Data Persistence#
Introduction#
What you’ll learn
In this module, you’ll learn how to persist Topics to disk so that subscribers can receive data after the publishers have stopped.
You will do the following:
Run Persistence Service to persist all Topics to disk
Run a subscriber that receives data from publishers that are no longer running
Save the subscriber state across restarts to receive only data it has not seen before
Persisting data serves two purposes:
It allows new subscribers to have access to data even after the original publishers have stopped.
It provides fault tolerance by allowing publishers and subscribers to recover from crashes or restarts.
The Last-Value Cache module shows how each publisher can keep data in memory for late-joining subscribers. In this module, you’ll persist data to disk with a Persistence Service.
How to complete this module#
To complete this module you’ll need the following:
20-30 minutes
A Connext installation (full installation). See Get Started.
A text editor or IDE to write your code and configuration.
(Recommended) Familiarity with some basic Connext concepts as described in Publish-Subscribe or Distributed Data Cache.
You can complete the module from scratch by copying and pasting the code provided in the sections below, or you can get the full code from RTI’s GitHub examples repository.
Cloning the GitHub repository
Clone the GitHub repository with the following command:
$ git clone --recurse-submodule https://github.com/rticommunity/rticonnextdds-examples.git
The code for this module is located in the tutorials/data_persistence
directory. See the README.md
files for additional instructions.
1. Create the applications#
For this example, you’ll create a simple temperature monitoring system. The temperature data will be persisted to disk so that subscribers can retrieve previously published values, even after the publisher has stopped.
Data type#
The data type is a simple structure that represents a temperature reading from a sensor.
If you’re developing in Python, you can directly define Temperature
in a file called temperature.py
:
import rti.types as idl
@idl.struct(
member_annotations={"sensor_name": [idl.key, idl.bound(100)]}
)
class Temperature:
sensor_name: str = ""
degrees: float = 0.0
Define the Temperature
type in a file called temperature.idl
:
struct Temperature {
@key string<100> sensor_name;
double degrees;
};
Use rtiddsgen to generate the code for your language and platform:
rtiddsgen -language [python|c++11|c++98|c#|java|c] -platform <platform_name> temperature.idl
Choosing your language and platform
The platform name depends on the language and the package you installed. The following table shows some platform names you can use:
|
|
---|---|
|
and more… |
|
|
|
|
For example, to generate code for C++11 on Linux, you would use:
$ <install dir>/bin/rtiddsgen -language c++11 -platform x64Linux4gcc7.3.0 <file>.idl
Run rtiddsgen -help
for all available options.
Publisher#
The publisher application will publish random temperature readings every second.
Create a file called temperature_publisher.py
:
import sys
import random
from time import sleep
import rti.connextdds as dds
from temperature import Temperature
def publish_temperature(sensor_name: str):
participant = dds.DomainParticipant(domain_id=0)
topic = dds.Topic(participant, "WindowStatus", Temperature)
writer = dds.DataWriter(topic)
temp_reading = Temperature(sensor_name)
for _ in range(1000):
temp_reading.degrees = random.uniform(30, 40)
writer.write(temp_reading)
sleep(1)
if __name__ == "__main__":
sensor_name = sys.argv[1] if len(sys.argv) > 1 else "Sensor1"
try:
publish_temperature(sensor_name)
except KeyboardInterrupt:
pass
Create a file called temperature_publisher.cxx
:
#include <iostream>
#include <thread>
#include <rti/rti.hpp>
#include "temperature.hpp"
void publish_temperature(const std::string& sensor_name)
{
dds::domain::DomainParticipant participant(0);
dds::topic::Topic<Temperature> topic(participant, "Temperature");
dds::pub::DataWriter<Temperature> writer(topic);
Temperature temp_reading;
temp_reading.sensor_name(sensor_name);
for (int i = 0; i < 1000; i++) {
temp_reading.degrees((rand() % 10) + 30);
writer.write(temp_reading);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
int main(int argc, char **argv)
{
std::string sensor_name = (argc > 1) ? argv[1] : "Sensor1";
publish_temperature(sensor_name);
}
Subscriber#
The subscriber application will print the temperature readings it receives, along with the source timestamp (the time when the sample was published).
Create a file called temperature_subscriber.py
:
from datetime import datetime
import rti.connextdds as dds
import rti.asyncio # required by take_data_async()
from temperature import Temperature
async def sensor_monitoring():
participant = dds.DomainParticipant(domain_id=0)
topic = dds.Topic(participant, "Temperature", Temperature)
reader = dds.DataReader(topic)
async for data, info in reader.take_async():
if not info.valid:
continue # skip updates with only meta-data
timestamp = datetime.fromtimestamp(info.source_timestamp.to_seconds())
print(
f"{data.sensor_name}: {data.degrees:.2f} degrees ({timestamp})"
)
if __name__ == "__main__":
try:
rti.asyncio.run(sensor_monitoring())
except KeyboardInterrupt:
pass
Create a file called temperature_subscriber.cxx
:
#include <iostream>
#include <thread>
#include "rti/rti.hpp"
#include "rti/sub/SampleProcessor.hpp"
#include "temperature.hpp"
int main(int argc, char **argv)
{
dds::domain::DomainParticipant participant(0);
dds::topic::Topic<Temperature> topic(participant, "Temperature");
dds::sub::DataReader<Temperature> reader(topic);
rti::sub::SampleProcessor sample_processor;
sample_processor.attach_reader(
reader,
[](const rti::sub::LoanedSample<Temperature>& sample)
{
if (!sample.info().valid()) {
// ignore samples with only meta-data
return;
}
uint64_t timestamp =
sample.info().source_timestamp().to_millisecs();
std::cout << sample.data().sensor_name() << ": "
<< std::fixed << std::setprecision(2)
<< sample.data().degrees() << " degrees ("
<< timestamp / 1000.0 << "s)" << std::endl;
});
while (true) { // wait in a loop
std::this_thread::sleep_for(std::chrono::seconds(4));
}
}
2. Configure persistent durability#
In the same directory as your applications, create a file called
USER_QOS_PROFILES.xml
with the following content:
<?xml version="1.0" encoding="UTF-8"?>
<dds xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="https://community.rti.com/schema/current/rti_dds_profiles.xsd">
<qos_library name="MyLibrary">
<qos_profile name="Persistence" is_default_qos="true">
<base_name>
<element>BuiltinQosSnippetLib::QosPolicy.Durability.Persistent</element>
<element>BuiltinQosSnippetLib::QosPolicy.Reliability.Reliable</element>
</base_name>
<datawriter_qos>
<history>
<kind>KEEP_LAST_HISTORY_QOS</kind>
<depth>10</depth>
</history>
</datawriter_qos>
<datareader_qos>
<history>
<kind>KEEP_LAST_HISTORY_QOS</kind>
<depth>10</depth>
</history>
</datareader_qos>
</qos_profile>
</qos_library>
</dds>
The profile sets is_default_qos="true"
so that applications use it by
default. The profile then configures Durability, History (to keep the
last 10 updates per sensor name) and Reliability QoS policies
for the DataWriters and DataReaders created by the applications.
You can persist all of the history by changing te history kind to
KEEP_ALL_HISTORY_QOS
.
3. Run Persistence Service#
On a new terminal, run Persistence Service as follows:
<install dir>/bin/rtipersistenceservice -cfgName defaultDisk
This command runs Persistence Service with a built-in default configuration,
defaultDisk
, that persists all Topics to disk.
4. Run the applications#
With Persistence Service running, launch a publisher for a few seconds, kill it, and then run a subscriber.
Run the publisher application:
python temperature_publisher.py Sensor1
After about ten seconds, kill the publisher (Ctrl+C).
Run the subscriber application:
python temperature_subscriber.py
The subscriber will receive from Persistence Service the data that was published before the publisher stopped.
Sensor1: 38.94 degrees (2024-05-02 15:03:13.143907)
Sensor1: 39.60 degrees (2024-05-02 15:03:13.906925)
Sensor1: 37.34 degrees (2024-05-02 15:03:14.911064)
Sensor1: 37.80 degrees (2024-05-02 15:03:15.916752)
Sensor1: 30.09 degrees (2024-05-02 15:03:16.920809)
Sensor1: 31.31 degrees (2024-05-02 15:03:17.921355)
Sensor1: 38.15 degrees (2024-05-02 15:03:18.926678)
Sensor1: 38.69 degrees (2024-05-02 15:03:19.944812)
Sensor1: 38.77 degrees (2024-05-02 15:03:20.951795)
Sensor1: 33.13 degrees (2024-05-02 15:03:21.953126)
Run the publisher application:
./objs/<platform>/temperature_publisher Sensor1
After about ten seconds, kill the publisher (Ctrl+C).
Run the subscriber application:
./objs/<platform>/temperature_subscriber
The subscriber will receive from the Persistence Service the data published before the publisher stopped.
Sensor1: 37.00 degrees (1716806062.65s)
Sensor1: 35.00 degrees (1716806063.65s)
Sensor1: 34.00 degrees (1716806064.65s)
Sensor1: 34.00 degrees (1716806065.65s)
Sensor1: 35.00 degrees (1716806066.65s)
Sensor1: 30.00 degrees (1716806067.65s)
Sensor1: 35.00 degrees (1716806068.65s)
Sensor1: 34.00 degrees (1716806069.65s)
Sensor1: 32.00 degrees (1716806070.65s)
Sensor1: 38.00 degrees (1716806071.65s)
Kill and restart the subscriber. It will receive the data again.
Kill and rerun Persistence Service. Since the data is persisted to disk, the subscriber will still receive the data.
Troubleshooting
In case of errors building or running your application, make sure you have set up your environment and license file.
For instructions, go to Get Started, select your platform and installation method, then find the section Run a Hello World.
5. Save the reader state across restarts#
To achieve fault tolerance, when the subscriber restarts (for example, after crashing), you may want it to continue where it left off, instead of receiving all the data again from Persistence Service.
To achieve this, each subscriber can save its own state to disk and restore it when it restarts, remembering the last sample it received. Connext will only deliver samples that the subscriber didn’t receive before.
You’ll enable durable reader state by setting the storage settings of the Durability QoS policy. Since each subscriber will have its own state, you’ll pass a subscriber name to the application, and use it to specify the file name where the state will be saved.
Copy temperature_subscriber.py
to a new file called
temperature_durable_subscriber.py
.
Get the base configuration from the previously used USER_QOS_PROFILES.xml
QoS profile and configure the durability storage settings:
reader_qos = dds.QosProvider.default.datareader_qos_from_profile(
"MyLibrary::Persistence"
)
reader_qos.durability.storage_settings.enable = True
reader_qos.durability.storage_settings.file_name = subscriber_name
reader = dds.DataReader(topic, reader_qos)
Here is the full code for the subscriber, where we also get the
subscriber_name
as a command-line argument:
import sys
from datetime import datetime
import rti.connextdds as dds
import rti.asyncio
from temperature import Temperature
async def sensor_monitoring(subscriber_name: str):
participant = dds.DomainParticipant(domain_id=0)
topic = dds.Topic(participant, "Temperature", Temperature)
reader_qos = dds.QosProvider.default.datareader_qos_from_profile(
"MyLibrary::Persistence"
)
reader_qos.durability.storage_settings.enable = True
reader_qos.durability.storage_settings.file_name = subscriber_name
reader = dds.DataReader(topic, reader_qos)
async for data, info in reader.take_async():
if not info.valid:
continue
timestamp = datetime.fromtimestamp(info.source_timestamp.to_seconds())
print(
f"{data.sensor_name}: {data.degrees:.2f} degrees ({timestamp})"
)
if __name__ == "__main__":
subscriber_name = sys.argv[1] if len(sys.argv) > 1 else "subscriber1"
try:
rti.asyncio.run(sensor_monitoring(subscriber_name))
except KeyboardInterrupt:
pass
Using durability storage settings requires setting up the Connext environment to load an additional library.
Setting up the environment for Connext
Use the following script to configure your shell environment variables to run Connext executables and load dynamic libraries.
If you’re using Bash, run this:
$ source <installdir>/resource/scripts/rtisetenv_<architecture>.bash
If you’re using the Z shell, run this:
$ source <installdir>/resource/scripts/rtisetenv_<architecture>.zsh
If you’re using Bash, run this:
$ source <installdir>/resource/scripts/rtisetenv_<architecture>.bash
If you’re using the Z shell, run this:
$ source <installdir>/resource/scripts/rtisetenv_<architecture>.zsh
If you’re using the tcsh shell, run this:
$ source <installdir>/resource/scripts/rtisetenv_<architecture>.tcsh
> <installdir>\resource\scripts\rtisetenv_<architecture>.bat
When a directory name has a space, enclose the path in quotation marks.
For example: "C:\Program Files\rti_connext_dds-<version>\resource\scripts\rtisetenv_x64Win64VS2017.bat"
.
<installdir>
refers to the installation directory for Connext.
Run the new subscriber:
python temperature_durable_subscriber.py sub1
First, it will receive all the data.
Sensor1: 38.94 degrees (2024-05-02 15:03:13.143907)
Sensor1: 39.60 degrees (2024-05-02 15:03:13.906925)
...
Sensor1: 33.13 degrees (2024-05-02 15:03:21.953126)
Kill the subscriber and run a publisher:
python temperature_publisher.py Sensor2
After a few seconds, kill the publisher and re-run the subscriber:
python temperature_durable_subscriber.py sub1
The subscriber now receives only the data it didn’t see before being killed.
Sensor2: 31.02 degrees (2024-05-03 10:00:10.430114)
Sensor2: 39.10 degrees (2024-05-03 10:00:11.023483)
Sensor2: 36.26 degrees (2024-05-03 10:00:12.028700)
Sensor2: 39.31 degrees (2024-05-03 10:00:13.033077)
Now run a subscriber with a different input argument, sub2
:
python temperature_durable_subscriber.py sub2
It will receive all the data, since it has a different state.
Copy temperature_subscriber.cxx
to a new file called
temperature_durable_subscriber.cxx
.
Get the base configuration from the previously used USER_QOS_PROFILES.xml
QoS profile and configure the durability storage settings:
dds::sub::qos::DataReaderQos reader_qos {
dds::core::QosProvider::Default().datareader_qos("MyLibrary::Persistence")
};
reader_qos.policy<dds::core::policy::Durability>().extensions()
.storage_settings().enable(true);
reader_qos.policy<dds::core::policy::Durability>().extensions()
.storage_settings().file_name(subscriber_name);
dds::sub::DataReader<Temperature> reader(topic, reader_qos);
Here is the full code for the subscriber, where we also get the
subscriber_name
as a command-line argument:
#include <iostream>
#include <thread>
#include "rti/rti.hpp"
#include "rti/sub/SampleProcessor.hpp"
#include "temperature.hpp"
void sensor_monitoring(std::string subscriber_name)
{
dds::domain::DomainParticipant participant(0);
dds::topic::Topic<Temperature> topic(participant, "Temperature");
dds::sub::qos::DataReaderQos reader_qos {
dds::core::QosProvider::Default().datareader_qos("MyLibrary::Persistence")
};
reader_qos.policy<dds::core::policy::Durability>().extensions()
.storage_settings().enable(true);
reader_qos.policy<dds::core::policy::Durability>().extensions()
.storage_settings().file_name(subscriber_name);
dds::sub::DataReader<Temperature> reader(topic, reader_qos);
rti::sub::SampleProcessor sample_processor;
sample_processor.attach_reader(
reader,
[](const rti::sub::LoanedSample<Temperature>& sample)
{
if (!sample.info().valid()) {
// ignore samples with only meta-data
return;
}
uint64_t timestamp =
sample.info().source_timestamp().to_millisecs();
std::cout << sample.data().sensor_name() << ": "
<< std::fixed << std::setprecision(2)
<< sample.data().degrees() << " degrees ("
<< timestamp / 1000.0 << "s)" << std::endl;
});
while (true) { // wait in a loop
std::this_thread::sleep_for(std::chrono::seconds(4));
}
}
int main(int argc, char **argv)
{
std::string subscriber_name = (argc > 1) ? argv[1] : "Subscriber1";
sensor_monitoring(subscriber_name);
}
Using the durability storage settings requires setting up the Connext environment to load an additional library.
Setting up the environment for Connext
Use the following script to configure your shell environment variables to run Connext executables and load dynamic libraries.
If you’re using Bash, run this:
$ source <installdir>/resource/scripts/rtisetenv_<architecture>.bash
If you’re using the Z shell, run this:
$ source <installdir>/resource/scripts/rtisetenv_<architecture>.zsh
If you’re using Bash, run this:
$ source <installdir>/resource/scripts/rtisetenv_<architecture>.bash
If you’re using the Z shell, run this:
$ source <installdir>/resource/scripts/rtisetenv_<architecture>.zsh
If you’re using the tcsh shell, run this:
$ source <installdir>/resource/scripts/rtisetenv_<architecture>.tcsh
> <installdir>\resource\scripts\rtisetenv_<architecture>.bat
When a directory name has a space, enclose the path in quotation marks.
For example: "C:\Program Files\rti_connext_dds-<version>\resource\scripts\rtisetenv_x64Win64VS2017.bat"
.
<installdir>
refers to the installation directory for Connext.
Run the new subscriber:
./objs/<platform>/temperature_durable_subscriber sub1
First, it will receive all the data.
Sensor1: 37.00 degrees (1716806062.65s)
Sensor1: 35.00 degrees (1716806063.65s)
...
Sensor1: 38.00 degrees (1716806071.65s)
Kill the subscriber and run a publisher:
./objs/<platform>/temperature_publisher Sensor2
After a few seconds, kill the publisher and re-run the subscriber:
./objs/<platform>/temperature_durable_subscriber sub1
The subscriber now receives only the data it didn’t see before being killed.
Sensor2: 36.00 degrees (1716808538.82s)
Sensor2: 30.00 degrees (1716808539.82s)
Sensor2: 30.00 degrees (1716808540.82s)
Sensor2: 31.00 degrees (1716808541.82s)
Now run a subscriber with a different input argument, sub2
:
./objs/<platform>/temperature_durable_subscriber sub2
It will receive all the data, since it has a different state.
Optional: persist the data without Persistence Service#
Connext also allows publishers to save their own data to disk locally, without the need of a Persistence Service. This functionality allows publishers to restore their history when they restart.
Enable durable writer history (optional)
To enable durable writer history, you simply need to configure the durability storage settings on the publisher side, the same way you did for the durable reader state on the subscriber side.
Copy temperature_publisher.py
to a new file called
temperature_durable_publisher.py
and modify it to
configure the durability storage settings:
writer_qos = dds.QosProvider.default.datawriter_qos_from_profile(
"MyLibrary::Persistence"
)
writer_qos.durability.storage_settings.enable = True
writer_qos.durability.storage_settings.file_name = sensor_name
writer = dds.DataWriter(topic, writer_qos)
You can find the full code for the publisher on the GitHub repository: temperature_durable_publisher.py.
Without Persistence Service running, start the new publisher:
python temperature_durable_publisher.py Sensor3
After a few seconds, kill the publisher and, on a different terminal, run the subscriber:
python temperature_subscriber.py
The subscriber won’t receive any data, because Persistence Service is not running.
Re-run the publisher:
python temperature_durable_publisher.py Sensor3
The subscriber now receives the data published before the publisher stopped, as well as new updates.
Copy temperature_publisher.cxx
to a new file called
temperature_durable_publisher.cxx
and modify it to
configure the durability storage settings:
dds::pub::qos::DataWriterQos writer_qos {
dds::core::QosProvider::Default().datawriter_qos("MyLibrary::Persistence")
};
writer_qos.policy<dds::core::policy::Durability>().extensions()
.storage_settings().enable(true);
writer_qos.policy<dds::core::policy::Durability>().extensions()
.storage_settings().file_name(sensor_name);
dds::pub::DataWriter<Temperature> writer(topic, writer_qos);
You can find the full code for the publisher on the GitHub repository: temperature_durable_publisher.cxx.
Without Persistence Service running, start the new publisher:
./objs/<platform>/temperature_durable_publisher Sensor3
After a few seconds kill the publisher and, on a different terminal, run the subscriber:
./objs/<platform>/temperature_subscriber
The subscriber won’t receive any data, because Persistence Service is not running.
Re-run the publisher:
./objs/<platform>/temperature_durable_publisher Sensor3
The subscriber now receives the data published before the publisher stopped, as well as new updates.
Learn more#
This module showed how to persist the data you publish to disk with Persistence Service to achieve fault tolerance and to persist the data beyond the life of their publishers.
You used a built-in configuration that persists all Topics to disk. Persistence Service can read a configuration file to specify which Topics to persist in memory or to disk, among other settings.
Connext also provides Recording Service, which can record and replay data for offline analysis, testing, or debugging.
Next Steps
Reference documentation:
Further reading: