Python Connext API: getting discovered topic data

32 posts / 0 new
Last post
Offline
Last seen: 4 years 3 weeks ago
Joined: 10/22/2020
Posts: 22
Python Connext API: getting discovered topic data

Really basic question about the new Python Connext API: I'm attempting to get discovery information about all discovered topics from any DataWriter from any other participant in a particular domain. This needs to be done without type information or any preconcieved knowledge about the domain and needs to use only builtin information on topics.

I've hit a snag: I've managed to get discovery information on the other participants and have been able to get the discovered participant data. I'm at the point that I have a ParticipantBuiltinTopicDataSeq:

with dds.DomainParticipant(DOMAIN_ID) as participant:
    subscriber = dds.Subscriber(participant)


     other_participants = participant.discovered_participants()
     data = participant.discovered_participant_data(other_participants)

     #type ParticipantBuiltinTopicDataSeq
     print(data.pop())

I do not know how to get from ParticipantBuiltinTopicData to getting the writers or even getting the topic data. Please advise.

Howard's picture
Offline
Last seen: 1 day 49 min ago
Joined: 11/29/2012
Posts: 610

Hi Wade,

You should check out all of the xxxBuiltinTopicData, there's

ParticipantBuiltinTopicData

TopicBuiltinTopicData

   (look up the function, discovered_topics(), to get a sequence of topics discovered by the participant)

PublicationBuiltinTopicData

SubscriptionBuiltinTopicData

Then you can get access to the builtin datareaders used by a Participant for discovery

publication_reader()

subscription_reader()

participant_reader()

this isn't well documented in the python API, so you kinda have to look at the Connext DDS Users manual to get an understanding of the builtin topics used for discovery.  Then you have to install a reader listener to get the discovery information.

 

Marc.Chiesa's picture
Offline
Last seen: 2 years 9 months ago
Joined: 07/24/2017
Posts: 32

Hi Wade,

Howard is correct that you need to retrieve this data through the builtin topics. In the examples/builtin_topics folder of the repo, the msg_publisher.py file demonstrates how to retrieve ParticipantBuiltinTopicData and SubscriptionBuiltinTopicData using listeners, though WaitSets would also work.

Since this relies on triggering on_data_available, it is recommended you start with the DomainParticipant disabled until the listeners have been installed so that discovery data is not received before your callbacks are ready to process it.

If you are interested in a polling mechanism to retrieve this information we don't currently have one in the master branch that allows you to get all publication/subscription data, but it was fairly easy to implement so I just put in a PR to add this capability.

Offline
Last seen: 4 years 3 weeks ago
Joined: 10/22/2020
Posts: 22

I really appreciate both of your inputs and will give it a go. I was looking at the examples/builtin_topics folder of the repo, the msg_subscriber example - it didn't dawn on me to check the publisher. That should help clarify things. Also, outstanding work on this polling mechanism. I'm equally familiarizng myself with the underlying library as well as this API, so I anticipate having more questions soon and it's nice to know this community is so responsive.

Offline
Last seen: 4 years 3 weeks ago
Joined: 10/22/2020
Posts: 22

Looking forward to trying out the polling mechanism once the PR goes through. In the meantime, I have managed to adapt the mentioned example to create a BuiltinPublicationListener. my only question is, with the provided PublicationBuiltinTopicData that get discovered how would I go about recovering the topic and typecode information so I can build DataReader(s) outside of the binded listener? what are the steps to get from a PublicationBuiltinTopicData to a appropriate structype and Topic to subscribe to the participant and retrieve data?

Marc.Chiesa's picture
Offline
Last seen: 2 years 9 months ago
Joined: 07/24/2017
Posts: 32

There are a couple of gotchas here. Discovery does not always contain the TypeCode/TypeObject, so you will only be able to create DataReaders for those discovered endpoints that do provide that information during discovery and otherwise you would have to load the type information locally via XML.

The below is code you could use with the proposed polling mechanism, though you should be able to adapt it to your use case with listeners. I will note that due to the listener callback running from an RTI thread within an exclusive area you will have to move significant portions of the processing (i.e. the creation of entities) outside of the callback. A WaitSet might be preferable for this use case.

The below code will create a DynamicData DataReader for discovered writers if the type info is available. If you need to do more dynamically with the underlying type's structure like parse all of the fields, that's also possible but it is a more complex exercise. In this basic example you could set up listeners or waitsets to process received samples from each created DataReader and simply print the received data values.

# proposed polling for publication info
# False arg means only return remote publication data
pub_data = participant.publication_data(False)
 
# starting with no readers...
reader_dict = {}
 
for entry in pub_data:
   # only create a reader if we don't have one for this Topic
   if not entry.topic_name in reader_dict:
      # find returns existing Topic object or None if not found
      topic = dds.DynamicData.Topic.find(participant, entry.topic_name)
      topic_type = entry.type
      if not topic:
         # type information may not be propagated during discovery, so
         # verify that the object is valid
         if topic_type:
            topic = dds.DynamicData.Topic(participant, entry.topic_name, topic_type)
        else:
            continue
      reader = dds.DynamicData.DataReader(participant.implicit_subscriber, topic)
      reader_dict[entry.topic_name] = reader
 
Offline
Last seen: 4 years 3 weeks ago
Joined: 10/22/2020
Posts: 22

Thank you - this is very clever. I'm lucky in a sense that most signals we can encounter with our usecase will have the typecode info. I just found the topic_name luckly prior to your response. Since the callback is running in a thread would it be sensible to make a multiprocessing manager dictionary to pass out dictionary info, or would there be any foreseeable issues with that?

Offline
Last seen: 4 years 3 weeks ago
Joined: 10/22/2020
Posts: 22

Also just to give a little background, this will be hooked into the backend of a server that will be piping output into a websocket endpoint for a dynamic data discovery dashboard. One the reader is set up, I think it will be straightforward to jsonfy the deserialized object which will have implicit type info encoded into it's structure - or should, at least.

Marc.Chiesa's picture
Offline
Last seen: 2 years 9 months ago
Joined: 07/24/2017
Posts: 32

Honestly I am not too familiar with the multiprocessing.Manager.dict. The biggest concern would be any potentially blocking operations inside the callback (I think this collection uses a lock/mutex), as that can delay the callback from returning to allow the thread to start reading data on the transport again. An alternative approach might be to push the publication data to a queue.Queue with the put_nowait method and then have another thread block on reading the Queue.

Offline
Last seen: 4 years 3 weeks ago
Joined: 10/22/2020
Posts: 22

I imagine with a queue I would remove the .state(dds.DataState.new_instance()) filtering and handle the new instances in a seperate thread in case the nowait fails.

Marc.Chiesa's picture
Offline
Last seen: 2 years 9 months ago
Joined: 07/24/2017
Posts: 32

Yes, I would remove that so that you can be alerted to QoS updates or metadata-only updates. Otherwise I would not expect to receive publication data from the same DataWriters periodically; unlike DomainParticipants, endpoints are not periodically refreshed with announcements. To avoid "missing" publication data, you could specify a large enough max size for the queue to accommodate a reasonable reception/processing rate of endpoint data or try making it unlimited because I think the only time nowait fails is if there is no space available in the queue.

Speaking of QoS updates, the example I posted doesn't account for any QoS policies you may wish to match (for example, using DDS reliability if the DataWriter offers it or setting Ownership to match). Request-offered policy settings for the DataWriter are provided in the PublicationBuiltinTopicData object so you should be able to create a QoS object with the desired policy settings that you can provide to the DataReader init.

Offline
Last seen: 4 years 3 weeks ago
Joined: 10/22/2020
Posts: 22

Thanks to your help, I'm getting closer to having a working back end. I'm having a couple of issues, however. I think you may be right regarding the QoS issues. Before I get to attempting to duplicate QoS is there a way to set a general QoS to be permissable enough to exempt most possibilities?

Also, I'm having an issue with keyboard interrupts while my script is running. I'm not sure why - I do not think it is my code because I'm not doing any error handling yet.

Marc.Chiesa's picture
Offline
Last seen: 2 years 9 months ago
Joined: 07/24/2017
Posts: 32

The default DataReader QoS is fairly permissive in terms of matching, though there are some policies (such as Ownership) that are more strict in their matching behavior. If you have access to RTI Admin Console, it provides a quick way to see if there are any QoS mismatches between endpoints on a topic. An alternative would be installing a listener callback for "on_requested_incompatible_qos" to determine which policies are giving you problems. This is a DataReader oriented callback, but you can set it in the listener for a Subscriber or DomainParticipant and have the status "bubble up" so you only have to have one listener to catch the status for all contained DataReaders.

As for the KeyboardInterrupt, this is unfortunately a known issue with native Python extensions. The main Python thread catches the SIGINT (ctrl-C) signal and sets a value that doesn't get checked until it gets back to running Python code. While the Python API attempts to release the GIL when performing long-running DDS operations, the KeyboardInterrupt exception is only thrown if the DDS call is running on a background thread in these scenarios. I have verified that sending a SIGQUIT (or SIGBREAK on Windows) via keyboard shortcut can still kill a script. Other possible solutions (though not tested) would be to override the SIGINT handler with the Python signal package or take long running/infinite wait operations and put them in loops with a shorter timeout or run your DDS code in another thread that can be signaled for exit (e.g. with a GuardCondition to a WaitSet) or run DDS code in a separate thread that can be monitored by the main thread and signaled if necessary (e.g with a GuardCondition on a WaitSet). That all being said, can you provide any details about what the application is doing when interrupted?

Edit: it seems signals processed through the Python signals package are subject to the same restrictions.

Howard's picture
Offline
Last seen: 1 day 49 min ago
Joined: 11/29/2012
Posts: 610

If you use the QOS discovered and provided in PublicationBuiltinTopicData to configure the DataReader's QOS, then the datareader should be compatible with the datawriter....however, only that datawriter.  It could be not compatible with other datawriters of the same topic.

If you use Admin Console, you can see which QOS values are used for compatibility matching.  If you create a DataReader with the most permissive value for each QOS, then it should be OK for that QOS for any datawriters.  For example, if you don't set the Deadline QOS value, leave it at infinite, that should match any datawriter.

However, as Marc mentioned, there are QOS values for which an exact setting is require...for example, OWNERSHIP.  The reader and writer OWNERSHIP Kind need to be the same for a match.

 

Offline
Last seen: 4 years 3 weeks ago
Joined: 10/22/2020
Posts: 22

Sorry for the delay in response. as it stands, I am using several callbacks for handling the numerous listeners that are being discovered - I realize now that I was inadvertently starving one of the threads but the issues requiring an interrupt are no longer present. I may need a little guidance on something else: for the DynamicType data coming off of these readers I need to recursively deserialize them. Is there a quick method to do this - aka convert them to python dictionary and list-objects. How would I go about this?

Offline
Last seen: 4 years 3 weeks ago
Joined: 10/22/2020
Posts: 22

It looks like I still am getting performance issues/lock up after all. For the current application, we are looking at up to 40 topics per domain right now and I think having that many bindings on the readers are causing a host of problems. Currently, I am using:

reader.bind_listener(MsgListener(self.msg_q, entry, self.domain_id), dds.StatusMask.data_available())

to get values from the readers into a shared queue for processing. For the usecase it is acceptable to drop some messages and instead update as able. Would this be a QoS setting (I am still getting my bearings with the robust QoS system for DDS), or would it be preferable to iteratively poll values from the readers in some way as opposed to having callbacks whenever data is available?

Offline
Last seen: 4 years 3 weeks ago
Joined: 10/22/2020
Posts: 22

I think I will be moving forward with WaitSet experimentation to potentially solve this.

Marc.Chiesa's picture
Offline
Last seen: 2 years 9 months ago
Joined: 07/24/2017
Posts: 32

I think WaitSets will improve the responsiveness on the DDS side, though if you have any additional information on the lockups you have experienced with Listeners that can help us improve performance in the Python API I would be happy to look into it.

As far as recursively processing DynamicData objects without knowing their types a priori, it is possible though not simple from an implementation standpoint. Unfortunately, while we provide methods to create and update DynamicData objects from a Python dictionaries there is currently no method to go in the opposite direction. I'll make a note of that as a potential API enhancement.

Offline
Last seen: 4 years 3 weeks ago
Joined: 10/22/2020
Posts: 22

I was just able to improve some performance issues/lockup by changing the QoS history on all constructed DataReaders to History.keep_last(1) as well as duplicating the reliability and ownership from the publications in question. Before the entire system was locking up once it started hitting around 20 readers, whether this was by binding a listener to the reader or building out waitsets around the reader. This seems to resolve now. I think I will be sticking to waitsets and use awaits/async to handle them. Each domain participant will be spun off into its own process through the multiprocessing library to get the GIL out of the way. 

I will have to deserialize these. My thoughts are using list(dynamic_data_obj) to get member names per each 'layer' of the object, then attempt to parse the variables as strings, ints, bools, etc. and if all that fails, recursively get a list again. This doesn't account for what to do with actual lists. Do you have an idea for a better flow to achieve this right now?

Marc.Chiesa's picture
Offline
Last seen: 2 years 9 months ago
Joined: 07/24/2017
Posts: 32

You can definitely iterate through the DynamicData objects. The object has all of the information necessary to determine how many fields are present (data.member_count) whether a field exists (data.member_exists(field_name_or_index)), info about the member including the type kind and, if it is a collection, the type of its elements (data.member_info(field_name_or_index)). You

For another option, if your data model isn't very complex I just pushed a small change to the master branch that allows you to print a DynamicData object to json which you could then convert using the Python json package:

format = rti.connextdds.PrintFormatProperty.json()
...
json.loads(data.to_string(format))
 
Offline
Last seen: 4 years 3 weeks ago
Joined: 10/22/2020
Posts: 22

As the end goal is to generate JSON strings, this is entirely convenient. Thank you. As the data is mostly nested structs and lists I think this will work very well. Are there datatypes you do not foresee working appropriately?

Marc.Chiesa's picture
Offline
Last seen: 2 years 9 months ago
Joined: 07/24/2017
Posts: 32

I'm not aware of anything specifically that would be incompatible in terms of data types, but if you encounter one that is problematic please feel free to share it here and I'll look into it.

Offline
Last seen: 4 years 3 weeks ago
Joined: 10/22/2020
Posts: 22

So, I'm still running into some issues at scale where I'm experiencing lockup: it's entirely spurious, however, and it acts as if a potential race condition. Mostly, it seems to be when a callback occurs on the publication reader for the discovery of a new topic and in a separate thread that is building the readers for the topic. In particular, the code halts where:

reader = dds.DynamicData.DataReader(
    self.participant.implicit_subscriber,
    topic,
    qos
)

Note that while this is happening, an on_data_available callback is occurring where it is reading from the publication reader. Is there a better way to troubleshoot this? I'm not getting any errors, just the threads locking up. RTI manager is throwing some warnings, but they're mostly about type compression and something about remote endpoints not being asserted by a plugin.

Offline
Last seen: 4 years 3 weeks ago
Joined: 10/22/2020
Posts: 22

I just verified that when the halt occurs, it freezes all of the threads within the process. I suspect this has something to do with the GIL

Marc.Chiesa's picture
Offline
Last seen: 2 years 9 months ago
Joined: 07/24/2017
Posts: 32

Yes, I suspect it is a GIL issue as well. When processing a listener callback and creating an entity simulataneously, the two threads are competing for an internal Exclusive Area lock. I'll see what I can do to resolve this. If this is the problem, I believe that a WaitSet that processes the incoming discovery data and creates the readers within the same thread would be a workaround for the issue because any EA accesses would be serialized.

Marc.Chiesa's picture
Offline
Last seen: 2 years 9 months ago
Joined: 07/24/2017
Posts: 32

I just pushed an update to master that expands GIL releases in DDS Entity methods, as those are the ones that have to deal with the EAs. Please let me know if that resolves the issue you are seeing.

Offline
Last seen: 4 years 3 weeks ago
Joined: 10/22/2020
Posts: 22

Unfortunately, it seems I'm hitting the same issues. To give a better background, this is the callback method that is occurring on the publisher:

def __init__(self, data_q):
    super(BuiltinPublicationListener, self).__init__()
    self.data_q = data_q

def on_data_available(self, reader):
    with reader.take() as samples:
        for sample in filter(lambda s: s.info.valid, samples):
            topic = sample.data.topic_name
            topic_type = sample.data.type
            type_name = sample.data.type_name
            participant_key = sample.data.participant_key
            key = sample.data.key
            instance_handle = sample.info.instance_handle
            ownership = sample.data.ownership
            reliability = sample.data.reliability

            try:
                self.data_q.put_nowait(
                    {
                        'instance_handle': instance_handle,
                        'key': key,
                        'participant_key': participant_key,
                        'topic': topic,
                        'topic_type': topic_type,
                        'type_name': type_name,
                        'ownership': ownership,
                        'reliability': reliability
                    })

            except queue.Full:
                ...
Marc.Chiesa's picture
Offline
Last seen: 2 years 9 months ago
Joined: 07/24/2017
Posts: 32

Thanks for the code! I was able to reproduce the issue and attach to the Python process with a debugger to find that it is still a GIL issue. Even though I had put a call guard to release the GIL on all functions/properties for the DomainParticipant, I now suspect that it does not get properly applied to properties based on other policies not working correctly with properties (https://github.com/pybind/pybind11/issues/2618). I was able to circumvent the issue by applying the call guard within the lambda of the offending property getter and now the issue seems resolved for me. I will go through the code and apply this universally. I'll post again once I have tested and pushed the fix.

Offline
Last seen: 4 years 3 weeks ago
Joined: 10/22/2020
Posts: 22

Ah, interesting bug! I've begun to work around this by utilizing waitsets but I'm just not getting the same performance as before on reads. I'm working around this by utilizing async/await right now in the thread, but if I can use callbacks that would be amazing.

Marc.Chiesa's picture
Offline
Last seen: 2 years 9 months ago
Joined: 07/24/2017
Posts: 32

Ok, just pushed the commit with the workaround property definitions. Let me know if you have better success with these changes!

Offline
Last seen: 4 years 3 weeks ago
Joined: 10/22/2020
Posts: 22

Sorry for the Delay - I've pulled in changes and have tested them against the branch that was still using callbacks and it looks like everything is working pretty well so far! My most recent branch is using coroutines instead of threading and async_waits and it seems to be just as performant. Thank you for your help through all of this - I will be sure to circle back if I have any other questions/concerns.

Offline
Last seen: 2 months 3 weeks ago
Joined: 06/24/2024
Posts: 1

I know this issue dealt with the old RTI Python Connector, but I'm running into a GIL issue in the new 7.3 python API. I've got a waitset on a data reader. The wait set is triggered correctly by another python process, but when I try to take data from the reader python throws the following error: "Fatal Python error: PyThreadState_Get: the function must be called with the GIL held, but the GIL is released (the current Python thread state is NULL) Python runtime state: initialized". I've tried acquiring a lock with the threading package `threading.lock()` and acquiring the GIL manually with the ctypes package:

import ctypes

# Access the Python C API
PyGILState_Ensure = ctypes.pythonapi.PyGILState_Ensure
PyGILState_Release = ctypes.pythonapi.PyGILState_Release

def handle_data(self):
    gil_state = PyGILState_Ensure()
    try:
        samples = self.reader.take()
    finally:
        PyGILState_Release(gil_state)     

It's pretty clear it's the C/C++ code getting run just doesn't have the GIL. I'm not sure if this problem is related to the GIL discussion here though.