DynamicData API usage for receiving arbitrary data on a topic

10 posts / 0 new
Last post
Offline
Last seen: 9 years 9 months ago
Joined: 03/08/2013
Posts: 8
DynamicData API usage for receiving arbitrary data on a topic

Hi,


I am trying to solve the following problem in C++ and would need some input on how to do this:
I want to create a data reader which listens on a specific topic name for arbitrary data coming in on this topic during runtime. Then I want to access a specific field of the received message (if it exists). I believe that this should be possible via the DynamicData API, but I just cannot figure out how to use it. A stripped down example would be like this (please consider it pseudo-code, it´s just there to illustrate the points I don´t get):

class MyListener : public DDSDataReaderListener {
public:
virtual void on_data_available(DDSDataReader* reader) {

DDSDynamicDataReader* dynamic_data_reader = NULL;
DDSDynamicData* data = NULL;
DDS_SampleInfo info;
DDS_ReturnCode_t retcode;
 
dynamic_data_reader = DDSDynamicDataReader::narrow(reader);
retcode = dynamic_data_reader->take_next_sample (data, info);

data->get_double(val, fieldName.c_str(), What is this here and how do I get it?);
printf("%lf\n", val);

}
}

void printData(const std::string& topicName, const std::string& fieldName) {

DDSDomainParticipant *participant = NULL;
DDSSubscriber *subscriber = NULL;
DDSTopic *topic = NULL;
DDSDynamicDataReader* reader = NULL;
MyListener reader_listener* = new MyListener();
DDS_SampleInfo sample_info;
double val;

participant = DDSTheParticipantFactory->create_participant(domainId,DDS_PARTICIPANT_QOS_DEFAULT,NULL , DDS_STATUS_MASK_NONE);
subscriber = participant->create_subscriber(DDS_SUBSCRIBER_QOS_DEFAULT, NULL , DDS_STATUS_MASK_NONE);

type_name = What do I have to do here ???
topic = participant->create_topic(topicNamed.c_str(),type_name,DDS_TOPIC_QOS_DEFAULT, NULL ,DDS_STATUS_MASK_NONE);
reader = subscriber->create_datareader(topic,DDS_DATAREADER_QOS_DEFAULT, reader_listener, DDS_STATUS_MASK_ALL);


// ... spin() ...

}

The question boils down to how can I instantiate a topic on arbitrary data and what do I have to do to access a field in a dynamic data type? I would be most thankful for some example code.

Thanks a lot in advance!

Organization:
Keywords:
Gerardo Pardo's picture
Offline
Last seen: 3 weeks 16 hours ago
Joined: 06/02/2010
Posts: 602

Hi,

I think there is a good example of this in the File Exchange.

Specifically the file called MonitorData.java found under the the Code Snippets/Java folder. It is a Java example but it should not be too hard to map it to the corresponding C++ code.

Gerardo

 

Offline
Last seen: 9 years 9 months ago
Joined: 03/08/2013
Posts: 8

Hi Gerardo,
thanks for the quick reply, this example actually helped. However now I'm stuck with an error message that i get.
 The code looks like follows:

#include <string>

#include <ndds/ndds_cpp.h>
#include <iostream>


class MyPublicationBuiltinTopicDataListener : public DDSDataReaderListener {
  public:

    virtual void on_data_available(DDSDataReader *reader) {
    std::cout << "Data available on topic " << reader->get_topicdescription()->get_name() << std::endl;
    DDS_PublicationBuiltinTopicData data;
    DDS_SampleInfo info;
    DDS_ExceptionCode_t exceptionCode;
         
    DDSPublicationBuiltinTopicDataDataReader* _reader = DDSPublicationBuiltinTopicDataDataReader::narrow( reader );
    if (_reader == NULL) { printf("#WARNING: reader == NULL\n"); return; }
    try {
        _reader->read_next_sample(data, info);
        data.type_code->print_IDL(0, exceptionCode);
    } catch (const std::exception& e) {
        std::cout << e.what() << std::endl;
    }
    };
};

int main(int argc, char *argv[])
{
    
    std::string library = "VW_Library";
    std::string profile = "vw_base_profile";
    
    NDDSConfigLogger::get_instance()->set_verbosity(NDDS_CONFIG_LOG_VERBOSITY_WARNING );
    
    DDSDomainParticipantFactory *factory = NULL;
    DDSDomainParticipant * participant = NULL;
    DDSSubscriber* builtin_subscriber = NULL;
    DDSPublicationBuiltinTopicDataDataReader* publications_reader = NULL;
    MyPublicationBuiltinTopicDataListener* builtin_publications_listener = NULL;
    DDS_DomainParticipantQos qos;
    DDS_ReturnCode_t retcode;
    
    factory = DDSDomainParticipantFactory::get_instance();
    retcode = factory->get_participant_qos_from_profile (qos, library.c_str(), profile.c_str());
    
    participant = DDSTheParticipantFactory->create_participant( 0, qos, NULL, DDS_STATUS_MASK_NONE);
    if (participant == NULL) { exit(-1); }
    
    builtin_subscriber = participant->get_builtin_subscriber();
    if (builtin_subscriber == NULL) { exit (-1); }

    printf("Trying to look up publications ...\n");
    builtin_publications_listener = new MyPublicationBuiltinTopicDataListener();
    publications_reader = DDSPublicationBuiltinTopicDataDataReader::narrow(  builtin_subscriber->lookup_datareader(DDS_PUBLICATION_TOPIC_NAME ));
    publications_reader->set_listener (builtin_publications_listener, DDS_STATUS_MASK_ALL);
    
    DDS_Duration_t loopDuration = {1, 0};
    while (1) { NDDSUtility::sleep(loopDuration); }

    /// clean up
    delete builtin_publications_listener;
    
    return 0;
}

The output that I get when I start the program and publish some data from another one is:

Trying to look up publications ...
Data available on topic DCPSPublication
DDS_PublicationBuiltinTopicData_copy:ERROR: Bad parameter: topic_name
DDS_PublicationBuiltinTopicDataPlugin_copy:ERROR: Failed to copy publication built-in topic data
PRESCstReaderCollator_readOrTakeNextSample:!copy
PRESPsReader_readNextSample:!collator readNextSample


Any suggestions why this happens?

Thanks

Gerardo Pardo's picture
Offline
Last seen: 3 weeks 16 hours ago
Joined: 06/02/2010
Posts: 602

Hi,

This error happens because the type  DDS_PublicationBuiltinTopicData is actually a "C" structure data-type (as indicated by the leading "DDS_" prefix.  The C++ classes have no "_" following the "DDS".   The consequence is that the declaration:

    DDS_PublicationBuiltinTopicData data;

Does not do a deep memory allocation of the members within the data. These include string members which are initialized to "NULL" pointers and hence the Error PRESCstReaderCollator_readOrTakeNextSample:!copy when trying to do the copy.

To solve this error you can do one of 3 things:

(1) Initialize the data after constructing it in the stack.

Use DDSPublicationBuiltinTopicDataTypeSupport::initialize_data() to initialize the data after constructing it in the stack. That is:

 DDS_PublicationBuiltinTopicData data; 
 DDSPublicationBuiltinTopicDataTypeSupport::initialize_data(&data);  

If you do this, then you must call  DDSPublicationBuiltinTopicDataTypeSupport::finalize_data()  before you return from the function so that the memory of the internal strings is freed.

(2) Create the data in the heap

Use   DDSPublicationBuiltinTopicDataTypeSupport::create_data(). This function will do a deep allocation.

  DDS_PublicationBuiltinTopicData *data = DDSPublicationBuiltinTopicDataTypeSupport::create_data();
  // do the logic
  DDSPublicationBuiltinTopicDataTypeSupport::delete_data(data);

(3) Use the sequence read/take API.

That way the data is read using zero copy and you get a reference to the memory inside the DataReader cache In this case there is no copy happening so you do not need to do allocations/de-allocations.

In other words:

    DDS_PublicationBuiltinTopicDataSeq data_seq;
    DDS_SampleInfoSeq info_seq;
    if (_reader == NULL) { printf("#WARNING: reader == NULL\n"); return; }

	try {
	    _reader->take(data_seq, info_seq,  DDS_LENGTH_UNLIMITED,
				    DDS_ANY_SAMPLE_STATE, DDS_ANY_VIEW_STATE, 
				    DDS_ANY_INSTANCE_STATE);
	    for (int i=0; i< info_seq.length(); ++i ) {
		// Check the sample has associated data
		if ( info_seq[i].valid_data ) {
		    data_seq[i].type_code->print_IDL(0, exceptionCode);
		}
	    }
	} catch (const std::exception& e) {
	    std::cout << e.what() << std::endl;
	}

	_reader->return_loan(data_seq, info_seq);

Any of the three approaches above should solve your error.

I notice a couple more potential problems in your code:

(1) You do not check whether the sample returned has data. 

When callng read or take you should always check the DDS_SampleInfo::valid_data flag.  If this flag is false then the data has no valid value and in the case of the sequence will be a NULL pointer.  This is because the DataReader is alsused to notify of other events, such as the fact that the data was deleted or there are no writers, which have a SampleInfo but no associated data.

  if ( info.valid_data ) {
      // OK to access the data here because we know it is valid
  } 

(2) You are using the "read" API instead of "take".

This is not a problem per se. It is the right thing to do in many cases. But keep in mind that when you do this the data remains in the DataReader cache and the memory will not be reclaimed.  As this DataReader has a key and the HISTORY is set to KEEP_LAST with a depth of 1 you will only have one sample per key. That is per remote discovered entity. So this DataReader cache will just keep growing. But if the application is running a long time, while other applications appear and dissapear the DataReader cache will accumulate some information for each entity that was ever discovered. So it might be a good thing to "take" the data evey now and again.

Gerardo

 

Offline
Last seen: 9 years 9 months ago
Joined: 03/08/2013
Posts: 8

Hi Gerardo,

incredible how fast you answer, I really appreciate that! I think I´m 99% there, just three last questions:

  1. Is it true, that one cannot create a dynamic data reader on a topic without retrieving the discovery information about the type from the publisher on the other side first? Since create_topic (const char *topic_name, const char *type_name, const DDS_TopicQos &qos, DDSTopicListener *listener, DDS_StatusMask mask)=0 takes the type_name as argument.
  2. What happens if the DynamicData subscriber that relies in the broadcasted type information joins later? Is the type information frequently broadcasted?
  3. When accessing information from DDSDynamicData, for example the function get_float (DDS_Float &value_out, const char *member_name, DDS_DynamicDataMemberId member_id) takes a DDS_DynamicDataMemberId as an input. Unfortunately I have found nothing in the API about how to retrieve this information. Could you give me a hint there?

Thanks, Ulrich

 

Gerardo Pardo's picture
Offline
Last seen: 3 weeks 16 hours ago
Joined: 06/02/2010
Posts: 602

Hello Ulrich,

You are welcome. Glad that my answers are helping...

I will answer each question separately:

1. Is it true, that one cannot create a dynamic data reader...? 

Getting  information via discovery is not the only way. But it is true that  you must get the information about the Topic name, the type name, and actally the data-type structure itself from somewhere...

The dependencieas are as this:

In order to create a DDS DataReader you need to first have a DDS Topic.

To create the DDS Topic you need to have both a topic_name as well as a the type_name of a previously-registered type.  

To register a data-type you need to tell the DDS DomainParticipant how to deal with the data-type (marshal, unmarshall, determine the memory needed to store data of that type, compare the keys, etc.). This is done calling  the register_type operation on the corresponding DDS TypeSupport object. Which in this case is the DynamicDataTypeSupport.  Another option is to use the TypeSuport objects generated by rtiddsgen from an IDL or XML file that describe the data-type.

The DynamicDataTypeSupport must be initialized so that it has information about the stucture of the data-type.  This is done by passing the TypeCode object that describes the data-type. The TypeCode can be obtained in two ways:

  • You can get it via DDS discovery from  a remote DataWriter or DataReader. This is what the example in this thread does.
  • You can construct it using the operations in the TypeCodeFactory.

As an alternative, you can use the new XML Application Creation Feature to define the DomainParticipant, Data-Types and Topics in an XML file and create them all with a simgle operation that parses the file.

2. What happens if the DynamicData subscriber that relies in the broadcasted type information joins later? Is the type information frequently broadcasted?

This is not a problem. As soon as any DomainParticipant joins the network DDS will automatically propage all the discovery information to that DomainParticipant. It does not matter in which order applications appears or when they create their DataWriters and DataReaders. It is not broadcasted sent periodically. It is sent reliably whenever anything changes. The only thin that is "broadcasted" (actually multicasted) is the announcment that stablishes the presence of the DomainParticipant on the network. 

3. When accessing information from DDSDynamicData, for example the function get_float (DDS_Float &value_out, const char *member_name, DDS_DynamicDataMemberId member_id) takes a DDS_DynamicDataMemberId as an input. Unfortunately I have found nothing in the API about how to retrieve this information. 

There is a few things you can do. 

Gerardo

 

Offline
Last seen: 10 years 10 months ago
Joined: 10/10/2012
Posts: 24

Hi, I'll use this thread, since I want to use dynamic data in a similar way.

At the moment I started out simple, by taking the publisher from the HelloWorld example. Instead of reading user input, it just sends a String message every 2 seconds. My first goal is to create a dynamic subscriber, so I took the MonitorData example and didn't modify it yet.


When I start both, the publisher sends it's first message and MonitorData seems to discover it fine:

waitForDiscoveredDataWriters
processDiscoveredDataWriters
DataWriter (New) name: "null" topic: "Hello, World" type: "DDS::String"
Discovered topic: "Hello, World"  with type: "DDS::String"
This type had not seen before. Its structure is:
struct DDS::String{
    string<1024> value;
};
//@Extensibility EXTENSIBLE_EXTENSIBILITY
This topic "Hello, World" had not seen before. Creating it.
waitForDiscoveredDataWriters

 

When the published continues to send samples, MonitorData throws the following error code for each sample:

Exception in thread "Thread-4" com.rti.dds.infrastructure.RETCODE_PRECONDITION_NOT_MET
    at com.rti.dds.util.Utilities.rethrow(Unknown Source)
    at com.rti.dds.infrastructure.RETCODE_ERROR.check_return_codeI(Unknown Source)
    at com.rti.dds.dynamicdata.DynamicData.copy_native(Unknown Source)
    at com.rti.dds.dynamicdata.DynamicData.pull_from_nativeI(Unknown Source)
    at com.rti.dds.dynamicdata.DynamicDataTypeSupport$DynamicDataReaderDelegate.pull_data_sample_from_native(Unknown Source)
    at com.rti.dds.topic.AbstractDataReaderDelegate.pull_data_sequence_from_native(Unknown Source)
    at com.rti.dds.subscription.DataReaderImpl.read_or_take_untypedI(Unknown Source)
    at com.rti.dds.subscription.DataReaderImpl.read_or_take_untypedI(Unknown Source)
    at com.rti.dds.subscription.DataReaderImpl.take_untyped(Unknown Source)
    at com.rti.dds.dynamicdata.DynamicDataReader.take(Unknown Source)
    at MonitorData$PrintDynamicDataListener.on_data_available(MonitorData.java:44)
    at com.rti.dds.subscription.DataReaderListenerImpl.on_data_available(Unknown Source)

 

So this happens, when new data is available apparently, but what is the error here?

 

Also, is MonitorData the right approach here? Basically, I want to create a dynamic subscriber which prints out the data it receives. There will be other things to consider later on, but for now I just start with a basic scenario.

 

EDIT 2: I've tried MonitorData together with Shapes Demo and it runs just fine. So I guess the problem is in the publisher. I also tried the usual Hello_simple example (with the user key input) and while discovery is fine and the type is recognized, there is the same exception thrown when the publisher sends samples. Any idea what the problem could be?

 

Best regards, Arthur.

Gerardo Pardo's picture
Offline
Last seen: 3 weeks 16 hours ago
Joined: 06/02/2010
Posts: 602

Hello Arthur,

I think the problem is occurring because these examples are using built-in types, such as the DDS::String.  DDS does not propagate via discovery the TypeCode associated with these builtin types.  As a consequence the type_code member that appears in the PublicationBuiltinTopicData is NULL.  This, by itself, should not be a problem. But the MonitorData.java example  (and rtiddsspy for that matter) are not prepared to handle that situation and produce the error you are seeing.

This problem does not occur for the types you define using IDL and generate with rtiddsgen or the one used by the shapes demo. Those propagate their TypeCode. It also should not be a problem for most of the types your application uses, however, see what I say about TypeCode propagation below.

I will file an internal issue to enhance our processing of builtin-types to resolve the TypeCode for those types that are already known. In the meantime what you should do it detect whether the type_code you get from the is PublicationBuiltinTopicData nil/NULL and be robust to it.  This is something that should be done in general because is it not guaranteed that a TypeCode will always be propagated even for IDL-defined types.  There are QoS settings that can be used to disable type-code propagation or it may also happen if the TypeCode is too large. So applications that construct types dynamically should be written to be robust to discovering a NULL/nil TypeCode.

Gerardo

 

Offline
Last seen: 10 years 10 months ago
Joined: 10/10/2012
Posts: 24

Alright, thank you very much. The built-in types should be no problem then, because we will be using generated code from an IDL on the sending server side.

Best regards,

Arthur

Offline
Last seen: 8 years 7 months ago
Joined: 04/21/2016
Posts: 1

Hi,

I try to make the example from this thread to work with modern C++.

Because I found it difficult to create new style BuiltinTopicDataListener myself, I decided to use  MyPublicationBuiltinTopicDataListener written by uschwes. It discovers existing topics and types fine. Now, I'd like to register the type via DDSDynamicDataTypeSupport::register_type, as Gerardo Pardo suggested, but it fails with dds::domain::DomainParticipant. Is it possible to register a type obtained by this listener to  dds::domain::DomainParticipant instead of old school DDSDomainParticipant ?

Regards,Greg