Example Code - Aggregation
Note: Applies to RTI Data Distribution Service 4.1, 4.2; While the design pattern also works for RTI Data Distribution Service 4.3 and above, its batching feature is a better way to achieve aggregation.
Purpose
Publisher
This example shows how to create an application that aggregates data. This means that instead of sending each sample out on the wire with its own header, we emulate an aggregation feature by storing the data in a batch until it is full and then sending the entire batch as one data packet. This saves on overhead taken up by headers and such because all the data in the batch shares the same header.
The aggregation DataWriter sends data out on the wire as soon as the batch is full. Of course, if the batch never becomes full, then nothing is sent unless a periodic flush is implemented.
Because RTI Data Distribution Service tags each data packet sent on the wire with a sequence number, the built-in sequence number corresponds to each batch sent out over the wire. This is different than the non-batching mechanism, where the sequence number corresponds to each data sample. To address this issue and also to show that all data samples are indeed sent regardless of the use of batching, the data type includes its own sequence number that can be tracked on the subscriber side.
Subscriber
No change from default behavior except for a few modifications to print statements.
Building
Make sure you are using one of the relevant RTI Data Distribution Service versions, as specified at the top of the Solution.
Before compiling or running the example, make sure the environment variable NDDSHOME
is set to the directory where your version of RTI Data Distribution Service is installed.
Run rtiddsgen with the -example
option and the target architecture of your choice (such as i86Win32VS2005, as seen in the following example command). Do not use the -replace
option. The RTI Data Distribution Service Getting Started Guide describes the process for generating the code and building the example applications.
After running rtiddsgen like this...
rtiddsgen -language C++ -example i86Win32VS2005 equities.idl
...you will see messages that look like this:
File C:\local\Aggregation\c++\equities_subscriber.cxx already exists and will not be replaced with updated content. If you would like to get a new file with the new content, either remove this file or supply -replace option. File C:\local\Aggregation\c++\equities_publisher.cxx already exists and will not be replaced with updated content. If you would like to get a new file with the new content, either remove this file or supply -replace option.
This is normal and is only informing you that the subscriber/publisher code has not been replaced, which is fine since all the source files for the example are already provided.
Running
In two separate command prompt windows for the publisher and subscriber, navigate to the objs/<arch> directory and run the following commands.
On Windows systems:
equities_publisher.exe <domain#> 4 equities_subscriber.exe <domain#>
On UNIX systems:
./equities_publisher <domain#> 4 ./equities_subscriber <domain#>
The publishing application accepts two arguments:
- The <
domain #>
. Both applications must use the same domain # in order to communicate. The default is 0. - The
batch size
. The default is 10.
The subscribing application accepts two arguments:
- The <
domain #>
. Both applications must use the same domain # in order to communicate. The default is 0. - How long the subscriber should run, measured in seconds. A value of '0' instructs the application to run forever; this is the default.
While generating the output below, we used values that would capture the most interesting behavior.
Publisher Output
It runs in an infinite loop, but we halted execution after 9 batches.
Writing EquitiesDataSequence, count 0 Writing EquitiesDataSequence, count 1 Writing EquitiesDataSequence, count 2 Writing EquitiesDataSequence, count 3 Writing EquitiesDataSequence, count 4 Writing EquitiesDataSequence, count 5 Writing EquitiesDataSequence, count 6 Writing EquitiesDataSequence, count 7 Writing EquitiesDataSequence, count 8 Writing EquitiesDataSequence, count 9
Subscriber Output
After halting an infinite loop.
received new batch elem_seq: elem_seq[0]: sequence_number: 0 value: 0 symbol: "IBM" elem_seq[1]: sequence_number: 1 value: 1 symbol: "IBM" elem_seq[2]: sequence_number: 2 value: 2 symbol: "IBM" elem_seq[3]: sequence_number: 3 value: 3 symbol: "IBM" received new batch elem_seq: elem_seq[0]: sequence_number: 4 value: 0 symbol: "IBM" elem_seq[1]: sequence_number: 5 value: 1 symbol: "IBM" elem_seq[2]: sequence_number: 6 value: 2 symbol: "IBM" elem_seq[3]: sequence_number: 7 value: 3 symbol: "IBM"