Improve performance accessing types containing sequences and arrays in RTI Connector

Due to the way in which RTI Connector handles data internally, some applications may suffer slower performance when using types that contain sequences. This is a known issue and is tracked internally in CON-42. More specifically, this issue is seen when setting or getting a type that contains a sequence. This article describes a potential solution, which can be used as a workaround for this issue.

A temporary workaround provides a major performance improvement by calling the C DynamicData APIs directly, bypassing the conversion to python/javascript lists. The way in which Connector is designed allows you to extend the API and call into the Connext DDS C API.

In order to understand how we can accomplish this, it is first necessary to understand how Connector performs native function calls. Connector for Python uses the package ctypes, while Connector for JavaScript uses the ffi package. Both of these packages work in a similar way. You load the native library that contains the API; you specify the argument types and return types; then you can call the native function. This article describes this process and provides an example of how we can call the native DynamicData APIs to efficiently get or set a sequence or array.

In the example presented here, we will use the following type (representing image data):

<struct name="ImageType">
  <member name="r_data" type="byte" arrayDimensions="1024"/>
  <member name="g_data" type="byte" arrayDimensions="1024"/>
  <member name="b_data" type="byte" arrayDimensions="1024"/>
</struct>

The first step is to identify which native APIs we would like to use. Connector is built upon the DynamicData API.  Under the hood, the inputs are DynamicDataReaders and the outputs are DynamicDataWriters.
This means that the APIs we need to call are
DDS_DynamicData_set_octet_array (to set the instance on the output) and DDS_DynamicData_get_octet_array (to get the sample on the input). The APIs required depend on the type of element in the sequence or array.
Note: If the sequence was contained within a nested structure (i.e., ImageType contained another nonBasicType, which contained the rgb data), we would first need to use the
DDS_DynamicData_bind_complex_member API).

Using the Python Connector as an example, we can add these APIs to the binding as follows:

# DDS_ReturnCode_t DDS_DynamicData_set_octet_array(
#       DDS_DynamicData *,
#       const char *,
#       DDS_DynamicDataMemberId,
#       DDS_UnisgnedLong
#       const DDS_Octet *)
# On Windows we need to explicitly load the nddsc.dll
# nddsc_library = cdll.LoadLibrary("C:\\Path\\To\\nddsc.dll")
# And then use nddsc_library variable above instead of
# rti.connector_binding.library
DDS_DynamicData_set_octet_array = rti.connector_binding.library.DDS_DynamicData_set_octet_array
DDS_DynamicData_set_octet_array.restype = ctypes.c_int
DDS_DynamicData_set_octet_array.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_long, ctypes.c_ulong, ctypes.c_void_p]


# DDS_ReturnCode_t DDS_DynamicData_get_octet_array(
#       DDS_DynamicData * self,
#       DDS_Octet *array,
#       DDS_UnsignedLong *length,
#       const char *name,
#       DDS_DynamicDataMemberId member_id)
# On Windows we need to explicitly load the nddsc.dll
# nddsc_library = cdll.LoadLibrary("C:\\Path\\To\\nddsc.dll")
# And then use nddsc_library variable above instead of
# rti.connector_binding.library
DDS_DynamicData_get_octet_array = rti.connector_binding.library.DDS_DynamicData_get_octet_array
DDS_DynamicData_get_octet_array.restype = ctypes.c_int
DDS_DynamicData_get_octet_array.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_char_p, ctypes.c_ulong]

NOTE: It is important to note that on Windows, the symbols for APIs that are not used by Connector are not contained within the library loaded by Connector. This means that you have to explicitly load the nddsc.dll in your program (using cdll.LoadLibrary).

In order to obtain the DDS_DynamicData *self argument, we need to use some of the properties provided by Connector:

With the native functions successfully “imported” to Connector, we can write the necessary code to call the APIs, and set/get the sequence data using the native DynamicData function calls.

On an output (file also attached to this article):

 

def setSequenceNatively(output, field_name, python_arr):
    # We need access to the native DynamicData sample which we can supply
    # to the DDS C APIs
    native_dynamic_data = output.instance.native

    # We then need to build the sequence such that it is compatible with C.
    # This is accomplished using ctypes
    # Unsigned long for the length of the array
    array_length = ctypes.c_ulong(len(python_arr))
    # Create a new type
    in_array_type = ctypes.c_char * len(python_arr)
    # Set the member ID to 0 (translates to unknown in DDS C DynamicData API)
    member_id = 0
    # Create an instance of our new type using the python array
    in_array = in_array_type(*python_arr)
    # Here we set the values into the member
    DDS_DynamicData_set_octet_array(
            ctypes.cast(native_dynamic_data, ctypes.c_void_p),
            field_name.encode("utf8"),
            member_id,
            array_length,
            ctypes.byref(in_array))

with rti.open_connector(
        config_name="MyParticipantLibrary::MyPubParticipant",
        url=file_path + "/../Example.xml") as connector:

    output = connector.get_output("MyPublisher::MyImageWriter")
    output.wait_for_subscriptions()

    r_data = [b'f'] * ARRAY_LEN
    setSequenceNatively(output, "r_data", r_data)
    g_data = [b'4'] * ARRAY_LEN
    setSequenceNatively(output, "g_data", g_data)
    b_data = [b'0'] * ARRAY_LEN
    setSequenceNatively(output, "b_data", b_data)

    print("Writing...")
    for i in range(1, 100):
        output.write()
        sleep(0.5)
    print("Exiting...")
    output.wait()

 

On an input (file also attached to this article):

def getSequenceNatively(native_dynamic_data, field_name):
    # Now obtain the sequence requested (described by field_name)
    # Unsigned long for the expected length of the array
    array_length = ctypes.c_ulong(ARRAY_LEN)
    # Create a new "type" compatible with the sequence we are obtaining
    in_array_type = ctypes.c_char * ARRAY_LEN
    # Create an instance of that type which will we pass to the native API
    in_array = in_array_type()
    DDS_DynamicData_get_octet_array(
            ctypes.cast(native_dynamic_data, ctypes.c_void_p),
            ctypes.byref(in_array),
            ctypes.byref(array_length),
            field_name.encode("utf8"),
            0)
    # This uses numpy. If numpy isn't available, you could do
    # return [in_array[i] for i in xrange(ARRAY_LEN)]
    return np.ndarray((ARRAY_LEN, ), np.ubyte, in_array, order='C')

with rti.open_connector(
        config_name="MyParticipantLibrary::MySubParticipant",
        url=file_path + "/../Example.xml") as connector:

    input = connector.get_input("MySubscriber::MyImageReader")
    input.wait_for_publications()

    for i in range(1, 500):
        input.wait()
        input.take()
        for sample in input.samples.valid_data_iter:
            r_data = getSequenceNatively(sample.native, "r_data")
            g_data = getSequenceNatively(sample.native, "g_data")
            b_data = getSequenceNatively(sample.native, "b_data")

 

Product:
Attachments: 

Comments

Hello! Are you still working on this? I'm trying to use your code, but im miserably failing. Can you help me sometimes? On the writer side I'm getting this:

DDS_DynamicData2_unbind_complex_member: Unbinding cached member failed.      
DDS_DynamicData2_unbind_complex_member: Unbinding cached member failed.      
DDS_DynamicData2_unbind_complex_member: Unbinding cached member failed.
DDS_DynamicData2_unbind_complex_member: Unbinding cached member failed.      
DDS_DynamicData2_set_octet_array: Unbinding cached member failed.
DDS_DynamicData2_set_octet_array: Unbinding cached member failed.
DDS_DynamicData2_unbind_complex_member:ERROR: Bad parameter: self has no bound member

On the reader side the following:

OSError: exception: access violation reading 0x0000000000000000

 

Hi garzob,

Did you make any modifications to the attached example?
As far as I am aware this example still works without issue.

Sam

Hi Samr!

Thank you for the reply. I'm using windows so i had to use the ctypes.cdll.LoadLibrary function, which is giving me all kinds of problems. Thats the only modification I made.

Something is up with my build and I'm not sure what is it. Can you tell me your specs, with what you are running the example? Like python version, DDS version, etc.

So my installation of RTI Connext DDS is on my D: and python etc. is on C: , I have the x64Win64VS2017 dds library and its in my PATH env variable, I have my NDDSHOME Variable set as well.

When I'm downloading your program from the scratch, I change the followings:

nddsc_library = ctypes.CDLL("D:\\RTI-Connext\\rti_connext_dds-6.0.0\\lib\\x64Win64VS2017\\nddsc.dll")
DDS_DynamicData_set_octet_array = nddsc_library.DDS_DynamicData_set_octet_array
 
I change nothing else and I get the following error:
 
OSError: [WinError 126] The specified module could not be found
 
I read it on the internet, that this should be fixed with doing an os.chdir to where the rest of the dlls are, I once copied all the files from x64Win64VS2017 to my project library, but that didn't fix it either.
I just got a Winerror 127 instead.