Reliable message delivery without persistence service

7 posts / 0 new
Last post
Offline
Last seen: 4 years 8 months ago
Joined: 05/21/2014
Posts: 46
Reliable message delivery without persistence service

Hi, is there a way to do reliable messaging such as sending a message if a device joins later, resedning a message if a device goes offline, or sending a message again if a device doesn't recieve it for a set amount of time without using the RTI persistance service?

rip
rip's picture
Offline
Last seen: 23 hours 36 min ago
Joined: 04/06/2012
Posts: 324

Persistance (Durability) and Reliability are two different QoS settings.  Durability > VOLATILE requires Reliability == RELIABLE, but otherwise they are independant of each other.

Durability:  Late Joiner Behavior.

Reliability:  Reliability protocol on top of un-reliable transports.

"Sending a message if a device joins later" is Durability.  Without a persistance server, you only have TRANSIENT_LOCAL (ie, so long as the original writer is alive, data will be sent to late joiner readers also with Durability == TRANSIENT_LOCAL).

"Resending a message if a device goes offline" is Reliability, if the assumption is the device comes back before its Discovery data is purged by the Reliable Writer's participant.  If it returns before the purge, the reliable data will be resent in response to a NACK.  If it returns after the purge, it is treated as a new reader and Durability takes over (see the previous paragraph for behavior in this case).

"Sending a message again if [it isn't received]" is Reliability, the period is controlled by heartbeats and a bunch of Quality of Service fields governing resend windows, etc.

The only time you need a persistance service is when Durability is TRANSIENT or PERSISTANT, which require Reliabilty == RELIABLE.

 

Offline
Last seen: 4 years 8 months ago
Joined: 05/21/2014
Posts: 46

I have been having some trouble getting an example program to work and keep getting inoncistant writer. Havent worked on the reader yet. May you take a look at the code to see what could cause this?

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import com.rti.dds.domain.DomainParticipant;
import com.rti.dds.domain.DomainParticipantFactory;
import com.rti.dds.infrastructure.DurabilityQosPolicyKind;
import com.rti.dds.infrastructure.HistoryQosPolicyKind;
import com.rti.dds.infrastructure.InstanceHandle_t;
import com.rti.dds.infrastructure.RETCODE_ERROR;
import com.rti.dds.infrastructure.ReliabilityQosPolicy;
import com.rti.dds.infrastructure.ReliabilityQosPolicyKind;
import com.rti.dds.infrastructure.ResourceLimitsQosPolicy;
import com.rti.dds.infrastructure.StatusKind;
import com.rti.dds.publication.DataWriterQos;
import com.rti.dds.publication.Publisher;
import com.rti.dds.topic.Topic;
import com.rti.dds.type.builtin.StringDataWriter;
import com.rti.dds.type.builtin.StringTypeSupport;

//****************************************************************************
public class HelloPublisher {
public static final void main(String[] args) {
// Create the DDS Domain participant on domain ID 0
DomainParticipant participant = DomainParticipantFactory.get_instance().create_participant(
0, // Domain ID = 0
DomainParticipantFactory.PARTICIPANT_QOS_DEFAULT,
null, // listener
StatusKind.STATUS_MASK_NONE);
if (participant == null) {
System.err.println("Unable to create domain participant");
return;
}

// Create the topic "Hello World" for the String type
Topic topic = participant.create_topic(
"Hello, World",
StringTypeSupport.get_type_name(),
DomainParticipant.TOPIC_QOS_DEFAULT,
null, // listener
StatusKind.STATUS_MASK_NONE);
if (topic == null) {
System.err.println("Unable to create topic.");
return;
}

DataWriterQos writer_qos = new DataWriterQos();;
writer_qos.reliability.kind =ReliabilityQosPolicyKind.RELIABLE_RELIABILITY_QOS;
writer_qos.durability.kind = DurabilityQosPolicyKind.TRANSIENT_LOCAL_DURABILITY_QOS;
writer_qos.history.kind = HistoryQosPolicyKind.KEEP_LAST_HISTORY_QOS;
writer_qos.history.depth = 5;
// Create the data writer using the default publisher
StringDataWriter dataWriter =
(StringDataWriter) participant.create_datawriter(
topic,
writer_qos,
null, // listener
StatusKind.STATUS_MASK_NONE);
if (dataWriter == null) {
System.err.println("Unable to create data writer\n");
return;
}

System.out.println("Ready to write data.");
System.out.println("When the subscriber is ready, you can start writing.");
System.out.print("Press CTRL+C to terminate or enter an empty line to do a clean shutdown.\n\n");

BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
try {
while (true) {
System.out.print("Please type a message> ");
String toWrite = reader.readLine();
if (toWrite == null) break; // shouldn't happen
dataWriter.write(toWrite, InstanceHandle_t.HANDLE_NIL);
if (toWrite.equals("")) break;
}
} catch (IOException e) {
// This exception can be thrown from the BufferedReader class
e.printStackTrace();
} catch (RETCODE_ERROR e) {
// This exception can be thrown from DDS write operation
e.printStackTrace();
}

System.out.println("Exiting...");
participant.delete_contained_entities();
DomainParticipantFactory.get_instance().delete_participant(participant);
}
}

Offline
Last seen: 4 years 8 months ago
Joined: 05/21/2014
Posts: 46

also here was a read I tried a short bit ago same problem

// ****************************************************************************
// (c) Copyright, Real-Time Innovations, All rights reserved.
//
// Permission to modify and use for internal purposes granted.
// This software is provided "as is", without warranty, express or implied.
//
// ****************************************************************************

 

import com.rti.dds.domain.DomainParticipant;
import com.rti.dds.domain.DomainParticipantFactory;
import com.rti.dds.infrastructure.DurabilityQosPolicyKind;
import com.rti.dds.infrastructure.HistoryQosPolicyKind;
import com.rti.dds.infrastructure.RETCODE_ERROR;
import com.rti.dds.infrastructure.RETCODE_NO_DATA;
import com.rti.dds.infrastructure.ReliabilityQosPolicy;
import com.rti.dds.infrastructure.ReliabilityQosPolicyKind;
import com.rti.dds.infrastructure.ResourceLimitsQosPolicy;
import com.rti.dds.infrastructure.StatusKind;
import com.rti.dds.subscription.DataReader;
import com.rti.dds.subscription.DataReaderAdapter;
import com.rti.dds.subscription.DataReaderQos;
import com.rti.dds.subscription.SampleInfo;
import com.rti.dds.subscription.Subscriber;
import com.rti.dds.topic.Topic;
import com.rti.dds.type.builtin.StringDataReader;
import com.rti.dds.type.builtin.StringTypeSupport;

//****************************************************************************
public class HelloSubscriber extends DataReaderAdapter {

// For clean shutdown sequence
private static boolean shutdown_flag = false;

public static final void main(String[] args) {
// Create the DDS Domain participant on domain ID 0
DomainParticipant participant = DomainParticipantFactory.get_instance().create_participant(
0, // Domain ID = 0
DomainParticipantFactory.PARTICIPANT_QOS_DEFAULT,
null, // listener
StatusKind.STATUS_MASK_NONE);
if (participant == null) {
System.err.println("Unable to create domain participant");
return;
}

// Create the topic "Hello World" for the String type
Topic topic = participant.create_topic(
"Hello, World",
StringTypeSupport.get_type_name(),
DomainParticipant.TOPIC_QOS_DEFAULT,
null, // listener
StatusKind.STATUS_MASK_NONE);
if (topic == null) {
System.err.println("Unable to create topic.");
return;
}

// Create the data reader using the default publisher
DataReaderQos reader_qos = new DataReaderQos();
reader_qos.reliability.kind = ReliabilityQosPolicyKind.RELIABLE_RELIABILITY_QOS;
reader_qos.durability.kind = DurabilityQosPolicyKind.TRANSIENT_LOCAL_DURABILITY_QOS;
reader_qos.history.kind = HistoryQosPolicyKind.KEEP_LAST_HISTORY_QOS;
reader_qos.history.depth = 5;
StringDataReader dataReader =
(StringDataReader) participant.create_datareader(
topic,
reader_qos,
new HelloSubscriber(), // Listener
StatusKind.DATA_AVAILABLE_STATUS);
if (dataReader == null) {
System.err.println("Unable to create DDS Data Reader");
return;
}

System.out.println("Ready to read data.");
System.out.println("Press CTRL+C to terminate.");
for (;;) {
try {
Thread.sleep(2000);
if(shutdown_flag) break;
} catch (InterruptedException e) {
// Nothing to do...
}
}

System.out.println("Shutting down...");
participant.delete_contained_entities();
DomainParticipantFactory.get_instance().delete_participant(participant);
}

/*
* This method gets called back by DDS when one or more data samples have
* been received.
*/
public void on_data_available(DataReader reader) {
StringDataReader stringReader = (StringDataReader) reader;
SampleInfo info = new SampleInfo();
for (;;) {
try {
String sample = stringReader.take_next_sample(info);
if (info.valid_data) {
System.out.println(sample);
if (sample.equals("")) {
shutdown_flag = true;
}
}
} catch (RETCODE_NO_DATA noData) {
// No more data to read
break;
} catch (RETCODE_ERROR e) {
// An error occurred
e.printStackTrace();
}
}
}
}

rip
rip's picture
Offline
Last seen: 23 hours 36 min ago
Joined: 04/06/2012
Posts: 324

Don't try to set QoS in code.  Use USER_QOS_PROFILES.xml in the working directory.  You want to inherit from base_name="BuiltinQosLibExp::Generic.KeepLastReliable.TransientLocal", and add the history.depth of 5 and .kind of KEEP_LAST_HISTORY_QOS:

USER_QOS_PROFILES.xml

<?xml version="1.0"?>
<dds>
    <qos_library name="Test_Library">
        <qos_profile name="Test_Profile" base_name="BuiltinQosLibExp::Generic.KeepLastReliable.TransientLocal" is_default_qos="true">
            <datawriter_qos>
                <history><kind>KEEP_LAST_HISTORY_QOS</kind><depth>5</depth></history>
                <publication_name>
                    <name>TestDataWriter</name>
                </publication_name>
            </datawriter_qos>
            <datareader_qos>
                <history><kind>KEEP_LAST_HISTORY_QOS</kind><depth>5</depth></history>
                <subscription_name>
                    <name>TestDataReader</name>
                </subscription_name>
            </datareader_qos>
        </qos_profile>
    </qos_library>
</dds>

And this is the change to the code:

StringDataWriter dataWriter = (StringDataWriter) participant.create_datawriter(
  topic,
  Publisher.DATAWRITER_QOS_DEFAULT, // this is the change, you don't need the DataWriterQos
  null, // listener
  StatusKind.STATUS_MASK_NONE);
if (dataWriter == null) {
  System.err.println("Unable to create data writer\n");
  return;
}

And the result:

lakealpine:foo rip$ java -cp ".:${NDDSHOME}/class/nddsjava.jar" HelloPublisher
Ready to write data.
When the subscriber is ready, you can start writing.
Press CTRL+C to terminate or enter an empty line to do a clean shutdown.

Please type a message> ^Clakealpine:foo rip$

 

Offline
Last seen: 4 years 8 months ago
Joined: 05/21/2014
Posts: 46

Thanks that worked. Is there also a way in code to flush the queue at any point you would like to so old messages from days ago are not sitting there and get delivered to a new device that are no longer relivant?

rip
rip's picture
Offline
Last seen: 23 hours 36 min ago
Joined: 04/06/2012
Posts: 324

You can dispose the keyed samples ( <writer>.dispose(instance_data, instance_handle*) ).  This will tell the late joiner data reader that the samples are no longer valid.  There are no documented api calls to reset the writer's cache.

Alternately, you can use the LIFESPAN qos to a fixed time period, and then the middleware will remove the data at the end of the lifespan of each sample individually.