Subscriptions¶
An application uses DataReaders to access data received over DDS. A DataReader is associated with a single Topic. You can have multiple DataReaders and Topics in a single application. In addition, you can have more than one DataReader for a particular Topic in a single application. Subscribers own and manage DataReaders.
Creating a DataReader¶
To create a DataReader, you need to create a Topic (see Topics) and a DomainParticipant. Optionally, you can add a QoS parameter and a listener.
The following code creates a DataReader
for the Topic
we created in the Topics section:
subscriber = dds.Subscriber(participant)
reader = dds.DataReader(subscriber, topic)
A DataReader can be created with a ContentFilteredTopic
, instead of a regular
Topic to define a content-based subscription with a filter on the data type.
Reading data¶
When data is available, the DataReader.read_data()
and
DataReader.take_data()
methods return a collection of data samples of the
type specified in the Topic. (take_data
removes the data from the reader,
and read_data
keeps it so it can be accessed again.)
for sample in reader.take_data():
print(sample)
To also access the meta-data associated with each data sample, use
DataReader.take()
or DataReader.read()
:
for data, info in reader.take():
if info.valid:
print(f"Data received: {data}")
else:
print(f"State changed: {info.state}")
Each call to these methods returns newly created objects (even read()
),
unlike other Connext language bindings, which return temporary loaned
objects.
The DataReader.select()
method allows selecting which
data to read.
Being notified when data is available¶
There are a few ways to check if a reader has data available:
Polling for data
Using a Condition and a WaitSet
Reading with an asynchronous generator
Using the DataReaderListener
Polling for data means that you call the “read” or “take” methods described before at certain intervals to check if they return any data.
A StatusCondition
and a WaitSet
allows waiting synchronously
until a DataReader status change triggers, including the DATA_AVAILABLE
status:
def process_data(_):
nonlocal reader
for sample in reader.take_data():
print(sample)
# Each Entity has a StatusCondition
status_condition = dds.StatusCondition(reader)
# Specify which status to get notified about and set the handler:
status_condition.enable_statuses = dds.StatusMask.DATA_AVAILABLE
status_condition.handler(process_data)
# Attach the condition to a waitset and call dispatch() to execute the
# condition handlers when they become active
waitset = dds.WaitSet()
waitset += status_condition
while True:
waitset.dispatch(dds.Duration(4)) # Wait up to 4 seconds
The async
versions of the “take” methods provide a simple way to write your
subscriber application. The methods DataReader.take_data_async()
or
DataReader.take_async()
work as asynchronous generators, returning
data as it is received and awaiting as necessary.
To use these functions your application must import rti.asyncio
, which
requires Python 3.7+.
import rti.asyncio
# ...
async def print_infinite(reader: dds.DataReader):
# Print data as it arrives, suspending the coroutine until data is
# available.
async for data in reader.take_data_async():
print(data)
if __name__ == "__main__":
# create reader...
# you can use Python's asyncio.run() as well
rti.asyncio.run(print_infinite(reader))
take_data_async()
and take_async()
receive an optional condition
argument (a dds.ReadCondition
or dds.QueryCondition
) that can select
data by state or content.
Finally, you can use a DataReaderListener
to get notified of status
updates, including new data. This method is only recommended for lightweight
processing, since the listener callback is executed in an internal Connext
thread, and should not block or perform CPU-heavy operations.
Special DataReaders¶
- In addition to the class
DataReader
, there are a few separate DataReader classes for certain types:
For
DynamicData
DataReaders:DataReader.Topic
(see DynamicType and DynamicData)For the built-in discovery Topics:
ParticipantBuiltinTopicData.DataReader
,SubscriptionBuiltinTopicData.DataReader
,PublicationBuiltinTopicData.DataReader
,TopicBuiltinTopicData.DataReader