.. include:: vars.rst
.. _section-controllingData:
*****************************************
Controlling Data: Processing Data Streams
*****************************************
In chapter :ref:`section-routingData` we presented how |RS| can easily connect and
scale systems. In order to do so, data is *forwarded* among systems, thus generating
data streams flowing from one system to another. The forwarding process is a basic
operation that consists of propagating data streams from the input to the output.
.. figure:: static/RouterControllingDataStreamForwarding.svg
:figwidth: 100 %
:name: FigureRouterControllingDataStreamForwarding
:align: center
Basic forwarding of an input data stream
:numref:`FigureRouterControllingDataStreamForwarding` illustrates the forwarding
process of |TOPIC| data. At the publication side, there are ``N`` |DWs| each
producing samples for ``Topic A``. The |RS| has a |TR| with a single input |DR|
and a single output |DW|. At the subscription side there are ``M`` |DRs| all
receiving samples from topic ``Topic A'``. All the samples the user |DWs|
produce in the publication side are received by the input |DR|, which are then
forwarded through the output |DW| **to all the user** |DRs| **in the
subscription side**. You can observe that the |TR| has a component the performs
the forwarding logic that involves reading from the input |DR| and writing to
the output |DR|.
The forwarding logic in the |TR| may be limiting when system connectivity
demands other requirements beyond basic data forwarding. You can anticipate the
simple read-and-write logic may be inadequate in |TRs| that define multiple
INPUTs| and |OUTPUTs| and the types of the associated |TOPICs| are different.
These cases require the use of a custom logic to process the data streams, and
this is the task of the *Processor*.
:numref:`FigureRouterControllingDataProcessor` shows the concept.
.. figure:: static/RouterControllingDataProcessor.svg
:figwidth: 100 %
:name: FigureRouterControllingDataProcessor
:align: center
|PROCESSOR| concept
A |PROCESSOR| is a *pluggable* component that allows you **control the forwarding
process of a** |TR|. You can create your own |PROCESSOR| implementations
based on the needs of your system integration, defining your own data flows,
and processing the data streams at your convenience.
A |PROCESSOR| receives notifications from the |TR| about relevant events such as
the availability of |INPUTs| and |OUTPUTs|, state of the |TR|, or arrival of
data. Then a |PROCESSOR| can react to any of these events and perform whichever
necessary actions. The basic forward logic presented above is actually a
builtin |PROCESSOR| implementation and that is set as the default in all the
|TRs|.
The following sections will guide you through the process of creating your own
|PROCESSOR|, how to configure it and install it in |RS|. We will show you
this functionality with examples of *Aggregation* and *Splitting* patterns.
.. note::
All the following sections require you to be familiar with the routing concepts
explained in section :ref:`section-routingData`. Also this section requires
software programming knowledge in C/C++, understanding of CMake, and shared
library management.
.. seealso::
:ref:`section-CoreConcepts-BuiltinPlugins-ForwardingProcessor`
Details on the default forwarding |PROCESSOR| of the |TRs|.
.. _section-controllingData-DynData:
|DD_HEADING| as a Data Representation Model
===========================================
The nature of the architecture of |RS| makes it possible to work with data
streams of different *types*. This demands a strategy for dealing with all
the possible times both a compilation and run time. This is provided through
|DD|.
|DD| a generic container that holds data for any type by keeping a custom and
flexible internal representation of the data. |DD| is a feature specific from
|CONNEXT| and is part of the core libraries.
:numref:`FigureRouterControllingDataDynData` shows the concept of |DD|.
.. figure:: static/RouterDataIntegrationDynData.svg
:figwidth: 100 %
:name: FigureRouterControllingDataDynData
:align: center
|DD| concept
|DD| is a container-like structure that holds data of any type. The description
of how that type looks like is given by the *TypeCode*, a structure that allows
representing any type. Using the *TypeCode* information, a |DD| object can
then contain data for the associated type and behave as if it was an actual
structure of such type. The |DD| class has a rich interface to access the data
members and manipulate its content.
The *Processor* API makes the inputs and outputs to interface with |DD|. Hence
the inputs will return a list of |DD| samples when reading, while the outputs
expect a |DD| object on the write operation. This common representation model
has two benefits:
- It allows implementations to work without knowing before hand the types. This
is very convenient for general purpose processors, such as data member mappers.
- It allows implementations to work independently from the the data domain where
the data streams flow. This is particularly important when a different data
other than DDS is used through a custom |ADAPTER| (:ref:`section-DataIntegration`).
.. seealso::
:link_connext_users_man_s:`Objects of Dynamically Defined Types `.
Section in |RTI_CONNEXT| User's manual about |DD| and *TypeCode*.
:link_connext_api_s:`DynamicData C++ API reference `
Online API documentation for the DynamicData class.
.. _section-ControllingData-Aggregation:
Aggregating Data From Different Topics
======================================
A very common scenario involves defining routing paths to combine data from two
or more input |TOPICs| into a single output |TOPIC|. This pattern is known as |TOPIC|
*aggregation*. You can leverage the |PROCESSOR| component to perform the custom
|TOPIC| aggregation that best suits the needs of your system.
An example of |TOPIC| aggregation is shown in
:numref:`FigureRouterControllingShapesAggregator`. There are two input |TOPICs|,
``Square`` and ``Circle``, and a single output |TOPIC|, ``Triangle``. All |TOPICs|
have the same type ``ShapeType``. The goal is to produce output samples by combining
data samples from the two inputs.
.. figure:: static/RouterControllingShapesAggregator.svg
:figwidth: 100 %
:name: FigureRouterControllingShapesAggregator
:align: center
Aggregation example of two |TOPICs|
Let's review all the tasks you need to do to create a custom |PROCESSOR| using the
:ref:`section-Tutorials-ExampleShapesProcessor` . You can run it first to see it
in action but you can also run one step at a time as we explain each.
Develop a Custom |PROCESSOR|
----------------------------
Once you know the stream processing pattern you want perform, including what
your data inputs and outputs are, you can then write the custom code of the
|PROCESSOR|. A custom processor must implement the interface
``rti::routing::Processor``, which defines the abstract operations that the
|TR| calls upon occurrence of certain events.
In our example, we create a ``ShapesAggregator`` class to be our |PROCESSOR|
implementation:
.. code-block:: c++
class ShapesAggregator : public rti::routing::processor::NoOpProcessor {
void on_data_available(rti::routing::processor::Route &);
void on_output_enabled(
rti::routing::processor::Route &route,
rti::routing::processor::Output &output);
...
}
Note how the processor class inherits from ``NoOpProcessor``. This class inherits
from ``rti::routing::processor::Processor`` and implements all its virtual methods
as no-op. This is a convenience that allows us to implement only the
methods for the notification of interest. In this example:
- ``on_output_enabled``: Notifies that an output has been enabled and it is
available to write. In our example, we create a buffer of the
output type (``ShapeType``) that will hold the aggregated content of the
input samples.
- ``on_data_available``: Indicates that at least one input has data available to
read. In our example, this is where the aggregation logic takes place and
it will simply generate aggregated output samples that contain the same
values as the ``Square`` samples, except for the field ``y``, which is
obtained from the ``Circle``.
.. seealso::
:link_router_sdk_api_cpp_s:`Processor C++ API reference `
:ref:`section-CoreConcepts-Rm-Route-States`
Different states of a |TR| and which |PROCESSOR| notifications are triggered
under each of them.
Create a Shared Library
-----------------------
Once the |PROCESSOR| implementation is finished we need to compile it and
generate a *shared library* that |RS| can load. In this example we use ``CMake``
as the build system to create the shared library. We specify the generation of
a library with name ``shapesaggregator``:
.. code-block::
...
add_library(shapesprocessor
"${CMAKE_CURRENT_SOURCE_DIR}/ShapesProcessor.cxx")
...
The generated library contains the compiled code of our implementation, contained
in a single file ``ShapesAggregator.cxx``. A key aspect of the generated library
is that it must export an external function that instantiates the
``ShapesAggregator``, and it's the function that |RS| will call to
instantiate the |PROCESSOR|. This external symbol is denoted *entry point* and
you can declare it as follows:
.. code-block::
RTI_PROCESSOR_PLUGIN_CREATE_FUNCTION_DECL(ShapesAggregatorPlugin);
The macro declares an external exported function with the following signature:
.. code-block::
struct RTI_RoutingServiceProcessorPlugin*
ShapesAggregatorPlugin_create_processor_plugin(
const struct RTI_RoutingServiceProperties *,
RTI_RoutingServiceEnvironment *);
which is the signature |RS| requires and will assume for the entry point to create
a custom |PROCESSOR|. Note that the implementation of this function requires
using the macro ``RTI_PROCESSOR_PLUGIN_CREATE_FUNCTION_DEF`` in the source file.
Define a Configuration with the Aggregating |TR|
------------------------------------------------
This is a similar process than the one we explained in section
:ref:`section-routingData-SingleTopic`. There are two main differences that are
particular to the use with a processor.
Configure a plugin library
^^^^^^^^^^^^^^^^^^^^^^^^^^
Within the root element of the XML configuration, you can define a *plugin library*
element that contains the description of all the plugins that can be loaded
by |RS|. In our case, we define a plugin library with a single entry for
our aggregation processor plugin:
.. code-block::
shapesaggregator
ShapesAggregatorPlugin_create_processor_plugin
The values specified for the ``name`` attributes can be set at your convenience
and they shall uniquely identify a plugin instance. We will use these names
later within the |TR| to refer to this plugin. For the definition of our
processor plugin we have specified two elements:
- ``dll``: The name of the shared library as we specified in the build step. We
just provide the library name so |RS| will try to load it from the working
directory, or assume that the library path is set accordingly.
- ````: Name of the entry point (external function) that
creates the plugin object, exactly as we defined in code with the
``RTI_PROCESSOR_PLUGIN_CREATE_FUNCTION_DECL`` macro.
Once we have the plugin defined in the library, we can move to the next step
and define the |TR| with the desired routing paths and our |PROCESSOR| in it.
.. warning::
When a name is specified in the element, |RS| will automatically append
a **d** suffix when running the debug version of |RS|.
.. seealso::
:ref:`section-Config-Plugins`
Documentation about the ```` element.
:ref:`section-Common-PluginManagement`
For in-depth understanding of plugins.
Configure a |RS| with the custom routing paths
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
In this example we need to define a |TR| that contains the two |INPUTs| to
receive the data streams from the ``Square`` and ``Circle`` |TOPICs|, and the
single output to write the single data stream to the ``Triangle`` |TOPIC|. The
key element in our |TR| is the specification of a custom |PROCESSOR|, to
indicate that the |TR| should **use an instance of our plugin to process the
route's events and data**:
.. code-block::
...
SquareShapeTypeCircleShapeType
There are three important aspects in this |TR| configuration:
- The custom |PROCESSOR| is specified with the ```` tag. The
``plugin_name`` attribute must contain the qualified name of an existing
processor plugin within a plugin library. The qualified name is built using
the values from the ``name`` attributes of the plugin library and plugin
element. Although our example does not make use of it, you could provide
run-time configuration properties to our plugin through an optional
```` tag. This element represents a set of name-value string pairs
that are passed to the ``create_processor`` call of the plugin.
- |INPUT| and |OUTPUT| elements have all a ``name`` attribute. This is the
configuration name for these elements can be used within the Processor to
look up and individual |INPUT| or |OUTPUT| by its name, such as we do in our
example. Also notice how the names match the |TOPIC| names for which they
are associated. Because we are not specifying ```` element, |RS|
uses the |INPUT| and |OUTPUT| names as |TOPIC| names. In our example this
makes it convenient to identify 1:1 inputs and outputs with their topics.
- The input |DRs| are configured with a QoS that sets a ``KEEP_LAST`` history of
just one sample. This allows our processor to just read and aggregate the
latest available sample from each input.
.. _section-ControllingData-Splitting:
Splitting Data From a single Topic
==================================
Another common pattern consists of defining routing paths to divide or *split* data
from a input |TOPIC| into several output |TOPICs|. This mechanism represents the
reverse equivalent to aggregation and is known as |TOPIC| splitting. You can leverage
the |PROCESSOR| component to perform the |TOPIC| splitting that best suits the
needs of your system.
An example of |TOPIC| splitting is shown in
:numref:`FigureRouterControllingDataShapesSplitter`. There is a single input |TOPIC|,
``Squares``, and two output |TOPICs|, ``Circles`` and ``Triangles``. All |TOPICs|
have the same type ``ShapeType``. The goal is to produce output samples by splitting
the content of data samples from the input.
.. figure:: static/RouterControllingDataShapesSplitter.svg
:figwidth: 90 %
:name: FigureRouterControllingDataShapesSplitter
:align: center
Splitting example of a |TOPIC|
The steps required to create a custom splitting |PROCESSOR| are the same as described
in the previous section :ref:`section-ControllingData-Aggregation`. For this example
we focus only in the aspects that are different.
Custom |PROCESSOR| implementation
----------------------------------
In this example, we create a ``ShapesSplitter`` class to be our |PROCESSOR|
implementation. Similar to ``ShapesAggregator``, this class reacts only to two
event notifications:
- ``on_input_enabled``: Creates a sample buffer that will be used to contain
the split content from the inputs. Because all the inputs and outputs have
the same type (``ShapeType``), we can obtain use the input type to create
the output sample.
- ``on_data_available``: This is where the splitting logic takes place and it
will simply generate split output samples that contain the same values as
the ``Square`` samples for all fields except ``x`` and ``y``, which are set
as follows:
- ``Circle`` output: the ``x`` field has the same value than the input ``Square``
and sets ``y`` to zero.
- ``Triangle`` output: the ``y`` field has the same value than the input ``Square``
and sets ``x`` to zero.
Define a Configuration with the Splitting |TR|
----------------------------------------------
In this example we need to define a |TR| that contains the single |INPUT| to receive
the data streams from the ``Square`` |TOPIC|, and the two |OUTPUTs| to write
the data streams fro the ``Circle`` and ``Triangle`` |TOPICs|. The |TR| specifies
a custom processor to be created from our plugin library, and it's configured
to create the ``SplitterProcessor``
.. code-block::
ShapeType
In this |TR| configuration, the input |DR| and output |DWs| are create with
default QoS. This is an important difference with regards to the configuration
of aggregation example. The splitting pattern in this case is simpler since
there's a single input and each received sample can hence be split individually.
Note that the splitting pattern can include multiple inputs if needed, and
generate output samples based on more complex algorithms in which different
content from different inputs is spread across the outputs.
.. _section-ControllingData-Periodic:
Periodic and Delayed Action
===========================
Processors can react to certain events affecting |TRs|. One special event that
requires attention is the *periodic* event. In addition to events of asynchronous
nature such as data available or route running, a |TR| can be configured to also
provide notifications occurring at a specified periodic rate.
Example below shows the XML that enables the periodic event notification at a rate
of one second:
.. code-block::
10
...
If a |TR| enables the periodic event, then your |PROCESSOR| can implement
the ``on_periodic_action`` notification and perform any operation of interest,
including reading and writing data. For details on the XML configuration for
periodic action and |TRs| in general, see :numref: `section-Config-Route`.
Note that each |TR| can specify a different period, allowing you to have different
event timing for different routing paths. Similarly, the event period that can
be modified at *runtime* through the ``Route::set_period`` operation that is available
as part of the |PROCESSOR| API.
The configuration above will generate periodic action events **in addition to
data available events coming from the inputs**. You could disable the
notification of data available using the tag ````,
causing the |TR| to be periodic-based only.
.. _section-ControllingData-Transformation:
Simple data transformation: introduction to Transformation
==========================================================
There are cases involving basic manipulation of data streams that can be performed
independently in a per-sample basis. That is, for a given input sample (or set of
them) there's a *transformed* output sample (or set of them). For this particular
use case, |RS| defines the concept of |TRANSF|, shown in
:numref:`FigureRouterControllingDataTransformationConcept`.
.. figure:: static/RouterControllingDataTransformationConcept.svg
:figwidth: 80 %
:name: FigureRouterControllingDataTransformationConcept
:align: center
Transformation concept
A |TRANSF| is a pluggable component that receives an input data stream of
type T\ :sub:`in` and produces an output data stream of type T\ :sub:`out`\.
The relation between the number of input samples and output samples can also be
different.
This component can be installed in two different entities in |RS|. A |TRANSF| can
appear to process the data stream after is produced by an |INPUT| |DR| and/or
to process a data stream before is passed to an |OUTPUT| |DW|.
:numref:`FigureRouterControllingDataTransformationModel` shows the complete model and
context of this component.
.. figure:: static/RouterControllingDataTransformationModel.svg
:figwidth: 100 %
:name: FigureRouterControllingDataTransformationModel
:align: center
Transformation model and context
You can observe that a each |INPUT| and |OUTPUT| can contain a transformation. On
the input side, the |TRANSF| is applied to the data stream generated by the input
|DR| and the result is fed to the |PROCESSOR|. Alternatively, on the output
side the |TRANSF| is applied to the data stream produced by the |PROCESSOR| and the
result is passed to the output |DW|.
When transformations are used it's a requirement that the type of the samples
provided by the input |DR| is the same type T\ :sub:`in` expected by the input
|TRANSF|. Similarly, the type T\ :sub:`out`\ of the samples produced by the output
|TRANSF| must be the same than the type of the samples expected by the output |DW|.
As in with a |PROCESSOR|, a |TRANSF| is expected to work with |DD|
(:ref:`section-controllingData-DynData`).
You can run :ref:`section-Tutorials-CustomTransf` to see how a transformation can
be used. In this example, the transformation implementation receives and
generates samples of type ``ShapeType``. The output samples are equal to the
input samples except for the field ``x``, which is adjusted to produce only
two possible constant values.
Transformations vs Processors
-----------------------------
A |TRANSF| is fundamentally different than a |PROCESSOR|. Moreover, they
complement each other. A |TRANSF| can be seen a very simplified version of a
|PROCESSOR| that has a single input and a single output and in which the
input data stream is processed as it is read.
In general you will find yourself implementing a |PROCESSOR| to perform the data
stream processing required by your system. Nevertheless there are cases where a
|TRANSF| is more suitable for certain patterns such as format conversion, content
reduction, or normalization (see :ref:`section-ControllingData-Patterns`).
.. _section-ControllingData-Patterns:
What stream processing patterns can I perform?
==============================================
With |RS| you have the ability to define routing paths with multiple inputs and
outputs, and provide custom processing logic for the data streams of those
paths. This provides a great degree of flexibility that allows to perform
pretty much any processing pattern.
In addition to the presented patterns of aggregation, splitting, and periodic
action, there are other well-known patterns:
- **Data format conversion**: this is the case where the input samples are
represented in one format and are converted to produce output samples of a
different format. For example, input samples could be represented as a byte
array and converted to a JSON string.
- **Content Enrichment**: an output sample is the result of amplifying or adding
content to an input sample. For example, output samples can be enhanced to
contain additional fields that represents result of computations performed on
input sample fields (e.g., statistic metrics).
- **Content Reduction**: an output sample is the result of attenuating or
removing content from an input sample. For example, output samples can have
some fields removed that are not relevant to the final destination, to
improve bandwidth usage.
- **Normalizer**: output samples are semantically equivalent to the input
samples except the values are adjusted or normalized according to a specific
convention. For example, input samples may contain a field indicating the
temperature in Fahrenheit and the output samples provide the same temperature
in Celsius.
- **Event-based or Triggered Forwarding**: output samples are generated based
on the reception and content of concrete input samples in which some of them
act as events indicating which, how, and when data is forwarded.
Key Terms
=========
.. glossary::
Data Stream
The collection of samples received by a |TR|'s |INPUT| or written to a
|TR|'s |OUTPUT|.
DynamicData
A general purpose structure that contains data of any type.
Processor
Pluggable component to control the forwarding process of a |TR|
Shared Library or Module
An output artifact that contains the implementation of pluggable components
that |RS| can load at run-time.
Entry Point
External symbol in a shared library that |RS| calls to instantiate a custom
plugin instance.
Stream Processing Patterns
Processing algorithms applied to the data streams of a |TR|.
Periodic action
|TR| event notification occurring at a configurable period.
Transformation
Pluggable component perform modifications of a forwarded data stream.