RTI Connext Traditional C++ API Version 7.1.0
HelloWorld_subscriber.cxx

RTI Connext Subscription Example

The unmodified subscription example generated by rtiddsgen (see the Code Generator User's Manual for more information).

HelloWorld_subscriber.cxx

/*
* (c) Copyright, Real-Time Innovations, 2020. All rights reserved.
* RTI grants Licensee a license to use, modify, compile, and create derivative
* works of the software solely for use with RTI Connext DDS. Licensee may
* redistribute copies of the software provided that all such copies are subject
* to this license. The software is provided "as is", with no warranty of any
* type, including any warranty for fitness for any purpose. RTI is under no
* obligation to maintain or support the software. RTI shall not be liable for
* any incidental or consequential damages arising out of the use or inability
* to use the software.
*/
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include "HelloWorld.h"
#include "HelloWorldSupport.h"
#include "ndds/ndds_cpp.h"
#include "application.h"
using namespace application;
static int shutdown_participant(
DDSDomainParticipant *participant,
const char *shutdown_message,
int status);
// Process data. Returns number of samples processed.
unsigned int process_data(HelloWorldDataReader *typed_reader)
{
HelloWorldSeq data_seq; // Sequence of received data
DDS_SampleInfoSeq info_seq; // Metadata associated with samples in data_seq
unsigned int samples_read = 0;
// Take available data from DataReader's queue
typed_reader->take(
data_seq,
info_seq,
// Iterate over all available data
for (int i = 0; i < data_seq.length(); ++i) {
// Check if a sample is an instance lifecycle event
if (info_seq[i].valid_data) {
// Print data
std::cout << "Received data" << std::endl;
HelloWorldTypeSupport::print_data(&data_seq[i]);
samples_read++;
} else { // This is an instance lifecycle event with no data payload.
std::cout << "Received instance state notification" << std::endl;
}
}
// Data loaned from Connext for performance. Return loan when done.
DDS_ReturnCode_t retcode = typed_reader->return_loan(data_seq, info_seq);
if (retcode != DDS_RETCODE_OK) {
std::cerr << "return loan error " << retcode << std::endl;
}
return samples_read;
}
int run_subscriber_application(unsigned int domain_id, unsigned int sample_count)
{
// Start communicating in a domain, usually one participant per application
DDSDomainParticipant *participant =
DDSTheParticipantFactory->create_participant(
domain_id,
NULL /* listener */,
if (participant == NULL) {
return shutdown_participant(participant, "create_participant error", EXIT_FAILURE);
}
// A Subscriber allows an application to create one or more DataReaders
DDSSubscriber *subscriber = participant->create_subscriber(
NULL /* listener */,
if (subscriber == NULL) {
return shutdown_participant(participant, "create_subscriber error", EXIT_FAILURE);
}
// Register the datatype to use when creating the Topic
const char *type_name = HelloWorldTypeSupport::get_type_name();
DDS_ReturnCode_t retcode =
HelloWorldTypeSupport::register_type(participant, type_name);
if (retcode != DDS_RETCODE_OK) {
return shutdown_participant(participant, "register_type error", EXIT_FAILURE);
}
// Create a Topic with a name and a datatype
DDSTopic *topic = participant->create_topic(
"Example HelloWorld",
type_name,
NULL /* listener */,
if (topic == NULL) {
return shutdown_participant(participant, "create_topic error", EXIT_FAILURE);
}
// This DataReader reads data on "Example HelloWorld" Topic
DDSDataReader *untyped_reader = subscriber->create_datareader(
topic,
NULL,
if (untyped_reader == NULL) {
return shutdown_participant(participant, "create_datareader error", EXIT_FAILURE);
}
// Narrow casts from a untyped DataReader to a reader of your type
HelloWorldDataReader *typed_reader =
HelloWorldDataReader::narrow(untyped_reader);
if (typed_reader == NULL) {
return shutdown_participant(participant, "DataReader narrow error", EXIT_FAILURE);
}
// Create ReadCondition that triggers when unread data in reader's queue
DDSReadCondition *read_condition = typed_reader->create_readcondition(
if (read_condition == NULL) {
return shutdown_participant(participant, "create_readcondition error", EXIT_FAILURE);
}
// WaitSet will be woken when the attached condition is triggered
DDSWaitSet waitset;
retcode = waitset.attach_condition(read_condition);
if (retcode != DDS_RETCODE_OK) {
return shutdown_participant(participant, "attach_condition error", EXIT_FAILURE);
}
// Main loop. Wait for data to arrive, and process when it arrives
unsigned int samples_read = 0;
while (!shutdown_requested && samples_read < sample_count) {
DDSConditionSeq active_conditions_seq;
// Wait for data and report if it does not arrive in 1 second
DDS_Duration_t wait_timeout = { 1, 0 };
retcode = waitset.wait(active_conditions_seq, wait_timeout);
if (retcode == DDS_RETCODE_OK) {
// If the read condition is triggered, process data
samples_read += process_data(typed_reader);
} else {
if (retcode == DDS_RETCODE_TIMEOUT) {
std::cout << "No data after 1 second" << std::endl;
}
}
}
// Cleanup
return shutdown_participant(participant, "Shutting down", 0);
}
// Delete all entities
static int shutdown_participant(
DDSDomainParticipant *participant,
const char *shutdown_message,
int status)
{
std::cout << shutdown_message << std::endl;
if (participant != NULL) {
// Cleanup everything created by this Participant
retcode = participant->delete_contained_entities();
if (retcode != DDS_RETCODE_OK) {
std::cerr << "delete_contained_entities error" << retcode
<< std::endl;
status = EXIT_FAILURE;
}
retcode = DDSTheParticipantFactory->delete_participant(participant);
if (retcode != DDS_RETCODE_OK) {
std::cerr << "delete_participant error" << retcode << std::endl;
status = EXIT_FAILURE;
}
}
return status;
}
int main(int argc, char *argv[])
{
// Parse arguments and handle control-C
ApplicationArguments arguments;
parse_arguments(arguments, argc, argv);
if (arguments.parse_result == PARSE_RETURN_EXIT) {
return EXIT_SUCCESS;
} else if (arguments.parse_result == PARSE_RETURN_FAILURE) {
return EXIT_FAILURE;
}
setup_signal_handlers();
// Sets Connext verbosity to help debugging
int status = run_subscriber_application(arguments.domain_id, arguments.sample_count);
// Releases the memory used by the participant factory. Optional at
// application exit
if (retcode != DDS_RETCODE_OK) {
std::cerr << "finalize_instance error" << retcode << std::endl;
status = EXIT_FAILURE;
}
return status;
}
<<interface>> Allows the application to: (1) declare the data it wishes to receive (i....
Definition: subscription.ifcxx:565
static DDS_ReturnCode_t finalize_instance()
<<extension>> Destroys the singleton instance of this class.
<<interface>> Container for all DDSDomainEntity objects.
Definition: domain.ifcxx:191
virtual DDS_ReturnCode_t delete_contained_entities()=0
Delete all the entities that were created by means of the "create" operations on the DDSDomainPartici...
virtual DDSTopic * create_topic(const char *topic_name, const char *type_name, const DDS_TopicQos &qos, DDSTopicListener *listener, DDS_StatusMask mask)=0
Creates a DDSTopic with the desired QoS policies and attaches to it the specified DDSTopicListener.
virtual DDSSubscriber * create_subscriber(const DDS_SubscriberQos &qos, DDSSubscriberListener *listener, DDS_StatusMask mask)=0
Creates a DDSSubscriber with the desired QoS policies and attaches to it the specified DDSSubscriberL...
<<interface>> Conditions specifically dedicated to read operations and attached to one DDSDataReader.
Definition: subscription.ifcxx:247
<<interface>> A subscriber is the object responsible for actually receiving data from a subscription.
Definition: subscription.ifcxx:366
virtual DDSDataReader * create_datareader(DDSTopicDescription *topic, const DDS_DataReaderQos &qos, DDSDataReaderListener *listener, DDS_StatusMask mask)=0
Creates a DDSDataReader that will be attached and belong to the DDSSubscriber.
<<interface>> The most basic description of the data to be published and subscribed.
Definition: topic.ifcxx:257
<<interface>> Allows an application to wait until one or more of the attached DDSCondition objects ha...
Definition: dds_cpp.1.0/interface/infrastructure.ifcxx:281
virtual DDS_ReturnCode_t attach_condition(DDSCondition *cond)
Attaches a DDSCondition to the DDSWaitSet.
virtual DDS_ReturnCode_t wait(DDSConditionSeq &active_conditions, const DDS_Duration_t &timeout)
Allows an application thread to wait for the occurrence of certain conditions.
static NDDSConfigLogger * get_instance()
Get the singleton instance of this type.
void set_verbosity(NDDS_Config_LogVerbosity verbosity)
Set the verbosity at which RTI Connext will log diagnostic information.
const struct DDS_DomainParticipantQos DDS_PARTICIPANT_QOS_DEFAULT
Special value for creating a DomainParticipant with default QoS.
#define DDSTheParticipantFactory
Can be used as an alias for the singleton factory returned by the operation DDSDomainParticipantFacto...
Definition: domain.ifcxx:1233
const struct DDS_SubscriberQos DDS_SUBSCRIBER_QOS_DEFAULT
Special value for creating a DDSSubscriber with default QoS.
const struct DDS_TopicQos DDS_TOPIC_QOS_DEFAULT
Special value for creating a DDSTopic with default QoS.
const DDS_InstanceStateMask DDS_ANY_INSTANCE_STATE
Any instance state ALIVE_INSTANCE_STATE | NOT_ALIVE_DISPOSED_INSTANCE_STATE | NOT_ALIVE_NO_WRITERS_IN...
const DDS_Long DDS_LENGTH_UNLIMITED
A special value indicating an unlimited quantity.
DDS_ReturnCode_t
Type for return codes.
Definition: infrastructure.ifc:1336
@ DDS_RETCODE_OK
Successful return.
Definition: infrastructure.ifc:1339
@ DDS_RETCODE_TIMEOUT
The operation timed out.
Definition: infrastructure.ifc:1379
const DDS_SampleStateMask DDS_ANY_SAMPLE_STATE
Any sample state DDS_READ_SAMPLE_STATE | DDS_NOT_READ_SAMPLE_STATE.
@ DDS_NOT_READ_SAMPLE_STATE
Sample has not been read.
Definition: subscription.ifc:1007
#define DDS_STATUS_MASK_NONE
No bits are set.
Definition: infrastructure.ifc:1410
const struct DDS_DataReaderQos DDS_DATAREADER_QOS_DEFAULT
Special value for creating data reader with default QoS.
const DDS_ViewStateMask DDS_ANY_VIEW_STATE
Any view state DDS_NEW_VIEW_STATE | DDS_NOT_NEW_VIEW_STATE.
Instantiates FooSeq < DDSCondition >
Definition: dds_cpp.1.0/interface/infrastructure.ifcxx:126
Type for duration representation.
Definition: infrastructure.ifc:642
Declares IDL sequence < DDS_SampleInfo > .
Definition: subscription.ifc:1305