Ordered Presentation Group

Concept

Usually DataReaders will receive data in the order that it was sent by a DataWriter. In addition, data is presented to the DataReader as soon as the application receives the next value expected.

Sometimes, you may want a set of data for the same DataWriter to be presented to the receiving DataReader only after ALL the elements of the set have been received, but not before. You may also want the data to be presented in a different order than it was received. Specifically, for keyed data, you may want Connext to present the data in keyed or instance order.

If we use Ordered Presentation, the access_scope controls the scope of the order in which samples are presented to the subscribing application. The access_scope may have four different values:

  • If access_scope is INSTANCE, the relative order of samples sent by a DataWriter is only preserved on an per-instance basis. If two samples refer to the same instance (identified by Topic and a particular value for the key) then the order in which they are stored in the DataReader’s queue is consistent with the order in which the changes occurred. However,if the two samples belong to different instances, the order in which they are presented may or may not match the order in which the changes occurred.
  • If access_scope is TOPIC, the relative order of samples sent by a DataWriter is preserved for all samples of all instances. The coherent  grouping and/or order in which samples appear in the DataReader’s queue is consistent with the grouping/order in which the changes occurred—even if the samples affect different instances.
  • If access_scope is GROUP, the scope spans all instances belonging to DataWriter entities within the same Publisher—even if they are instances of different topics. Changes made to instances via DataWriter entities attached to the same Publisher are made available to Subscribers on the same order they occurred.
  • If access_scope is HIGHEST_OFFERED, the Subscriber will use the access scope specified by each remote Publisher.

Example Description

In this example we illustrate how to use Group access_scope for the order in which samples are presented to the subscribing application.

The Publisher sets its presentation QoS properties to enable GROUP-level ordered access. This enforces ordering on instances from any DataWriters and Topics for a given Publisher. Also note that ordered-access configuration needs to be configured in the subscription side so samples are read in an ordered manner.

The Subscriber application illustrates the effects of the GROUP access_scope presentation QoS. Changes made to instances via DataWriter entities attached to the same Publisher are made available to Subscribers on the same order they occurred. For more information about these QoS you can see section 6.4.6 "PRESENTATION QosPolicy" of the User's Manual.

The example creates 3 Topics, 3 DataWriters, 3 DataReaders and 3 instances. Each DataWriter publishes two samples of the corresponding topic. This process is repeated once a second. The order in which the DataWriters publish the samples is: { DW1-Sample1, DW1-Sample2, DW2-Sample1, DW2-Sample2, DW3-Sample1, DW3-Sample2 }.

The key points in the example are:

  • In the Subscriber, we implement a subscriber listener for the callback ON_DATA_ON_READERS. This callback will activate every time any DataReader has data available.
    We want to read the samples in the order in which they were modified. For doing that, in the implementation of the callback we invoke begin_access() and end_access(). Inside this block, we call get_datareaders() to obtain an ordered sequence of the DataReaders. This sequence specifies the reading order for the samples. See more information in sections 7.2.5 "Beginning and Ending Group-Ordered Access" and 7.2.7 "Getting DataReaders with Specific Samples" of the User's Manual.
    We need to iterate across the sequence of DataReaders and read one sample each time. For doing this, use take_next_sample() instead of take().
  • In USER_QOS_PROFILES.xml, we set Group Access Scope and Ordered Access QoS:
    <publisher_qos> 
        <presentation> 
            <ordered_access>true</ordered_access> 
            <access_scope>GROUP_PRESENTATION_QOS</access_scope> 
        </presentation> 
    </publisher_qos>
    <subscriber_qos>
        <presentation> 
            <ordered_access>true</ordered_access> 
            <access_scope>GROUP_PRESENTATION_QOS</access_scope> 
        </presentation> 
    </subscriber_qos>

Download Example

Browse Example

Languages:

Comments

Thanks, examples made it very clear.

In these examples, the code of getting group ordered-access data from DataReaders is not the recommended method, and while it does not provide data out-of-order, the way that it's done is both inefficient and can lead to unexpected delays in processing received data as well as unlimited increase of unprocessed data stored in a DataReader's ReceiveQueue.

The examples explicitly take a single data sample from each DataReader in the sequence/vector of DataReaders that have received data obtained from the parent DDSSubscriber.

So, after getting an ordered list of DataReaders that have received data (the order of the DataReaders is the order in which the DataWriters of a DDSPublisher has sent the data), the example code iterates through the list and takes 1 and only 1 sample from each DataReader.

For example, in Modern C++ it does

            dds::sub::LoanedSamples<ordered_group> sample =
                reader.select().max_samples(1).take();

in Traditional C++ it does

 /* IMPORTANT. Use take_next_sample(). We need to take only 
   *
one sample each time, as we want to follow the sequence of
  * DataReaders. This way the samples will be returned in the
  * order in which they were modified */
    retcode = ordered_group_reader->take_next_sample(data, info);

The comment and the code is not correct...although the execution of the example code will not process the data received "out-of-order"...it's not doing it efficiently.  And it's possible that if the DataWriters stop sending data, that there will be unprocessed data left in the DataReaders since the Subscriber's on_data_on_readers() callback will not be invoked until new data is received.

What is happening is that when begin_access() is called, a snapshot of the DataReader's queues is taken, and DataReaders are ordered in the order that their data appears in the queue (a virtual queue is created for the order-access Subscriber).  So when you call

/* Obtain DataReaders. We obtain a sequence of DataReaders
    that specifies the order in which each sample should be read */
    DDSDataReaderSeq MyDataReaders;
    retcode = subscriber->get_datareaders( MyDataReaders, DDS_ANY_SAMPLE_STATE, DDS_ANY_VIEW_STATE, DDS_ANY_INSTANCE_STATE);

or in Modern C++

       // Get the sequence of DataReaders that specifies the order
        // in wich each sample should be read.
        std::vector<dds::sub::AnyDataReader> readers;
        int num_readers =
                find(subscriber,
                     dds::sub::status::DataState::new_data(),
                     std::back_inserter(readers));

A sequence or vector of ordered DataReaders is returned.

However, a DataReader only appears a single time for a contigious set of Data for that DataReader in the queue.

So, say the virtual ordered queue contains the following data samples

1, 1, 2, 2, 3, 3

where 1 is a sample for DataReader 1, 2 is a sample for DataReader 2, etc.

Then the returned sequence/vector of DataReaders would be

DR1, DR2, DR3

(only 3! DataReaders)

If code iterates through DR1 then DR2 and then DR3, and tries to take a single sample from each DataReader, then what happens is the DR 1 will return a sample, but DR2 and DR3 will not return any samples since there is still a sample for DR1 ahead of samples for DR2 and DR3 that the user must take first.

So the next time a set of data comes in, say the queue now holds

1, 2, 2, 3, 3, 1, 1, 2, 2, 3, 3

using begin_access() and getting a sequence of DataReaders would hold

DR1, DR2, DR3, DR1, DR2, DR3

Nofw when the code iterates through the list of DR's above and take 1 sample from each, it would get

1, 2, 2, 3

The first DR3 and second DR1 would return no samples, but what will remain on the receive queue is

3, 1, 1, 2, 2, 3, 3

You can see that this can easily lead to an ever increasing receive queue holding old samples waiting to be processed...

The CORRECT methodology...as well as documented here: https://community.rti.com/static/documentation/connext-dds/6.1.1/doc/manuals/connext_dds_professional/users_manual/index.htm#users_manual/Getting_DataReaders_with_Specific_DDS_Sa.htm#8.2.7_Getting_DataReaders_with_Specific_DDS_Samples%3FTocPath%3DPart%25202%253A%2520Core%2520Concepts%7C8.%2520Receiving%2520Data%7C8.2%2520Subscribers%7C_____7

So, the processing code should actually be (in Modern C++) :

        // Create a coherent group access.
        dds::sub::CoherentAccess coherent_access(subscriber);

        // Get the sequence of DataReaders that specifies the order
        // in wich each sample should be read.
        std::vector<dds::sub::AnyDataReader> readers;
        int num_readers =
                find(subscriber,
                     dds::sub::status::DataState::new_data(),
                     std::back_inserter(readers));

        for (int i = 0; i < num_readers; i++) {
            dds::sub::DataReader<ordered_group> reader =
                    readers[i].get<ordered_group>();

            dds::sub::LoanedSamples<ordered_group> samples =
                    reader.select().take();

            for (const auto& sample : samples) {
                if (sample.info().valid()) {
                    std::cout << sample.data() << std::endl;
                }
            }

or in Traditional C++

    /* IMPORTANT for GROUP access scope: Invoking begin_access() */
    subscriber->begin_access();

    /* Obtain DataReaders. We obtain a sequence of DataReaders that specifies
                the order in which each sample should be read */
    retcode = subscriber->get_datareaders(
            MyDataReaders,
            DDS_ANY_SAMPLE_STATE,
            DDS_ANY_VIEW_STATE,
            DDS_ANY_INSTANCE_STATE);
    if (retcode != DDS_RETCODE_OK) {
        std::cerr << "ERROR error " << retcode << std::endl;
        /* IMPORTANT. Remember to invoke end_access() before a return call.
                        Also reset DataReaders sequence */
        MyDataReaders.ensure_length(0, 0);
        subscriber->end_access();
        return;
    }

    /* Read the samples received, following the DataReaders sequence */
    for (i = 0; i < MyDataReaders.length(); i++) {
        ordered_groupDataReader *ordered_group_reader = NULL;
        ordered_groupSeq data_seq;
        DDS_SampleInfoSeq info_seq;

        ordered_group_reader =
                ordered_groupDataReader::narrow(MyDataReaders.get_at(i));
        if (ordered_group_reader == NULL) {
            std::cerr << "DataReader narrow error\n";
            /* IMPORTANT. Remember to invoke end_access() before a return call.
                                Also reset DataReaders sequence */
            MyDataReaders.ensure_length(0, 0);
            subscriber->end_access();
            return;
        }

        retcode = ordered_group_reader->take(
            data_seq, info_seq, DDS_LENGTH_UNLIMITED,
            DDS_ANY_SAMPLE_STATE, DDS_ANY_VIEW_STATE, DDS_ANY_INSTANCE_STATE);

        /* In case there is no data in current DataReader,
            check next in the sequence */
        if (retcode == DDS_RETCODE_NO_DATA) {
            continue;
        } else if (retcode != DDS_RETCODE_OK) {
            std::cerr << "take error %d\n", retcode;
            continue;
        }

        for (i = 0; i < data_seq.length(); ++i) {
            if (info_seq[i].valid_data) {
                ordered_groupTypeSupport::print_data(&data_seq[i]);
            }
        }

    }

So in the example above, if the ordered virtual queue has

1,1,2,2,3,3

the returned DataReader sequence would be

DR1, DR2, DR3

and then the processing logic would get values 1,1 for DR1, 2,2 for DR2 and 3,3 for DR3 and the queue would be emptied on a single invocation of the callback...which is what you want.