Data Reader QOS Question

4 posts / 0 new
Last post
Offline
Joined: 01/09/2013
Posts: 2
Data Reader QOS Question

Hi all, I am just starting out with DDS and I am trying to accomplish the following:

Simplified Example:

struct SampleData { long key; //@key long data; }

DataWriter -> Sends different data for different keys. For example:

DataReader 1 JOINS

DataWriter -> Writes: key=0; data=1;

DataWriter -> Writes: key=0; data=2;

DataWriter -> Writes: key=1; data=3;

DataWriter -> Writes: key=1; data=4;

Unregister Key 1 Data as it is no longer valid;

DataWriter -> Writes: key=2; data=5;

DataWriter -> Writes: key=2; data=6;

DataReader 2 JOINS Late

DataReader 3 JOINS Late

etc...

 

What I would like is 3 DataReaders that perform similar actions but receive different views of the data:

DataReader 1) Stores every message sent to a file for potential data replay/data analysis. Seeing: {key=0; data=1;} {key=0; data=2;}{key=1; data=3}{key=1; data=4;} {key=2; data=5} {key=2; data=6;} 

DataReader 2) Joins late and only receives the latest data for each key value. Seeing: {key=0; data=2;} {key=2; data=6}

DataReader 3) Joins late and receives every piece of data for each key value. Seeing: {key=0; data=1;} {key=0; data=2;} {key=2; data=5} {key=2; data=6;} *inorder

 

What it boils down to is some readers need all of the data and others only need the latest data. This is essentially what I am trying to accomplish with a simple example. If anyone has any pointers/general strategies to accomplish this I would greatly appreciate them!

 

Thanks!

Gerardo Pardo's picture
Offline
Joined: 06/02/2010
Posts: 565

Hi,

I will start with some background and then get to some suggestions. I think the specific use-case you are asking will require not only the use of QoS but also the use things like the RTI DDS Persistence Service.

There are several ways different DataReaders on the same DDS Topic can get different subsets of the data. The most direct ones are the use of ContentFilteredTopic (so the decision of what data goes to a DataReader is based on the content) and the use of a TimeBasedFilter (where the decision is based on the frequency at which the instance (Key) is updated).  

Unfortunately neither these mechanisms allows the DataReader to control the data received based on the amount of history per key.

DDS does have a HISTORY QoS which can be configured on the DataWriter and the DataReader. If you use the KEEP_LAST History kind you can also specify a history depth which controls the amount of data that is kept per key. However this controls how much data is in the respective caches and not directly how much it is sent or received. I will explain further.

If you set the DataWriter HISTORY cache to kind=KEEP_LAST with depth=1. Then the DataWriter will only keep one (the last) sample per instance. If you also configure DURABILITY QoS to non-VOLATILE then it will keep that last  value per key for late joiners. This will allow a late-joiner DataReader configured also with DURABILITY QoS to non-VOLATILE  to get the last sample per key (and no more). However if another late-joiner DataReader wanted more it would not be able to get it because the DataWriter does not have it. Also the KEEP_LAST could cause some intermediate samples to lost if they are written to fast because a DataReader can process them so the DataReader that wants every message would not get that either.

If you set the DataWriter HISTORY cache to kind=KEEP_ALL or to KEEP_LAST with sufficiently large history depth and configure DURABILITY as before, then the late-joining DataReader that wants only the most recent values per key would get more than that. This happens even if the DataReader cache is configured to kind=KEEP_ALL and depth=1. The late-joining DataReader cache would only hold one sample per key, but as the DataWriter will send all the ones it has in the DataWriter cache and as the DataReader processes them incrementally as they arrive it will likely see all of them.  If the late-joining DataReader does not "read or take" them as they arrive then as they are placed in the DataReader cache they will replace other samples with the same key so if the DataReader looks at the end it will see just one sample per key. However on the wire all these samples were sent and received so probably this is not what you wanted.

So basically the only way to offer different am-mounts of historical samples to different late-joining DataReaders is to have different DataWriters write the data, each DataWriter keeping the proper amount of history.

I think there is a reasonable way to do this using the RTI DDS Persistence Service.

The Persistence Service is a standard DDS service that automatically stores samples from DataWriters that have DURABILITY TRANSIENT or PERSISTENT and makes it available to late-joining DataReaders which specify DURABILITY TRANSIENT or PERSISTENT. 

The use of the RTI Persistence Service is transparent to the publishing and subscribing applications. Specifically the DDS DataReader will get the data from the Persistence Service and/or the original DataWriter and merge it all automatically so the application reading from the DataReader will see a single consolidated stream. Source Timestamp, order, etc. is all preserved.

You can configure the QoS settings of the DataReaders and DataWriters in the Persistence Service. So if you set up two PeristenceServices each with a DataWriter that uses a different History depth you would have different amounts of history available on each DataWriter. The trick is then to have the DataReader get the initial data from the proper Persistence Service DataWriter...

To have different DataReaders match different Persistence Service Data writers you can use the PARTITION QoS. These are a set of tags (or tag expressions) that you can associate with the DataWriters and DataReader. A DataWriter and a DataReader wil only communicate if in addition to specifying the same Topic and compatible QoS they also have a matching partition.

So if you configured the DataWriter in one of the Persistence Service to have PARTITION of "all_values" and the DataWriter in the other Persistence Service to have PARTITION of "last_values" then a late-joiner DataReader that specified DURABILITY  TRANSIENT or PERSISTENT and a PARTITION "all_values" would match the Persistence Service with the full history and get all the data. Similarly a late-joiner DataReader that also specified DURABILITY  TRANSIENT or PERSISTENT and a PARTITION "last_values" would match the Persistence Service with just one sample per key.

There are a couple more details regarding how to configure the DataWriter so it has enough history to make sure your "Archiver DataReader" that wants all the data gets it but at the same time it does not send that history to the late joiner DataReader that only wants a subset.  I think the best way to do this is to leverage the "Delegated Reliability" feature. This is described in section 30.2 and 30.3 of the RTI DDS Core Library and Utilities User's Manual

Hope this helps you get started...

Gerardo

Offline
Joined: 01/09/2013
Posts: 2

Wow! That's a great answer! Thanks. I tried something very similar to what you suggested and it seemed to work very closely to what I was expecting. My concern is that this solution is over complicating somewhat of a simple problem. This is probably because of my lack of experience with DDS and how data should be flowing through the system. Maybe I can expand my simple example from above to give you an idea of what I am trying to solve with this approach.

struct SampleData { long key; //@key long data; }

Lets say that my data represents wheels rolling down a hill, the "key" field represents a unique wheel's id and the "data" field represents the distance the wheel has rolled so far. If the wheel falls over the wheel id ("key" field) is unregistered and should not be accounted for any further. Each wheel will send it's current distance rolled ("data" field) on a random interval determined by the DataWriter. If the wheel has stopped rolling and has not fallen over it will keep repeating the same "data" field until it falls (some wheels may report the same "data" field value forever). If the wheel were to roll backwards the distance rolled would decrease. The normal case would be a wheel rolls for some distance (forward and backward), then falls over and is removed from the system.

 

Here is where my DataReaders come in:

DataReader 1) Stores every message sent to a file for potential data replay/data analysis after the whole scenario has been played out.

DataReader 2) Is in a GUI app that displays the latest distance for every wheel still standing

DataReader 3) Is in a GUI app that displays the complete history of all distances for every wheel still standing

 

My original thinking was that DataReader 2 would only receive the latest information for each of the wheels. If DataReader 2 joined late and was flooded with multiple values per wheel on startup it would potentially update the GUI multiple times before the latest "data" field was read. Unless there was a way for the DataReader 2 to determine if the sample was older and not the latest. If it knew that information it could skip the old data and only update the GUI once with the latest information.

 

DataReader 3 definitely needs all the data so it can display an accurate history for each wheel. My thinking is that DataReader 3 is easy.

 

Looking at the above example maybe I was approaching my problem in the wrong way?

Gerardo Pardo's picture
Offline
Joined: 06/02/2010
Posts: 565

Note: I am trying to provide here a "practical answer" meaning something that will work in RTI Connext DDS and probably several other DDS implementations. Maybe there are other approaches that would theoretically be possible with some interpretation of the DDS specification, but this would not be too practical without a set of produts that use that interpretation...

Given the scenario I can suggest a perhaps simple solution. But there are some details like how "long" the complete history will be and whether each DataWriter  has the resources it would need to store the "whole" history in its own DataWriter cache. If it does not, then you will need to use something like the Persistence Service I mentioned to keep around the data that the DataWriter cannot keep in its own memory cache.

I agree it should be simpler. The problem is that as I mentioned earlier a (reliable) DataWriter with non-VOLATILE DURABILITY  will send all the data in its cache to a (reliable) DataReader with non-VOLATILE DURABILITY, regardless of the setting of the HISTORY depth in the DataReader. This is what the DDS implies states given that the HISTORY QoS is local and not propagated via discovery. So the DataWriter does not even know what the DataReader HISTORY configuration is.  We actually have run into this issue several times before and we are actually designing some extensions that will solve it. But they are not in the product yet, hence the need for a more involved approach.

One thing that can make everything simpler would be is you can make sure the DataWriter that represents wheels that have not fallen down makes sure that they always write data at least at some minimum period. For example a wheel that is moving could be sending the data faster. Perhaps whenever its position changes more than a certain amount. However a wheel that moves very slowly or stops still sends its data at a slower rate. For example each 5 seconds.

With this assumption, which you can enforce using the DEADLINE QoS, you can then take advantage of the RELIABILITY, HISTORY, and DURABILITY QoS as follows:

a) The DataWrtiters would publish information with RELIABILITY kind=RELIABLE, HISTORY kind=KEEP_ALL, DURABILITY kind=TRANSIENT_LOCAL and RESOURCE_LIMITS large enough to hold the data need for DataReader 3.

b) DataReader 1 would always be running so it should not need historical data. You could use RELIABILITY kind=RELIABLE, HISTORY kind=KEEP_ALL. If you want then to get historical data as late-joiners use DURABILITY kind=TRANSIENT_LOCAL , if you do not want historical data as late-joiners use DURABILITY kind=VOLATILE

Even easier: use the RTI Recording Service which does exactly this and sopports record, replay, remote administration, has a GUI, etc.

c) DataReader 2 would subscribe with RELIABILITY kind=BEST_EFFORT (or RELIABLE), HISTORY kind=KEEP_LAST, DURABILITY kind=VOLATILE

The point is that since the DataReader is VOLATILE the DataWriter will not send historical data to it. The "latest state" is then provided by the fact that the DataWriters are publishing periodically so while the DataReader does not get history its initial state will be initialized once the 'slowest' update period elapses.

d) DataReader 3 needs all the historical data so you can suscribe using RELIABILITY kind=RELIABLE, HISTORY kind=KEEP_ALL, DURABILITY kind=TRANSIENT

As mentioned this assumes the DataWriters can really keep in their cache the historical data for late-joining DataReader 3. If they cannot, then you need an external application (the RTI Persistence Service) to do that.

In addition if you do not want to have the DataWriters guarantee a minimum update period, so for example when the wheel does not move the data is not sent. Then you have the problem that I described in my fist answer. So you need a different DataWriter to keep that "last value cache" for DataReader 1 and you need to configure DataReader 1 to get its initial history from that specific Persistence Service. This can be done using partitions and also taking advantage of the Collaborative DataWriter features I pointed out.

Gerardo