RTI Connext Modern C++ API Version 7.3.0
Creating Custom Content Filters

Creating a custom content filter.

Creating a custom content filter.

Introduction <br>

By default, RTI Connext creates content filters with the DDS SQL filter, which implements a superset of the DDS-specified SQL WHERE clause. However, in many cases this filter may not be what you want. Some examples are:

This HOW-TO explains how to write your own custom filter and is divided into the following sections:

The Custom Content Filter API

A custom content filter is created by calling the dds::domain::DomainParticipant::register_contentfilter function with a rti::topic::CustomFilter. A CustomFilter is created with either a rti::topic::ContentFilter or rti::topic::WriterContentFilter.

A ContentFilter contains a compile, an evaluate and a finalize function.

A WriterContentFilter contains a compile, an evaluate, a finalize, a writer_attach, writer_compile, writer_evaluate, wrtier_detach, and writer_finalize functions.

To use a custom filter in a dds::topic::ContentFilteredTopic or a dds::sub::QueryCondition, the name used to register it with the participant has to be set in the dds::topic::Filter
or in the dds::sub::Query that is used to create the corresponding ContentFilteredTopic or QueryCondition.

A custom ContentFilter is used by RTI Connext at the following times during the life-time of a ContentFilteredTopic (the function called is shown in parenthesis).

A custom WriterContentFilter is used by RTI Connext at the following times during the life-time of a ContentFilteredTopic (the function called is shown in parenthesis).

The compile function

The "compile" function is used to compile a filter expression and expression parameters. Please note that the term compile is intentionally loosely defined. It is up to the user to decide what this function should do and return.

The evaluate function

When using a ContentFilter, the "evaluate" function is called each time a sample is received to determine if a sample should be filtered out and discarded.

When using a WriterContentFilter, the "evaluate" function is called each time a sample is written to determine if a sample should be filtered out and discarded. It is called for each DataReader for which the DataWriter is filtering and for which the writer_compile function set the ExpressionProperty.writer_side_filter_optimization to false.

The finalize function

The "finalize" function is called when an instance of the custom content filter is no longer needed. When this function is called, it is safe to free all resources used by this particular instance of the custom content filter.

The writer_attach function

The "writer_attach" function is called the first time that a DataWriter matches with a DataReader with the same ContentFilter. It will not be called for subsequent DataReaders that are using the same filter. This function is used to create some state required to perform filtering on the writer-side. It is entirely up to you, as the implementer of the filter, to decide if the filter requires this state.

The writer_compile function

The "writer_compile" function is called when a DataWriter matches with a DataReader with the same ContentFilter. It is called every time that the DataWriter matches a DataReader that is using the same filter as well as each time the DataWriter is notified that a DataReader's filter parameters have changed. This function will receive as an input a rti::core::Cookie which uniquely identifies the DataReader for which the function was invoked.

The writer_evaluate function

The "writer_evaluate" function is called every time that a DataWriter writes a new sample. Its purpose is to evaluate the sample for all the readers for which the DataWriter is performing writer-side filtering and return the sequence of rti::core::Cookie associated with the DataReaders whose filter pass the sample.

The writer_detach function

The "writer_detach" function is called when an instance of the custom content filter is no longer needed. When this function is called, it is safe to free all resources used by this particular instance of the custom content filter.

The writer_return_loan function

The "writer_return_loan" function is called to return the loan on the rti::core::CookieSeq provided by the writer_evaluate function.

The writer_finalize function

The "writer_finalize" function will be called by Connext to notify the filter implementation that the DataWriter is no longer matching with a DataReader for which it was previously performing writer-side filtering. This will allow the filter to purge any state it was maintaining for the DataReader.

Example Custom Writer Content Filter

Assume that you have a type Foo.

Our filter will show how to enable the writer-side filter optimization for some readers and not for others. The ones with writer-side filter optimization will only pass samples where Foo.x == y where y is a value determined by an expression parameter, see the writer_evaluate function. Readers without the optimization turned on will pass all samples where Foo.x > 7, see the evaluate function. The filter will only be used to filter samples of type Foo.

The following #includes are needed for the examples on this page

#include <iostream>
#include <dds/topic/ddstopic.hpp>
#include <rti/topic/findImpl.hpp>
#include <dds/sub/ddssub.hpp>
#include <dds/pub/ddspub.hpp>
#include <dds/core/ddscore.hpp>
#include "Foo.hpp"

The following is the definition of the WriterFilterData, the state that is created and returned by the writer_attach method:

class WriterFilterData
{
public:
typedef std::pair<rti::core::Cookie, int32_t> CookieValue;
typedef std::vector<CookieValue> CookieValueSeq;
public:
WriterFilterData()
{
}
void add_pair(const CookieValue& cookie_value_pair)
{
reader_pairs_.push_back(cookie_value_pair);
}
CookieValueSeq& reader_pairs()
{
return reader_pairs_;
}
private:
CookieValueSeq reader_pairs_;
};

And here is the declaration of our custom writer content filter. Notice, we are inheriting from rti::topic::WriterContentFilter. We could have inherited from rti::topic::ContentFilter or rti::topic::WriterContentFilterHelper here too to create other custom content filters.

// A custom WriterContentFilter
class ExampleWriterContentFilter :
Foo,
rti::topic::no_compile_data_t,
WriterFilterData>
{
public:
ExampleWriterContentFilter() {}
~ExampleWriterContentFilter() {}
const std::string& expression,
const dds::core::StringSeq& parameters,
const std::string& type_class_name,
rti::topic::no_compile_data_t* old_compile_data);
bool evaluate(
const Foo& sample,
const rti::topic::FilterSampleInfo& meta_data);
WriterFilterData& writer_attach();
WriterFilterData& writer_filter_data,
const std::string& expression,
const dds::core::StringSeq& parameters,
const std::string& type_class_name,
const rti::core::Cookie& cookie);
rti::core::CookieSeq& writer_evaluate(
WriterFilterData& writer_filter_data,
const Foo& sample,
const rti::topic::FilterSampleInfo &meta_data);
WriterFilterData& writer_filter_data,
const rti::core::Cookie& cookie);
void writer_detach(WriterFilterData& writer_filter_data);
WriterFilterData& writer_filter_data,
rti::core::CookieSeq& cookies);
void writer_data(const WriterFilterData& is_writer_data) { writer_data_ = is_writer_data; }
WriterFilterData& writer_data() { return writer_data_; }
void reset_cookie_seq()
{
cookie_seq_.clear();
}
void add_cookie(rti::core::Cookie& cookie)
{
cookie_seq_.resize(cookie_seq_.size() + 1);
cookie_seq_[cookie_seq_.size() - 1] = cookie;
}
rti::core::CookieSeq& cookie_seq()
{
return cookie_seq_;
}
private:
rti::core::CookieSeq cookie_seq_;
WriterFilterData writer_data_;
};
An example topic-type.
Definition: types.dxx:34
Unique identifier for a written data sample in the form of a sequence of bytes.
Definition: Cookie.hpp:71
virtual bool evaluate(no_compile_data_t &compile_data, const T &sample, const FilterSampleInfo &meta_data)=0
Evaluate whether the sample is passing the filter or not according to the sample content.
virtual void finalize(no_compile_data_t &compile_data)=0
A previously compiled instance of the content filter is no longer in use and resources can now be cle...
virtual no_compile_data_t & compile(const std::string &expression, const dds::core::StringSeq &parameters, const dds::core::optional< dds::core::xtypes::DynamicType > &type_code, const std::string &type_class_name, no_compile_data_t *old_compile_data)=0
Compile an instance of the content filter according to the filter expression and parameters of the gi...
<<extension>> <<value-type>> Provides additional information about the filter expression passed to th...
Definition: ExpressionProperty.hpp:81
<<extension>> <<value-type>> Provides meta information associated with the sample.
Definition: FilterSampleInfo.hpp:78
<<extension>> A class to inherit from when implementing a writer-side custom content filter
Definition: ContentFilter.hpp:597
virtual void writer_compile(WriterFilterData &writer_filter_data, ExpressionProperty &prop, const std::string &expression, const dds::core::StringSeq &parameters, const dds::core::optional< dds::core::xtypes::DynamicType > &type_code, const std::string &type_class_name, const rti::core::Cookie &cookie)=0
A writer-side filtering API to compile an instance of the content filter according to the filter expr...
virtual rti::core::CookieSeq & writer_evaluate(WriterFilterData &writer_filter_data, const T &sample, const FilterSampleInfo &meta_data)=0
A writer-side filtering API to compile an instance of the content filter according to the filter expr...
virtual void writer_detach(WriterFilterData &writer_filter_data)=0
A writer-side filtering API to clean up a previously created state using writer_attach.
virtual void writer_finalize(WriterFilterData &writer_filter_data, const rti::core::Cookie &cookie)=0
A writer-side filtering API to clean up a previously compiled instance of the content filter.
virtual void writer_return_loan(WriterFilterData &writer_filter_data, rti::core::CookieSeq &cookies)=0
A writer-side filtering API to return the loan on the list of DataReaders returned by writer_evaluate...
virtual WriterFilterData & writer_attach()=0
A writer-side filtering API to create some state that can facilitate filtering on the writer side.
basic_string< char, rti::core::memory::OsapiAllocator< char > > string
A string convertible to std::string and with similar functionality.
Definition: String.hpp:266
std::vector< std::string > StringSeq
A vector of strings.
Definition: dds/core/types.hpp:59
The type to specify as the CompileData template parameter to your ContentFilter if your compile funct...
Definition: ContentFilter.hpp:62

Writing the Compile Function

Since we already know what the expression is (Foo.x > 7), we can simply return rti::topic::no_compile_data.

Below is the entire compile function.

rti::topic::no_compile_data_t& ExampleWriterContentFilter::compile(
const std::string&,
const std::string&,
{
// We don't have any compile data to setup in this compile function. We
// will do all necessary setup in the writer_compile function
return rti::topic::no_compile_data;
}

Writing the Evaluate Function

The next step is to implement the evaluate function. The evaluate function receives no_compile_data because it is unecessary in this example. The function then evaluates the received sample against our filter expression and passes the sample if Foo.x > 7. Below is the entire evaluate function.

bool ExampleWriterContentFilter::evaluate(
const Foo& sample,
{
if (sample.x() > 7) {
return true;
}
return false;
}

Writing the Finalize Function

The next function to write is the finalize function. It is safe to free all resources used by this particular instance of the custom content filter that is allocated in compile. Because we did not create any resources in the compile function, we have nothing to do in the finalize. Below is the entire finalize function.

void ExampleWriterContentFilter::finalize(
{
// There is nothing to finalize in this example
}

Writing the Writer Attach Function

The writer_attach is used to create some state required to perform filtering on the writer-side. In our example filter, this state is kept as part of our custom filter class, ExampleWriterContentFilter, so we therefore simply return a reference to our writer data that will be used during the writer_compile and writer_evaluate functions.

WriterFilterData& ExampleWriterContentFilter::writer_attach()
{
// Setup the writer_filter_data to point to our WriterFilterData
return writer_data();
}

Writing the Writer Compile Function

The writer_compile function is called when a DataWriter matches with a DataReader with the same ContentFilter. In our case, we use the parameters to determine if we should turn on the writer-side filtering optimization. If parameters[0] == 1, then we store the provided rti::core::Cookie along with parameters[1] in our writer_filter_data to be accessed during the writer_evaluate function whenever we receive a new sample.

void ExampleWriterContentFilter::writer_compile(
WriterFilterData& writer_filter_data,
const std::string& /* expression */,
const std::string& /* type_class_name */,
const rti::core::Cookie& cookie)
{
int length = parameters.size();
bool optimize = false;
int32_t x = 0;
if (length == 2) {
std::istringstream(parameters[0]) >> optimize;
std::istringstream(parameters[1]) >> x;
}
// Add this DataReader's cookie to our writer filter data
if (optimize) {
writer_filter_data.add_pair(std::make_pair(cookie, x));
}
}
void parameters(const dds::core::StringSeq &parameters)
<<extension>> Set the parameters
bool writer_side_filter_optimization() const
Get the value of writer_side_filter_optimization.
Definition: ExpressionProperty.hpp:145

Writing the Writer Evaluate Function

The writer_evaluate function receives our stored writer_filter_data and a sample to evaluate. We iterate through our (rti::core::Cookie, value) pairs and add any Cookie with a matching value of x to the sample's Foo.x to the rti::core::CookieSeq that we return. Any Cookie in this sequence then represents a DataReader to which this sample will be sent.

rti::core::CookieSeq& ExampleWriterContentFilter::writer_evaluate(
WriterFilterData& writer_filter_data,
const Foo& sample,
{
WriterFilterData::CookieValueSeq& reader_pairs =
writer_filter_data.reader_pairs();
// If x == value, pass the reader
for (uint32_t i = 0; i < reader_pairs.size(); i++) {
if (sample.x() == reader_pairs[i].second) {
add_cookie(reader_pairs[i].first);
}
}
return cookie_seq();
}

Writing the Writer Detach Function

It is safe to free all resources used by this particular instance of the custom content filter that is allocated in writer_attach. Because we did not allocate any resources in the writer_attach function, there is nothing to release in the writer_finalize. Below is the entire writer_detach function.

void ExampleWriterContentFilter::writer_detach(
WriterFilterData&)
{
// Nothing to do in writer detach for this example
}

Writing the Writer Return Loan Function

RTI Connext uses the writer_return_loan function specified in the WriterContentFilter to indicate to the filter implementation that it has finished using the sequence of cookies returned by the filter writer_evaluate function. Your filter implementation should not free the memory associated with the cookie sequence before the writer_return_loan function is called. You can also create your custom content filter by inheriting from the rti::topic::WriterContentFilterHelper, which manages the DataReader CookieSeq for you. If you do that, then the writer_return_loan is implemented for you. Below is the entire writer_return_loan function.

void ExampleWriterContentFilter::writer_return_loan(
WriterFilterData&,
rti::core::CookieSeq&)
{
// The cookie sequence's destructor will take care of returning the loan
// to the middleware, but we will reset the size to 0
const_cast<rti::core::CookieSeq&>(cookie_seq()).resize(0);
}

Writing the Writer Finalize Function

The writer_finalize function specified in the WriterContentFilter will be called when the DataWriter no longer matches with a DataReader that was created with ContentFilteredTopic. This will allow the filter implementation to delete any state it was maintaining for the DataReader. Because we did not create any resources in the writer_compile function, we have nothing to do in the writer_finalize. Below is the entire writer_finalize function.

void ExampleWriterContentFilter::writer_finalize(
WriterFilterData&,
{
// There is nothing to finalize in this example
}

Creating a CustomFilter

The first thing that an application needs to do when using a custom filter is to give it a name and wrap an instance of their custom filter with a rti::topic::CustomFilter. The CustomFilter class receives shared pointer to an instance of your custom filter and ensures that your filter does not go out-of-scope while it is being used. This means that you do not have to retain a reference to your filter throughout its lifetime, the CustomFilter class handles this detail for you.

// Create an instance of the ExampleWriterContentFilter and then
// create a CustomFilter object which holds a shared pointer to your
// ContentFilter.
std::string my_filter_name = "my_custom_filter";
my_custom_filter(new ExampleWriterContentFilter());
// To make sure that we don't miss any samples in this example,
// setup there DataWriter and DataReaders to be reliable
static Durability TransientLocal()
Creates a Durability instance with trasient-local kind.
Definition: TCorePolicy.hpp:204
static History KeepAll()
Creates a History with HistoryKind::KEEP_ALL.
Definition: TCorePolicy.hpp:1526
static Reliability Reliable(const dds::core::Duration &max_blocking_time=dds::core::Duration::from_millisecs(100))
Creates a policy with ReliabilityKind::RELIABLE and optionally a max blocking time.
Definition: TCorePolicy.hpp:1365
<<reference-type>> Container for all dds::core::Entity objects.
Definition: TDomainParticipant.hpp:63
<<value-type>> Container of the QoS policies that a dds::pub::DataWriter supports
Definition: DataWriterQosImpl.hpp:92
<<value-type>> Container of the QoS policies that a dds::sub::DataReader supports
Definition: DataReaderQosImpl.hpp:85
<<extension>> <<reference-type>> A wrapper class for the user-defined implementation of a ContentFilt...
Definition: CustomFilter.hpp:61

Registering the Filter

After wrapping your content filter with the CustomFilter class, and before the custom filter can be used, it must be registered with RTI Connext:

// Register the filter with both participants. Doing this will
// automatically enable writer-side filtering if any DataWriters match with
// any DataReaders that are using the same filter
participant1.extensions().register_contentfilter(my_custom_filter, my_filter_name);
participant2.extensions().register_contentfilter(my_custom_filter, my_filter_name);

Unregistering the Filter

When the filter is no longer needed, it can be unregistered from RTI Connext:

// You do not have to unregister your content filter unless you want to
// register a different filter with the same name
participant1.extensions().unregister_contentfilter(my_filter_name);
participant2.extensions().unregister_contentfilter(my_filter_name);

Using a CustomFilter

After the custom filter has been registered, you must create dds::topic::Filter that contain the filter expressions and parameters that will be used in your filter. You must give these Filters names that match the name with which the CustomFilter was registered in order to associate the two.

Next, create the readers that will be using the custom filter with dds::topic::ContentFilteredTopics that have been created with the Filters you created.

After that, everything is set up for you. Now, your readers will only receive samples matching the filter that you have set up for them.

dds::topic::Topic<Foo> topic1(participant1, "ExampleTopic");
dds::topic::Topic<Foo> topic2(participant2, "ExampleTopic");
// Create the parameter lists for the filters
std::vector<std::string> cft_parameters(2);
// In this example, the first parameter dictates whether or not the
// writer-side filter optimization will be turned on. Parameter 2
// tells the writer_evaluate which single value of x this reader wants to
// receive samples for
cft_parameters[0] = "1";
cft_parameters[1] = "5";
dds::topic::Filter filter1("x = %1", cft_parameters);
cft_parameters[0] = "1";
cft_parameters[1] = "10";
dds::topic::Filter filter2("x = %1", cft_parameters);
// Any reader that is not using the writer-side filter optimization will
// receive samples where x > 7. The second parameter in this case is
// ignored, but is set here just as a reminder
cft_parameters[0] = "0";
cft_parameters[1] = "7";
dds::topic::Filter filter3("x > %1", cft_parameters);
// Create a DataWriter that will perform writer-side filtering
dds::pub::Publisher(participant1), topic1, writer_qos);
// Assign filters a name or else the default, rti::topic::sql_filter_name,
// will be used
filter1.extensions().name(my_filter_name);
filter2.extensions().name(my_filter_name);
filter3.extensions().name(my_filter_name);
// Create the ContentFilteredTopics that will be used when creating the
// DataReaders
topic2, "MyContentFilteredTopic1", filter1);
topic2, "MyContentFilteredTopic2", filter2);
topic2, "MyContentFilteredTopic3", filter3);
// Create two readers that will make use of the writer-side filter
// optimization and one that won't
dds::sub::DataReader<Foo> optimizedReader1(
dds::sub::Subscriber(participant2), cft1, reader_qos);
dds::sub::DataReader<Foo> optimizedReader2(
dds::sub::Subscriber(participant2), cft2, reader_qos);
dds::sub::Subscriber(participant2), cft3, reader_qos);
// Wait for the writer to match all readers before writing any samples
dds::core::cond::StatusCondition status_condition(writer);
int32_t matched_publications = 0;
while (matched_publications != 3) {
writer.publication_matched_status();
}
// Write 11 samples with x = 0 through 10
for (int i = 0; i < 11; i++) {
writer.write(Foo(i, i));
}
// optimizedReader1 will only receive samples with x == 5
std::cout << "optimizedReader1's samples: " << std::endl;
dds::sub::LoanedSamples<Foo> samples = optimizedReader1.take();
std::copy(
samples.begin(),
samples.end(),
dds::sub::LoanedSamples<Foo>::ostream_iterator(std::cout, "\n"));
// optimizedReader2 will only receive samples with x == 10
std::cout << "optimizedReader2's samples: " << std::endl;
samples = optimizedReader2.take();
std::copy(
samples.begin(),
samples.end(),
dds::sub::LoanedSamples<Foo>::ostream_iterator(std::cout, "\n"));
// reader3 will receive samples with x == 8, 9, 10
std::cout << "reader3's samples: " << std::endl;
samples = reader3.take();
std::copy(
samples.begin(),
samples.end(),
dds::sub::LoanedSamples<Foo>::ostream_iterator(std::cout, "\n"));
<<reference-type>> A condition associated with each dds::core::Entity
Definition: TStatusCondition.hpp:50
Information about the status dds::core::status::StatusMask::publication_matched()
Definition: TStatus.hpp:361
int32_t total_count() const
The total cumulative number of times that this dds::pub::DataWriter discovered a "match" with a dds::...
Definition: TStatus.hpp:371
<<reference-type>> Allows an application to publish data for a dds::topic::Topic
Definition: TDataWriter.hpp:58
<<reference-type>> A publisher is the object responsible for the actual dissemination of publications...
Definition: TPublisher.hpp:52
<<reference-type>> Allows the application to: (1) declare the data it wishes to receive (i....
Definition: TDataReader.hpp:74
const ::dds::core::InstanceHandleSeq matched_publications(const dds::sub::DataReader< T > &reader)
Retrieve the list of publications currently "associated" with a DataReader.
Definition: dds/sub/discovery.hpp:96
<<move-only-type>> Provides temporary access to a collection of samples (data and info) from a DataRe...
Definition: LoanedSamplesImpl.hpp:77
iterator begin()
Gets an iterator to the first sample.
Definition: LoanedSamplesImpl.hpp:232
iterator end()
Gets an iterator to one past the last sample.
Definition: LoanedSamplesImpl.hpp:240
<<reference-type>> A subscriber is the object responsible for actually receiving data from a subscrip...
Definition: TSubscriber.hpp:49
<<reference-type>> Specialization of TopicDescription that allows for content-based subscriptions.
Definition: TContentFilteredTopic.hpp:57
Defines the filter to create a ContentFilteredTopic.
Definition: TFilter.hpp:46
<<reference-type>> Topic is the most basic description of the data to be published and subscribed.
Definition: TTopic.hpp:56

Looking up the Filter

A custom filter that is registered with a DomainParticipant can be looked up:

rti::topic::find_content_filter<ExampleWriterContentFilter>(
participant1, my_filter_name);
// You can retrieve your ContentFilter from the CustomFilter:
ExampleWriterContentFilter* retrieved_content_filter =
retrieved_custom_filter.get();
const T * get() const
Get a const pointer to the underlying content filter.
Definition: CustomFilter.hpp:82