RTI Connext Modern C++ API  Version 6.1.0
rti::sub::SampleProcessor Class Reference

<<extension>> <<reference-type>> Utility to read and process the data samples that one or more DataReaders receive using a sample handler. More...

#include <rti/sub/SampleProcessor.hpp>

Inheritance diagram for rti::sub::SampleProcessor:
dds::core::Reference< SampleProcessorImpl >

Public Member Functions

 SampleProcessor (const rti::core::cond::AsyncWaitSetProperty &aws_property)
 Single-argument constructor that allows creating a a rti::sub::SampleProcessor with the configuration of the underlying rti::core::cond::AsyncWaitSet. More...
 
 SampleProcessor ()
 Creates a rti::sub::SampleProcessor with the default rti::core::cond::AsyncWaitSetProperty. More...
 
 SampleProcessor (rti::core::cond::AsyncWaitSet aws)
 Constructor that allows specifying an externally created rti::core::cond::AsyncWaitSet. More...
 
template<typename T , typename FUNCTOR >
SampleProcessorattach_reader (dds::sub::DataReader< T > reader, const FUNCTOR &handler)
 Attaches the specified dds::sub::DataReader with an associated sample handler to this rti::sub::SampleProcessor. More...
 
template<typename T >
SampleProcessordetach_reader (dds::sub::DataReader< T > reader)
 Detaches the specified dds::sub::DataReader from this rti::sub::SampleProcessor. More...
 
std::vector< dds::sub::AnyDataReaderreaders () const
 Returns the list of attached dds::sub::DataReader (s). More...
 

Detailed Description

<<extension>> <<reference-type>> Utility to read and process the data samples that one or more DataReaders receive using a sample handler.

The following example shows how to create an full application that subscribes to a topic and prints each sample it receives:

int subscriber_main(int domain_id, int sample_count)
{
dds::domain::DomainParticipant participant(domain_id);
dds::topic::Topic<Foo> topic(participant, "Example Foo");
dds::sub::DataReader<Foo> reader(dds::sub::Subscriber(participant), topic);
int count = 0;
rti::sub::SampleProcessor sample_processor;
sample_processor.attach_reader(reader, [](const rti::sub::LoanedSample<Foo>& sample) {
if (sample.info().valid()) {
std::cout << "Received " << sample.data() << std::endl;
}
});
while(1) {
// wait forever
}
return 1;
}

Note that the expected sample handler is a functor with signature std::function<void(const rti::sub::LoanedSample<T>&)>, where T is the type associated with the dds::sub::DataReader.

A rti::sub::SampleProcessor relies on an underlying rti::core::cond::AsyncWaitSet to read and dispatch the data from the DataReaders. It internally creates a dds::sub::cond::ReadCondition for each attached DataReader, an associates a custom handler that contains state to read samples and notify the corresponding handler .

The SampleProcessor uses this ReadCondition to wait for data and then calls dds::sub::DataReader::select() to take all the available data.

Notifications to any handler may be concurrent if the thread pool size set in rti::core::cond::AsyncWaitSetProperty::thread_pool_size is greater than 1. The same or different handlers may be called in parallel. The SampleProcessor cycles through all attached DataReaders dispatching a sample at a time with the next available thread. This mechanism guarantees concurrent dispatching of the samples accross all DataReaders.

The rti::sub::SampleProcessor internally creates and uses a dds::sub::cond::ReadCondition for each attached dds::sub::DataReader to read the data. It's recommended to perform all the reading of the dds::sub::DataReader's samples through the rti::sub::SampleProcessor and avoid reading with through other mechanisms in different part of your applications. Similarly, your application must be careful to not modify the dds::sub::cond::ReadCondition associated with an attached dds::sub::DataReader.

In general, avoid or beware of using the following operations in combination with a rti::sub::SampleProcessor:

On the other hand, you can externally provide the underlying rti::sub::SampleProcessor's rti::core::cond::AsyncWaitSet if you want to access additional capabilities of the rti::core::cond::AsyncWaitSet and use it to attach other dds::core::cond::Condition to handle other aspects of your application.

SampleProcessor Thread Safety

Similar to the rti::core::cond::AsyncWaitSet, the rti::sub::SampleProcessor provides a thread-safe interface. All the operations of this class can be called concurrently from multiple threads.

Because the rti::sub::SampleProcessor relies on an rti::core::cond::AsyncWaitSet to dispatch the samples the same threading concepts apply. This means that operations on a rti::sub::SampleProcessor may require synchronizing with the thread pool for safety. rti::sub::SampleProcessor also relies on the asynchrnous completion pattern to effectively interact with the underlying thread pool.

For instance to detach a dds::sub::DataReader, the rti::sub::SampleProcessor generates an internal request to its thread pool to process it. As soon as the detachment completes, the thread pool provides the notification through an associated completion token on which the rti::sub::SampleProcessor waits and blocks until it completes.

Due to the concurrent processing nature of operations on a rti::sub::SampleProcessor as well as the sample handling, it's important to keep in mind the following aspects:

  • The handler operation can be called concurrently for each sample. Therefore, handler implementations may need to apply thread synchronization strategies to protect shared resources.

  • Calling a rti::sub::SampleProcessor operation that requires internal synchronization (e.g., rti::sub::SampleProcessor::attach_reader) from a handler notification will result in a error.

Note that the interface of the rti::sub::SampleProcessor is similar to that of the rti::core::cond::AsyncWaitSet but reduced and simplified. As noted above, if more advanced use and control of the thread pool is required, you can always create the rti::core::cond::AsyncWaitSet externally when calling dds::sub::SampleProcessor::SampleProcessor(rti::core::cond::AsyncWaitSet).

MT Safety:
Safe.
See also
rti::core::cond::AsyncWaitSet
dds::core::cond::Condition
rti::core::cond::AsyncWaitSetProperty
dds::sub::DataReader

Constructor & Destructor Documentation

◆ SampleProcessor() [1/3]

rti::sub::SampleProcessor::SampleProcessor ( const rti::core::cond::AsyncWaitSetProperty aws_property)
inline

Single-argument constructor that allows creating a a rti::sub::SampleProcessor with the configuration of the underlying rti::core::cond::AsyncWaitSet.

You can provide rti::core::cond::AsyncWaitSetProperty::AsyncWaitSetProperty() as property to create the underlying rti::core::cond::AsyncWaitSet with default behavior.

This constructor creates rti::core::cond::AsyncWaitSet with no listener installed and will call rti::core::cond::AsyncWaitSet::start rigt after its creation.

Parameters
aws_property<<in>> configuraiton of the underlying rti::core::cond::AsyncWaitSet.
See also
dds::sub::SampleProcessor::SampleProcessor(rti::core::cond::AsyncWaitSet)

◆ SampleProcessor() [2/3]

rti::sub::SampleProcessor::SampleProcessor ( )
inline

◆ SampleProcessor() [3/3]

rti::sub::SampleProcessor::SampleProcessor ( rti::core::cond::AsyncWaitSet  aws)
inline

Constructor that allows specifying an externally created rti::core::cond::AsyncWaitSet.

Creates a new rti::sub::SampleProcessor with the specified rti::core::cond::AsyncWaitSet.

Tnis constructor flavor decouples the lifecycle of the rti::sub::SampleProcessor from the rti::core::cond::AsyncWaitSet. It allows the application to have more control on the rti::core::cond::AsyncWaitSet, such as to install an rti::core::cond::AsyncWaitSetListener, a custom ThreadFactory, and controlling when it starts or stops.

Note that this constructor will not call rti::core::cond::AsyncWaitSet::start, so it's the caller responsiblity to start and stop it. You can provide the external rti::core::cond::AsyncWaitSet::start in either started or stopped state.

Parameters
aws<<in>> the externally created rti::core::cond::AsyncWaitSet.
See also
dds::sub::SampleProcessor::SampleProcessor(const rti::core::cond::AsyncWaitSetProperty&)

Member Function Documentation

◆ attach_reader()

template<typename T , typename FUNCTOR >
SampleProcessor& rti::sub::SampleProcessor::attach_reader ( dds::sub::DataReader< T >  reader,
const FUNCTOR &  handler 
)
inline

Attaches the specified dds::sub::DataReader with an associated sample handler to this rti::sub::SampleProcessor.

Note that the expected sample handler is a functor with signature std::function<void(const rti::sub::LoanedSample<T>&)>, where T is the type associated with the dds::sub::DataReader.

This operation will block until the attach request completes. Upon successful return, it is guaranteed that the specified dds::sub::DataReader is attached and notifications to the handler may occur.

If this operation is called multiple times for an already attached dds::sub::DataReader, it will result in no-op and return sucessfully, ignoring the specified handler. So if the handler is different, it will not be updated.

Parameters
reader<<in>> dds::sub::DataReader to be attached.
handler<<in>> handler to be notified on dispatching of the reader samples.
Exceptions
Oneof the Standard Exceptions
See also
rti::sub::SampleProcessor::detach_reader

References attach_reader().

Referenced by attach_reader().

◆ detach_reader()

template<typename T >
SampleProcessor& rti::sub::SampleProcessor::detach_reader ( dds::sub::DataReader< T >  reader)
inline

Detaches the specified dds::sub::DataReader from this rti::sub::SampleProcessor.

Once the dds::sub::DataReader is detached, it is guaranteed that the rti::sub::SampleProcessor will no longer process it so it is safe for your application to release any resources associated with the detached dds::sub::DataReader.

This operation blocks until the detach request completes. Upon successful return, it is guaranteed that the specified dds::sub::DataReader is detached.

dds::sub::DataReader may be detached at any time independently of the state of the rti::sub::SampleProcessor.

Parameters
reader<<in>> dds::sub::DataReader to be detached.
Exceptions
Oneof the Standard Exceptions
See also
rti::sub::SampleProcessor::attach_reader

References detach_reader().

Referenced by detach_reader().

◆ readers()

std::vector<dds::sub::AnyDataReader> rti::sub::SampleProcessor::readers ( ) const
inline

Returns the list of attached dds::sub::DataReader (s).