rti.asyncio
This module must be imported in order to use the methods
rti.connextdds.DataReader.take_async()
and rti.connextdds.DataReader.take_data_async()
.
These two methods are added to the DataReader class when this module is imported.
The module also defines rti.asyncio.Application
, an easy way to
subscribe to topics by decorating functions that receive each topic update.
The module also defines a convenience function rti.asyncio.run
, which
is similar to asyncio.run
, and can synchronously run the main async function
in an application, as well as perform certain cleanup tasks at the end.
See Subscriptions.
- class rti.asyncio.Application
An rti.asyncio.Application provides an easy way to subscribe to topics by simply decorating async functions to receive topic updates.
To use this class:
Create an
app
instance of this class (typically one per application)For each topic you want to subscribe to decorate a function with
@app.subscribe()
Run the application, typically with
rti.asyncio.run(app.run(domain_id))
For example:
app = rti.asyncio.Application() @app.subscribe(topic_name="Sensor Temperature") async def on_sensor_temperature(data: Temperature): print(data) @app.subscribe(topic_name="Alerts") async def on_alert(data: Alert): print(data) rti.asyncio.run(app.run(domain_id=0))
- async run(domain_id: Union[int, DomainParticipant] = 0, qos_file: Optional[str] = None)
Start all subscriptions and start notifying decorated functions.
This function creates a
dds.DomainParticipant
with the given domain_id or uses an existing participant if one is provided. It then creates a DataReader for each function that was decorated with@app.subscribe
.The optional
qos_file
argument loads an XML file used to look up any QoS profiles specified in the@app.subscribe
decorator. By default the default Connext methods to look for profiles are used, including the<working directory>/USER_QOS_PROFILES.xml
file.The simplest way to run this method is with
rti.asyncio.run
:if __name__ == "__main__": rti.asyncio.run(app.run(domain_id))
To stop the application (and all subscriptions), run as a task and cancel it when needed:
async def main(): task = asyncio.create_task(app.run(domain_id)) # subscriptions are running ... task.cancel() await task if __name__ == "__main__": rti.asyncio.run(main())
- subscribe(topic_name: str, qos_profile: Optional[str] = None)
Decorator for a function that will receive samples of the given topic.
The decorated function must meet the following requirements:
It must be
async
.It must have at least one argument.
The first argument must have a type annotation that corresponds to the type of the topic.
The function may have an optional second argument with the type annotation
dds.SampleInfo
.The decorator receives the topic name and an optional QoS profile name. The QoS profile name is used to configure the DataReader and Subscriber QoS.
Example:
@app.subscribe(topic_name="MyTopic", qos_profile="my_library::my_profile") async def on_my_topic(data: MyType): print(data)
Decorating a function has the following effect when
run()
is called:A
dds.Topic
with name"MyTopic"
and typeMyType
is created. If one already exists and the type is the same, it is re-used. If the type is different, an exception is raised.A
dds.DataReader
is created for the topic. If a QoS profile is provided, the profile is loaded and used to configure the DataReader and Subscriber.The decorated function is called with each data sample received by the DataReader.
If the function has a second argument with the type
dds.SampleInfo
, the sample meta-data is also passed to the function. For updates with meta-data only (e.g. when an instance is disposed), the first argument isNone
.
- rti.asyncio.run(coroutine)
Uses the current event loop to run the given coroutine and waits until it finishes. If there is no current running event loop, a new one is created. When it ends, it cleans up global resources.