DynamicData

12 posts / 0 new
Last post
Offline
Last seen: 10 years 3 days ago
Joined: 11/13/2014
Posts: 11
DynamicData

Hi,
I have a generic recorder with multiple interfaces to several transport protocols that stores data in a byte stream.  The idea is to keep the recorder and it's interfaces to all of the transport protocols generic to any data type.  Is there a way and are their any examples of how to use a generic typecode to keep my generic recorder from being dependent on an idl definaition at build time with a RTI interface?  I am looking at the Hello_dynamic example but it is still dependent on having the HelloWorkType defined at build time.  Any help would be much appreciated.

Thanks

Organization:
Gerardo Pardo's picture
Offline
Last seen: 3 weeks 2 days ago
Joined: 06/02/2010
Posts: 602

Hello,

Yes this is possible. It is what the RTI Recording Service does... So maybe this tool would also be of interest to you...

To implement this functionality in your own code you need to use the DynamicData API in conjunction with some internal APIs. The internal APIs are not documented and may change in the future... To help you out I have uploaded an example called monitor_raw_data into the File Exchange, the use of the internal APIs I mentioned are descriibed in the comments there. You can access the example directly at this link.

Gerardo

Offline
Last seen: 10 years 3 days ago
Joined: 11/13/2014
Posts: 11

Gerardo,

Thank you so much for your help.  When doing the reverse and reading the data out of the octet can I assume that the reverse functionality is available?  What are the RTI internal functions I need to call to get the data from the octet byte stream back to the DynamicData type?  How far back do these internal api calls go?  I appreciate all of your help.

Thanks, Steve

Offline
Last seen: 6 years 10 months ago
Joined: 04/04/2014
Posts: 27

Hello,

you could use the procedure described in a HOWTO: https://community.rti.com/howto/dynamicdata-serialization

Offline
Last seen: 10 years 3 days ago
Joined: 11/13/2014
Posts: 11

Hi, I was hoping that there would be an equivalent method to taking the octet and converting it to a DynamicData type so I could publish the data back out.  My plan is to have the data and it will be identified by the topic which I will find the type-name and typecode via the built-in subscription service just like I have done for receiving the data.  The goal is that I want to be IDL independent.  I will know what topic the data is associated to so I will be able to find a subscriber for that topic that will identify the type-name and typecode.  The data will be in the octet form so I will need to convert it back into a DynamicData form before sending it out.  Any help is much appreciated.

Thanks, Steve

Offline
Last seen: 4 years 6 months ago
Joined: 09/10/2010
Posts: 32

Hello Steve,

     In the example that Sergey sent, at the bottom of the example, there are notes on how to deserialize the data into a dynamic data object.  Use that to convert the octets buffer into the dynamic data object you have setup from the discovery information.

Bert

Offline
Last seen: 10 years 3 days ago
Joined: 11/13/2014
Posts: 11

Hi Bert,

I've tried to use the code from the example and I am getting errors.  I am thinking that maybe I am having errors publishing the data back out because I am using the RTICdrStream to put the data back into the DynamicData object but I created the octet stream on my subscription by using the methods that Gerardo had given to me in his example which I put below:

/* These extern "C" functions are internal to RTI Connext DDS.
 * These internal functions are not part of the public API and may change and/or
 * not be supported in future versions of RTI Connext DDS.
 */
extern "C" struct DDS_DynamicDataBuffer * DDS_DynamicData_get_buffer(
    DDS_DynamicData * dynamic_data);
extern "C" DDS_UnsignedLong DDS_DynamicDataBuffer_get_data_size(
    const struct DDS_DynamicDataBuffer * dynamic_data_buffer);
extern "C" char *DDS_DynamicDataBuffer_get_storage(
    const struct DDS_DynamicDataBuffer * dynamic_data_buffer);

/* Gets a reference to the raw buffer of DynamicData containing the serialized data
 * Accessing the raw buffer of DynamicData is an internal feature which may change
 * or not be supported in future versions of RTI Connext DDS.
 */
const DDS_Octet *get_rawbuffer_from_dynamicdata(int *rawbuffer_len, DynamicData *dynamic_data)
{
    DDS_DynamicDataBuffer *ddBuffer = DDS_DynamicData_get_buffer(dynamic_data);
    DDS_Octet *rawbuffer = (DDS_Octet *)DDS_DynamicDataBuffer_get_storage(ddBuffer);
    *rawbuffer_len = DDS_DynamicDataBuffer_get_data_size(ddBuffer);

    return rawbuffer;
}

I need to convert the data into a char*.  The error I am getting is the following:

prefix: DDS_DynamicDataTypeSupport_print_data:!Output failure
Sending data...
DDS_DynamicDataTypePlugin_cdr_to_parametrized_cdr:deserialization error: sequence length
DDS_DynamicDataTypePlugin_cdr_to_parametrized_cdr:error converting from CDR to extended CDR
DDS_DynamicDataTypePlugin_serialize:error converting from CDR to extended CDR
PRESWriterHistoryDriver_initializeSample:!serialize
WriterHistoryMemoryPlugin_addEntryToSessions:!initialize sample
WriterHistoryMemoryPlugin_getEntry:!add virtual sample to sessions
WriterHistoryMemoryPlugin_addSample:!get entry
PRESWriterHistoryDriver_addWrite:!add_sample
PRESPsWriter_writeInternal:!collator addWrite
! Write error 1length = 1052

Basically I am trying to capture the DynamicData, convert it to a byte stream and store it on the subscription side.  On the publish side I need to take it from the byte stream and convert it back into a DynamicData before sending it out.  Is it possible to convert the RTICdrStream into a char* because that is what I am trying to store the data as?

I appreciate all of your help.  Thanks, Steve

 

Offline
Last seen: 6 years 10 months ago
Joined: 04/04/2014
Posts: 27

It's hard to tell (for me) without seeing you code where the error exactly comes from, but it should be something with incorrectly initializing or modifying DynamicData (e.g. directly modifying internal buffer) so I can only help with the last question:

You don't convert RTICdrStream to char*, but use RTICdrStream as wrapper around your char* buffer:

char buffer[1000];
int bufferLen = 1000;
RTICdrStream serializationStream;
RTICdrStream_init(&serializationStream);
RTICdrStream_set(&serializationStream, buffer, bufferLen);

After this code all operations on  serializationStream are performed with buffer. E.g. following from_stream call will correctly fill  DynamicaData object from  serializationStream wrapped around buffer.

data->from_stream(serializationStream);

 

 

Offline
Last seen: 10 years 3 days ago
Joined: 11/13/2014
Posts: 11

Sergey,  Thank you for your response.  I believe I am doing what you have said.  I have changed the subscription side to use RTICdrStream functions as well in order to be consistent but I am still having the conversion problems when I am trying to publish the data back out.  Here are some code snippets that will show you exactly what I am doing both on the input side (subscription) and the output side (publish).  I appreciate all of your help.

--------------------------------------------------------------------------------------------------------------------------------------------------------------------

This is the subscription side where I am receiving the DynamicData object and converting it to a char * to store.

    DynamicDataReader *typed_reader = NULL;
    DynamicDataSeq data_seq;

    char buffer[BUFFER_SIZE];
    int bufferLen = BUFFER_SIZE;
    memset(buffer, 0, bufferLen);

    RTICdrStream serializationStream;
    RTICdrStream_init(&serializationStream);

    int serializedSize = 0;

    SampleInfoSeq info_seq;
    ReturnCode_t retcode;

    int i;

    /* Use the DynamicData reader since we registered the type using the
       DynaicDataTypeSupport
    */
    typed_reader = DynamicDataReader::narrow(reader);
    if (typed_reader == NULL) {
        printf("DataReader could not be narrowed to DynamicDataReader\n");
        return;
    }

    /* Take all samples. We could take samples one at a time but it is more
       efficient this way because the buffers are accessed without extra copies */
    retcode = typed_reader->take(
            data_seq, info_seq, LENGTH_UNLIMITED,
            ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE);

    if (retcode == RETCODE_NO_DATA) {
        return;
    } else if (retcode != RETCODE_OK) {
        printf("take error %d\n", retcode);
        return;
    }

    /* Process each sample received */
    for (i = 0; i < data_seq.length(); ++i) {

    /* We are only interested in data samples, not notifications of disposal or no writers */
        if (info_seq[i].valid_data) {

        DynamicData *data = &data_seq[i];
        RTICdrStream_set(&serializationStream, buffer, bufferLen);
        data->to_stream(serializationStream);
        serializedSize = RTICdrStream_getCurrentPositionOffset(&serializationStream);
        char* rawbuffer2;
        RTICdrStream_deserializeChar(&serializationStream, rawbuffer2);

        spec.bytes = serializedSize;
       
        //TODO Need to take the bytes here and send to gmlipc
        int32_t passthru = 0;
        gmlipc_dispatch_message(&spec,passthru,0,0,const_cast<char *>(rawbuffer2),GMLIPC_MODE_RTIDDS,0,0);

    }

    /* Return the loan on the samples which were being accessed as zero-copy from the DataReader cache */
    retcode = typed_reader->return_loan(data_seq, info_seq);
    if (retcode != RETCODE_OK) {
        printf("return loan error %d\n", retcode);
    }

---------------------------------------------------------------------------------------------------------------------------------------------------

This is the publish side where I am taking the stored char * and trying to send it out.

    DDSDataWriter *dataWriter            = NULL;
    DDSDynamicDataWriter *dynamicDataWriter    = NULL;
    DDS_DynamicData *deserializedData = NULL;
    RTICdrStream deserializationStream;
    RTICdrStream_init(&deserializationStream);
    DDS_Long i, count;
    DDS_ReturnCode_t rc;


    dataWriter = participant->create_datawriter(
                        topic,
                        DDS_DATAWRITER_QOS_DEFAULT,
                        NULL,           /* listener */
                        DDS_STATUS_MASK_NONE);
    if (dataWriter == NULL) {
        std::cerr << "! Unable to create DDS data writer" << std::endl;
        return false;
    }

    dynamicDataWriter = DDSDynamicDataWriter::narrow(dataWriter);
    if (dynamicDataWriter == NULL) {
        std::cerr << "! Unable to narrow data writer into DDSDynamicDataWriter"
                  << std::endl;
        return false;
    }

    /* Creates an instance of the sparse data we are about to send
     */
    deserializedData = type_support->create_data();
    if (deserializedData == NULL) {
        std::cerr << "! Unable to create an instance of the data" << std::endl;
        std::cerr << "! This problem most likely is caused by out of memory"
                  << std::endl;
        return false;
    }

    RTICdrStream_set(&deserializationStream, (char*)data, length);
    deserializedData->from_stream(deserializationStream);
    // Send the data!
    std::cout << "Sending data..." << std::endl;
    rc = dynamicDataWriter->write(*deserializedData, DDS_HANDLE_NIL);
    if (rc != DDS_RETCODE_OK) {
        std::cerr << "! Write error " <<  rc << std::endl;
    }

Offline
Last seen: 6 years 10 months ago
Joined: 04/04/2014
Posts: 27

Hello Steve,

The code looks ok for me. I would try an example dynamicdata_serialization.cpp to make sure that serialization and deserialization works with you compiler settings. Next I would try replacing typecode in example with your one to make sure that typecode can be handled with this procedure (for example it looks like you are using sparse types and I've not tested sparse data types with this procedure)

Error message "DDS_DynamicDataTypePlugin_cdr_to_parametrized_cdr:deserialization error: sequence length" could mean that it's not possible to deserialize length of sequence, which should only happen if you have not enough data in your buffer, e.g. data is truncated. Is it possible that you use incorrect length, when serializing before sending data?

Offline
Last seen: 10 years 3 days ago
Joined: 11/13/2014
Posts: 11

Hi Sergey,


I am using the size given back to me by the above code:

serializedSize = RTICdrStream_getCurrentPositionOffset(&serializationStream);

Is that not correct?

Thanks, Steve

Offline
Last seen: 6 years 10 months ago
Joined: 04/04/2014
Posts: 27

Hello Steve,


As far as I know this is the correct way to determine serialized size (at least I am using this).  to_stream call moves internal position of the  RTICdrStream during serialization and after the serialization, the internal position points to the next free location, so its offset is equal to the serialized size.

If you are using the same  RTICdrStream object for serialization you should call RTICdrStream_set before every  to_stream call to reset internal position of the RTICdrStream.

  RTICdrStream_set(&serializationStream, buffer, bufferLen);