How to read "historical" persistent data

5 posts / 0 new
Last post
Offline
Last seen: 8 years 3 weeks ago
Joined: 10/21/2012
Posts: 18
How to read "historical" persistent data

I'm using the a persistency service process to store persistent data to disk. How am I able to read historical data that is beyond the local history of my publisher?

Both my Publisher and my Subscriber use RELIABLE_RELIABILTY_QOS and PERSISTENT_DURABILITY_QOS with KEEP_LAST_HISTORY_QOS and depth = 20.

Samples are generated and published to each of 10 instances in turn. Samples are given a sequence number which is independent of the instance key. Once I'm in a situation where my instances contain more than 20 samples each, how am I able to retrieve an arbitrary set of samples?

I'd like to use dynamic filters such as "seq_num BETWEEN %0 AND %1"

Or maybe "(seq_num BETWEEN %0 AND %1) AND key_num = %2" 

Where %0, %1, and %2 can be modified at run-time to retrieve different sets of data.

Where do I start looking?

Thanks

Dallas

Organization:
Fernando Garcia's picture
Offline
Last seen: 4 months 1 week ago
Joined: 05/18/2011
Posts: 200

Hi Dallas,

You can find a good code example on how to use content filters here. Also, the RTI Connext Core Libraries and Utilities User's Manual includes an explaination on how to set up content filters--including the use of parameters that can be changed at run-time--in its Chapter 12 (Section 5.4.3).

Basically, to be able to modify a set parameters on a filter expression at run-time, you have to create your content filtered topic indicating the sequence of parameters that can be modified.

    DDS_StringSeq parameters(2);
    const char* param_list[] = {"1", "4"};
    parameters.from_array(param_list, 2);

    DDSContentFilteredTopic *cft = NULL;
    if (sel_cft) {
        cft = participant->create_contentfilteredtopic(
            "ContentFilteredTopic", topic, "(x >= %0 and x <= %1)", parameters);
        if (cft == NULL) {
            printf("create_contentfilteredtopic error\n");
            subscriber_shutdown(participant);
            return -1;
        }
    }

Then, whenever you want to change the value of those parameters, you have to (1) change their value in the parameters string squence, and (2) call set_expression_parameters() to update the filter.

    parameters[0] = DDS_String_dup("5");
    parameters[1] = DDS_String_dup("9");
    retcode = cft->set_expression_parameters(parameters);

Regarding your first question, KEEP_LAST_HISTORY_QOS indicates the number of samples per instance that the middleware stores locally for data writers and data readers. If the data writer of your publisher application (or the persitence service data writer) keeps the last 20 samples of each of your 10 instances, it won't be able to send data readers more samples than the number of samples stored for each instance. On the data reader side, only the samples that passing the filter will be stored.

Therefore, if you want to increase the number of samples you receive for each instance you have to increase the KEEP_LAST_HISTORY_QOS depth on your data writers (including persistence service for its potential use), and accordingly its depth on your data readers.

Fernando.

Gerardo Pardo's picture
Offline
Last seen: 3 weeks 1 day ago
Joined: 06/02/2010
Posts: 602

Hi,

Regarding the depth of the history received by the DataReader, I think it is enough if you increase the history depth on the pesistence service DataWriter.  RTI Connext DDS will automaticaly combine what is in the persistence service DataWriter with the history in the original DataWriter (if it still around) so that the DataReader that specified a DURABILITY of TRANSIENT or PERSISTENT will get the larger history.

Gerardo

Offline
Last seen: 8 years 3 weeks ago
Joined: 10/21/2012
Posts: 18

Thanks for your responses, guys.

I think that answers about half of my question. The example in Content_Filtered_Topic.zip in Fernando's post is useful.

But in that example, the dynamic changes to the content filtered topic only affect the reception of newly published samples. 

I'd like to be able to modify this example so that the filter is on the "count" field so that I can read, for example, samples 10 to 15, then 25 to 30, then back to 5 to 10, and so on, arbitrarily back and forth through the entire history.

I've tried to implement a trivial example of this, without success. Using a DataReaderListener (as in the example) is clearly not the way to go; it will only notify me of "new" samples. I've tried using "read" on a reader of a content filtered topic, and I've also tried using "read_w_condition" on a reader of a non-filtered topic. Neither seem to work. In both cases, when the filter parameters are set to read samples "older" than the "oldest" sample currently in the DataReaders' histories, no data is read.

So, my (limited) experience so far indicates that content filters will filter data that is within the DataReader's local history. Once data is too "old" to be contained within this local history it seems to be gone forever. 

Obviously, once the data samples are gone from the DataReader's local cache, the only way to get the data again would be for the Persistence Service to send the samples again. Is this even possible?

If this is possible, how would it be done?

Alternatively, is there something I've missed? A different solution altogether maybe?

Thanks again

Dallas

Gerardo Pardo's picture
Offline
Last seen: 3 weeks 1 day ago
Joined: 06/02/2010
Posts: 602

Hello,

Your observation below is exactly right:

    "dynamic changes to the content filtered topic only affect the reception of newly published samples"

This is how it works:  

 

  • If a DataReader is subscribed to a ContentFilteredTopic and the filter parameters are changed, the change causes DDS to send an update of the new DataReader filter to the DataWriters via the discovery writer (BuiltinSubscriptionDataWriter).   Once the DataWriter sees the change, it will apply the new filter parameters to the samples it writes after it receives the change in the filter.  
  • At the same time, as soon as the filter parameters are changed, the DataReader starts using the new filter parameters. So if the DataWriter is still sending samples filtered with the old filter parameters, the DataReader will see that the filter is different and apply the new filter paramaters effectively refiltering the samples.
  • So the change of parameters will cause the stream to be first filtered with the old filter, then with both filters, and then with the new filter.

 

In this process the DataWriter never re-evaluates the history of samples that were already filtered for that DataReader. The reason is that it would be quite complicated to define the proper behavior. For instance:

 

  1. Is the DataReader suppossed to see duplicate samples (if it passed both the old and new filters) or only the ones that pass the new filter, but not the old one?
  2. Is the DataReader allowed to see "old" samples prior to the ones that it has already received. Doing so would make the time "go backwards" and leave it with the impression that a particular instance has a value when it really has somethign different.

 

These may or may not be problems in partuclar situations. But it is very hard to get the behavior to be correct and intuitive in general so for this reason the DataWriter does not go back and replay its history to the DataReader...

So if you need this old history you have 2 options I think:

 

  1. You can create a new DataReader with the new filter.  For this to work as you expect you also need the DataReader and DataWriter configured with DURABILITY QoS kind set to something other than VOLATILE (for example you configure both to TRANSIENT_LOCAL; or else you configure the DataReader to TRANSIENT and have a PersistenceService configured to maintain the durable data). And in addition you also need the DataWriter HISTORY QoS to have the RefilterQosPolicyKind set to DDS_ALL_REFILTER_QOS.
  2. You can create the DataReader without a filter so that it receives all the samples, and then use a QueryCondition to locally read the samples that match the filter of interest. As the samples are all in the DataReader cache you can change the filter parameters in the QueryCondition and read other samples in the cache.

 

The first option works because a DataWriter that is configured with DURABILITY kind TRANSIENT_LOCAL, TRANSIENT, or PERSISTENT will keep samples in its cache for future, to-be-discovered DataReaders. When the DataWriter discovers a new DataReader that also specifies non-VOLATILE DURABILITY the DataWriter will send the data in its history cache. If the DataWriter HISTORY QoS has been configured with the RefilterQosPolicyKind set to DDS_ALL_REFILTER_QOS then the DataWriter will filter the historical samples and only sent the ones that pass the filter. If the RefilterQosPolicyKind is set to DDS_NONE_REFILTER_QOS then it will just send all the samples and let the DataReader filter them, which is probably not what you are looking for.

Let us know if this helps,

Gerardo