2. Configuration

This section describes how to configure Routing Service Kafka Adapter.

All configuration is specified in Routing Service’s XML configuration file.

2.1. Load the Kafka Adapter Plugin

Routing Service Kafka Adapter must be registered as a Routing Service plugin by using the <adapter_plugin> tag.

Once the dynamic library and constructor function have been registered, Routing Service will create an instance of the plugin during start-up, and you can use the plugin to create one or more connections to a Kafka Broker.

The following snippet demonstrates how to register the plugin in the <plugin_library> section of Routing Service’s XML configuration:

<?xml version="1.0"?>
<dds>
    <plugin_library name="MyPlugins">
        <adapter_plugin name="KafkaAdapter">
            <dll>rtikafkaadapter</dll>
            <create_function>RTI_RS_Kafka_AdapterPlugin_create</create_function>
        </adapter_plugin>
    </plugin_library>
</dds>

Warning

Routing Service must be able to find the Routing Service Kafka Adapter dynamic library (librtikafkaadapter.so on Linux® systems, librtikafkaadapter.dylib on macOS® systems, or rtikafkaadapter.dll on Windows® systems). Make sure to include the library’s directory in the library search path environment variable appropriate for your system (LD_LIBRARY_PATH on Linux systems, RTI_LD_LIBRARY_PATH on macOS systems, or PATH on Windows systems, etc.).

2.2. Register KAFKA Data Types

Routing Service Kafka Adapter exposes data to Routing Service using the DDS Type named: RTI::Kafka::Message.

This data type needs to be registered. It is used to store converted DDS data in a format that a Kafka application can understand (e.g., JSON). The format can be configured with a transformation plugin.

<?xml version="1.0"?>
<dds>
    <types>
        <module name="RTI">
            <module name="Kafka">
                <struct name="MessagePayload" nested="true">
                    <member name="data" sequenceMaxLength="-1" type="byte" />
                </struct>
                <struct name="Message">
                    <member name="payload" type="nonBasic" nonBasicTypeName="RTI::Kafka::MessagePayload" />
                </struct>
            </module>
        </module>
    </types>
</dds>

2.3. Configure QoS

The types used by Routing Service Kafka Adapter use “unbounded” sequences.

The following QoS parameters must be configured in order to enable use of “unbounded” types:

<?xml version="1.0"?>
<dds>
    <qos_library name="MyQosLib">
        <qos_profile name="MyQos" is_default_qos="true">
            <datareader_qos>
                <reader_resource_limits>
                    <dynamically_allocate_fragmented_samples>
                        true
                    </dynamically_allocate_fragmented_samples>
                </reader_resource_limits>
                <property>
                    <value>
                        <element>
                            <name>dds.data_reader.history.memory_manager.fast_pool.pool_buffer_max_size</name>
                            <!-- Modify this value according to your preference -->
                            <value>10485760</value>
                        </element>
                    </value>
                </property>
            </datareader_qos>
            <datawriter_qos>
                <property>
                    <value>
                        <element>
                            <name>dds.data_writer.history.memory_manager.fast_pool.pool_buffer_max_size</name>
                            <!-- Modify this value according to your preference -->
                            <value>10485760</value>
                        </element>
                    </value>
                </property>
            </datawriter_qos>
        </qos_profile>
    </qos_library>
</dds>

2.4. Connect to a Kafka Broker

Once the plugin has been registered with Routing Service, it can be used to create <connection> elements within a <domain_route>.

The <connection>’s configuration must include properties to configure the associated Kafka Client.

The bootstrap.servers property should contains a comma-separated list of host and port pairs that are the addresses of the Kafka Brokers in a “bootstrap” Kafka cluster that a Kafka Client connects to initially to bootstrap itself. A host and port pair uses : as the separator (e.g., “localhost:9092, another.host:9092”).

The following snippet shows an example <connection> that connects to a local Kafka Broker:

<?xml version="1.0"?>
<dds>
    <routing_service>
        <domain_route name="domain_route">
            <participant name="dds">
                <domain_id>0</domain_id>
                <registered_type name="ShapeType" type_name="ShapeType" />
                <registered_type name="RTI::Kafka::Message" type_name="RTI::Kafka::Message" />
            </participant>
            <connection name="kafka" plugin_name="MyPlugins::KafkaAdapter">
                <property>
                    <value>
                        <element>
                            <name>bootstrap.servers</name>
                            <value>localhost:9092</value>
                        </element>
                    </value>
                </property>
                <registered_type name="ShapeType" type_name="ShapeType" />
                <registered_type name="RTI::Kafka::Message" type_name="RTI::Kafka::Message" />
            </connection>
        </domain_route>
    </routing_service>
</dds>

2.5. Route Data from Kafka to DDS

A <connection> created by Routing Service Kafka Adapter can be used to define <input> elements which will subscribe to Kafka Messages from the Kafka Broker, and expose them to the enclosing <route> as DDS Samples of RTI::Kafka::Message.

The specific DDS Type to use is determined from the value of the <registered_type_name> element.

The Kafka Client associated with the <connection> will be used to create a subscription on the Kafka Broker for the topic name defined in the the topic property.

The following snippet demonstrate how to create an <input> which will subscribe to a topic named "kafka_dds" available on the Kafka Broker, and route them to DDS Topic “dds_kafka” using DDS Type RTI::Kafka:Message. The Kafka Messages in this example are formatted in JSON, so we use the JSON transformation plugin to convert the Kafka Messages in JSON to DDS samples.

<?xml version="1.0"?>
<dds>
    <routing_service>
        <session>
            <route>
                <input connection="kafka">
                    <registered_type_name>RTI::Kafka::Message</registered_type_name>
                    <property>
                        <value>
                            <element>
                                <name>topic</name>
                                <value>kafka_dds</value>
                            </element>
                        </value>
                    </property>
                </input>
                <dds_output name="dds_kafka" participant="dds">
                    <registered_type_name>ShapeType</registered_type_name>
                    <transformation plugin_name="plugin_library::json_transformation">
                        <input_type_name>RTI::Kafka::Message</input_type_name>
                        <property>
                            <value>
                                <element>
                                    <name>transform_type</name>
                                    <value>deserialize</value>
                                </element>
                                <element>
                                    <name>buffer_member</name>
                                    <value>payload.data</value>
                                </element>
                            </value>
                        </property>
                    </transformation>
                </dds_output>
            </route>
        </session>
    </routing_service>
</dds>

2.6. Route Data from DDS to Kafka

A <connection> created by Routing Service Kafka Adapter can be used to define <output> elements that will convert DDS Samples into Kafka Messages, and publish them to a topic on the <connection>’s Kafka Broker.

DDS Samples must be of type RTI::Kafka::Message.

The following snippet demonstrate how to configure an <output> to publish the payload of RTI::Kafka::Message DDS Samples published on topic "dds_kafka" to Kafka Topic "kafka_dds". The Kafka Messages in this example are formatted in JSON, so we use the JSON transformation plugin to convert the DDS Samples to Kafka Messages in JSON.

<?xml version="1.0"?>
<dds>
    <routing_service>
        <session>
            <route>
                <dds_input name="dds_kafka" participant="dds">
                    <registered_type_name>ShapeType</registered_type_name>
                </dds_input>
                <output connection="kafka">
                    <registered_type_name>RTI::Kafka::Message</registered_type_name>
                    <property>
                        <value>
                            <element>
                                <name>topic</name>
                                <value>kafka_dds</value>
                            </element>
                        </value>
                    </property>
                    <transformation plugin_name="plugin_library::json_transformation">
                        <input_type_name>ShapeType</input_type_name>
                        <property>
                            <value>
                                <element>
                                    <name>transform_type</name>
                                    <value>serialize</value>
                                </element>
                                <element>
                                    <name>buffer_member</name>
                                    <value>payload.data</value>
                                </element>
                            </value>
                        </property>
                    </transformation>
                </output>
            </route>
        </session>
    </routing_service>
</dds>

2.7. Kafka Configuration properties

This section describes the properties that can be used to configure Routing Service Kafka Adapter.

2.7.1. <connection> Properties

Each <connection> created by the Routing Service Kafka Adapter plugin is associated with a Kafka Client which must be configured using the <properties> tag.

Table 2.1 <connection> Properties

Property

Required

Default

Type

Description

bootstrap.servers

Yes

None

string

Initial list of brokers as a CSV list of broker host or host:port.

2.7.2. <input> Properties

<input> created by the Routing Service Kafka Adapter plugin is associated with a Kafka Consumer which can be configured using the <properties> tag.

Table 2.2 <input> Properties

Property

Required

Default

Type

Description

topic

Yes

None

string

A topic name used by a Kafka Consumer.

2.7.3. <output> Properties

<output> created by the Routing Service Kafka Adapter plugin is associated with a Kafka Producer which can be configured using the <properties> tag.

Table 2.3 <input> Properties

Property

Required

Default

Type

Description

topic

Yes

None

string

A topic name used by a Kafka Producer.

2.7.4. librdkafka Producer/Consumer Properties

Librdkafka supports several configuration properties for Kafka producers and consumers that can be used as Routing Service <output> and <input> properties. The full list of these librdkafka properties can be found in this table.