Why does my DDS DataReader miss the first few samples?

Note: Applies to RTI Connext 4.x and above.

Discovery is not an instantaneous event. It takes some time for the discovery process between applications to complete. The DDS DataWriter and DDS DataReader must discover each other before they can start communicating. Therefore, if you send data immediately after creating the RTI Connext DDS entities, DataReaders will not receive the first few samples because they are still in-process of discovering the DataWriters and vice versa.  This is true even when the DataWriters and DataReaders are reliable, because the Reliability QoS on its own does not guarantee delivery to DataReaders that have not been discovered, yet. This is expected behavior.

Note: For more information on the Discovery process, please refer to the chapter titled “Discovery” in the Core Libraries and Utilities User Manual.

If you need to ensure that all samples are received, including samples that were sent before the discovery process is complete, there are two different methods:

  1. Use the Durability QoS policy:

    The Durability QoS policy specifies whether RTI Connext DDS should store and send data to late-joining or not-yet-discovered DataReaders.  Depending on the value of the durability kind, the middleware may not store data for late-joiners; it may store data in the DataWriter's local cache for late-joiners; or it may store data in an external cache for late-joiners.  The durability kind can have the following values:

    - DDS_VOLATILE_DURABILITY_QOS (default): RTI Connext DDS will only attempt to provide data to existing DataReaders.

    - DDS_TRANSIENT_LOCAL_DURABILITY_QOS: RTI Connext DDS will keep a set of samples in the DataWriter´s queue to deliver to late-joining DataReaders. The number of samples the DataWriter keeps depends on the settings in the DataWriter's History QoS policy and Resource Limits QoS policy. These samples are stored in the DataWriter’s queue, meaning they will not be available if the DataWriter is shut down. To enable this level of durability, you must also set the Reliability QoS policy kind to DDS_RELIABLE_RELIABILITY_QOS.

    - DDS_TRANSIENT_DURABILITY_QOS (requires RTI Persistence Service): RTI Connext DDS will keep samples using RTI Persistence Service, which will deliver them to late-joining DataReaders. Samples will be kept in memory and not in permanent storage so they will not survive the RTI Persistence Service session (but will survive the DataWriters). The number of samples kept depends on the History QoS policy and Resource Limit QoS policy configured for the RTI Persistence Service.

    - DDS_PERSISTENT_DURABILITY_QOS (requires RTI Persistence Service): This kind is similar to DDS_TRANSIENT_DURABILITY_QOS but in this case, samples are kept in permanent storage so they survive the session of RTI Persistence Service.

    Note: Number of samples kept depends on History QoS policy kind. When this kind is set to KEEP_ALL, the size of the DataWriter’s queue is determined by the Resource Limits QoS. When history kind is set to KEEP_LAST, the size of the queue is determined by the history depth.

  2. Having the DataWriter wait until the DataReader has been discovered:

    Using the publication matched status, the DataWriter can wait until all DataReaders are discovered before starting to publish. Using this method requires communication to be reliable because while the DataWriter can wait until the DataReader is discovered, this does not guarantee that the DataReader has discovered the DataWriter. If the DataReader receives samples from an unknown DataWriter, it will reject them. If you set the DataWriter and DataReader's Reliability QoS to DDS_RELIABLE_RELIABILITY_QOS, the DataWriter will try to repair the samples until the DataReader acknowledges them. 

    The downside to this technique is that it is brittle:  If you write code in your application to wait until the DataWriter discovers a single DataReader before sending, you have no guarantee that the DataReader you have discovered is the one you care about.  For example, you might want to use the rtiddsspy utility to debug your system.  The rtiddsspy utility creates DataReaders to match all the DataWriters it discovers - including your DataWriter that is waiting to send.  This means that your DataWriter might discover rtiddsspy before it discovers your application DataReader, causing your DataReader to miss the first sample.  This also means that if your system design changes in the future to include multiple DataReaders for this data, you will have to update your code to wait for multiple DataReaders.

    Here's an example on how to process the publication matched status: 
DDS_PublicationMatchedStatus pubstatus; 

do { 
    retcode = my_writer->get_publication_matched_status(pubstatus); 
    if (retcode != DDS_RETCODE_OK) { 
        printf("get publication matched status error %d\n", retcode); 
        return -1; 
    } 
    // sleep is needed if on the same board with default priorities 
    NDDSUtility::sleep(nap_period); 
} while (pubstatus.current_count == 0);
 

The on_publication_matched callback can also be used to be notified when a matching subscriber has been found.

In RTI Data Distribution Service 4.1 and later, you can also do this by using a StatusCondition and a WaitSet. Here is an example in C++: 

/* set up the waitset: */ 
DDSWaitSet* waitset = new DDSWaitSet(); 
DDSStatusCondition* cond = writer->get_statuscondition(); 
DDS_ReturnCode_t retcode; 

cond->set_enabled_statuses(DDS_PUBLICATION_MATCHED_STATUS); 

retcode = waitset->attach_condition(cond); 
if (retcode != DDS_RETCODE_OK) { 
    // ... error 
} 

/* use the waitset to wait for the condition to become true */ 
DDS_Duration_t timeout = { 1, 0 }; // 1 second 
DDS_ConditionSeq active_conditions; 

do { 
    retcode = waitset->wait(active_conditions, timeout); 
} while(retcode == DDS_RETCODE_TIMEOUT); 

if (retcode != DDS_RETCODE_OK) { 
    // ... error 
} 

/* active_conditions should include a single entry which matches cond */ 

/* ... */ 

/* clean up */ 
delete waitset;
 

Note regarding the above code: Keep in mind that the StatusCondition is owned by the DataWriter; if you are also using a StatusCondition for other purposes, your call to set_enabled_statuses() may include additional statuses. In that case, when wait() returns, you would need to check the triggering condition to ensure it is the one you are looking for here.