How to Filter Samples using RTI Connext DDS Micro

Differences between Filtering Data using RTI Connext DDS Professional and RTI Connext DDS Micro

There are several differences when using RTI Connext DDS Professional and RTI Connext DDS Micro; one of them is how to filter the samples that the subscriber will receive.

RTI Connext DDS Professional allows defining ContentFilteredTopics, which are topics that have a set of filtering properties. This allows subscribers to subscribe to topics and, at the same time, specify that they are only interested in a subset of the topic data. ContentFilteredTopics are defined when the DataReader is created.

By default, a DataReader sends the ContentFilteredTopic information to the DataWriters as part of the endpoint discovery phase. This way, each DataWriter will do the filtering on its side and will not send samples that are not in the defined subset. This approach avoids sending samples to DataReaders that will drop them anyway, thus reducing traffic on the network and improving performance. However, this requires the DataWriters to store the ContentFilteredTopic information and apply that filter to every sample, deciding whether a sample will be sent or not.

Currently, this mechanism for filtering data does not exist in RTI Connext DDS Micro. RTI Connext DDS Micro is designed to enable DDS capabilities for a wide range of embedded systems and boards with reduced resources, where applications have special requirements regarding memory usage and hardware capabilities. The additional processing and storage required by ContentFilteredTopics add extra processing that may be difficult in constrained devices. This is why data filtering needs to be addressed differently.

You can filter samples in RTI Connext DDS Micro at the application level on the DataReader side by using listeners. Samples can be dropped at different stages of its processing, depending on your application. The two callbacks that can be used to do this are on_before_sample_deserialize and on_before_sample_commit. These callbacks allow the subscriber to drop samples right after they are received, and before committing the samples to the application logic, respectively.

How to Filter Samples with RTI Connext DDS Micro

When a sample is received by a DataReader, it is deserialized before being stored in the DataReader’s queue. However, if this sample is to be dropped due to a filter, this deserialization consumes time and CPU time unnecessarily.

Therefore, it is possible to determine if the sample should be dropped or not by using the callback on_before_sample_deserialize. This callback allows you to drop samples right after receiving them and even before processing their information or deserializing them. The amount of processing required to drop samples using this callback is smaller than filtering after deserializing the sample, but the process is more complex since the samples must be filtered using serialized data.

For example, let’s consider samples in which one of the fields indicates the status of a sensor, which can be NORMAL, WARNING or CRITICAL, and the rest of fields provide a large amount of information about the sensor measurements, such as temperature, power consumption, video images, or pressure. If a DataReader is only interested in samples with CRITICAL status, the samples with WARNING or NORMAL status can be dropped without needing to deserialize the other fields. In this case, we can use on_before_sample_deserialize and verify only the field status, avoiding any unnecessary processing.

In addition to the callback on_before_sample_deserialize, it is possible to apply a filter right after deserialization and before storing the sample in the DataReader’s queue. To do so, you can use the callbackon_before_sample_commit. This callback allows you to determine if a sample should be dropped or not using the deserialized information of the sample.

Using the sensor measurement example, we could drop the samples in which temperature is lower than a given value, for instance. This kind of filtering could be done in the on_before_sample_commit callback, avoiding sending the sample to the application logic and dropping it according to the user criteria.

How to Use on_before_sample_deserialize and on_before_sample_commit

Using C:

To use the filter callback in C, you only need to have a non-null value in the listener callback. In order to do this, the callback function needs to be implemented and assigned to the DataReader's listener. Note that it is not mandatory to implement both filters.

The following code snippet shows the function prototype that must be implemented for the  on_before_sample_deserialize callback:

DDS_Boolean user_defined_on_before_sample_deserialize(
        void *listener_data,
        DDS_DataReader *reader,
        struct NDDS_Type_Plugin *plugin,
        struct CDR_Stream_t *stream,
        DDS_Boolean *dropped);

The following code snippet shows the function prototype that must be implemented for the  on_before_sample_commit callback:

DDS_Boolean user_defined_on_before_sample_commit(
        void *listener_data,
        DDS_DataReader *reader,
        const void *const sample,
        const struct DDS_SampleInfo *const sample_info,
        DDS_Boolean *dropped);

The user code must set the variable dropped to DDS_BOOLEAN_TRUE if the sample should be dropped, or to DDS_BOOLEAN_FALSE if the sample should not be dropped.

Once these functions are implemented, you must assign them to the DataReader listener. Note that it is not mandatory to implement both functions. To set the filter, you must assign this function to the DataReader listener variable on_before_sample_commit and/or on_before_sample_deserialize. Do this before creating the DataReader.

The following code snippet shows an example of how to perform this operation:

// Assign to the DataReaderListener the on_before_sample_deserialize function
dataReaderListener.on_before_sample_deserialize = user_defined_on_before_sample_deserialize;
// Assign to the DataReaderListener the on_before_sample_commit function
dataReaderListener.on_before_sample_commit = user_defined_on_before_sample_commit;

Using C++:

The process to define these callbacks in C++ consists of overwriting the functions on_before_sample_commit and on_before_sample_deserialize for the DataReader listener. The following code snippet shows the function’s prototypes that must be overwritten to implement the callbacks:

// on_before_sample_deserialize callback function
virtual DDS_Boolean on_before_sample_deserialize(
        DDSDataReader *reader,
        NDDS_Type_Plugin *plugin,
        CDR_Stream_t *stream_t,
        DDS_Boolean *sample_dropped);
// on_before_sample_commit callback function
virtual DDS_Boolean on_before_sample_commit(
        DDSDataReader *reader,
        const void *const sample, 
        const struct DDS_SampleInfo *const sample_info, 
        DDS_Boolean *sample_dropped);  

Example

The attached example shows how to apply both callbacks using RTI Shapes Demo. This example will filter samples published by Shapes Demo according to certain user-specified parameters. The filter consists of accepting only a subset of squares published in the network.

The filter applied with on_before_sample_deserialize will filter by color, dropping the squares with the colors that you want to reject. The filter applied with  on_before_sample_commit will drop samples according to their x, y position. You can specify the minimum and maximum values for x and y.

How to Execute the Example

You can define different parameters to indicate the samples that you want to reject. To do so, add the following options when executing the example:

To filter by color, using the  on_before_sample_deserialize callback, use this option:

  • -rejectColor <value>: Indicates the color that must be rejected for the square. Samples with that color will be dropped. For example, -rejectColor RED

To filter by position, using the  on_before_sample_commit callback, use these options:

  • -xMin <value>: Indicates the minimum value for the x parameter of the square. Samples with an x field below this value will be rejected.
  • -xMax <value>: Indicates the maximum value for the x parameter of the square. Samples with an x field above this value will be rejected.
  • -yMin <value>: Indicates the minimum value for the y parameter of the square. Samples with a y field below this value will be rejected.
  • -yMax <value>: Indicates the maximum value for the y parameter of the square. Samples with a y field above this value will be rejected.

The following example shows how to use these options to filter squares, dropping all the red squares and accepting only not-red squares that are between positions (x: 50, y: 50) to (x: 150, y: 150) in Domain 9.

$ ShapeType_subscriber -domain 9 -rejectColor RED -xMin 50 -xMax 150 -yMin 50 -yMax 150
Programming Language: