ASYNCHRONOUS_PUBLISHER QoS and in-order sample delivery across multiple topics

5 posts / 0 new
Last post
Offline
Last seen: 13 years 7 months ago
Joined: 04/22/2011
Posts: 3
ASYNCHRONOUS_PUBLISHER QoS and in-order sample delivery across multiple topics

Hi. I would like to know if there is a way to guarantee the order of samples sent by multiple data writer belonging to the same subscriber if ASYNCHRONOUS_PUBLISHER QoSPolicy is used. At the moment, the topics that I am writing to are not keyed, however, I don't see how keyed topics can help here.

 

I am noticing that order of samples is not guaranteed if I use ASYNCHRONOUS_PUBLISHER policy even when samples are small in size (the order is observed using data reader listeners on the other side). The same is not true if I use SYNCHRONOUS_PUBLISHER policy.

 

Does DDS support this or is it a limitation?

 

Thanks.

 

EDIT: Forgot to mention. I know there are wait-based methods that allow me to wait until a particular sample has been delivered, however, this will block the caller for indefinite amount of time. I need to know if it is possible to send messages asynchronously without the need to block the caller. 

 

P.S: I am using C++ NDDS v.4.5c library running on Linux. The machines in question are connected directly using point-to-point protocol. No other network services are running on both machines.

Gerardo Pardo's picture
Offline
Last seen: 3 weeks 5 days ago
Joined: 06/02/2010
Posts: 602

 

Hello,

 

If I understood correctly you have multiple DataWriters, each on a separate Topic, but all inside the same DomainParticipant. And you would like to configure the QoS so that DataReaders within the same DomainParticipant (and and Subscriber)  can receive the samples in the same order that they were published.

 

If this is the case, then what you want fits what DDS calls presentation "ordered access" with access-scope GROUP.  This is a configuration of the PRESENTATION QoS policy. This policy has two relevant attributes for the use case: the  'access_scope' attribute with a setting of GROUP and the 'ordered_access' attribute with a setting of TRUE. According to the DDS spec, with the presentation QoS policy is configured this way, the order of samples written by multiple DataWriters within the same Publisher can be preserved and on the Subscriber side you can use a specialized access pattern to get the data in the same order it was published. This is described in sections 2.1.3.6, 2.1.2.5.1.9, and 2.1.2.5.2.8 of the DDS spec version 1.2

 

However, unfortunately, the GROUP setting for the PRESENTATION QoS policy is not supported by RTI DDS version 4.5d (currently the latest publicly released version from RTI).

The good news is that we have an upcoming scheduled maintenance release of RTI DDS this year that will include this feature... In the meantime if you describe your scenario a bit more perhaps I could think of alternative ways you can achieve your goals...

 

Regarding the wait-based methods. They do not have to block you for an 'indefinite' amount of time. The WaitSet::wait() operation takes a 'timeout' parameter. You can set it to your maximum desired wait and the operation will return control to you at that time at the latest if no event matched the associated conditions. So that way you can control how long your thread suspends control.

 

Regards,

Gerardo

Offline
Last seen: 13 years 7 months ago
Joined: 04/22/2011
Posts: 3

Hi Gerardo.

 

Thank you for your response.

 

"If I understood correctly you have multiple DataWriters, each on a separate Topic, but all inside the same DomainParticipant. And you would like to configure the QoS so that DataReaders within the same Domain articipant (and and Subscriber)  can receive the samples in the same order that they were published."

 

Yes, this is my scenario. Basic way to visualize this is to think of a single buffer to which all the samples are added one at a time. All that the data writers need to do is append their serialized samples to the end of the buffer and let the publisher send them out one at a time.

 

"If this is the case, then what you want fits what DDS calls presentation "ordered access" with access-scope GROUP.  This is a configuration of the PRESENTATION QoS policy. This policy has two relevant attributes for the use case: the  'access_scope' attribute with a setting of GROUP and the 'ordered_access' attribute with a setting of TRUE. According to the DDS spec, with the presentation QoS policy is configured this way, the order of samples written by multiple DataWriters within the same Publisher can be preserved and on the Subscriber side you can use a specialized access pattern to get the data in the same order it was published. This is described in sections 2.1.3.6, 2.1.2.5.1.9, and 2.1.2.5.2.8 of the DDS spec version 1.2"

 

I have one question about this GROUP access-scope. Once set up (assuming support for it is provided by RTI's DDS), will data reader listeners trigger in the correct order or do I really need to use another access method to obtain my samples? It would be very convenient if I can just keep using my data reader listeners and get my samples in the order that I want.

 

"Regarding the wait-based methods. They do not have to block you for an 'indefinite' amount of time. The WaitSet::wait() operation takes a 'timeout' parameter. You can set it to your maximum desired wait and the operation will return control to you at that time at the latest if no event matched the associated conditions. So that way you can control how long your thread suspends control."

 

Sorry, I was referring to the wait_for_asynchronous_publishing or the wait_for_acknowledgments functions, not a WaitSet.

 

My issue with using above functions (or a waitset for that matter) is that you really cannot know exactly as to how long it will take to deliver a sample to the other computer. Since messages are not sent in the order they are written (as it appears at the moment when using asynchronous publishing mode), I feel like there is no guarantee that I will be able to get an acknowledgment from the other side before my waiting period expires.

Maybe for smaller messages it can work, but if I send a large message that spans multiple packets, it may timeout before an acknowledgment is sent. Also, it is not 100% clear as to what to do next after a timeout. Does a timeout mean that I lost the other side or does it mean that I've spent inadequate amount of time waiting for acknowledgment? Perhaps all of this can be determined some other way (using various statuses that RTI provides), however, at the moment, it deviates from a simple send-it-and-forget-it approach that I am after. Please correct me if I am wrong in thinking this way.

 

Thanks.

 

 

 

 

 

 

 

 

Gerardo Pardo's picture
Offline
Last seen: 3 weeks 5 days ago
Joined: 06/02/2010
Posts: 602

 

Hello,

 

OK so your use case does indeed fit the PRESENTATION QoS policy with the access scope set to 'GROUP'.

Regarding to your question:

I have one question about this GROUP access-scope. Once set up (assuming support for it is provided by RTI's DDS), will data reader listeners trigger in the correct order or do I really need to use another access method to obtain my samples? It would be very convenient if I can just keep using my data reader listeners and get my samples in the order that I want.

 

Yes and no. There is a way to get the samples in order but its a bit more involved than that. The issue really is not wether the DataReaders are called in any participar order. Even if they were that would not be enough... Let me explain:

Each DDS DataReader has a cache. The model is that the application can access the Data in the DataReader cache (via the 'read' and 'take' operations) at any point it wants and for any reason. When accessing the DataReader cache the samples can be accessed in several orders. For example you can call 'read_instance' or 'read_next_instance', read_next_sample, you can pass various flags to these operations to control if you want to only see new samples never read before, or even pass a QueryCondition to access samples with a particular content, etc. You are free to call this on any DataReader on any order, at any time.  

 

The DateReader listener just provides notification that a particular DataReader has data to be read. Similarly a Condition can be activated and a WaitSet be signalled to wake up an application thread that is waiting for this to happen. Either way you just get a hint that something happened and is up to the application to read as it sees fit.

 

Given that you are trying to get samples in the right order across DataReaders you will have to do something extra to inform DDS of your intent so that DDS can do the right thing delivering you the samples. For example assume that you have the DataReaders and have received the following samples:   S1, S2, S3, S4, and S5;  samples S1, S2, and S5 have been received by DataReader1 and samples S3, and S4 by DataReader2.  To read this in the correct order two things need to happen:

 

a) You will need to know that you have to read first from DataReader1, then from DataReader2, and then from DataReader1

b) The middleware will have to know that you are trying to do this so that when you read from DataReader1 you get just S1 and S2 (not S5), then when you read rom DataReader2 you get S3, and S4, and finally when you read from DataReader1 again you get S5.

 

So here is what you need to do to make it happen (aside from setting the QoS policy as explained):

 

1) You need to call the operation begin_access() on the Subscriber. This logically initializes an internal iterator

2) Then you call get_datareaders() to get a sequence of DataReaders in the correct order to call. In the example, above you will get {DataReader1, DataReader2, DataReader1}. Notice that the same DataReader can appear multiple times

if, like in the example, its samples are not all consecutive.

3) Then you call read/take or any other sample accessing function on each DataReader in sequence. These operations will only allow you to see the samples that will be in teh right order at that point in the iteration. For example the take on the DataReader1 will return {S1, S2} only and you cannot see anything else from DataReader1 until you access S3 and S4 from DataReader2

4) When you are done accessing samples in order you call end_access() on the subscriber. 

 

You can do this at any time. Whenever you call begin_access() the DataReaders are put on this mode and you are 'forced' to see samples in order. Outside the being_access() end_access() you see samples without order across writers.

 

Furthermore you can take advantage of the listener operation 'on_data_or_readers' to know there is Data in some DataReader for that Subscriber and if you so choose call the begin_access/end_access inside that callback.

 

Regarding the other issue of knowing when a a sample (or a collection of samples) is delivered to the other side:

 

My issue with using above functions (or a waitset for that matter) is that you really cannot know exactly as to how long it will take to deliver a sample to the other computer. Since messages are not sent in the order they are written (as it appears at the moment when using asynchronous publishing mode), I feel like there is no guarantee that I will be able to get an acknowledgment from the other side before my waiting period expires.

Maybe for smaller messages it can work, but if I send a large message that spans multiple packets, it may timeout before an acknowledgment is sent. Also, it is not 100% clear as to what to do next after a timeout. Does a timeout mean that I lost the other side or does it mean that I've spent inadequate amount of time waiting for acknowledgment? Perhaps all of this can be determined some other way (using various statuses that RTI provides), however, at the moment, it deviates from a simple send-it-and-forget-it approach that I am after. Please correct me if I am wrong in thinking this way.

 

 

I agree the WaitSet or the wait_for_acknowledgments are not wee suited to that use-case.

I do not have the complete picture of your use-case: Are you simply monitoring progress? Are there certain points in time when you must know everything you sent was acknowledged? In oterh words what level of delivery guarantee are you looking for behind your 'send and forget'. In any case here are some thoughts/suggestions:

 

- You can hookup some logic on the DataWriterListener operation 'on_reliable_reader_activity_changed' there you can look at the DDS_ReliableWriterCacheChangedStatus and specifically at the field empty_reliable_writer_cache. This will tell you wether all the samples in your cache have been acknowledged by all the realiable readers. You can monitor that without blocking the DataWriter

- You could set up a separate DataWriter/Reader channel flowing back where your DataReader application publishes its processing state and the DataWriter application

- We are also working on another feature called 'ApplicationAcknowledgment' that would let a DataWriter know that a specific DataReader processed its sample, and even get some result back from that processing.

 

If you care to elaborate your use-case, perhaps I can give you a better suggestion once I understand it better.

 

Regards,

 

-Gerardo

 

Offline
Last seen: 13 years 7 months ago
Joined: 04/22/2011
Posts: 3

Hi Gerardo.

 

Thank you for your comments.

 

"...

So here is what you need to do to make it happen (aside from setting the QoS policy as explained):

 

1) You need to call the operation begin_access() on the Subscriber. This logically initializes an internal iterator

2) Then you call get_datareaders() to get a sequence of DataReaders in the correct order to call. In the example, above you will get {DataReader1, DataReader2, DataReader1}. Notice that the same DataReader can appear multiple times

if, like in the example, its samples are not all consecutive.

3) Then you call read/take or any other sample accessing function on each DataReader in sequence. These operations will only allow you to see the samples that will be in the right order at that point in the iteration. For example the take on the DataReader1 will return {S1, S2} only and you cannot see anything else from DataReader1 until you access S3 and S4 from DataReader2

4) When you are done accessing samples in order you call end_access() on the subscriber. 

 

You can do this at any time. Whenever you call begin_access() the DataReaders are put on this mode and you are 'forced' to see samples in order. Outside the being_access() end_access() you see samples without order across writers.

 

Furthermore you can take advantage of the listener operation 'on_data_or_readers' to know there is Data in some DataReader for that Subscriber and if you so choose call the begin_access/end_access inside that callback."

 

Above required approach seems like it will work, however I have one concern that I would like you to address. Is there a possibility to miss some samples when using above approach? Let's say that a connection was temporarily lost and due to publishing frequency a lot of samples across multiple DataWriters have accumulated. Assuming reliability is turned on, if connection is re-established just before the liveliness change event is generated (so message caches/queues for all the data writers remain intact), I would anticipate a lot of messages to be sent out by the publisher to try and recover the samples lost during the network outage. If I use above approach in a listener as you suggest, how many times do I need to repeat above steps to guarantee that I'll get all of my old and new samples? Is it once and assume that more on_data_on_readers events will be generated? Or is it until data readers report that there are no more data to read?

 

"

- You can hookup some logic on the DataWriterListener operation 'on_reliable_reader_activity_changed' there you can look at the DDS_ReliableWriterCacheChangedStatus and specifically at the field empty_reliable_writer_cache. This will tell you whether all the samples in your cache have been acknowledged by all the realiable readers. You can monitor that without blocking the DataWriter

- You could set up a separate DataWriter/Reader channel flowing back where your DataReader application publishes its processing state and the DataWriter application

- We are also working on another feature called 'ApplicationAcknowledgment' that would let a DataWriter know that a specific DataReader processed its sample, and even get some result back from that processing.

 

If you care to elaborate your use-case, perhaps I can give you a better suggestion once I understand it better."

 

My use-case is fairly simple. The caller will make a single function call to send one sample of a particular data type. The send call has to return as quickly as possible (send-it-and-forget-it approach), so no monitoring is not allowed. This is why I was hoping to use ASYNCHRONOUS publishing mode to guarantee that sending is done in a required fashion.  I like the fact that there are lot of ways to see what is going on on the network, however, none of this is needed when messages are being sent out.

So as you can see, using WaitSets or wait functions is not really suitable here.

 

Actually, when RTI plans to release the update that includes GROUP presentation order_access? I would have liked to try this to see how well it works.

 

Thanks.