Data Persistence#

Introduction#

What you’ll learn

In this module, you’ll learn how to persist Topics to disk so that subscribers can receive data after the publishers have stopped.

You will do the following:

  • Run Persistence Service to persist all Topics to disk

  • Run a subscriber that receives data from publishers that are no longer running

  • Save the subscriber state across restarts to receive only data it has not seen before

Persisting data serves two purposes:

  • It allows new subscribers to have access to data even after the original publishers have stopped.

  • It provides fault tolerance by allowing publishers and subscribers to recover from crashes or restarts.

The Last-Value Cache module shows how each publisher can keep data in memory for late-joining subscribers. In this module, you’ll persist data to disk with a Persistence Service.

Persistence Service

How to complete this module#

To complete this module you’ll need the following:

  • 20-30 minutes

  • A Connext installation (full installation). See Get Started.

  • A text editor or IDE to write your code and configuration.

  • (Recommended) Familiarity with some basic Connext concepts as described in Publish-Subscribe or Distributed Data Cache.

You can complete the module from scratch by copying and pasting the code provided in the sections below, or you can get the full code from RTI’s GitHub examples repository.

Cloning the GitHub repository

Clone the GitHub repository with the following command:

$ git clone --recurse-submodule https://github.com/rticommunity/rticonnextdds-examples.git

The code for this module is located in the tutorials/data_persistence directory. See the README.md files for additional instructions.

1. Create the applications#

For this example, you’ll create a simple temperature monitoring system. The temperature data will be persisted to disk so that subscribers can retrieve previously published values, even after the publisher has stopped.

Data type#

The data type is a simple structure that represents a temperature reading from a sensor.

If you’re developing in Python, you can directly define Temperature in a file called temperature.py:

temperature.py#
import rti.types as idl

@idl.struct(
    member_annotations={"sensor_name": [idl.key, idl.bound(100)]}
)
class Temperature:
    sensor_name: str = ""
    degrees: float = 0.0

Define the Temperature type in a file called temperature.idl:

temperature.idl#
struct Temperature {
    @key string<100> sensor_name;
    double degrees;
};

Use rtiddsgen to generate the code for your language and platform:

rtiddsgen -language [python|c++11|c++98|c#|java|c] -platform <platform_name> temperature.idl
Choosing your language and platform

The platform name depends on the language and the package you installed. The following table shows some platform names you can use:

-language

-platform

c++11, c++98, java, c

  • Linux: x64Linux4gcc7.3.0, armv8Linux4gcc7.3.0, …

  • Windows: x64Win64VS2017, …

  • macOS: x64Darwin20clang12.0, arm64Darwin20clang12.0

and more…

c#

net5, net6, net8, …

python

universal

For example, to generate code for C++11 on Linux, you would use:

$ <install dir>/bin/rtiddsgen -language c++11 -platform x64Linux4gcc7.3.0 <file>.idl

Run rtiddsgen -help for all available options.

Publisher#

The publisher application will publish random temperature readings every second.

Create a file called temperature_publisher.py:

temperature_publisher.py#
import sys
import random
from time import sleep

import rti.connextdds as dds
from temperature import Temperature

def publish_temperature(sensor_name: str):
    participant = dds.DomainParticipant(domain_id=0)
    topic = dds.Topic(participant, "WindowStatus", Temperature)
    writer = dds.DataWriter(topic)

    temp_reading = Temperature(sensor_name)
    for _ in range(1000):
        temp_reading.degrees = random.uniform(30, 40)
        writer.write(temp_reading)
        sleep(1)


if __name__ == "__main__":
    sensor_name = sys.argv[1] if len(sys.argv) > 1 else "Sensor1"
    try:
        publish_temperature(sensor_name)
    except KeyboardInterrupt:
        pass

Create a file called temperature_publisher.cxx:

temperature_publisher.cxx#
#include <iostream>
#include <thread>
#include <rti/rti.hpp>
#include "temperature.hpp"

void publish_temperature(const std::string& sensor_name)
{
    dds::domain::DomainParticipant participant(0);
    dds::topic::Topic<Temperature> topic(participant, "Temperature");
    dds::pub::DataWriter<Temperature> writer(topic);

    Temperature temp_reading;
    temp_reading.sensor_name(sensor_name);
    for (int i = 0; i < 1000; i++) {
        temp_reading.degrees((rand() % 10) + 30);
        writer.write(temp_reading);
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}

int main(int argc, char **argv)
{
    std::string sensor_name = (argc > 1) ? argv[1] : "Sensor1";
    publish_temperature(sensor_name);
}

Subscriber#

The subscriber application will print the temperature readings it receives, along with the source timestamp (the time when the sample was published).

Create a file called temperature_subscriber.py:

temperature_subscriber.py#
from datetime import datetime

import rti.connextdds as dds
import rti.asyncio  # required by take_data_async()
from temperature import Temperature


async def sensor_monitoring():
    participant = dds.DomainParticipant(domain_id=0)
    topic = dds.Topic(participant, "Temperature", Temperature)
    reader = dds.DataReader(topic)

    async for data, info in reader.take_async():
        if not info.valid:
            continue  # skip updates with only meta-data

        timestamp = datetime.fromtimestamp(info.source_timestamp.to_seconds())
        print(
            f"{data.sensor_name}: {data.degrees:.2f} degrees ({timestamp})"
        )


if __name__ == "__main__":
    try:
        rti.asyncio.run(sensor_monitoring())
    except KeyboardInterrupt:
        pass

Create a file called temperature_subscriber.cxx:

temperature_subscriber.cxx#
#include <iostream>
#include <thread>

#include "rti/rti.hpp"
#include "rti/sub/SampleProcessor.hpp"
#include "temperature.hpp"

int main(int argc, char **argv)
{
    dds::domain::DomainParticipant participant(0);
    dds::topic::Topic<Temperature> topic(participant, "Temperature");
    dds::sub::DataReader<Temperature> reader(topic);

    rti::sub::SampleProcessor sample_processor;
    sample_processor.attach_reader(
            reader,
            [](const rti::sub::LoanedSample<Temperature>& sample)
            {
                if (!sample.info().valid()) {
                    // ignore samples with only meta-data
                    return;
                }

                uint64_t timestamp =
                        sample.info().source_timestamp().to_millisecs();

                std::cout << sample.data().sensor_name() << ": "
                        << std::fixed << std::setprecision(2)
                        << sample.data().degrees() << " degrees ("
                        << timestamp / 1000.0 << "s)" << std::endl;
            });

    while (true) { // wait in a loop
        std::this_thread::sleep_for(std::chrono::seconds(4));
    }
}

2. Configure persistent durability#

In the same directory as your applications, create a file called USER_QOS_PROFILES.xml with the following content:

USER_QOS_PROFILES.xml#
<?xml version="1.0" encoding="UTF-8"?>
<dds xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:noNamespaceSchemaLocation="https://community.rti.com/schema/current/rti_dds_profiles.xsd">
    <qos_library name="MyLibrary">
        <qos_profile name="Persistence" is_default_qos="true">
            <base_name>
                <element>BuiltinQosSnippetLib::QosPolicy.Durability.Persistent</element>
                <element>BuiltinQosSnippetLib::QosPolicy.Reliability.Reliable</element>
            </base_name>

            <datawriter_qos>
                <history>
                    <kind>KEEP_LAST_HISTORY_QOS</kind>
                    <depth>10</depth>
                </history>
            </datawriter_qos>
            <datareader_qos>
                <history>
                    <kind>KEEP_LAST_HISTORY_QOS</kind>
                    <depth>10</depth>
                </history>
            </datareader_qos>
        </qos_profile>
    </qos_library>
</dds>

The profile sets is_default_qos="true" so that applications use it by default. The profile then configures Durability, History (to keep the last 10 updates per sensor name) and Reliability QoS policies for the DataWriters and DataReaders created by the applications.

You can persist all of the history by changing te history kind to KEEP_ALL_HISTORY_QOS.

3. Run Persistence Service#

On a new terminal, run Persistence Service as follows:

<install dir>/bin/rtipersistenceservice -cfgName defaultDisk

This command runs Persistence Service with a built-in default configuration, defaultDisk, that persists all Topics to disk.

4. Run the applications#

With Persistence Service running, launch a publisher for a few seconds, kill it, and then run a subscriber.

Run the publisher application:

python temperature_publisher.py Sensor1

After about ten seconds, kill the publisher (Ctrl+C).

Run the subscriber application:

python temperature_subscriber.py

The subscriber will receive from Persistence Service the data that was published before the publisher stopped.

Sensor1: 38.94 degrees (2024-05-02 15:03:13.143907)
Sensor1: 39.60 degrees (2024-05-02 15:03:13.906925)
Sensor1: 37.34 degrees (2024-05-02 15:03:14.911064)
Sensor1: 37.80 degrees (2024-05-02 15:03:15.916752)
Sensor1: 30.09 degrees (2024-05-02 15:03:16.920809)
Sensor1: 31.31 degrees (2024-05-02 15:03:17.921355)
Sensor1: 38.15 degrees (2024-05-02 15:03:18.926678)
Sensor1: 38.69 degrees (2024-05-02 15:03:19.944812)
Sensor1: 38.77 degrees (2024-05-02 15:03:20.951795)
Sensor1: 33.13 degrees (2024-05-02 15:03:21.953126)

Run the publisher application:

./objs/<platform>/temperature_publisher Sensor1

After about ten seconds, kill the publisher (Ctrl+C).

Run the subscriber application:

./objs/<platform>/temperature_subscriber

The subscriber will receive from the Persistence Service the data published before the publisher stopped.

Sensor1: 37.00 degrees (1716806062.65s)
Sensor1: 35.00 degrees (1716806063.65s)
Sensor1: 34.00 degrees (1716806064.65s)
Sensor1: 34.00 degrees (1716806065.65s)
Sensor1: 35.00 degrees (1716806066.65s)
Sensor1: 30.00 degrees (1716806067.65s)
Sensor1: 35.00 degrees (1716806068.65s)
Sensor1: 34.00 degrees (1716806069.65s)
Sensor1: 32.00 degrees (1716806070.65s)
Sensor1: 38.00 degrees (1716806071.65s)

Kill and restart the subscriber. It will receive the data again.

Kill and rerun Persistence Service. Since the data is persisted to disk, the subscriber will still receive the data.

Troubleshooting

In case of errors building or running your application, make sure you have set up your environment and license file.

For instructions, go to Get Started, select your platform and installation method, then find the section Run a Hello World.

5. Save the reader state across restarts#

To achieve fault tolerance, when the subscriber restarts (for example, after crashing), you may want it to continue where it left off, instead of receiving all the data again from Persistence Service.

To achieve this, each subscriber can save its own state to disk and restore it when it restarts, remembering the last sample it received. Connext will only deliver samples that the subscriber didn’t receive before.

You’ll enable durable reader state by setting the storage settings of the Durability QoS policy. Since each subscriber will have its own state, you’ll pass a subscriber name to the application, and use it to specify the file name where the state will be saved.

Copy temperature_subscriber.py to a new file called temperature_durable_subscriber.py.

Get the base configuration from the previously used USER_QOS_PROFILES.xml QoS profile and configure the durability storage settings:

reader_qos = dds.QosProvider.default.datareader_qos_from_profile(
    "MyLibrary::Persistence"
)
reader_qos.durability.storage_settings.enable = True
reader_qos.durability.storage_settings.file_name = subscriber_name

reader = dds.DataReader(topic, reader_qos)

Here is the full code for the subscriber, where we also get the subscriber_name as a command-line argument:

temperature_durable_subscriber.py#
import sys
from datetime import datetime
import rti.connextdds as dds
import rti.asyncio
from temperature import Temperature


async def sensor_monitoring(subscriber_name: str):
    participant = dds.DomainParticipant(domain_id=0)
    topic = dds.Topic(participant, "Temperature", Temperature)

    reader_qos = dds.QosProvider.default.datareader_qos_from_profile(
        "MyLibrary::Persistence"
    )
    reader_qos.durability.storage_settings.enable = True
    reader_qos.durability.storage_settings.file_name = subscriber_name

    reader = dds.DataReader(topic, reader_qos)

    async for data, info in reader.take_async():
        if not info.valid:
            continue

        timestamp = datetime.fromtimestamp(info.source_timestamp.to_seconds())
        print(
            f"{data.sensor_name}: {data.degrees:.2f} degrees ({timestamp})"
        )


if __name__ == "__main__":
    subscriber_name = sys.argv[1] if len(sys.argv) > 1 else "subscriber1"
    try:
        rti.asyncio.run(sensor_monitoring(subscriber_name))
    except KeyboardInterrupt:
        pass

Using durability storage settings requires setting up the Connext environment to load an additional library.

Setting up the environment for Connext

Use the following script to configure your shell environment variables to run Connext executables and load dynamic libraries.

If you’re using Bash, run this:

$ source <installdir>/resource/scripts/rtisetenv_<architecture>.bash

If you’re using the Z shell, run this:

$ source <installdir>/resource/scripts/rtisetenv_<architecture>.zsh

If you’re using Bash, run this:

$ source <installdir>/resource/scripts/rtisetenv_<architecture>.bash

If you’re using the Z shell, run this:

$ source <installdir>/resource/scripts/rtisetenv_<architecture>.zsh

If you’re using the tcsh shell, run this:

$ source <installdir>/resource/scripts/rtisetenv_<architecture>.tcsh
> <installdir>\resource\scripts\rtisetenv_<architecture>.bat

When a directory name has a space, enclose the path in quotation marks. For example: "C:\Program Files\rti_connext_dds-<version>\resource\scripts\rtisetenv_x64Win64VS2017.bat".

<installdir> refers to the installation directory for Connext.

Run the new subscriber:

python temperature_durable_subscriber.py sub1

First, it will receive all the data.

Sensor1: 38.94 degrees (2024-05-02 15:03:13.143907)
Sensor1: 39.60 degrees (2024-05-02 15:03:13.906925)
...
Sensor1: 33.13 degrees (2024-05-02 15:03:21.953126)

Kill the subscriber and run a publisher:

python temperature_publisher.py Sensor2

After a few seconds, kill the publisher and re-run the subscriber:

python temperature_durable_subscriber.py sub1

The subscriber now receives only the data it didn’t see before being killed.

Sensor2: 31.02 degrees (2024-05-03 10:00:10.430114)
Sensor2: 39.10 degrees (2024-05-03 10:00:11.023483)
Sensor2: 36.26 degrees (2024-05-03 10:00:12.028700)
Sensor2: 39.31 degrees (2024-05-03 10:00:13.033077)

Now run a subscriber with a different input argument, sub2:

python temperature_durable_subscriber.py sub2

It will receive all the data, since it has a different state.

Copy temperature_subscriber.cxx to a new file called temperature_durable_subscriber.cxx.

Get the base configuration from the previously used USER_QOS_PROFILES.xml QoS profile and configure the durability storage settings:

dds::sub::qos::DataReaderQos reader_qos {
        dds::core::QosProvider::Default().datareader_qos("MyLibrary::Persistence")
};

reader_qos.policy<dds::core::policy::Durability>().extensions()
        .storage_settings().enable(true);
reader_qos.policy<dds::core::policy::Durability>().extensions()
        .storage_settings().file_name(subscriber_name);

dds::sub::DataReader<Temperature> reader(topic, reader_qos);

Here is the full code for the subscriber, where we also get the subscriber_name as a command-line argument:

temperature_durable_subscriber.cxx#
#include <iostream>
#include <thread>

#include "rti/rti.hpp"
#include "rti/sub/SampleProcessor.hpp"
#include "temperature.hpp"


void sensor_monitoring(std::string subscriber_name)
{
    dds::domain::DomainParticipant participant(0);
    dds::topic::Topic<Temperature> topic(participant, "Temperature");

    dds::sub::qos::DataReaderQos reader_qos {
            dds::core::QosProvider::Default().datareader_qos("MyLibrary::Persistence")
    };

    reader_qos.policy<dds::core::policy::Durability>().extensions()
            .storage_settings().enable(true);
    reader_qos.policy<dds::core::policy::Durability>().extensions()
            .storage_settings().file_name(subscriber_name);

    dds::sub::DataReader<Temperature> reader(topic, reader_qos);

    rti::sub::SampleProcessor sample_processor;
    sample_processor.attach_reader(
            reader,
            [](const rti::sub::LoanedSample<Temperature>& sample)
            {
                if (!sample.info().valid()) {
                    // ignore samples with only meta-data
                    return;
                }

                uint64_t timestamp =
                        sample.info().source_timestamp().to_millisecs();

                std::cout << sample.data().sensor_name() << ": "
                        << std::fixed << std::setprecision(2)
                        << sample.data().degrees() << " degrees ("
                        << timestamp / 1000.0 << "s)" << std::endl;
            });

    while (true) { // wait in a loop
        std::this_thread::sleep_for(std::chrono::seconds(4));
    }
}

int main(int argc, char **argv)
{
    std::string subscriber_name = (argc > 1) ? argv[1] : "Subscriber1";
    sensor_monitoring(subscriber_name);
}

Using the durability storage settings requires setting up the Connext environment to load an additional library.

Setting up the environment for Connext

Use the following script to configure your shell environment variables to run Connext executables and load dynamic libraries.

If you’re using Bash, run this:

$ source <installdir>/resource/scripts/rtisetenv_<architecture>.bash

If you’re using the Z shell, run this:

$ source <installdir>/resource/scripts/rtisetenv_<architecture>.zsh

If you’re using Bash, run this:

$ source <installdir>/resource/scripts/rtisetenv_<architecture>.bash

If you’re using the Z shell, run this:

$ source <installdir>/resource/scripts/rtisetenv_<architecture>.zsh

If you’re using the tcsh shell, run this:

$ source <installdir>/resource/scripts/rtisetenv_<architecture>.tcsh
> <installdir>\resource\scripts\rtisetenv_<architecture>.bat

When a directory name has a space, enclose the path in quotation marks. For example: "C:\Program Files\rti_connext_dds-<version>\resource\scripts\rtisetenv_x64Win64VS2017.bat".

<installdir> refers to the installation directory for Connext.

Run the new subscriber:

./objs/<platform>/temperature_durable_subscriber sub1

First, it will receive all the data.

Sensor1: 37.00 degrees (1716806062.65s)
Sensor1: 35.00 degrees (1716806063.65s)
...
Sensor1: 38.00 degrees (1716806071.65s)

Kill the subscriber and run a publisher:

./objs/<platform>/temperature_publisher Sensor2

After a few seconds, kill the publisher and re-run the subscriber:

./objs/<platform>/temperature_durable_subscriber sub1

The subscriber now receives only the data it didn’t see before being killed.

Sensor2: 36.00 degrees (1716808538.82s)
Sensor2: 30.00 degrees (1716808539.82s)
Sensor2: 30.00 degrees (1716808540.82s)
Sensor2: 31.00 degrees (1716808541.82s)

Now run a subscriber with a different input argument, sub2:

./objs/<platform>/temperature_durable_subscriber sub2

It will receive all the data, since it has a different state.

Optional: persist the data without Persistence Service#

Connext also allows publishers to save their own data to disk locally, without the need of a Persistence Service. This functionality allows publishers to restore their history when they restart.

Enable durable writer history (optional)

To enable durable writer history, you simply need to configure the durability storage settings on the publisher side, the same way you did for the durable reader state on the subscriber side.

Copy temperature_publisher.py to a new file called temperature_durable_publisher.py and modify it to configure the durability storage settings:

writer_qos = dds.QosProvider.default.datawriter_qos_from_profile(
    "MyLibrary::Persistence"
)
writer_qos.durability.storage_settings.enable = True
writer_qos.durability.storage_settings.file_name = sensor_name

writer = dds.DataWriter(topic, writer_qos)

You can find the full code for the publisher on the GitHub repository: temperature_durable_publisher.py.

Without Persistence Service running, start the new publisher:

python temperature_durable_publisher.py Sensor3

After a few seconds, kill the publisher and, on a different terminal, run the subscriber:

python temperature_subscriber.py

The subscriber won’t receive any data, because Persistence Service is not running.

Re-run the publisher:

python temperature_durable_publisher.py Sensor3

The subscriber now receives the data published before the publisher stopped, as well as new updates.

Copy temperature_publisher.cxx to a new file called temperature_durable_publisher.cxx and modify it to configure the durability storage settings:

dds::pub::qos::DataWriterQos writer_qos {
        dds::core::QosProvider::Default().datawriter_qos("MyLibrary::Persistence")
};

writer_qos.policy<dds::core::policy::Durability>().extensions()
        .storage_settings().enable(true);
writer_qos.policy<dds::core::policy::Durability>().extensions()
        .storage_settings().file_name(sensor_name);

dds::pub::DataWriter<Temperature> writer(topic, writer_qos);

You can find the full code for the publisher on the GitHub repository: temperature_durable_publisher.cxx.

Without Persistence Service running, start the new publisher:

./objs/<platform>/temperature_durable_publisher Sensor3

After a few seconds kill the publisher and, on a different terminal, run the subscriber:

./objs/<platform>/temperature_subscriber

The subscriber won’t receive any data, because Persistence Service is not running.

Re-run the publisher:

./objs/<platform>/temperature_durable_publisher Sensor3

The subscriber now receives the data published before the publisher stopped, as well as new updates.

Learn more#

This module showed how to persist the data you publish to disk with Persistence Service to achieve fault tolerance and to persist the data beyond the life of their publishers.

You used a built-in configuration that persists all Topics to disk. Persistence Service can read a configuration file to specify which Topics to persist in memory or to disk, among other settings.

Connext also provides Recording Service, which can record and replay data for offline analysis, testing, or debugging.

Next Steps

Was this page helpful?

Back to Learn