RTI Connext Modern C++ API  Version 5.3.1
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
Creating Custom Content Filters

Creating a custom content filter. More...

Creating a custom content filter.

Introduction

By default, RTI Connext creates content filters with the DDS_SQL_FILTER, which implements a superset of the DDS-specified SQL WHERE clause. However, in many cases this filter may not be what you want. Some examples are:

This HOWTO explains how to write your own custom filter and is divided into the following sections:

The Custom Content Filter API

A custom content filter is created by calling the DomainParticipant::register_contentfilter function with a rti::topic::CustomFilter. A CustomFilter is created with either a rti::topic::ContentFilter or rti::topic::WriterContentFilter.

A ContentFilter contains a compile, an evaluate and a finalize function.

A WriterContentFilter contains a compile, an evaluate, a finalize, a writer_attach, writer_compile, writer_evaluate, wrtier_detach, and writer_finalize functions.

A ContentFilteredTopic can be created with a dds::topic::Filter that has been created with the name that is registered with the participant.

A custom ContentFilter is used by RTI Connext at the following times during the life-time of a ContentFilteredTopic (the function called is shown in parenthesis).

A custom WriterContentFilter is used by RTI Connext at the following times during the life-time of a ContentFilteredTopic (the function called is shown in parenthesis).

The compile function

The "compile" function is used to compile a filter expression and expression parameters. Please note that the term compile is intentionally loosely defined. It is up to the user to decide what this function should do and return.

The evaluate function

When using a ContentFilter, the "evaluate" function is called each time a sample is received to determine if a sample should be filtered out and discarded.

When using a WriterContentFilter, the "evaluate" function is called each time a sample is written to determine if a sample should be filtered out and discarded. It is called for each DataReader for which the DataWriter is filtering and for which the writer_compile function set the ExpressionProperty.writer_side_filter_optimization to false.

The finalize function

The "finalize" function is called when an instance of the custom content filter is no longer needed. When this function is called, it is safe to free all resources used by this particular instance of the custom content filter.

The writer_attach function

The "writer_attach" function is called the first time that a DataWriter matches with a DataReader with the same ContentFilter. It will not be called for subsequent DataReaders that are using the same filter. This function is used to create some state required to perform filtering on the writer-side. It is entirely up to you, as the implementer of the filter, to decide if the filter requires this state.

The writer_compile function

The "writer_compile" function is called when a DataWriter matches with a DataReader with the same ContentFilter. It is called every time that the DataWriter matches a DataReader that is using the same filter as well as each time the DataWriter is notified that a DataReader's filter parameters have changed. This function will receive as an input a rti::core::Cookie which uniquely identifies the DataReader for which the function was invoked.

The writer_evaluate function

The "writer_evaluate" function is called every time that a DataWriter writes a new sample. Its purpose is to evaluate the sample for all the readers for which the DataWriter is performing writer-side filtering and return the sequence of rti::core::Cookie associated with the DataReaders whose filter pass the sample.

The writer_detach function

The "writer_detach" function is called when an instance of the custom content filter is no longer needed. When this function is called, it is safe to free all resources used by this particular instance of the custom content filter.

The writer_return_loan function

The "writer_return_loan" function is called to return the loan on the rti::core::CookieSeq provided by the writer_evaluate function.

The writer_finalize function

The "writer_finalize" function will be called by Connext to notify the filter implementation that the DataWriter is no longer matching with a DataReader for which it was previously performing writer-side filtering. This will allow the filter to purge any state it was maintaining for the DataReader.

Example Custom Writer Content Filter

Assume that you have a type Foo.

Our filter will show how to enable the writer-side filter optimization for some readers and not for others. The ones with writer-side filter optimization will only pass samples where Foo.x == y where y is a value determined by an expression parameter, see the writer_evaluate function. Readers without the optimization turned on will pass all samples where Foo.x > 7, see the evaluate function. The filter will only be used to filter samples of type Foo.

The following #includes are needed for the examples on this page

#include <iostream>
#include <dds/topic/ddstopic.hpp>
#include <rti/topic/findImpl.hpp>
#include <dds/sub/ddssub.hpp>
#include <dds/pub/ddspub.hpp>
#include <dds/core/ddscore.hpp>
#include "Foo.hpp"

The following is the definition of the WriterFilterData, the state that is created and returned by the writer_attach method:

class WriterFilterData
{
public:
typedef std::pair<rti::core::Cookie, int32_t> CookieValue;
typedef std::vector<CookieValue> CookieValueSeq;
public:
WriterFilterData()
{
}
void add_pair(const CookieValue& cookie_value_pair)
{
reader_pairs_.resize(reader_pairs_.size() + 1);
reader_pairs_[reader_pairs_.size() - 1] = cookie_value_pair;
}
CookieValueSeq& reader_pairs()
{
return reader_pairs_;
}
private:
CookieValueSeq reader_pairs_;
};

And here is the declaration of our custom writer content filter. Notice, we are inheriting from rti::topic::WriterContentFilter. We could have inherited from rti::topic::ContentFilter or rti::topic::WriterContentFilterHelper here too to create other custom content filters.

// A custom WriterContentFilter
class ExampleWriterContentFilter :
Foo,
rti::topic::no_compile_data_t,
WriterFilterData>
{
public:
ExampleWriterContentFilter() {}
~ExampleWriterContentFilter() {}
const std::string& expression,
const dds::core::StringSeq& parameters,
const std::string& type_class_name,
rti::topic::no_compile_data_t* old_compile_data);
bool evaluate(
const Foo& sample,
const rti::topic::FilterSampleInfo& meta_data);
WriterFilterData& writer_attach();
WriterFilterData& writer_filter_data,
const std::string& expression,
const dds::core::StringSeq& parameters,
const std::string& type_class_name,
const rti::core::Cookie& cookie);
rti::core::CookieSeq& writer_evaluate(
WriterFilterData& writer_filter_data,
const Foo& sample,
const rti::topic::FilterSampleInfo &meta_data);
WriterFilterData& writer_filter_data,
const rti::core::Cookie& cookie);
void writer_detach(WriterFilterData& writer_filter_data);
WriterFilterData& writer_filter_data, rti::core::CookieSeq& cookies);
void writer_data(const WriterFilterData& is_writer_data) { writer_data_ = is_writer_data; }
WriterFilterData& writer_data() { return writer_data_; }
void reset_cookie_seq()
{
cookie_seq_.resize(0);
}
void add_cookie(rti::core::Cookie& cookie)
{
cookie_seq_.resize(cookie_seq_.size() + 1);
cookie_seq_[cookie_seq_.size() - 1] = cookie;
}
rti::core::CookieSeq& cookie_seq()
{
return cookie_seq_;
}
private:
rti::core::CookieSeq cookie_seq_;
WriterFilterData writer_data_;
};

Writing the Compile Function

Since we already know what the expression is (Foo.x > 7), we can simply return rti::topic::no_compile_data.

Below is the entire compile function.

rti::topic::no_compile_data_t& ExampleWriterContentFilter::compile(
const std::string&,
const std::string&,
{
// We don't have any compile data to setup in this compile function. We
// will do all necessary setup in the writer_compile function
return rti::topic::no_compile_data;
}

Writing the Evaluate Function

The next step is to implement the evaluate function. The evaluate function receives no_compile_data because it is unecessary in this example. The function then evaluates the received sample against our filter expression and passes the sample if Foo.x > 7. Below is the entire evaluate function.

bool ExampleWriterContentFilter::evaluate(
const Foo& sample,
{
if (sample.x() > 7) {
return true;
}
return false;
}

Writing the Finalize Function

The next function to write is the finalize function. It is safe to free all resources used by this particular instance of the custom content filter that is allocated in compile. Because we did not create any resources in the compile function, we have nothing to do in the finalize. Below is the entire finalize function.

void ExampleWriterContentFilter::finalize(
{
// There is nothing to finalize in this example
}

Writing the Writer Attach Function

The writer_attach is used to create some state required to perform filtering on the writer-side. In our example filter, this state is kept as part of our custom filter class, ExampleWriterContentFilter, so we therefore simply return a reference to our writer data that will be used during the writer_compile and writer_evaluate functions.

WriterFilterData& ExampleWriterContentFilter::writer_attach()
{
// Setup the writer_filter_data to point to our WriterFilterData
return writer_data();
}

Writing the Writer Compile Function

The writer_compile function is called when a DataWriter matches with a DataReader with the same ContentFilter. In our case, we use the parameters to determine if we should turn on the writer-side filtering optimization. If parameters[0] == 1, then we store the provided rti::core::Cookie along with parameters[1] in our writer_filter_data to be accessed during the writer_evaluate function whenever we receive a new sample.

void ExampleWriterContentFilter::writer_compile(
WriterFilterData& writer_filter_data,
const std::string& /* expression */,
const std::string& /* type_class_name */,
const rti::core::Cookie& cookie)
{
int length = parameters.size();
bool optimize = false;
int32_t x = 0;
if (length == 2) {
std::istringstream(parameters[0]) >> optimize;
std::istringstream(parameters[1]) >> x;
}
// Add this DataReader's cookie to our writer filter data
if (optimize) {
writer_filter_data.add_pair(
std::pair<rti::core::Cookie, int32_t>(cookie, x));
}
}

Writing the Writer Evaluate Function

The writer_evaluate function receives our stored writer_filter_data and a sample to evaluate. We iterate through our (rti::core::Cookie, value) pairs and add any Cookie with a matching value of x to the sample's Foo.x to the rti::core::CookieSeq that we return. Any Cookie in this sequence then represents a DataReader to which this sample will be sent.

rti::core::CookieSeq& ExampleWriterContentFilter::writer_evaluate(
WriterFilterData& writer_filter_data,
const Foo& sample,
{
WriterFilterData::CookieValueSeq& reader_pairs =
writer_filter_data.reader_pairs();
// If x == value, pass the reader
for (uint32_t i = 0; i < reader_pairs.size(); i++) {
if (sample.x() == reader_pairs[i].second) {
add_cookie(reader_pairs[i].first);
}
}
return cookie_seq();
}

Writing the Writer Detach Function

It is safe to free all resources used by this particular instance of the custom content filter that is allocated in writer_attach. Because we did not allocate any resources in the writer_attach function, there is nothing to release in the writer_finalize. Below is the entire writer_detach function.

void ExampleWriterContentFilter::writer_detach(
WriterFilterData&)
{
// Nothing to do in writer detach for this example
}

Writing the Writer Return Loan Function

RTI Connext uses the writer_return_loan function specified in the WriterContentFilter to indicate to the filter implementation that it has finished using the sequence of cookies returned by the filter’s writer_evaluate function. Your filter implementation should not free the memory associated with the cookie sequence before the writer_return_loan function is called. You can also create your custom content filter by inheriting from the rti::topic::WriterContentFilterHelper, which manages the DataReader CookieSeq for you. If you do that, then the writer_return_loan is implemented for you. Below is the entire writer_return_loan function.

void ExampleWriterContentFilter::writer_return_loan(
WriterFilterData&,
rti::core::CookieSeq&)
{
// The cookie sequence's destructor will take care of returning the loan
// to the middleware, but we will reset the size to 0
const_cast<rti::core::CookieSeq&>(cookie_seq()).resize(0);
}

Writing the Writer Finalize Function

The writer_finalize function specified in the WriterContentFilter will be called when the DataWriter no longer matches with a DataReader that was created with ContentFilteredTopic. This will allow the filter implementation to delete any state it was maintaining for the DataReader. Because we did not create any resources in the writer_compile function, we have nothing to do in the writer_finalize. Below is the entire writer_finalize function.

void ExampleWriterContentFilter::writer_finalize(
WriterFilterData&,
{
// There is nothing to finalize in this example
}

Creating a CustomFilter

The first thing that an application needs to do when using a custom filter is to give it a name and wrap an instance of their custom filter with a rti::topic::CustomFilter. The CustomFilter class receives shared pointer to an instance of your custom filter and ensures that your filter does not go out-of-scope while it is being used. This means that you do not have to retain a reference to your filter throughout its lifetime, the CustomFilter class handles this detail for you.

// Create an instance of the ExampleWriterContentFilter and then
// create a CustomFilter object which holds a shared pointer to your
// ContentFilter.
std::string my_filter_name = "my_custom_filter";
my_custom_filter(new ExampleWriterContentFilter());
// To make sure that we don't miss any samples in this example,
// setup there DataWriter and DataReaders to be reliable

Registering the Filter

After wrapping your content filter with the CustomFilter class, and before the custom filter can be used, it must be registered with RTI Connext:

// Register the filter with both participants. Doing this will
// automatically enable writer-side filtering if any DataWriters match with
// any DataReaders that are using the same filter
participant1->register_contentfilter(my_custom_filter, my_filter_name);
participant2->register_contentfilter(my_custom_filter, my_filter_name);

Unregistering the Filter

When the filter is no longer needed, it can be unregistered from RTI Connext:

// You do not have to unregister your content filter unless you want to
// register a different filter with the same name
participant1->unregister_contentfilter(my_filter_name);
participant2->unregister_contentfilter(my_filter_name);

Using a CustomFilter

After the custom filter has been registered, you must create dds::topic::Filter that contain the filter expressions and parameters that will be used in your filter. You must give these Filters names that match the name with which the CustomFilter was registered in order to associate the two.

Next, create the readers that will be using the custom filter with dds::topic::ContentFilteredTopics that have been created with the Filters you created.

After that, everything is set up for you. Now, your readers will only receive samples matching the filter that you have set up for them.

dds::topic::Topic<Foo> topic1(participant1, "ExampleTopic");
dds::topic::Topic<Foo> topic2(participant2, "ExampleTopic");
// Create the parameter lists for the filters
std::vector<std::string> cft_parameters(2);
// In this example, the first parameter dictates whether or not the
// writer-side filter optimization will be turned on. Parameter 2
// tells the writer_evaluate which single value of x this reader wants to
// receive samples for
cft_parameters[0] = "1";
cft_parameters[1] = "5";
dds::topic::Filter filter1("x = %1", cft_parameters);
cft_parameters[0] = "1";
cft_parameters[1] = "10";
dds::topic::Filter filter2("x = %1", cft_parameters);
// Any reader that is not using the writer-side filter optimization will
// receive samples where x > 7. The second parameter in this case is
// ignored, but is set here just as a reminder
cft_parameters[0] = "0";
cft_parameters[1] = "7";
dds::topic::Filter filter3("x > %1", cft_parameters);
// Create a DataWriter that will perform writer-side filtering
dds::pub::Publisher(participant1), topic1, writer_qos);
// Assign filters a name or else the default, DDS_SQL_FILTER will be
// used
filter1->name(my_filter_name);
filter2->name(my_filter_name);
filter3->name(my_filter_name);
// Create the ContentFilteredTopics that will be used when creating the
// DataReaders
topic2, "MyContentFilteredTopic1", filter1);
topic2, "MyContentFilteredTopic2", filter2);
topic2, "MyContentFilteredTopic3", filter3);
// Create two readers that will make use of the writer-side filter
// optimization and one that won't
dds::sub::DataReader<Foo> optimizedReader1(
dds::sub::Subscriber(participant2), cft1, reader_qos);
dds::sub::DataReader<Foo> optimizedReader2(
dds::sub::Subscriber(participant2), cft2, reader_qos);
dds::sub::Subscriber(participant2), cft3, reader_qos);
// Wait for the writer to match all readers before writing any samples
dds::core::cond::StatusCondition status_condition(writer);
int32_t matched_publications = 0;
while (matched_publications != 3) {
writer.publication_matched_status();
matched_publications = status.total_count();
}
// Write 11 samples with x = 0 through 10
for (int i = 0; i < 11; i++) {
writer.write(Foo(i, i));
}
// optimizedReader1 will only receive samples with x == 5
std::cout << "optimizedReader1's samples: " << std::endl;
dds::sub::LoanedSamples<Foo> samples = optimizedReader1.take();
std::copy(
samples.begin(),
samples.end(),
dds::sub::LoanedSamples<Foo>::ostream_iterator(std::cout, "\n"));
// optimizedReader2 will only receive samples with x == 10
std::cout << "optimizedReader2's samples: " << std::endl;
samples = optimizedReader2.take();
std::copy(
samples.begin(),
samples.end(),
dds::sub::LoanedSamples<Foo>::ostream_iterator(std::cout, "\n"));
// reader3 will receive samples with x == 8, 9, 10
std::cout << "reader3's samples: " << std::endl;
samples = reader3.take();
std::copy(
samples.begin(),
samples.end(),
dds::sub::LoanedSamples<Foo>::ostream_iterator(std::cout, "\n"));

Looking up the Filter

A custom filter that is registered with a DomainParticipant can be looked up:

rti::topic::find_content_filter<ExampleWriterContentFilter>(
participant1, my_filter_name);
// You can retrieve your ContentFilter from the CustomFilter:
ExampleWriterContentFilter* retrieved_content_filter =
retrieved_custom_filter.get();

RTI Connext Modern C++ API Version 5.3.1 Copyright © Mon Feb 19 2018 Real-Time Innovations, Inc