rti.asyncio

This module must be imported in order to use the methods rti.connextdds.DataReader.take_async() and rti.connextdds.DataReader.take_data_async().

These two methods are added to the DataReader class when this module is imported.

The module also defines rti.asyncio.Application, an easy way to subscribe to topics by decorating functions that receive each topic update.

The module also defines a convenience function rti.asyncio.run, which is similar to asyncio.run, and can synchronously run the main async function in an application, as well as perform certain cleanup tasks at the end.

See Subscriptions.

class rti.asyncio.Application

An rti.asyncio.Application provides an easy way to subscribe to topics by simply decorating async functions to receive topic updates.

To use this class:

  1. Create an app instance of this class (typically one per application)

  2. For each topic you want to subscribe to decorate a function with @app.subscribe()

  3. Run the application, typically with rti.asyncio.run(app.run(domain_id))

For example:

app = rti.asyncio.Application()

@app.subscribe(topic_name="Sensor Temperature")
async def on_sensor_temperature(data: Temperature):
    print(data)

@app.subscribe(topic_name="Alerts")
async def on_alert(data: Alert):
    print(data)

rti.asyncio.run(app.run(domain_id=0))
async run(domain_id: Union[int, DomainParticipant] = 0, qos_file: Optional[str] = None)

Start all subscriptions and start notifying decorated functions.

This function creates a dds.DomainParticipant with the given domain_id or uses an existing participant if one is provided. It then creates a DataReader for each function that was decorated with @app.subscribe.

The optional qos_file argument loads an XML file used to look up any QoS profiles specified in the @app.subscribe decorator. By default the default Connext methods to look for profiles are used, including the <working directory>/USER_QOS_PROFILES.xml file.

The simplest way to run this method is with rti.asyncio.run:

if __name__ == "__main__":
    rti.asyncio.run(app.run(domain_id))

To stop the application (and all subscriptions), run as a task and cancel it when needed:

async def main():
    task = asyncio.create_task(app.run(domain_id))
    # subscriptions are running ...
    task.cancel()
    await task

if __name__ == "__main__":
    rti.asyncio.run(main())
subscribe(topic_name: str, qos_profile: Optional[str] = None)

Decorator for a function that will receive samples of the given topic.

The decorated function must meet the following requirements:

  • It must be async.

  • It must have at least one argument.

  • The first argument must have a type annotation that corresponds to the type of the topic.

The function may have an optional second argument with the type annotation dds.SampleInfo.

The decorator receives the topic name and an optional QoS profile name. The QoS profile name is used to configure the DataReader and Subscriber QoS.

Example:

@app.subscribe(topic_name="MyTopic", qos_profile="my_library::my_profile")
async def on_my_topic(data: MyType):
    print(data)

Decorating a function has the following effect when run() is called:

  • A dds.Topic with name "MyTopic" and type MyType is created. If one already exists and the type is the same, it is re-used. If the type is different, an exception is raised.

  • A dds.DataReader is created for the topic. If a QoS profile is provided, the profile is loaded and used to configure the DataReader and Subscriber.

  • The decorated function is called with each data sample received by the DataReader.

  • If the function has a second argument with the type dds.SampleInfo, the sample meta-data is also passed to the function. For updates with meta-data only (e.g. when an instance is disposed), the first argument is None.

rti.asyncio.run(coroutine)

Uses the current event loop to run the given coroutine and waits until it finishes. If there is no current running event loop, a new one is created. When it ends, it cleans up global resources.