6. ContentFilteredTopics¶
Prerequisites |
|
Time to complete |
45 minutes |
Concepts covered in this module |
Content Filtering |
In the Publish/Subscribe communication pattern, DataWriters send data to DataReaders with the same Topic. But sometimes a DataReader may be interested in only a subset of the data that is being sent. Some examples are:
A DataReader only cares about the temperature when it goes outside of a certain bound.
A DataReader only cares about log messages with levels WARNING or ERROR.
A DataReader only cares about the ChocolateLotState if the DataReader belongs to the next station that processes the chocolate lot.
Note that in all of these examples, there might be some DataReaders that want to receive a subset of data, and other DataReaders that might want to receive all data. For example, in our chocolate factory, an individual station only cares about a chocolate lot if it is the next station to process the lot, but the monitoring application wants to receive every update about every chocolate lot.
Connext DDS offers a mechanism to allow DataReaders to filter data so that they only receive the data they are interested in. Applications can do this by creating ContentFilteredTopics instead of normal Topics. Since different DataReaders may want to receive a different subset of the data, ContentFilteredTopics are specified on the DataReaders. No changes are required on DataWriters to use content-filters: they continue to use a normal Topic, and they can communicate both with DataReaders that use a normal Topic and DataReaders that use a ContentFilteredTopic.
Although ContentFilteredTopics are specified on DataReaders, in most cases the DataWriter does the filtering, in which case no network bandwidth is used for those filtered samples. (The DataWriter finds out during discovery that a DataReader has a ContentFilteredTopic.)
Definition
A ContentFilteredTopic is a Topic with filtering properties. It makes it possible to subscribe to Topics and at the same time specify that you are only interested in a subset of the Topic’s data.
A ContentFilteredTopic consists of a Topic, a filter expression, and optional parameters. The filter expression and parameters can be changed at runtime. You can think of the filter expression as being like the “where” clause in SQL.

Figure 6.1 A ContentFilteredTopic that only allows samples with degrees > 32.¶
6.1. The Complete Chocolate Factory¶
ContentFilteredTopics are the final piece we need to put together the last part of the Chocolate Factory: the ingredient stations.
If you recall, the process the chocolate lot takes through the chocolate factory looks like this:

Figure 6.2 DataReaders in our system will use content filtering to filter out any ChocolateLotState that doesn’t have “next_station” equal to that DataReader’s station.¶
The Monitoring/Control Application starts each lot by writing a sample
to the ChocolateLotState Topic, with next_station
set to one of the
ingredient stations. Each ingredient station processes the lot by
adding an ingredient, and then updates the next_station
. The last
station is the tempering machine, which processes the lot by tempering
it, and then calls dispose()
on the lot instance to say that the lot has
completed.

Figure 6.3 A chocolate lot is waiting, processing, or completed by an
ingredient station. When a lot is completed, the next_station
field
indicates the next station to process the chocolate lot.¶

Figure 6.4 A chocolate lot is waiting, processing, or disposed by the tempering application.¶
The Monitoring/Control Application wants to monitor every state of the
chocolate lots as they are processed by the stations. However, an
individual station only wants to know about a lot if the next_station
field indicates it is the station that will process the lot next.
In the previous exercises, we had code in the process_lot
function in
tempering_application.cxx
to check whether the tempering station was the
next station:
void process_lot(
ChocolateLotStateDataReader *lot_state_reader,
ChocolateLotStateDataWriter *lot_state_writer)
{
// Take all samples. Samples are loaned to application, loan is
// returned when LoanedSamples destructor called.
ChocolateLotStateSeq data_seq;
DDS_SampleInfoSeq info_seq;
// Take available data from DataReader's queue
DDS_ReturnCode_t retcode = lot_state_reader->take(data_seq, info_seq);
if (retcode != DDS_RETCODE_OK && retcode != DDS_RETCODE_NO_DATA) {
std::cerr << "take error " << retcode << std::endl;
return;
}
// Process lots waiting for tempering
for (int i = 0; i < data_seq.length(); ++i) {
// Check if a sample is an instance lifecycle event
if (info_seq[i].valid_data) {
if (data_seq[i].next_station == TEMPERING_CONTROLLER) {
std::cout << "Processing lot #" << data_seq[i].lot_id
<< std::endl;
...
}
...
}
Instead of doing that, we can use a ContentFilteredTopic to filter out
all the lot updates where the tempering controller is not the
next_station
. This cleans up the logic when the tempering application
receives data, because it no longer needs to check whether a chocolate
lot update is intended for it. This can also save network bandwidth because
DataWriters perform the filtering. When the application uses a ContentFilteredTopic,
Connext DDS can filter out the data more efficiently than the application, and
may not even send updates to an application that is not interested.
6.2. Hands-On 1: Update the ChocolateLotState DataReader with a ContentFilteredTopic¶
We are going to update the DataReader in the Tempering Application to use a
ContentFilteredTopic instead of checking whether the data is important
to this application every time a DataWriter updates the ChocolateLotState.
Remember that the Tempering Application only cares about a lot if the
next_station
field indicates that the tempering machine is the next
station.
Find the examples as described in Get Example Files.
Run CMake.
In the
6_content_filters/c++98
directory, type the following, depending on your operating system:$ mkdir build $ cd build $ cmake ..
$ mkdir build $ cd build $ cmake ..
Enter the following commands:
> mkdir build > cd build > "c:\program files\CMake\bin\cmake" --help
Your path name may vary. Substitute
"c:\program files\CMake\bin\cmake"
with the correct path for your CMake installation.--help
will list all the compilers you can generate project files for. Choose an installed compiler version, such as “Visual Studio 15 2017”.
Create a ContentFilteredTopic that filters out ChocolateLotState data unless the
next_station
field in the data refers to this application.Open
tempering_application.cxx
and look for the comment:// Exercise #1.1: Create a Content-Filtered Topic that filters out // chocolate lot state unless the next_station = TEMPERING_CONTROLLER
Add the following code after the comment, so it looks like the following:
// Exercise #1.1: Create a Content-Filtered Topic that filters out // chocolate lot state unless the next_station = TEMPERING_CONTROLLER // String sequence owns memory for the strings it contains, must allocate DDS_StringSeq parameters(0); DDSContentFilteredTopic *filtered_lot_state_topic = participant->create_contentfilteredtopic( "FilteredLotState", lot_state_topic, "next_station = 'TEMPERING_CONTROLLER'", parameters);
This code creates a ContentFilteredTopic, using the ChocolateLotState Topic, that will filter out all chocolate lot state data that the Tempering Application does not care about.
Use the ContentFilteredTopic instead of a plain Topic.
In
tempering_application.cxx
, look for the comment:// Exercise #1.2: Change the DataReader's Topic to use a // Content-Filtered Topic
Replace these lines:
DDSDataReader *generic_lot_state_reader = subscriber->create_datareader_with_profile( lot_state_topic, "ChocolateFactoryLibrary", "ChocolateLotStateProfile", NULL, DDS_STATUS_MASK_NONE);
So the code looks like this (the highlighted line has changed):
// Exercise #1.2: Change the DataReader's Topic to use a // Content-Filtered Topic DDSDataReader *generic_lot_state_reader = subscriber->create_datareader_with_profile( filtered_lot_state_topic, "ChocolateFactoryLibrary", "ChocolateLotStateProfile", NULL, DDS_STATUS_MASK_NONE);
Now the DataReader is using the ContentFilteredTopic. You don’t need to make any changes on the DataWriter side.
Remove the code from the
process_lot
that checks thatnext_station
is the Tempering Application.In
tempering_application.cxx
, look for the comment:// Exercise #1.3: Remove the check that the Tempering Application is // the next_station. This will now be filtered automatically.
Remove the if statement:
if (data_seq[i].next_station == TEMPERING_CONTROLLER) {
So the code looks like this:
// Exercise #1.3: Remove the check that the Tempering Application // is the next_station. This will now be filtered automatically. std::cout << "Processing lot #" << data_seq[i].lot_id << std::endl; // Send an update that the tempering station is processing lot ChocolateLotState updated_state(data_seq[i]); updated_state.lot_status = PROCESSING; updated_state.next_station = INVALID_CONTROLLER; updated_state.station = TEMPERING_CONTROLLER; DDS_ReturnCode_t retcode = lot_state_writer->write(updated_state, DDS_HANDLE_NIL); if (retcode != DDS_RETCODE_OK) { std::cerr << "write error " << retcode << std::endl; } // "Processing" the lot. DDS_Duration_t processing_time = { 5, 0 }; NDDSUtility::sleep(processing_time); // Since this is the last step in processing, notify the // monitoring application that the lot is complete using a // dispose retcode = lot_state_writer->dispose( updated_state, DDS_HANDLE_NIL); std::cout << "Lot completed" << std::endl;
Now Connext DDS automatically filters out ChocolateLotState data before your application processes it.
6.3. Hands-On 2: Review the Temperature DataReader’s ContentFilteredTopic¶
We have already made a similar change as you made in Hands-On 1 to the Monitoring/Control Application’s Temperature DataReader: we added content filtering to it. That application now filters out data unless the temperature is outside of the expected range. (And it prints an error when the temperature is above or below that range.)
In
monitoring_ctrl_application.cxx
, review the following code:// Register the datatype to use when creating the Topic const char *temperature_type_name = TemperatureTypeSupport::get_type_name(); retcode = TemperatureTypeSupport::register_type( participant, temperature_type_name); if (retcode != DDS_RETCODE_OK) { shutdown(participant, "register_type error", EXIT_FAILURE); } // A Topic has a name and a datatype. Create a Topic called // "ChocolateTemperature" with your registered data type DDSTopic *temperature_topic = participant->create_topic( CHOCOLATE_TEMPERATURE_TOPIC, temperature_type_name, DDS_TOPIC_QOS_DEFAULT, NULL /* listener */, DDS_STATUS_MASK_NONE); if (temperature_topic == NULL) { shutdown(participant, "create_topic error", EXIT_FAILURE); } DDS_StringSeq parameters(2); const char *param_list[] = { "32", "30" }; parameters.from_array(param_list, 2); DDSContentFilteredTopic *filtered_temperature_topic = participant->create_contentfilteredtopic( "FilteredTemperature", temperature_topic, "degrees > %0 or degrees < %1", parameters);
Notice that we have added a ContentFilteredTopic that will filter out temperature data unless it’s outside of the expected range. This uses a slightly more complex filter expression than the one you added to the Tempering Application in Hands-On 1: Update the ChocolateLotState DataReader with a ContentFilteredTopic. More information about the possible filter expressions can be found in SQL Filter Expression Notation, in the RTI Connext DDS Core Libraries User’s Manual.
You can see later in the code that the Temperature DataReader has been updated to use the ContentFilteredTopic:
// This DataReader reads data of type Temperature on Topic // "ChocolateTemperature". DataReader QoS is configured in // USER_QOS_PROFILES.xml DDSDataReader *temperature_generic_reader = subscriber->create_datareader_with_profile( filtered_temperature_topic, "ChocolateFactoryLibrary", "ChocolateTemperatureProfile", NULL, DDS_STATUS_MASK_NONE); if (temperature_generic_reader == NULL) { shutdown(participant, "create_datareader error", EXIT_FAILURE); }
The logic in the
monitor_temperature
function no longer checks whether the temperature is out of range, because temperature data is only received when it’s out of range. Now the function looks like this:// Receive updates from tempering station about chocolate temperature. // Only an error if below 30 or over 32 degrees Fahrenheit. for (int i = 0; i < data_seq.length(); ++i) { // Check if a sample is an instance lifecycle event if (!info_seq[i].valid_data) { std::cout << "Received instance state notification" << std::endl; continue; } // Print data std::cout << "Tempering temperature out of range: "; TemperatureTypeSupport::print_data(&data_seq[i]); }
Build the applications.
$ make
After running
make
, a script calledstart_all.sh
was copied into your local directory. This script opens up all the different chocolate factory applications. (Note that the script runs the debug versions of the applications, so you will need to edit it if you want to run the release versions.)$ make
After running
make
, a script calledstart_all.sh
was copied into your local directory. This script opens up all the different chocolate factory applications. (Note that the script runs the debug versions of the applications, so you will need to edit it if you want to run the release versions.)Open
rticonnextdds-getting-started-keys-instances.sln
in Visual Studio Code by enteringrticonnextdds-getting-started-keys-instances.sln
at the command prompt or using File > Open Project in Visual Studio Code.
Right-click
ALL_BUILD
and choose Build. (See2_hello_world\<language>\README_<architecture>.txt
if you need more help.)Since “Debug” is the default option at the top of Visual Studio Code, a Debug directory will be created with the compiled files.
After building, a script called
start_all.bat
was copied into your local directory. This script opens up all the different chocolate factory applications. (Note that the script runs the debug versions of the applications, so you will need to edit it if you want to run the release versions.)Use the script to start all the applications at once:
$ ./start_all.sh
$ ./start_all.sh
> start_all.bat
The script runs four copies of the Ingredient Application, each one specifying a different type of ingredient it is providing. The script also runs the Tempering Application and the Monitoring/Control Application. You can watch each lot get processed by the ingredient applications, and then the tempering machine.
If you do not want to run the script, you can start up each of the applications by creating a new terminal for each application, and starting each application in a different shell as follows:
$ ./ingredient_application -k COCOA_BUTTER_CONTROLLER $ ./ingredient_application -k SUGAR_CONTROLLER $ ./ingredient_application -k MILK_CONTROLLER $ ./ingredient_application -k VANILLA_CONTROLLER $ ./tempering_application $ ./monitoring_ctrl_application
$ ./ingredient_application -k COCOA_BUTTER_CONTROLLER $ ./ingredient_application -k SUGAR_CONTROLLER $ ./ingredient_application -k MILK_CONTROLLER $ ./ingredient_application -k VANILLA_CONTROLLER $ ./tempering_application $ ./monitoring_ctrl_application
> Debug\ingredient_application.exe -k COCOA_BUTTER_CONTROLLER > Debug\ingredient_application.exe -k SUGAR_CONTROLLER > Debug\ingredient_application.exe -k MILK_CONTROLLER > Debug\ingredient_application.exe -k VANILLA_CONTROLLER > Debug\tempering_application.exe > Debug\monitoring_ctrl_application.exe
6.4. Hands-On 3: Review the Larger System in Admin Console¶
We now have six applications that are publishing and subscribing to data, which is not a very large distributed system. This can start to look overwhelming, so we’re going to open Admin Console to get an overview of this system. This hands-on exercise does not relate directly to content filtering, but shows how multiple applications start to look in Admin Console.
Make sure all of your applications are still running.
Open Admin Console.
Open Admin Console from RTI Launcher.
Choose the Administration view:
(The Administration view might be selected for you already.)
In the DDS Logical View, click on the Domain in your system:
Explore the chocolate factory system. Even though it looks complex with multiple applications running, you can navigate by clicking on the Topic name or the application. This makes it much easier to visualize the larger system.
Figure 6.5 Each Ingredient Application shows one DataReader and one DataWriter communicating on the ChocolateLotState Topic¶