subcribe to all topic available using python API

8 posts / 0 new
Last post
Offline
Last seen: 2 years 6 months ago
Joined: 12/06/2021
Posts: 17
subcribe to all topic available using python API

Hello, 

I'm pretty new to DDS and I want to build an application that requires subscribing to all the available topics. Do you have an example of how to do this? Or can you point me to the API functions and steps that may be helpful to receive all messages and be able to extract information from their data?

Thanks in advance.

Howard's picture
Offline
Last seen: 6 days 6 hours ago
Joined: 11/29/2012
Posts: 618

Hi,

What you've asked isn't actually very easy to do.  It's not a beginner...and requires the use of Builtin Discovery topics as well as the DynamicData API.  I would suggest you look for examples as well as read the manual to learn about Builtin Discovery topics first and then DynamicData second.  I don't know how much of the DynamicData API is supported in the Python API yet...you can check the docs to see if they exist in the current version of the Python API.  If not, you'll have to use a compiled language such as C++.  Or you can use Java.

Although, if you're very new to DDS, you probably wanna first start with some basic tutorials/helloworld examples. Which again you can find...there's GettingStartedGuide that you can use for a tutorial.  And tons of examples in the distribution as well as on community.rti.com.

--Howard

Offline
Last seen: 2 years 6 months ago
Joined: 12/06/2021
Posts: 17

Hello Howard, thanks for the fast response. I have done some of the basic tutorials in python and c++, I will read the manual on these 2 sections (Builtin Discovery topics and DynamicData API). I let you know if I have further questions.

 

Offline
Last seen: 2 years 6 months ago
Joined: 12/06/2021
Posts: 17

Hello again Howard,

Do you have examples for either C++ or Java showing something similar to what I'm trying to achieve?

Howard's picture
Offline
Last seen: 6 days 6 hours ago
Joined: 11/29/2012
Posts: 618

If you search "builtin topic" on the community.rt.com website, you'll find solutions, examples and HOWTOs that are related to what you want to do.  The other search terms I would suggest are "DynamicData" and "Dynamic Data", which also produce a set of relevant links to follow.

Offline
Last seen: 2 years 6 months ago
Joined: 12/06/2021
Posts: 17

Hi again, how can get the topic name from a DataReader in python?

The current Listener I'm using in reader.bind_listener looks like the following:

 

class Listener(dds.DynamicData.NoOpDataReaderListener):
    def on_data_available(self, reader):
        print("Received updates:", reader)
        with reader.take() as samples:
            for sample in filter(lambda s: s.info.valid, samples):
                data = sample.data
                print(data)
 
Thanks in advance.
Howard's picture
Offline
Last seen: 6 days 6 hours ago
Joined: 11/29/2012
Posts: 618

I'm not familiar with the Python API, but the documentation for rti.connextdds.anydatareader which is derived from rti.connext.Ianydatareader has a topic_name property.

https://community.rti.com/static/documentation/connext-dds/current/api/connext_dds/api_python/rti.html#rti.connextdds.IAnyDataReader

Marc.Chiesa's picture
Offline
Last seen: 2 years 10 months ago
Joined: 07/24/2017
Posts: 32

Hi Emilio,

I put together a simple example showing how to subscribe to all discovered publications that provide type information during discovery using the builtin topics and a listener (though WaitsSets are also an option and can be set up to use asyncio). When the DynamicData reader is created, the print statement uses the "topic_name" property of the reader to populate the formatted string. Please let me know if you have any questions.


import rti.connextdds as dds
import time
import argparse
import queue


# A listener for msg samples
class AllPublicationListener(dds.PublicationBuiltinTopicData.NoOpDataReaderListener):
   def __init__(self, queue):
      super().__init__()
      self._queue = queue

   def on_data_available(self, reader):
      with reader.take() as samples:
         for sample in (s for s in samples if s.info.valid and s.data.type):
           # Create a non-loaned copy of the publication data
           sample_copy = dds.PublicationBuiltinTopicData.Sample(sample)
           print(f'Discovered publisher on topic "{sample_copy.data.topic_name}"')
           self._queue.put_nowait(sample_copy)


def application_main(domain_id):
   participant = dds.DomainParticipant(domain_id)
   pub_reader = participant.publication_reader
   pub_queue = queue.Queue(0)
   pub_reader.bind_listener(AllPublicationListener(pub_queue), dds.StatusMask.DATA_AVAILABLE)

   readers = {}
   while True:
      sample = pub_queue.get(block=True, timeout=None)
      if sample.data.topic_name in readers:
         print(f'already created reader for topic {sample.data.topic_name}')
      else:
         topic = dds.DynamicData.Topic.find(participant, sample.data.topic_name)
         if not topic:
            topic = dds.DynamicData.Topic(participant, sample.data.topic_name, sample.data.type)
         # Ideally set QoS appropriately and add data handling
         reader = dds.DynamicData.DataReader(
            participant.builtin_subscriber,
            topic)
         readers[sample.data.topic_name] = reader
         print(f'Created DataReader for topic {reader.topic_name}')


if __name__ == "__main__":
   parser = argparse.ArgumentParser(
      description="RTI Connext DDS Example: Using Builtin Topics to subscribe to all"
   )
   parser.add_argument("-d", "--domain", type=int, default=0, help="DDS Domain ID")
args = parser.parse_args()
   application_main(args.domain)