Last-Value Cache#

Introduction#

What you’ll learn

In this module, you’ll learn how publishers can save the last value of each instance so that new subscribers have access to the latest data when they join the system.

You will do the following:

  • Run a simple publisher and subscriber application with volatile durability

  • Update the quality of service (QoS) configuration to achieve a higher level of durability

  • Update the QoS configuration to increase the history depth

By default, the data you publish with Connext is volatile. Publishers send it only to subscribers that are currently running. Sometimes, though, a subscriber may want to know the last value of a topic instance without having to wait for new updates from the publishers.

In the Publish-Subscribe and Distributed Data Cache modules, we defined a “WindowStatus” topic where different sensors indicated if a window was open or closed. If a subscriber starts after a sensor has already published updates, the subscribers won’t know the current state of that sensor until the sensor publishes a new update. We can change this behavior by simply configuring the Durability and History QoS.

How to complete this module#

To complete this module, you’ll need the following:

  • 20-30 minutes

  • A Connext installation (full installation or pip-based installation). See Get Started.

  • A text editor or IDE to write your code.

  • It is recommended to have completed the Publish-Subscribe or Distributed Data Cache modules, but not required.

The code examples in this module are available in RTI’s GitHub examples repository. You will need to clone the repository to build the C++ examples. If you are developing in Python, you can also complete the exercises from scratch by simply copying and pasting the code snippets in this module.

Cloning the GitHub repository

Clone the GitHub repository with the following command:

$ git clone --recurse-submodule https://github.com/rticommunity/rticonnextdds-examples.git

The code for this module is located in the tutorials/last_value_cache directory. See the README.md files for additional instructions.

1. Run with volatile durability#

Use the same data type and publisher applications from the Publish-Subscribe module.

The data type is a simple structure that represents the status of a sensor:

home_automation.py#
from rti.types import struct, key

@struct(
    member_annotations={"sensor_name": [key]}
)
class DeviceStatus:
    sensor_name: str = ""
    room_name: str = ""
    is_open: bool = False
home_automation.idl#
struct DeviceStatus {
    @key string sensor_name;
    string room_name;
    boolean is_open;
};

The publisher applications publish an update when a window is opened or closed. Our application simulates the windows getting opened or closed every ten seconds.

home_automation_publisher.py#
import sys
from time import sleep
import rti.connextdds as dds
from home_automation import DeviceStatus

def publish_sensor(sensor_name: str, room_name: str):
    participant = dds.DomainParticipant(domain_id=0)
    topic = dds.Topic(participant, "WindowStatus", DeviceStatus)
    writer = dds.DataWriter(topic)

    device_status = DeviceStatus(sensor_name, room_name, is_open=False)
    for i in range(1000):
        # Simulate the window opening and closing
        device_status.is_open = not device_status.is_open
        print(f"{sensor_name} is now: {'open' if device_status.is_open else 'closed'}")
        writer.write(device_status)
        sleep(10)

if __name__ == '__main__':
    sensor_name = sys.argv[1] if len(sys.argv) > 1 else "Window1"
    room_name = sys.argv[2] if len(sys.argv) > 2 else "LivingRoom"
    try:
        publish_sensor(sensor_name, room_name)
    except KeyboardInterrupt:
        pass
home_automation_publisher.cxx#
#include <iostream>
#include <thread>
#include <rti/rti.hpp>
#include "home_automation.hpp"

void publish_sensor(
    const std::string& sensor_name,
    const std::string& room_name)
{
    dds::domain::DomainParticipant participant(0);
    dds::topic::Topic<DeviceStatus> topic(participant, "WindowStatus");
    dds::pub::DataWriter<DeviceStatus> writer(topic);

    DeviceStatus device_status { sensor_name, room_name, false };
    for (int i = 0; i < 1000; i++) {
        device_status.is_open(!device_status.is_open());
        std::cout << sensor_name << " is now: "
                << (device_status.is_open() ? "open" : "closed")
                << std::endl;
        writer.write(device_status);
        std::this_thread::sleep_for(std::chrono::seconds(10));
    }
}

int main(int argc, char **argv)
{
    std::string sensor_name = (argc > 1) ? argv[1] : "Sensor1";
    std::string room_name = (argc > 2) ? argv[2] : "LivingRoom";
    publish_sensor(sensor_name, room_name);
}

The subscriber is an alert application that simply warns when any window is opened. For this exercise, we will also print an information message when the window is closed.

home_automation_subscriber.py#
import rti.connextdds as dds
import rti.asyncio # required by take_data_async()
from home_automation import DeviceStatus

async def sensor_monitoring():
    participant = dds.DomainParticipant(domain_id=0)
    topic = dds.Topic(participant, "WindowStatus", DeviceStatus)
    reader = dds.DataReader(topic)

    async for data in reader.take_data_async():
        if data.is_open:
            print(f"WARNING: {data.sensor_name} in {data.room_name} is open!")
        else:
            print(f"INFO: {data.sensor_name} in {data.room_name} is closed")

if __name__ == '__main__':
    try:
        rti.asyncio.run(sensor_monitoring())
    except KeyboardInterrupt:
        pass
home_automation_subscriber.cxx#
#include <iostream>
#include <thread>

#include "rti/rti.hpp"
#include "rti/sub/SampleProcessor.hpp"
#include "home_automation.hpp"

int main(int argc, char **argv)
{
    dds::domain::DomainParticipant participant(0);
    dds::topic::Topic<DeviceStatus> topic(participant, "WindowStatus");
    dds::sub::DataReader<DeviceStatus> reader(topic);

    rti::sub::SampleProcessor sample_processor;
    sample_processor.attach_reader(
            reader,
            [](const rti::sub::LoanedSample<DeviceStatus>& sample)
            {
                if (sample.info().valid()) { // ignore samples with only meta-data
                    if (sample.data().is_open()) {
                        std::cout << "WARNING: " << sample.data().sensor_name()
                                << " in " << sample.data().room_name()
                                << " is open!" << std::endl;
                    } else {
                        std::cout << "INFO: " << sample.data().sensor_name()
                                << " in " << sample.data().room_name()
                                << " is closed." << std::endl;
                    }
                }
            });

    while (true) { // wait in a loop
        std::this_thread::sleep_for(std::chrono::seconds(4));
    }
}

The problem is that, when the data is volatile and the alert application is launched, the alert application doesn’t know the current state of the existing windows. It may have to wait until the sensors publish an update to get the latest value for that instance.

Let’s see this problem by running a couple of sensor applications followed by the alert application:

Terminal 1 - run the first sensor publisher:

python home_automation_publisher.py Window1 LivingRoom

Terminal 2 - run the second sensor publisher:

python home_automation_publisher.py Window2 LivingRoom

Terminal 3 - run the alert subscriber:

python home_automation_subscriber.py

Build the applications by following the instructions in the README.md from the tutorials/last_value_cache/c++11 directory.

Then run the following commands in separate terminals:

Terminal 1 - run the first sensor publisher:

./objs/<platform>/home_automation_publisher Window1 LivingRoom

Terminal 2 - run the second sensor publisher:

./objs/<platform>/home_automation_publisher Window2 LivingRoom

Terminal 3 - run the alert subscriber:

./objs/<platform>/home_automation_subscriber

You will see that the subscriber doesn’t receive any updates about the windows until they are opened or closed.

... after up to 10 seconds ...

WARNING: Window1 in LivingRoom is open!
WARNING: Window2 in LivingRoom is open!
INFO: Window1 in LivingRoom is closed
INFO: Window2 in LivingRoom is closed
...

Let’s update the example so that subscribers can get the last status of each sensor without having to wait for new updates from the publishers.

2. Configure the data durability#

The Durability QoS policy allows publishers to “offer” to store the data they publish for late-joiners. Subscribers can then “request” whether they need only new data or also the data that was published before they joined.

Set the durability QoS policy as follows, in a file called USER_QOS_PROFILES.xml. This file is automatically loaded by Connext applications from the current directory:

USER_QOS_PROFILES.xml#
<?xml version="1.0" encoding="UTF-8"?>
<dds xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:noNamespaceSchemaLocation="https://community.rti.com/schema/current/rti_dds_profiles.xsd">
    <qos_library name="MyLibrary">
        <qos_profile name="MyProfile" is_default_qos="true">
            <base_name>
                <element>BuiltinQosSnippetLib::QosPolicy.Durability.TransientLocal</element>
                <element>BuiltinQosSnippetLib::QosPolicy.Reliability.Reliable</element>
                <element>BuiltinQosSnippetLib::QosPolicy.History.KeepLast_1</element>
            </base_name>
        </qos_profile>
    </qos_library>
</dds>

This file defines a QoS profile called “MyProfile” inside a library called “MyLibrary”. These names are not important for this example, but they can be used in code to select a specific profile. The is_default_qos="true" attribute means that this profile is used by default by the publisher and subscriber applications.

The base_name tags set some QoS policies. Connext provides a pre-defined set of common QoS profiles. The profiles above indicates that the data is kept in memory for late-joiners and that only the last update for each instance is kept. It also uses reliable communications (Reliability QoS policy) and a history of one update per instance (History QoS policy).

Now that you have updated the QoS profile, run the publishers first and then the subscriber as you did before. This time the subscriber should receive the last reported temperature for each sensor without having to wait for the next update.

INFO: Window1 in LivingRoom is closed
INFO: Window2 in LivingRoom is closed
WARNING: Window1 in LivingRoom is open!
WARNING: Window2 in LivingRoom is open!

Stop the subscriber and run it again. It will immediately get the most up-to-date temperature for each sensor. For example:

WARNING: Window1 in LivingRoom is open!
WARNING: Window2 in LivingRoom is open!
...

Note that Connext understands the data structure and knows the sensor_name is the key field that defines an instance, so it keeps one update per sensor name. (See the sensor_name definition at the beginning of this module.)

3. Increase the history#

Let’s say that in addition to the window alerts, we want an audit application. The audit application will log all the updates for each sensor and the time when the update was sent. In addition, we want to receive up to ten updates for each sensor that was already running when the audit application starts.

Modify the USER_QOS_PROFILES.xml file to add a new profile named Last10Cache:

USER_QOS_PROFILES.xml#
<?xml version="1.0" encoding="UTF-8"?>
<dds xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:noNamespaceSchemaLocation="https://community.rti.com/schema/current/rti_dds_profiles.xsd">
    <qos_library name="MyLibrary">
        <qos_profile name="LastValueCache" is_default_qos="true">
            ...
        </qos_profile>
        <qos_profile name="Last10Cache">
            <base_name>
                <element>BuiltinQosSnippetLib::QosPolicy.Durability.TransientLocal</element>
                <element>BuiltinQosSnippetLib::QosPolicy.Reliability.Reliable</element>
            </base_name>

            <datawriter_qos>
                <history>
                    <kind>KEEP_LAST_HISTORY_QOS</kind>
                    <depth>10</depth>
                </history>
            </datawriter_qos>
            <datareader_qos>
                <history>
                    <kind>KEEP_LAST_HISTORY_QOS</kind>
                    <depth>10</depth>
                </history>
            </datareader_qos>
        </qos_profile>
    </qos_library>
</dds>

Then write the audit application:

home_automation_audit.py#
from datetime import datetime

import rti.connextdds as dds
import rti.asyncio # required by take_data_async()
from home_automation import DeviceStatus

async def sensor_monitoring():
    participant = dds.DomainParticipant(domain_id=0)
    topic = dds.Topic(participant, "WindowStatus", DeviceStatus)
    reader = dds.DataReader(
        topic,
        qos=dds.QosProvider.default.datareader_qos_from_profile("Last10Cache"))

    async for data, info in reader.take_async():
        if not info.valid:
            continue

        timestamp = datetime.fromtimestamp(info.source_timestamp.to_seconds())
        print(f"{timestamp} - {data}")

if __name__ == '__main__':
    try:
        rti.asyncio.run(sensor_monitoring())
    except KeyboardInterrupt:
        pass

Modify the QoS profile used by the publisher to use the new “Last10Cache” profile, by updating the publisher code as follows:

writer = dds.DataWriter(
    topic,
    qos=dds.QosProvider.default.datawriter_qos_from_profile("Last10Cache"))

Now run all the sensor publishers and the alert subscriber as before, followed by the audit subscriber:

python home_automation_audit.py

You will see that the audit subscriber receives up to ten updates for each sensor as soon as it starts, with the original time when the update was sent, followed by new updates.

2021-08-25 10:00:00 - DeviceStatus("Window1", "LivingRoom", True)
2021-08-25 10:00:10 - DeviceStatus("Window1", "LivingRoom", False)
2021-08-25 10:00:20 - DeviceStatus("Window1", "LivingRoom", True)
2021-08-25 10:00:30 - DeviceStatus("Window1", "LivingRoom", False)
...
home_automation_audit.cxx#
#include <iostream>
#include <thread>

#include "rti/rti.hpp"
#include "rti/sub/SampleProcessor.hpp"
#include "home_automation.hpp"

int main(int argc, char **argv)
{
    dds::domain::DomainParticipant participant(0);
    dds::topic::Topic<DeviceStatus> topic(participant, "WindowStatus");
    dds::sub::DataReader<DeviceStatus> reader(
            topic,
            dds::core::QosProvider::Default().datareader_qos("Last10Cache"));

    rti::sub::SampleProcessor sample_processor;
    sample_processor.attach_reader(
            reader,
            [](const rti::sub::LoanedSample<DeviceStatus>& sample)
            {
                if (!sample.info().valid()) {
                    // ignore samples with only meta-data
                    return;
                }

                uint64_t timestamp =
                        sample.info().source_timestamp().to_millisecs();
                std::cout << std::fixed << std::setprecision(2)
                        << timestamp / 1000.0 << " - "
                        << sample.data() << std::endl;
            });

    while (true) { // wait in a loop
        std::this_thread::sleep_for(std::chrono::seconds(4));
    }
}

You also need to modify the QoS profile used by the publisher to use the new “Last10Cache” profile. This can be done by updating the publisher code as follows:

dds::pub::DataWriter<DeviceStatus> writer(
        topic,
        dds::core::QosProvider::Default().datawriter_qos("Last10Cache"));

Build the applications as before.

You can now run all the sensor publishers and the alert subscriber as before followed by the audit subscriber:

./objs/<platform>/home_automation_audit

You will see that the audit subscriber will receive up to 10 updates for each sensor as soon as it starts, with the original time where the update was sent, followed by new updates.

1716815811.11 - [sensor_name: Window1, room_name: LivingRoom, is_open: 1]
1716815821.11 - [sensor_name: Window1, room_name: LivingRoom, is_open: 0]
1716815831.11 - [sensor_name: Window1, room_name: LivingRoom, is_open: 1]
1716815841.11 - [sensor_name: Window1, room_name: LivingRoom, is_open: 0]
...

Learn more#

This module showed how the Durability and History QoS policies allow implementing the last-value cache pattern. Publishers keep their last update for each instance in memory so that new subscribers can receive it. This data pattern allows an application to get the current state of the system when it starts.

Next Steps

Related modules:

  • Data Persistence - A higher durability level and Persistence Service allow persisting data beyond the life of the original publisher.

  • Debugging - When your DataReaders request a higher QoS than your DataWriters offer (for example, a higher durability or history), your applications may not communicate. See how you can debug this using Admin Console.

Reference documentation:

Was this page helpful?

Back to Learn