2. Configuration
This section describes how to configure Routing Service Kafka Adapter.
All configuration is specified in Routing Service’s XML configuration file.
2.1. Load the Kafka Adapter Plugin
Routing Service Kafka Adapter must be registered as a Routing Service plugin by using the <adapter_plugin>
tag.
Once the dynamic library and constructor function have been registered, Routing Service will create an instance of the plugin during start-up, and you can use the plugin to create one or more connections to a Kafka Broker.
The following snippet demonstrates how to register the plugin in the
<plugin_library>
section of Routing Service’s XML configuration:
<?xml version="1.0"?>
<dds>
<plugin_library name="MyPlugins">
<adapter_plugin name="KafkaAdapter">
<dll>rtikafkaadapter</dll>
<create_function>RTI_RS_Kafka_AdapterPlugin_create</create_function>
</adapter_plugin>
</plugin_library>
</dds>
Warning
Routing Service must be able to find the Routing Service Kafka Adapter dynamic library
(librtikafkaadapter.so
on Linux® systems,
librtikafkaadapter.dylib
on macOS® systems,
or rtikafkaadapter.dll
on Windows® systems). Make
sure to include the library’s directory in the library search
path environment variable appropriate for your system
(LD_LIBRARY_PATH
on Linux systems, RTI_LD_LIBRARY_PATH
on
macOS systems, or PATH
on Windows systems, etc.).
2.2. Register KAFKA Data Types
Routing Service Kafka Adapter exposes data to Routing Service using the DDS Type named:
RTI::Kafka::Message
.
This data type needs to be registered. It is used to store converted DDS data in a format that a Kafka application can understand (e.g., JSON). The format can be configured with a transformation plugin.
<?xml version="1.0"?>
<dds>
<types>
<module name="RTI">
<module name="Kafka">
<struct name="MessagePayload" nested="true">
<member name="data" sequenceMaxLength="-1" type="byte" />
</struct>
<struct name="Message">
<member name="payload" type="nonBasic" nonBasicTypeName="RTI::Kafka::MessagePayload" />
</struct>
</module>
</module>
</types>
</dds>
2.3. Configure QoS
The types used by Routing Service Kafka Adapter use “unbounded” sequences.
The following QoS parameters must be configured in order to enable use of “unbounded” types:
<?xml version="1.0"?>
<dds>
<qos_library name="MyQosLib">
<qos_profile name="MyQos" is_default_qos="true">
<datareader_qos>
<reader_resource_limits>
<dynamically_allocate_fragmented_samples>
true
</dynamically_allocate_fragmented_samples>
</reader_resource_limits>
<property>
<value>
<element>
<name>dds.data_reader.history.memory_manager.fast_pool.pool_buffer_max_size</name>
<!-- Modify this value according to your preference -->
<value>10485760</value>
</element>
</value>
</property>
</datareader_qos>
<datawriter_qos>
<property>
<value>
<element>
<name>dds.data_writer.history.memory_manager.fast_pool.pool_buffer_max_size</name>
<!-- Modify this value according to your preference -->
<value>10485760</value>
</element>
</value>
</property>
</datawriter_qos>
</qos_profile>
</qos_library>
</dds>
2.4. Connect to a Kafka Broker
Once the plugin has been registered with Routing Service, it can be used to create
<connection>
elements within a <domain_route>
.
The <connection>
’s configuration must include properties to configure the
associated Kafka Client.
The bootstrap.servers
property should contains a comma-separated list of
host and port pairs that are the addresses of the Kafka Brokers in a
“bootstrap” Kafka cluster that a Kafka Client connects to initially to
bootstrap itself. A host and port pair uses : as the separator
(e.g., “localhost:9092, another.host:9092”).
The following snippet shows an example <connection>
that connects to
a local Kafka Broker:
<?xml version="1.0"?>
<dds>
<routing_service>
<domain_route name="domain_route">
<participant name="dds">
<domain_id>0</domain_id>
<registered_type name="ShapeType" type_name="ShapeType" />
<registered_type name="RTI::Kafka::Message" type_name="RTI::Kafka::Message" />
</participant>
<connection name="kafka" plugin_name="MyPlugins::KafkaAdapter">
<property>
<value>
<element>
<name>bootstrap.servers</name>
<value>localhost:9092</value>
</element>
</value>
</property>
<registered_type name="ShapeType" type_name="ShapeType" />
<registered_type name="RTI::Kafka::Message" type_name="RTI::Kafka::Message" />
</connection>
</domain_route>
</routing_service>
</dds>
2.5. Route Data from Kafka to DDS
A <connection>
created by Routing Service Kafka Adapter can be used to define <input>
elements which will subscribe to Kafka Messages from the Kafka Broker, and
expose them to the enclosing <route>
as DDS Samples of
RTI::Kafka::Message
.
The specific DDS Type to use is determined from the value of the
<registered_type_name>
element.
The Kafka Client associated with the <connection>
will be used to create
a subscription on the Kafka Broker for the topic name defined in the the
topic
property.
The following snippet demonstrate how to create an <input>
which will
subscribe to a topic named "kafka_dds"
available on the Kafka Broker, and
route them to DDS Topic “dds_kafka” using DDS Type RTI::Kafka:Message
. The
Kafka Messages in this example are formatted in JSON, so we use the JSON
transformation plugin to convert the Kafka Messages in JSON to DDS samples.
<?xml version="1.0"?>
<dds>
<routing_service>
<session>
<route>
<input connection="kafka">
<registered_type_name>RTI::Kafka::Message</registered_type_name>
<property>
<value>
<element>
<name>topic</name>
<value>kafka_dds</value>
</element>
</value>
</property>
</input>
<dds_output name="dds_kafka" participant="dds">
<registered_type_name>ShapeType</registered_type_name>
<transformation plugin_name="plugin_library::json_transformation">
<input_type_name>RTI::Kafka::Message</input_type_name>
<property>
<value>
<element>
<name>transform_type</name>
<value>deserialize</value>
</element>
<element>
<name>buffer_member</name>
<value>payload.data</value>
</element>
</value>
</property>
</transformation>
</dds_output>
</route>
</session>
</routing_service>
</dds>
2.6. Route Data from DDS to Kafka
A <connection>
created by Routing Service Kafka Adapter can be used to define <output>
elements that will convert DDS Samples into Kafka Messages, and publish
them to a topic on the <connection>
’s Kafka Broker.
DDS Samples must be of type RTI::Kafka::Message
.
The following snippet demonstrate how to configure an <output>
to
publish the payload of RTI::Kafka::Message
DDS Samples published
on topic "dds_kafka"
to Kafka Topic "kafka_dds"
. The Kafka Messages
in this example are formatted in JSON, so we use the JSON transformation plugin
to convert the DDS Samples to Kafka Messages in JSON.
<?xml version="1.0"?>
<dds>
<routing_service>
<session>
<route>
<dds_input name="dds_kafka" participant="dds">
<registered_type_name>ShapeType</registered_type_name>
</dds_input>
<output connection="kafka">
<registered_type_name>RTI::Kafka::Message</registered_type_name>
<property>
<value>
<element>
<name>topic</name>
<value>kafka_dds</value>
</element>
</value>
</property>
<transformation plugin_name="plugin_library::json_transformation">
<input_type_name>ShapeType</input_type_name>
<property>
<value>
<element>
<name>transform_type</name>
<value>serialize</value>
</element>
<element>
<name>buffer_member</name>
<value>payload.data</value>
</element>
</value>
</property>
</transformation>
</output>
</route>
</session>
</routing_service>
</dds>
2.7. Kafka Configuration properties
This section describes the properties that can be used to configure Routing Service Kafka Adapter.
2.7.1. <connection> Properties
Each <connection>
created by the Routing Service Kafka Adapter plugin is associated with a
Kafka Client which must be configured using the <properties>
tag.
Property |
Required |
Default |
Type |
Description |
---|---|---|---|---|
|
Yes |
None |
string |
Initial list of brokers as a CSV list of broker host or host:port. |
2.7.2. <input> Properties
<input>
created by the Routing Service Kafka Adapter plugin is associated with a Kafka Consumer
which can be configured using the <properties>
tag.
Property |
Required |
Default |
Type |
Description |
---|---|---|---|---|
|
Yes |
None |
string |
A topic name used by a Kafka Consumer. |
2.7.3. <output> Properties
<output>
created by the Routing Service Kafka Adapter plugin is associated with a Kafka Producer
which can be configured using the <properties>
tag.
Property |
Required |
Default |
Type |
Description |
---|---|---|---|---|
|
Yes |
None |
string |
A topic name used by a Kafka Producer. |
2.7.4. librdkafka Producer/Consumer Properties
Librdkafka supports several configuration properties for Kafka producers and
consumers that can be used as Routing Service <output>
and <input>
properties. The
full list of these librdkafka
properties can be found
in this table.