Content Filtering#

Introduction#

What you’ll learn

In this module, you’ll learn how to use content filtering to subscribe to a subset of the data published for a Topic.

You will do the following:

  • Create a content filter

  • Run a simple publisher and subscriber application using a filter

  • Update the content filter at runtime

Let’s review the example in the Publish-Subscribe module, where we simulated a simple home automation system that monitored sensors indicating if a window was open or closed.

  • Each sensor published an update to the WindowStatus Topic when the window status changed.

  • A monitoring application subscribed to the WindowStatus Topic and printed a warning when any window was open.

This module builds on the above example. You may want different monitoring applications to track sensors for different rooms. Also, the monitoring application may only be interested in being notified when a window is open, but not when it gets closed. You can achieve these objectives by using a content filter.

A ContentFilteredTopic (CFT) is a Topic with filtering properties. This feature allows you to subscribe to a subset of the data published for a Topic.

topic = dds.Topic(participant, "WindowStatus", DeviceStatus)
content_filtered_topic = dds.ContentFilteredTopic(
    topic,
    "FilterRoomAndOpenWindows",
    dds.Filter("is_open = true and room_name = 'LivingRoom'")
)

A CFT effectively partitions Topic data. When used, Connext optimizes data distribution by avoiding messages that don’t pass the filter criteria, saving network bandwidth and CPU. Therefore, content filtering is also a way to scale your distributed system.

How to complete this module#

To complete this module you’ll need the following:

  • 20-30 minutes

  • A Connext installation (full installation or pip-based installation). See Get Started.

  • A text editor or IDE to write your code.

  • It’s recommended to have completed the Publish-Subscribe module, but not required.

You can complete the module from scratch by copying and pasting the code provided in the sections below or you can get the full code from GitHub.

Cloning the GitHub repository

Clone the GitHub repository with the following command:

$ git clone --recurse-submodule https://github.com/rticommunity/rticonnextdds-examples.git

The code for this module is located in the tutorials/content_filtering directory. See the README.md files for additional instructions.

1. Define your data type#

Use the same data type as in the Publish-Subscribe module, DeviceStatus.

The data type is a simple structure that represents the status of a sensor:

home_automation.py#
from rti.types import struct, key

@struct(
    member_annotations={"sensor_name": [key]}
)
class DeviceStatus:
    sensor_name: str = ""
    room_name: str = ""
    is_open: bool = False
home_automation.idl#
struct DeviceStatus {
    @key string sensor_name;
    string room_name;
    boolean is_open;
};

Use rtiddsgen to generate the code for your language and platform:

rtiddsgen -language [python|c++11|c++98|c#|java|c] -platform <platform_name> home_automation.idl
Choosing your language and platform

The platform name depends on the language and the package you installed. The following table shows some platform names you can use:

-language

-platform

c++11, c++98, java, c

  • Linux: x64Linux4gcc7.3.0, armv8Linux4gcc7.3.0, …

  • Windows: x64Win64VS2017, …

  • macOS: x64Darwin20clang12.0, arm64Darwin20clang12.0

and more…

c#

net5, net6, net8, …

python

universal

For example, to generate code for C++11 on Linux, you would use:

$ <install dir>/bin/rtiddsgen -language c++11 -platform x64Linux4gcc7.3.0 <file>.idl

Run rtiddsgen -help for all available options.

2. Create the publisher application#

Use the same publisher application as in the Publish-Subscribe module.

The publisher application, using the “WindowStatus” Topic, publishes an update when a window is opened or closed. This application simulates the windows getting opened or closed every ten seconds:

home_automation_publisher.py#
import sys
from time import sleep
import rti.connextdds as dds
from home_automation import DeviceStatus

def publish_sensor(sensor_name: str, room_name: str):
    participant = dds.DomainParticipant(domain_id=0)
    topic = dds.Topic(participant, "WindowStatus", DeviceStatus)
    writer = dds.DataWriter(topic)

    device_status = DeviceStatus(sensor_name, room_name, is_open=False)
    for i in range 1000):
        # Simulate the window opening and closing
        device_status.is_open = not device_status.is_open
        writer.write(device_status)
        sleep(2)

if __name__ == '__main__':
    sensor_name = sys.argv[1] if len(sys.argv) > 1 else "Window1"
    room_name = sys.argv[2] if len(sys.argv) > 2 else "LivingRoom"
    try:
        publish_sensor(sensor_name, room_name)
    except KeyboardInterrupt:
        pass
home_automation_publisher.cxx#
#include <iostream>
#include <thread>
#include <rti/rti.hpp>
#include "home_automation.hpp"

void publish_sensor(
    const std::string& sensor_name,
    const std::string& room_name)
{
    dds::domain::DomainParticipant participant(0);
    dds::topic::Topic<DeviceStatus> topic(participant, "WindowStatus");
    dds::pub::DataWriter<DeviceStatus> writer(topic);

    DeviceStatus device_status { sensor_name, room_name, false };
    for (int i = 0; i < 1000; i++) {
        device_status.is_open(!device_status.is_open());
        writer.write(device_status);
        std::this_thread::sleep_for(std::chrono::seconds(2));
    }
}

int main(int argc, char **argv)
{
    std::string sensor_name = (argc > 1) ? argv[1] : "Sensor1";
    std::string room_name = (argc > 2) ? argv[2] : "LivingRoom";
    publish_sensor(sensor_name, room_name);
}

3. Create the subscriber with a ContentFilteredTopic#

Use the same subscriber application as in the Publish-Subscribe module. You’ll update it to use a content filter.

The subscriber application, using the “WindowStatus” Topic, subscribes to updates published when a window is opened or closed. When the filter is applied, this application will only receive updates when a window is open in the living room.

Use the following code to create the “WindowStatus” Topic and apply a filter that limits the updates received:

topic = dds.Topic(participant, "WindowStatus", DeviceStatus)
content_filtered_topic = dds.ContentFilteredTopic(
    topic,
    "FilterRoomAndOpenWindows",
    dds.Filter("is_open = true and room_name = 'LivingRoom'")
)

The name "FilterRoomAndOpenWindows" is a local identifier and is not important for this module. The Topic is still called "WindowStatus".

Here is the full subscriber application:

home_automation_subscriber.py#
import rti.connextdds as dds
import rti.asyncio # required by take_data_async()
from home_automation import DeviceStatus

async def sensor_monitoring():
    participant = dds.DomainParticipant(domain_id=0)
    topic = dds.Topic(participant, "WindowStatus", DeviceStatus)
    content_filtered_topic = dds.ContentFilteredTopic(
        topic,
        "FilterRoomAndOpenWindows",
        dds.Filter("is_open = true and room_name = 'LivingRoom'")
    )
    reader = dds.DataReader(content_filtered_topic)

    async for data in reader.take_data_async():
        print(f"WARNING: {data.sensor_name} in {data.room_name} is open!")

if __name__ == '__main__':
    try:
        rti.asyncio.run(sensor_monitoring())
    except KeyboardInterrupt:
        pass

Since you’re now only receiving updates when a window is open, you no longer have to check data.is_open in the for loop.

Use the following code to create the “WindowStatus” Topic and apply a filter that limits the updates received:

dds::topic::Topic<DeviceStatus> topic(participant, "WindowStatus");
dds::topic::ContentFilteredTopic<DeviceStatus> content_filtered_topic(
    topic,
    "FilterRoomAndOpenWindows",
    dds::topic::Filter("is_open = true and room_name = 'LivingRoom'"));

The name "FilterRoomAndOpenWindows" is a local identifier and is not important for this module. The Topic is still called "WindowStatus".

Here is the full subscriber application:

home_automation_subscriber.cxx#
#include <iostream>
#include <thread>

#include "rti/rti.hpp"
#include "rti/sub/SampleProcessor.hpp"
#include "home_automation.hpp"

int main(int argc, char **argv)
{
    dds::domain::DomainParticipant participant(0);
    dds::topic::Topic<DeviceStatus> topic(participant, "WindowStatus");
    dds::topic::ContentFilteredTopic<DeviceStatus> content_filtered_topic(
            topic,
            "FilterRoomAndOpenWindows",
            dds::topic::Filter("is_open = true and room_name = 'LivingRoom'"));
    dds::sub::DataReader<DeviceStatus> reader(content_filtered_topic);

    rti::sub::SampleProcessor sample_processor;
    sample_processor.attach_reader(
            reader,
            [](const rti::sub::LoanedSample<DeviceStatus>& sample)
            {
                if (sample.info().valid()) { // ignore samples with only meta-data
                    std::cout << "WARNING: " << sample.data().sensor_name()
                            << " in " << sample.data().room_name()
                            << " is open!" << std::endl;
                }
            });

    while (true) { // wait in a loop
        std::this_thread::sleep_for(std::chrono::seconds(4));
    }
}

Since you’re now only receiving updates when a window is open, you no longer have to check sample.data().is_open() in the sample processor.

4. Run the applications#

Run one subscriber and three publisher instances, each in a different terminal.

Run the alert subscriber application:

python home_automation_subscriber.py

Run a publisher application instance for each of three different sensors:

python home_automation_publisher.py Window1 LivingRoom
python home_automation_publisher.py Window2 LivingRoom
python home_automation_publisher.py Window3 Kitchen

The subscriber application now receives updates only from the LivingRoom:

WARNING: Window1 in LivingRoom is open!
WARNING: Window2 in LivingRoom is open!
...
WARNING: Window1 in LivingRoom is open!

First, build the publisher and subscriber applications using the build files generated by rtiddsgen for your platform.

Build the applications using the makefile_home_automation_<platform> file. For example:

make -f makefile_home_automation_x64Linux4gcc7.3.0

Open home_automation-<platform>.sln in Visual Studio and compile the project.

Build the applications using the makefile_home_automation_<platform> file. For example:

make -f makefile_home_automation_arm64Darwin20clang12.0

Run the alert subscriber application:

./objs/<platform>/home_automation_subscriber

Run a publisher application instance for each of three different sensors:

./objs/<platform>/home_automation_publisher Window1 LivingRoom
./objs/<platform>/home_automation_publisher Window2 LivingRoom
./objs/<platform>/home_automation_publisher Window3 Kitchen

The subscriber application now receives updates only from the LivingRoom:

WARNING: Window1 in LivingRoom is open!
WARNING: Window2 in LivingRoom is open!
...
WARNING: Window1 in LivingRoom is open!
Troubleshooting

In case of errors building or running your application, make sure you have set up your environment and license file.

For instructions, go to Get Started, select your platform and installation method, then find the section Run a Hello World.

Optional: Update the filter#

Update the filter at runtime

In the previous section, you defined a content filter with the following expression: "is_open = true and room_name = 'LivingRoom'". You can parameterize the filter so that the subscriber can change the room name at runtime.

Change the filter expression to use a parameter for the room_name field:

content_filtered_topic = dds.ContentFilteredTopic(
    topic,
    "FilterRoomAndOpenWindows",
    dds.Filter(
        expression="is_open = true and room_name = %0",
        parameters=["'LivingRoom'"])
)

Now you can update the filter to change the room name at runtime:

content_filtered_topic.filter_parameters = ["'Kitchen'"]

This update causes the subscriber application to start receiving updates for the Kitchen instead of the LivingRoom.

It’s also possible to update the whole expression with content_filtered_topic.set_filter(new_filter).

Change the filter expression to use a parameter for the room_name field:

dds::core::StringSeq parameters(1, "'LivingRoom'");
dds::topic::ContentFilteredTopic<DeviceStatus> content_filtered_topic(
        topic,
        "FilterRoomAndOpenWindows",
        dds::topic::Filter("is_open = true and room_name = %0", parameters));

Now you can update the filter to change the room name at runtime:

parameters[0] = "'Kitchen'";
content_filtered_topic.filter_parameters(parameters.begin(), parameters.end());

This update causes the subscriber application to start receiving updates for the Kitchen instead of the LivingRoom.

It’s also possible to update the whole expression with content_filtered_topic.set_filter(new_filter).

Learn more#

In this module, you learned how to use a content filter to subscribe to a subset of the data published for a Topic. This is an efficient way to design a scalable distributed system, since the publisher applications don’t need to know the specific requirements of each subscriber. Connext can optimize the data distribution to send data only to the subscribers that pass the filter criteria.

Next Steps

Related modules:

  • Partitioning. A content filter is a flexible and efficient way to scale your distributed system by partitioning a topic.

Full code examples:

Reference documentation:

Was this page helpful?

Back to Learn