.. include:: vars.rst .. _section-controllingData: ***************************************** Controlling Data: Processing Data Streams ***************************************** In chapter :ref:`section-routingData` we presented how |RS| can easily connect and scale systems. In order to do so, data is *forwarded* among systems, thus generating data streams flowing from one system to another. The forwarding process is a basic operation that consists of propagating data streams from the input to the output. .. figure:: static/RouterControllingDataStreamForwarding.svg :figwidth: 100 % :name: FigureRouterControllingDataStreamForwarding :align: center Basic forwarding of an input data stream :numref:`FigureRouterControllingDataStreamForwarding` illustrates the forwarding process of |TOPIC| data. At the publication side, there are ``N`` |DWs| each producing samples for ``Topic A``. The |RS| has a |TR| with a single input |DR| and a single output |DW|. At the subscription side there are ``M`` |DRs| all receiving samples from topic ``Topic A'``. All the samples the user |DWs| produce in the publication side are received by the input |DR|, which are then forwarded through the output |DW| **to all the user** |DRs| **in the subscription side**. You can observe that the |TR| has a component the performs the forwarding logic that involves reading from the input |DR| and writing to the output |DR|. The forwarding logic in the |TR| may be limiting when system connectivity demands other requirements beyond basic data forwarding. You can anticipate the simple read-and-write logic may be inadequate in |TRs| that define multiple INPUTs| and |OUTPUTs| and the types of the associated |TOPICs| are different. These cases require the use of a custom logic to process the data streams, and this is the task of the *Processor*. :numref:`FigureRouterControllingDataProcessor` shows the concept. .. figure:: static/RouterControllingDataProcessor.svg :figwidth: 100 % :name: FigureRouterControllingDataProcessor :align: center |PROCESSOR| concept A |PROCESSOR| is a *pluggable* component that allows you **control the forwarding process of a** |TR|. You can create your own |PROCESSOR| implementations based on the needs of your system integration, defining your own data flows, and processing the data streams at your convenience. A |PROCESSOR| receives notifications from the |TR| about relevant events such as the availability of |INPUTs| and |OUTPUTs|, state of the |TR|, or arrival of data. Then a |PROCESSOR| can react to any of these events and perform whichever necessary actions. The basic forward logic presented above is actually a builtin |PROCESSOR| implementation and that is set as the default in all the |TRs|. The following sections will guide you through the process of creating your own |PROCESSOR|, how to configure it and install it in |RS|. We will show you this functionality with examples of *Aggregation* and *Splitting* patterns. .. note:: All the following sections require you to be familiar with the routing concepts explained in section :ref:`section-routingData`. Also this section requires software programming knowledge in C/C++, understanding of CMake, and shared library management. .. seealso:: :ref:`section-CoreConcepts-BuiltinPlugins-ForwardingProcessor` Details on the default forwarding |PROCESSOR| of the |TRs|. .. _section-controllingData-DynData: |DD_HEADING| as a Data Representation Model =========================================== The nature of the architecture of |RS| makes it possible to work with data streams of different *types*. This demands a strategy for dealing with all the possible times both a compilation and run time. This is provided through |DD|. |DD| a generic container that holds data for any type by keeping a custom and flexible internal representation of the data. |DD| is a feature specific from |CONNEXT| and is part of the core libraries. :numref:`FigureRouterControllingDataDynData` shows the concept of |DD|. .. figure:: static/RouterDataIntegrationDynData.svg :figwidth: 100 % :name: FigureRouterControllingDataDynData :align: center |DD| concept |DD| is a container-like structure that holds data of any type. The description of how that type looks like is given by the *TypeCode*, a structure that allows representing any type. Using the *TypeCode* information, a |DD| object can then contain data for the associated type and behave as if it was an actual structure of such type. The |DD| class has a rich interface to access the data members and manipulate its content. The *Processor* API makes the inputs and outputs to interface with |DD|. Hence the inputs will return a list of |DD| samples when reading, while the outputs expect a |DD| object on the write operation. This common representation model has two benefits: - It allows implementations to work without knowing before hand the types. This is very convenient for general purpose processors, such as data member mappers. - It allows implementations to work independently from the the data domain where the data streams flow. This is particularly important when a different data other than DDS is used through a custom |ADAPTER| (:ref:`section-DataIntegration`). .. seealso:: :link_connext_dds_pro_um:`Objects of Dynamically Defined Types <#users_manual/Objects_of_Dynamically_Defined_Types.htm>`. Section in |RTI_CONNEXT| User's manual about |DD| and *TypeCode*. :link_connext_dds_api_cpp:`DynamicData C++ API reference ` Online API documentation for the DynamicData class. .. _section-ControllingData-Aggregation: Aggregating Data From Different Topics ====================================== A very common scenario involves defining routing paths to combine data from two or more input |TOPICs| into a single output |TOPIC|. This pattern is known as |TOPIC| *aggregation*. You can leverage the |PROCESSOR| component to perform the custom |TOPIC| aggregation that best suits the needs of your system. An example of |TOPIC| aggregation is shown in :numref:`FigureRouterControllingShapesAggregator`. There are two input |TOPICs|, ``Square`` and ``Circle``, and a single output |TOPIC|, ``Triangle``. All |TOPICs| have the same type ``ShapeType``. The goal is to produce output samples by combining data samples from the two inputs. .. figure:: static/RouterControllingShapesAggregator.svg :figwidth: 100 % :name: FigureRouterControllingShapesAggregator :align: center Aggregation example of two |TOPICs| Let's review all the tasks you need to do to create a custom |PROCESSOR| using the :ref:`section-Tutorials-ExampleShapesProcessor` . You can run it first to see it in action but you can also run one step at a time as we explain each. Develop a Custom |PROCESSOR| ---------------------------- Once you know the stream processing pattern you want perform, including what your data inputs and outputs are, you can then write the custom code of the |PROCESSOR|. A custom processor must implement the interface ``rti::routing::Processor``, which defines the abstract operations that the |TR| calls upon occurrence of certain events. In our example, we create a ``ShapesAggregator`` class to be our |PROCESSOR| implementation: .. code-block:: c++ class ShapesAggregator : public rti::routing::processor::NoOpProcessor { void on_data_available(rti::routing::processor::Route &); void on_output_enabled( rti::routing::processor::Route &route, rti::routing::processor::Output &output); ... } Note how the processor class inherits from ``NoOpProcessor``. This class inherits from ``rti::routing::processor::Processor`` and implements all its virtual methods as no-op. This is a convenience that allows us to implement only the methods for the notification of interest. In this example: - ``on_output_enabled``: Notifies that an output has been enabled and it is available to write. In our example, we create a buffer of the output type (``ShapeType``) that will hold the aggregated content of the input samples. - ``on_data_available``: Indicates that at least one input has data available to read. In our example, this is where the aggregation logic takes place and it will simply generate aggregated output samples that contain the same values as the ``Square`` samples, except for the field ``y``, which is obtained from the ``Circle``. .. seealso:: :link_routing_service_api_cpp:`Processor C++ API reference ` :ref:`section-CoreConcepts-Rm-Route-States` Different states of a |TR| and which |PROCESSOR| notifications are triggered under each of them. Create a Shared Library ----------------------- Once the |PROCESSOR| implementation is finished we need to compile it and generate a *shared library* that |RS| can load. In this example we use ``CMake`` as the build system to create the shared library. We specify the generation of a library with name ``shapesaggregator``: .. code-block:: ... add_library(shapesprocessor "${CMAKE_CURRENT_SOURCE_DIR}/ShapesProcessor.cxx") ... The generated library contains the compiled code of our implementation, contained in a single file ``ShapesAggregator.cxx``. A key aspect of the generated library is that it must export an external function that instantiates the ``ShapesAggregator``, and it's the function that |RS| will call to instantiate the |PROCESSOR|. This external symbol is denoted *entry point* and you can declare it as follows: .. code-block:: RTI_PROCESSOR_PLUGIN_CREATE_FUNCTION_DECL(ShapesAggregatorPlugin); The macro declares an external exported function with the following signature: .. code-block:: struct RTI_RoutingServiceProcessorPlugin* ShapesAggregatorPlugin_create_processor_plugin( const struct RTI_RoutingServiceProperties *, RTI_RoutingServiceEnvironment *); which is the signature |RS| requires and will assume for the entry point to create a custom |PROCESSOR|. Note that the implementation of this function requires using the macro ``RTI_PROCESSOR_PLUGIN_CREATE_FUNCTION_DEF`` in the source file. Define a Configuration with the Aggregating |TR| ------------------------------------------------ This is a similar process than the one we explained in section :ref:`section-routingData-SingleTopic`. There are two main differences that are particular to the use with a processor. Configure a plugin library ^^^^^^^^^^^^^^^^^^^^^^^^^^ Within the root element of the XML configuration, you can define a *plugin library* element that contains the description of all the plugins that can be loaded by |RS|. In our case, we define a plugin library with a single entry for our aggregation processor plugin: .. code-block:: shapesaggregator ShapesAggregatorPlugin_create_processor_plugin The values specified for the ``name`` attributes can be set at your convenience and they shall uniquely identify a plugin instance. We will use these names later within the |TR| to refer to this plugin. For the definition of our processor plugin we have specified two elements: - ``dll``: The name of the shared library as we specified in the build step. We just provide the library name so |RS| will try to load it from the working directory, or assume that the library path is set accordingly. - ````: Name of the entry point (external function) that creates the plugin object, exactly as we defined in code with the ``RTI_PROCESSOR_PLUGIN_CREATE_FUNCTION_DECL`` macro. Once we have the plugin defined in the library, we can move to the next step and define the |TR| with the desired routing paths and our |PROCESSOR| in it. .. warning:: When a name is specified in the element, |RS| will automatically append a **d** suffix when running the debug version of |RS|. .. seealso:: :ref:`section-Config-Plugins` Documentation about the ```` element. :ref:`section-Common-PluginManagement` For in-depth understanding of plugins. Configure a |RS| with the custom routing paths ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ In this example we need to define a |TR| that contains the two |INPUTs| to receive the data streams from the ``Square`` and ``Circle`` |TOPICs|, and the single output to write the single data stream to the ``Triangle`` |TOPIC|. The key element in our |TR| is the specification of a custom |PROCESSOR|, to indicate that the |TR| should **use an instance of our plugin to process the route's events and data**: .. code-block:: ... Square ShapeType Circle ShapeType Triangle ShapeType There are three important aspects in this |TR| configuration: - The custom |PROCESSOR| is specified with the ```` tag. The ``plugin_name`` attribute must contain the qualified name of an existing processor plugin within a plugin library. The qualified name is built using the values from the ``name`` attributes of the plugin library and plugin element. Although our example does not make use of it, you could provide run-time configuration properties to our plugin through an optional ```` tag. This element represents a set of name-value string pairs that are passed to the ``create_processor`` call of the plugin. - |INPUT| and |OUTPUT| elements have all a ``name`` attribute. This is the configuration name for these elements can be used within the Processor to look up and individual |INPUT| or |OUTPUT| by its name, such as we do in our example. Also notice how the names match the |TOPIC| names for which they are associated. Because we are not specifying ```` element, |RS| uses the |INPUT| and |OUTPUT| names as |TOPIC| names. In our example this makes it convenient to identify 1:1 inputs and outputs with their topics. - The input |DRs| are configured with a QoS that sets a ``KEEP_LAST`` history of just one sample. This allows our processor to just read and aggregate the latest available sample from each input. .. _section-ControllingData-Splitting: Splitting Data From a single Topic ================================== Another common pattern consists of defining routing paths to divide or *split* data from a input |TOPIC| into several output |TOPICs|. This mechanism represents the reverse equivalent to aggregation and is known as |TOPIC| splitting. You can leverage the |PROCESSOR| component to perform the |TOPIC| splitting that best suits the needs of your system. An example of |TOPIC| splitting is shown in :numref:`FigureRouterControllingDataShapesSplitter`. There is a single input |TOPIC|, ``Squares``, and two output |TOPICs|, ``Circles`` and ``Triangles``. All |TOPICs| have the same type ``ShapeType``. The goal is to produce output samples by splitting the content of data samples from the input. .. figure:: static/RouterControllingDataShapesSplitter.svg :figwidth: 90 % :name: FigureRouterControllingDataShapesSplitter :align: center Splitting example of a |TOPIC| The steps required to create a custom splitting |PROCESSOR| are the same as described in the previous section :ref:`section-ControllingData-Aggregation`. For this example we focus only in the aspects that are different. Custom |PROCESSOR| implementation ---------------------------------- In this example, we create a ``ShapesSplitter`` class to be our |PROCESSOR| implementation. Similar to ``ShapesAggregator``, this class reacts only to two event notifications: - ``on_input_enabled``: Creates a sample buffer that will be used to contain the split content from the inputs. Because all the inputs and outputs have the same type (``ShapeType``), we can obtain use the input type to create the output sample. - ``on_data_available``: This is where the splitting logic takes place and it will simply generate split output samples that contain the same values as the ``Square`` samples for all fields except ``x`` and ``y``, which are set as follows: - ``Circle`` output: the ``x`` field has the same value than the input ``Square`` and sets ``y`` to zero. - ``Triangle`` output: the ``y`` field has the same value than the input ``Square`` and sets ``x`` to zero. Define a Configuration with the Splitting |TR| ---------------------------------------------- In this example we need to define a |TR| that contains the single |INPUT| to receive the data streams from the ``Square`` |TOPIC|, and the two |OUTPUTs| to write the data streams fro the ``Circle`` and ``Triangle`` |TOPICs|. The |TR| specifies a custom processor to be created from our plugin library, and it's configured to create the ``SplitterProcessor`` .. code-block:: ShapeType ShapeType ShapeType In this |TR| configuration, the input |DR| and output |DWs| are create with default QoS. This is an important difference with regards to the configuration of aggregation example. The splitting pattern in this case is simpler since there's a single input and each received sample can hence be split individually. Note that the splitting pattern can include multiple inputs if needed, and generate output samples based on more complex algorithms in which different content from different inputs is spread across the outputs. .. _section-ControllingData-Periodic: Periodic and Delayed Action =========================== Processors can react to certain events affecting |TRs|. One special event that requires attention is the *periodic* event. In addition to events of asynchronous nature such as data available or route running, a |TR| can be configured to also provide notifications occurring at a specified periodic rate. Example below shows the XML that enables the periodic event notification at a rate of one second: .. code-block:: 1 0 ... If a |TR| enables the periodic event, then your |PROCESSOR| can implement the ``on_periodic_action`` notification and perform any operation of interest, including reading and writing data. For details on the XML configuration for periodic action and |TRs| in general, see :numref:`section-Config-Route`. Note that each |TR| can specify a different period, allowing you to have different event timing for different routing paths. Similarly, the event period that can be modified at *runtime* through the ``Route::set_period`` operation that is available as part of the |PROCESSOR| API. The configuration above will generate periodic action events **in addition to data available events coming from the inputs**. You could disable the notification of data available using the tag ````, causing the |TR| to be periodic-based only. .. _section-ControllingData-Transformation: Simple data transformation: introduction to Transformation ========================================================== There are cases involving basic manipulation of data streams that can be performed independently in a per-sample basis. That is, for a given input sample (or set of them) there's a *transformed* output sample (or set of them). For this particular use case, |RS| defines the concept of |TRANSF|, shown in :numref:`FigureRouterControllingDataTransformationConcept`. .. figure:: static/RouterControllingDataTransformationConcept.svg :figwidth: 80 % :name: FigureRouterControllingDataTransformationConcept :align: center Transformation concept A |TRANSF| is a pluggable component that receives an input data stream of type T\ :sub:`in` and produces an output data stream of type T\ :sub:`out`\. The relation between the number of input samples and output samples can also be different. This component can be installed in two different entities in |RS|. A |TRANSF| can appear to process the data stream after is produced by an |INPUT| |DR| and/or to process a data stream before is passed to an |OUTPUT| |DW|. :numref:`FigureRouterControllingDataTransformationModel` shows the complete model and context of this component. .. figure:: static/RouterControllingDataTransformationModel.svg :figwidth: 100 % :name: FigureRouterControllingDataTransformationModel :align: center Transformation model and context You can observe that a each |INPUT| and |OUTPUT| can contain a transformation. On the input side, the |TRANSF| is applied to the data stream generated by the input |DR| and the result is fed to the |PROCESSOR|. Alternatively, on the output side the |TRANSF| is applied to the data stream produced by the |PROCESSOR| and the result is passed to the output |DW|. When transformations are used it's a requirement that the type of the samples provided by the input |DR| is the same type T\ :sub:`in` expected by the input |TRANSF|. Similarly, the type T\ :sub:`out`\ of the samples produced by the output |TRANSF| must be the same than the type of the samples expected by the output |DW|. As in with a |PROCESSOR|, a |TRANSF| is expected to work with |DD| (:ref:`section-controllingData-DynData`). You can run :ref:`section-Tutorials-CustomTransf` to see how a transformation can be used. In this example, the transformation implementation receives and generates samples of type ``ShapeType``. The output samples are equal to the input samples except for the field ``x``, which is adjusted to produce only two possible constant values. Transformations vs Processors ----------------------------- A |TRANSF| is fundamentally different than a |PROCESSOR|. Moreover, they complement each other. A |TRANSF| can be seen a very simplified version of a |PROCESSOR| that has a single input and a single output and in which the input data stream is processed as it is read. In general you will find yourself implementing a |PROCESSOR| to perform the data stream processing required by your system. Nevertheless there are cases where a |TRANSF| is more suitable for certain patterns such as format conversion, content reduction, or normalization (see :ref:`section-ControllingData-Patterns`). .. _section-ControllingData-Patterns: What stream processing patterns can I perform? ============================================== With |RS| you have the ability to define routing paths with multiple inputs and outputs, and provide custom processing logic for the data streams of those paths. This provides a great degree of flexibility that allows to perform pretty much any processing pattern. In addition to the presented patterns of aggregation, splitting, and periodic action, there are other well-known patterns: - **Data format conversion**: this is the case where the input samples are represented in one format and are converted to produce output samples of a different format. For example, input samples could be represented as a byte array and converted to a JSON string. - **Content Enrichment**: an output sample is the result of amplifying or adding content to an input sample. For example, output samples can be enhanced to contain additional fields that represents result of computations performed on input sample fields (e.g., statistic metrics). - **Content Reduction**: an output sample is the result of attenuating or removing content from an input sample. For example, output samples can have some fields removed that are not relevant to the final destination, to improve bandwidth usage. - **Normalizer**: output samples are semantically equivalent to the input samples except the values are adjusted or normalized according to a specific convention. For example, input samples may contain a field indicating the temperature in Fahrenheit and the output samples provide the same temperature in Celsius. - **Event-based or Triggered Forwarding**: output samples are generated based on the reception and content of concrete input samples in which some of them act as events indicating which, how, and when data is forwarded. Key Terms ========= .. glossary:: Data Stream The collection of samples received by a |TR|'s |INPUT| or written to a |TR|'s |OUTPUT|. DynamicData A general purpose structure that contains data of any type. Processor Pluggable component to control the forwarding process of a |TR| Shared Library or Module An output artifact that contains the implementation of pluggable components that |RS| can load at run-time. Entry Point External symbol in a shared library that |RS| calls to instantiate a custom plugin instance. Stream Processing Patterns Processing algorithms applied to the data streams of a |TR|. Periodic action |TR| event notification occurring at a configurable period. Transformation Pluggable component perform modifications of a forwarded data stream.