RTI Routing Service Version 7.1.0
RTI Routing Service Processor API

This module describes the Processor API. More...

Data Structures

class  rti::routing::processor::Input
 Generic Representation of a Route's input. More...
 
class  rti::routing::processor::TypedInput< Data, Info >
 Representation of an Input whose data representation is DataRep, whose info representation is InfoRep. More...
 
class  rti::routing::processor::Selector< Data, Info >
 An element that allows reading data that meet a set of specified attributes. More...
 
class  rti::routing::processor::LoanedSamples< T, U >
 Provides temporary access to a collection of samples (data and info) from a TypedInput. More...
 
class  rti::routing::processor::Output
 Generic Representation of a Route's output. More...
 
class  rti::routing::processor::TypedOutput< Data, Info >
 Representation of an Output whose data representation is DataRep, whose info representation is InfoRep. More...
 
class  rti::routing::processor::Processor
 Processor interface definition. Provides a way to process Route events and control the data forwarding process. More...
 
class  rti::routing::processor::NoOpProcessor
 An empty implementation of the pure virtual interface rti::routing::processor::Processor. More...
 
class  rti::routing::processor::ProcessorPlugin
 The top-level plug-in class. More...
 
class  rti::routing::processor::Query
 Encapsulates a content query to select data from a rti::routing::adapter::StreamReader. More...
 
class  rti::routing::processor::Route
 Representation of the Route object that owns a Processor. More...
 
class  rti::routing::processor::SampleIterator< T, U >
 A random-access iterator of LoanedSample. More...
 

Macros

#define RTI_PROCESSOR_PLUGIN_CREATE_FUNCTION_DECL(PROCESSOR_CLASS)
 Utility macro that declares a native extern function that can be used to load a ProcessorPlugin through a shared library. More...
 
#define RTI_PROCESSOR_PLUGIN_CREATE_FUNCTION_DEF(PROCESSOR_CLASS)
 Utility macro that implements the ProcessorPlugin entry point declared with RTI_PROCESSOR_PLUGIN_CREATE_FUNCTION_DECL. More...
 

Detailed Description

This module describes the Processor API.

RTI Routing Service rti::routing::processor::Processor are event-oriented pluggable components that allow you to control the forwarding process that occur within a Route.

Development Requirements

Linux/macOS Systems Windows Systems
Shared Libraries librtiroutingservice.so librtiroutingservice.lib
librtirsinfrastructure.so rtirsinfrastructure.lib
libnddscpp2.so nddscpp2.lib
libnddsc.so nddsc.lib
libnddscore.so nddscore.lib
Headers rti/routing/processor/ProcessorPlugin.hpp

Architecture

A rti::routing::processor::Processor represents a multiple-input, multiple-output component attached to a rti::routing::processor::Route, which dispatches events to the rti::routing::processor::Processor. The figure below shows the role of a rti::routing::processor::Processor within RTI Routing Service.

Overview of a Processor component in RTI Routing Service

A Processor can access the N inputs and M outputs of the owner Route. Upon event notification, the Processor can read data from any input, perform any manipulation on the data, and write it on any of the outputs.

Note in the figure above that any data a Processor writes on an output is first transformed with an rti::routing::transf::Transformation before passing it to the underlying rti::routing::adapter::StreamWriter.

An example snippet is shown below. The example code is reading data from two inputs, merging it together in an intermediate data buffer and writing into a single output.

on_data_available(Route& route)
{
auto output = route.output<DynamicData>(0);
auto output_sample = output.create_data();
for(const auto& status : route.input<DynamicData>(0).take()) {
auto periodic = route.input<DynamicData>(1)
.select().instance(status.info().instance_handle()).read();
output_sample.value<int>(
"id",
status.data().get<int>("id"));
output_sample.value<int>(
"config",
status.data().get<string>("config"));
output_sample.value<int>(
"latency",
periodic[0].data().get<int>("latency"));
output.write(output_sample);
}
}

The Processor API component model is shown in figure below.

Processor API component model

Event dispatching

A rti::routing::processor::Route will notify its contained rti::routing::processor::Processor about the events that affect the Route object. Each event kind maps to a virtual operation in the rti::routing::processor::Processor interface:

Threading

A rti::routing::processor::Route dispatches all the events to its contained processors sequentially, from within one of the parent Session's threads. For any given Route, and hence a Processor, it is guaranteed that only one event is dispatched at at time. That is, the operations on the Processor are never called concurrently for a given Route.

A parent Session with L threads can only process one Route at a time but it can dispatch different Routes concurrently. Namely, it can dispatch up to L Routes concurrently. You should keep this behavior in mind for implementations that share state between Processor objects.

Constraints

There are several constraints that affect the behavior and relationship of the Processor API components:

Macro Definition Documentation

◆ RTI_PROCESSOR_PLUGIN_CREATE_FUNCTION_DECL

#define RTI_PROCESSOR_PLUGIN_CREATE_FUNCTION_DECL (   PROCESSOR_CLASS)
Value:
extern "C" RTI_USER_DLL_EXPORT struct RTI_RoutingServiceProcessorPlugin* \
PROCESSOR_CLASS ## _create_processor_plugin(\
const struct RTI_RoutingServiceProperties *, \
RTI_RoutingServiceEnvironment *); \

Utility macro that declares a native extern function that can be used to load a ProcessorPlugin through a shared library.

The prototype of the function is given by a native definition of the create operation.

To register a ProcessorPlugin with RTI Routing Service, you must use the tag <processor_plugin> within <plugin_library>. For example:

<dds>
...
<plugin_library name="MyPluginLib">
<processor_plugin name="MyProcessorPlugin">
<dll>myprocessor</dll>
<create_function>
MyProcessorPlugin_create
</create_function>
</processor_plugin>
...
</plugin_library>
...
<routing_service>
...
</routing_service>
...
</dds>

Once a ProcessorPlugin is registered, a Route can use it to create a Processor.

For example:

<topic_route name="SquareAndCircleToTriangle">
...
<processor plugin_name="MyPluginLib::MyProcessorPlugin">
<property>
<value>
<element>
<name>my_property_name</name>
<value>my_property_value</value>
</element>
</value>
</property>
</processor>
<input participant="DomainIn">
<topic_name>Square</topic_name>
<registered_type_name>ShapeType</registered_type_name>
</input>
<input participant="DomainIn">
<topic_name>Circle</topic_name>
<registered_type_name>ShapeType</registered_type_name>
</input>
<output participant="DomainOut">
<topic_name>Triangle</topic_name>
<registered_type_name>ShapeType</registered_type_name>
</output>
</topic_route>

For additional information on configuring Processors, see the RTI Routing Service User's Manual.

Parameters
PROCESSOR_CLASSClass name of a ProcessorPlugin implementation

◆ RTI_PROCESSOR_PLUGIN_CREATE_FUNCTION_DEF

#define RTI_PROCESSOR_PLUGIN_CREATE_FUNCTION_DEF (   PROCESSOR_CLASS)
Value:
struct RTI_RoutingServiceProcessorPlugin * PROCESSOR_CLASS ## _create_processor_plugin( \
const struct RTI_RoutingServiceProperties * native_properties, \
RTI_RoutingServiceEnvironment *environment) \
{ \
PropertySet properties; \
rti::routing::PropertyAdapter::add_properties_from_native(\
properties,\
native_properties); \
try { \
return rti::routing::processor::detail::ProcessorPluginForwarder::create_plugin(\
new PROCESSOR_CLASS(properties)); \
} catch (const std::exception& ex) {\
RTI_RoutingServiceEnvironment_set_error(\
environment,\
"%s",\
ex.what());\
} catch (...) {}\
\
return NULL; \
}

Utility macro that implements the ProcessorPlugin entry point declared with RTI_PROCESSOR_PLUGIN_CREATE_FUNCTION_DECL.