DataReader data changes on multiple threads

3 posts / 0 new
Last post
Offline
Last seen: 6 years 10 months ago
Joined: 07/12/2012
Posts: 51
DataReader data changes on multiple threads

StatusCondition can be used to detect when data changes on a DataReader. This is useful in a single thread, but if there are multiple threads that query the same  DataReader then a read() operation will effect the state of the DataReader in one thread and the other thread(s) will miss an update.So I looked through the API and found the field received_sample_count in DataReaderProtocolStatus. It seems reasonable that each thread can update a variable with received_sample_count and checks for changes in this count to detect a change of data in the DataReader.

My questions are:

Is this a reasonable use of it ?

Is it a reliable method for this type of use ?

Are there any other better solutions ?

 

Thanks

Nico

Organization:
Gerardo Pardo's picture
Offline
Last seen: 4 weeks 16 hours ago
Joined: 06/02/2010
Posts: 602

Yes, I believe that if you create multiple DDS WaitSet objects, one per thread, and install the StatusCondition in all the WaitSets and and wait on then for each corresponging Thread then all the WaitSets will be awaken when the StatusCondition. If you do do not want to "wait" from some of the threads and instead check when you want to see if some sample has arrived then looking at the received_sample_count in DataReaderProtocolStatus seems like a good approach to me. But note that this may miss things like the samples that notifies that an instance has changed to NOT_ALIVE_NO_WRITERS because this is locally generated and does not always correspond to a sample received from a DataWriter. To catch those also you will need to also monitor the counters in the LivelinessChangedStatus .

Also I do not think there is currently a way for each thread to just read the "new" samples that have been received. You will basically have to read all the samples and look sfor the new ones. You can do that efficiently by reading with the read sequence API which returns a sequence of samples without making a copy. This samples are returned sorted by SampleInfo::reception_sequence_number.  This number is incremented for each sample inserted into the DataReader cache so ii each thread saved the higuest  SampleInfo::reception_sequence_number that it had seen so far it could search in the sequence obtained from the DataReader for samples that have a higher SampleInfo::reception_sequence_number.

We have been thinking of adding some new "read" APIs that support the above use-case but unfortunately these are still not included in the product...

Worst case if the above does not work for all cases you could create some application level Queue to decouple the DDS DataReader from the samples the differnet threads see. As samples arrive you can put them into this queue, signal the application readers and use some reference counting to know when the samples have been seen by all the readers. But that is a lot of extra work to say the least...

 

 

Offline
Last seen: 6 years 10 months ago
Joined: 07/12/2012
Posts: 51

It was a misunderstanding on my part then. From your explanation I conclude that one can use a  WaitSet per thread plus the StatusCondition. I will consider that option then.

Thanks for the information on the other options that are also available. It answers the questions I had.

Nico.