Event Streaming¶
The PlanetSide 2 API supports streaming in-game events directly to your client.
This is useful for obtaining real-time event data, but can also be used to gain access to data that would otherwise not be available through the REST API (see the Examples below).
To gain access to event streaming functionality in Auraxium, you must use the auraxium.event.EventClient
class. This is a subclass of auraxium.Client
and still supports the full REST API.
Note
When using the event client, be wary of using the asynchronous context manager interface. When the context manager body finishes execution, the WebSocket connection is shut down as well:
async with auraxium.event.EventClient() as client:
@client.trigger(...)
async def action(event):
...
# This block is left immediately, shutting the client
You can mitigate this by using an asyncio.Event
or a similar asynchronous flag to keep the control flow within the context manager until you are ready to shut it down.
Alternatively, you can use asyncio.loop.run_forever()
to keep the event loop running even after the enclosing method finishes:
loop = asyncio.new_event_loop()
loop.create_task(main())
loop.run_forever()
Trigger System Overview¶
Auraxium wraps the real-time event endpoint of the PlanetSide 2 API in an event trigger system, which has been inspired by scripting hooks used in many game engines.
Note
Users familiar with the discord.py package can skip ahead to Event Types section.
The system used to define event listeners and commands in d.py is very similar to Auraxium’s trigger system, with Conditions being comparable to d.py’s checks.
Usage examples and the trigger definition syntax are covered further below.
Triggers¶
Triggers are the main building block of the event streaming interface. They store what events to listen to (e.g. players deaths) and then execute a function when such an event is encountered (e.g. mock them in Discord).
In the case of the PlanetSide 2 event stream, this information is also used to dynamically generate or update the subscription messages needed to receive the appropriate payloads through the WebSocket API.
A trigger can be set up to listen to more than event at once. Information about the event types available and the data they make accessible can be found in the event types section below.
Example
The minimum code required to set up an event trigger and its action uses the auraxium.event.EventClient.trigger()
decorator:
client = auraxium.event.EventClient()
@client.trigger(auraxium.event.Death)
async def print_death(event):
... # Do stuff
This version is shortest and will be used for most examples as it covers most use cases, but does not support some advanced trigger features like conditions.
For the full set of features, instantiate the auraxium.event.Trigger
manually, add any actions and conditions, and finally register it to the client via auraxium.event.EventClient.add_trigger()
:
client = auraxium.event.EventClient()
my_trigger = auraxium.Trigger(auraxium.event.Death)
@my_trigger.action
async def print_death(event):
... # Do stuff
client.add_trigger(my_trigger)
Conditions¶
Whether a trigger fires is mainly controlled by its events. However, sometimes you may have multiple triggers who’s event definitions overlap to some extent.
To keep your event callbacks tidy and not cause unnecessary triggering of potentially expensive actions, you can additionally specify any number of conditions that must be met for the trigger to fire.
Conditions are stored in the Trigger.conditions
list and are safe to be modified or updated at any point.
This list may contain synchronous callables or any object that evaluates to bool
. Callables must take a single argument: the auraxium.event.Event
encountered.
Important
Coroutines (i.e. functions defined via async def
) are not supported as conditions and will be cast to bool
instead (i.e. always pass).
Example
valid_character_ids = [5428072203494645969, ...]
def check_killer_id(event):
"""Example condition that checks the payload's killing player."""
payload = event['payload']
assert payload['event_name'] == 'Death'
char_id = int(payload['attacker_character_id'])
return char_id in valid_character_ids
trigger = auraxium.EventTrigger(auraxium.event.Death)
trigger.conditions.append(check_killer_id)
@trigger.action
async def filtered_death(event):
... # Do stuff
Actions¶
A trigger action is a method or function that will be run when the corresponding trigger fires (i.e. a matching event is encountered and all conditions were met).
Actions may be synchronous functions or coroutines; anything that is a coroutine as determined by inspect.iscoroutinefunction()
will be awaited.
The only argument passed to the trigger action is the auraxium.event.Event
received.
Example
async def example_action(event: Event) -> None:
"""Example function to showcase the signature used for actions.
Keep in mind that this could also be a regular function (i.e. one
defined without the "async" keyword).
"""
# Do stuff
Event Types¶
You can find a list of all known event types in the auraxium.event
module documentation.
Filtering by Experience ID¶
Due to the high volume of events matching auraxium.event.GainExperience
, it is also possible to only listen for specific experience IDs.
Due to the dynamic nature of these events, they are not part of the auraxium.event
namespace, but are instead generated dynamically via the GainExperience.filter_experience
method:
- classmethod GainExperience.filter_experience(id_)
Factory for custom, experience ID specific events.
This method is used to generate custom event names that allow only subscribing to a single type of experience gain. The returned string can be passed to a
auraxium.event.Trigger
.- Parameters:
id (int) – The experience ID to subscribe to.
- Returns:
A custom event name for the given experience type.
The strings generated by this method can then be used in place of auraxium.event.Event
sub classes to define triggers and conditions.
Examples¶
Levelup Tracker¶
A single trigger listening to players gaining a new battle rank and printing the character’s name, their title, and the newly gained battle rank.
"""Print whenever any player receives a new battle rank."""
import asyncio
import auraxium
from auraxium import event, ps2
async def main():
# NOTE: Depending on player activity, this script may exceed the ~6
# requests per minute and IP address limit for the default service ID.
client = auraxium.event.EventClient(service_id='s:example')
@client.trigger(event.BattleRankUp)
async def print_levelup(evt: event.BattleRankUp):
char = await client.get_by_id(ps2.Character, evt.character_id)
# Brand new characters may not be available in the API yet
if char is None:
print('Skipping anonymous player')
return
# NOTE: This value is likely more up-to-date than the one from the
# char.battle_rank attribute.
print(f'{await char.name_long()} has reached BR {evt.battle_rank}!')
_ = print_levelup
if __name__ == '__main__':
loop = asyncio.new_event_loop()
loop.create_task(main())
loop.run_forever()
Detecting Mutual Deaths¶
This script listens to player deaths, caching the ones it sees for a few seconds and looking for the opposite death (i.e. another death with the killer and victim reversed).
"""Example for detecting mutual deaths between players.
This script showcases using the event client to detect mutual deaths
between players; i.e. two players killing each other at (nearly) the
same time.
"""
import asyncio
import datetime
from typing import Dict, Tuple
import auraxium
from auraxium import event, ps2
# The maximum time difference between kills for them to be considered mutual.
MUTUAL_DEATH_WINDOW = 5.0
async def main() -> None:
"""Main script method."""
# Instantiate the event client
client = auraxium.event.EventClient(service_id='s:example')
# This dictionary is used to track recent deaths
cache: Dict[int, Tuple[int, datetime.datetime]] = {}
@client.trigger(event.Death)
async def on_death(evt: event.Death) -> None:
"""Run whenever a death event is received."""
now = evt.timestamp
victim_id = evt.character_id
killer_id = evt.attacker_character_id
# Ignore deaths not caused by enemy players
if killer_id == 0 or victim_id == killer_id:
return
# Remove outdated kills from cache
for cache_killer, cache_data in list(cache.items()):
cache_victim, cache_timestamp = cache_data
age = now - cache_timestamp
if age.total_seconds() > MUTUAL_DEATH_WINDOW:
del cache[cache_killer]
# Check remaining cache items for mutual deaths
for cache_killer, cache_data in cache.items():
cache_victim, _ = cache_data
if (cache_killer, cache_victim) == (victim_id, killer_id):
# Mutual death found!
# Get the names of the players involved
ids = ','.join((str(i) for i in (killer_id, victim_id)))
results = await client.find(ps2.Character, character_id=ids)
if not results:
# Ignore events if you cannot resolve the player names
return
victim, killer = results
# Get the name of the server these players are on
server = await victim.world()
print(f'{now}: [{server}] - Mutual death between '
f'{victim.name} and {killer.name}')
# Remove cache item as it was "consumed" for this mutual death
del cache[cache_killer]
return
# No-op; this mostly serves to fix "unused name" errors in some linters
_ = on_death
# No match found, add current event to cache instead
cache[killer_id] = victim_id, now
if __name__ == '__main__':
# NOTE: Be sure to use `run_forever()` rather than `run_until_complete()`
# when using the event client, otherwise the client would shut down as soon
# as the `main()` method finishes.
loop = asyncio.new_event_loop()
loop.create_task(main())
loop.run_forever()