Adapting the C++ monitored_discovered_types example to discover prexisting types (when joining later) Solved

9 posts / 0 new
Last post
Offline
Last seen: 8 years 11 months ago
Joined: 11/07/2014
Posts: 13
Adapting the C++ monitored_discovered_types example to discover prexisting types (when joining later) Solved

Note: Source code was included in a subsequent comment.  Thanks.

The http://community.rti.com/filedepot has a c++ "monitored_discovered_types" example that works great when it is started before any other application on the domain, but it will not discovery anything when it is the last application to join.  I changed the participant factory so that participants are not auto enabled when created, hoping that would keep the app from missing pre-existent participants, but that did not solve the problem.  I also changed the following block of code so that it will capture those preexisting: 

        // We are only interested in new DataReaders which we have not

        // seen before. This is because the data-type will never change for any

        // one DataReader to other updates indicating change of QoS or deletion of the

        // DataReader can be safely ignored

        if ( /*info.view_state == DDS_NEW_VIEW_STATE*/ true ) {

           printf("DataReader (New) name: \"%s\"  topic: \"%s\" type: \"%s\"\n" ,

                    subscriptionData.subscription_name.name,

                    subscriptionData.topic_name,

                    subscriptionData.type_name);

            process_type(subscriptionData.type_name, subscriptionData.type_code);

        }

 Lastly I experimented with QOS on the subscribe and subscription data readers but that did not change the behavior. 

 What am I not understanding about this example or DDS?

 Thanks for any help,

 Doug 

PS. It would be nice to have another version of "monitored_discovered_types"  that  could be a late joiner.

Offline
Last seen: 8 years 11 months ago
Joined: 11/07/2014
Posts: 13

If it helps here is the original source code:

 

/* SerializeTypecode_subscriber.cxx
*/

#include <stdio.h>
#include <stdlib.h>

#include <map>
#include <string>

#include "ndds/ndds_cpp.h"


class MonitorDicoveredTypes {

public:
    bool start(int domainId);
    bool wait_for_discovery_data();
    bool shutdown();
    void process_types_in_discovered_data_readers();
    void process_types_in_discovered_data_writers();

protected:
    virtual bool process_type(const char *type_name, DDS_TypeCode * type_code);

private:
    DDSDomainParticipant *participant;
    DDSSubscriber *subscriber;
    DDSTopic *topic;
    DDSDataReader *reader;
    DDS_ReturnCode_t retcode;
    int count;
    DDSWaitSet *discovery_waitset;
    DDSConditionSeq active_condition_seq;
    DDSPublicationBuiltinTopicDataDataReader   *publicationsDR;
    DDSSubscriptionBuiltinTopicDataDataReader  *subscriptionsDR;

    typedef std::map<std::string, DDS_TypeCode *> TypecodeMap;
    TypecodeMap discoveredTypes;
};

void MonitorDicoveredTypes::process_types_in_discovered_data_readers() {
    DDS_SubscriptionBuiltinTopicData subscriptionData;
    DDS_SubscriptionBuiltinTopicData_initialize( &subscriptionData);
    DDS_SampleInfo info;

    //printf("MonitorDicoveredTypes::process_types_in_discovered_data_readers()\n");
    while ( subscriptionsDR->take_next_sample(subscriptionData, info) == DDS_RETCODE_OK ) {

        // We are only interested in new DataReaders which we have not
        // seen before. This is because the data-type will never change for any
        // one DataReader to other updates indicating change of QoS or deletion of the
        // DataReader can be safely ignored
        if ( info.view_state == DDS_NEW_VIEW_STATE ) {
           printf("DataReader (New) name: \"%s\"  topic: \"%s\" type: \"%s\"\n" ,
                    subscriptionData.subscription_name.name,
                    subscriptionData.topic_name,
                    subscriptionData.type_name);

           process_type(subscriptionData.type_name, subscriptionData.type_code);
        }
     }
}

void MonitorDicoveredTypes::process_types_in_discovered_data_writers() {
    DDS_PublicationBuiltinTopicData publicationData;
    DDS_PublicationBuiltinTopicData_initialize( &publicationData);
    DDS_SampleInfo info;

    //printf("MonitorDicoveredTypes::process_types_in_discovered_data_writers()\n");
    while ( publicationsDR->take_next_sample(publicationData, info) == DDS_RETCODE_OK ) {

        // We are only interested in new DataWriters which we have not
        // seen before. This is because the data-type will never change for any
        // one DataWriter to other updates indicating change of QoS or deletion of the
        // DataWriter can be safely ignored
        if ( info.view_state == DDS_NEW_VIEW_STATE ) {
            printf("DataWriter (New) name: \"%s\"  topic: \"%s\" type: \"%s\"\n" ,
                    publicationData.publication_name.name,
                    publicationData.topic_name,
                    publicationData.type_name);
            process_type(publicationData.type_name, publicationData.type_code);
        }
     }

}

bool MonitorDicoveredTypes::process_type(const char *type_name, DDS_TypeCode *type_code) {
    DDS_TypeCodeFactory *tcFactory = DDS_TypeCodeFactory::get_instance();
    DDS_TypeCode *existing_type = NULL;
    DDS_ExceptionCode_t ex;
    TypecodeMap::iterator iterator;

    printf("MonitorDicoveredTypes::process_type: Discovered type is \"%s\"\n", type_name);
    if ( type_code == NULL ) {
        printf("No type information was supplied for type: \"%s\"\n", type_name);
        return false;
    }

    // See if we already had a type with the same name:
    iterator  = discoveredTypes.find(type_name);
    if( iterator == discoveredTypes.end() ) {
        printf("This type name has not seen before. Its structure is:\n");
        type_code->print_IDL(0, ex);
        discoveredTypes[type_name] = tcFactory->clone_tc(type_code, ex);
        return true;
    }
    else {
        printf("This type name had been seen already. Comparing the type definitions...\n");
        existing_type = iterator->second;
        if ( existing_type->equal(type_code, ex) ) {
            printf("The type matches the existing definition\n");
            return false;
        }
        else {
            printf("The type DOES NOT match the existing definition\n");

            printf("This is the existing definition:\n");
            existing_type->print_IDL(0, ex);

            printf("This is the definition of the type just discovered:\n");
            type_code->print_IDL(0, ex);
            return true;
        }
    }
 }

bool MonitorDicoveredTypes::wait_for_discovery_data() {
    //printf("MonitorDicoveredTypes::wait_for_discovery_data()\n");
    return ( discovery_waitset->wait(active_condition_seq, DDS_DURATION_INFINITE) == DDS_RETCODE_OK);
}

bool MonitorDicoveredTypes::start(int domainId)
{
    /* To customize the participant QoS, use
       the configuration file USER_QOS_PROFILES.xml */
    participant = DDSTheParticipantFactory->create_participant(
        domainId, DDS_PARTICIPANT_QOS_DEFAULT,
        NULL /* listener */, DDS_STATUS_MASK_NONE);
    if (participant == NULL) {
        printf("MonitorDicoveredTypes::start: create_participant error\n");
        shutdown();
        return false;
    }

    publicationsDR  = (DDSPublicationBuiltinTopicDataDataReader *)
            participant->get_builtin_subscriber()->lookup_datareader("DCPSPublication");
    subscriptionsDR = (DDSSubscriptionBuiltinTopicDataDataReader *)
        participant->get_builtin_subscriber()->lookup_datareader("DCPSSubscription");

    if ( publicationsDR == NULL || subscriptionsDR == NULL ) {
        printf("MonitorDicoveredTypes::start: Error retrieving builtin readers\n");
        return false;
    }

    discovery_waitset = new DDSWaitSet();

    // Attach the conditions that would wakeup the waitset. In this case the arrival of data on
    // any of the builting datareaders
    discovery_waitset->attach_condition(publicationsDR->get_statuscondition());
    publicationsDR->get_statuscondition()->set_enabled_statuses(DDS_DATA_AVAILABLE_STATUS);

    discovery_waitset->attach_condition(subscriptionsDR->get_statuscondition());
    subscriptionsDR->get_statuscondition()->set_enabled_statuses(DDS_DATA_AVAILABLE_STATUS);

    printf("MonitorDicoveredTypes::start: Started on domainId = %d\n", domainId);

    return true;
}

/* Delete all entities */
bool MonitorDicoveredTypes::shutdown()
{
    DDS_ReturnCode_t retcode;
    int status = 0;

    if (participant != NULL) {
        retcode = participant->delete_contained_entities();
        if (retcode != DDS_RETCODE_OK) {
            printf("delete_contained_entities error %d\n", retcode);
            return false;
        }

        retcode = DDSTheParticipantFactory->delete_participant(participant);
        if (retcode != DDS_RETCODE_OK) {
            printf("delete_participant error %d\n", retcode);
            return false;
        }
    }

    retcode = DDSDomainParticipantFactory::finalize_instance();
    if (retcode != DDS_RETCODE_OK) {
        printf("finalize_instance error %d\n", retcode);
        return false;
    }

    return true;
}

 

int main(int argc, char *argv[])
{
    int domainId = 0;

    if (argc >= 2) {
        domainId = atoi(argv[1]);
    }

    MonitorDicoveredTypes *discovery_monitor = new MonitorDicoveredTypes();
    discovery_monitor->start(domainId);

    while ( true ) {
        discovery_monitor->wait_for_discovery_data();
        discovery_monitor->process_types_in_discovered_data_readers();
        discovery_monitor->process_types_in_discovered_data_writers();
    }
}

Gerardo Pardo's picture
Offline
Last seen: 1 day 14 hours ago
Joined: 06/02/2010
Posts: 601

Hello Doug,

The example should work with the default QoS settings independent of whether you start the "monitored_discovered_types" application before or after any other application.

You also should not need to modfy teh code you commented out:

 if ( /*info.view_state == DDS_NEW_VIEW_STATE*/ true ) {  

The info.view_state refers to the perspective of that particular DataReader you have created inside the "monitored_discovered_types" application so the info.view_state seen from the point of view of that DataReader will be DDS_NEW_VIEW_STATE idependent of whether the application was running before or after.

Basically what you are seeing is very odd. I have no explanation for it. What version of the product are you using? What platform it is running on (unix, Windows, MacOS? How many applications and DomainParticipant entities are you creating on that particular computer? It could be that in a particualr platform the last applications to start getr ports numbers outside the range that discovery sees...

Maybe you can run rtiddsspy side-to-side. This pre-built application essentially implements that same logic inside. Does rtiddsspy discover the DataReaders and DataWriters of applications started before. That is if you replace the execution of "monitored_discovered_types"  with rtiddsspy are you seeing the same behavior?

Gerardo

 

Offline
Last seen: 8 years 11 months ago
Joined: 11/07/2014
Posts: 13

Gerardo,

I'm using Connext 5.1 on a Linux (Red Hat Enterprise Linux Workstation 6.5) with very few applications and domain participants.  rtiddsspy sees everything when "monitored_discovered_types" sees nothing.   If start rtiddsspy after "monitored_discovered_types" then "monitored_discovered_types" sees everything.  It seems like it is able trigger discovery when joining late, but "monitior_discovered_types" cannot even though it uses the same logic.

Thanks for your help,

Doug

Offline
Last seen: 8 years 11 months ago
Joined: 11/07/2014
Posts: 13

I finally got it working.  When I make the following setting on the participant's QOS everything works:

transport_builtin.mask = DDS::TRANSPORTBUILTIN_UDPv4.

Apparently the default transport is shared memory and in some cases that is too slow to get setup, so that is why it was not discovering those pre-existing when joining late.

Thanks for the help and I hope this can help others,

Doug

Gerardo Pardo's picture
Offline
Last seen: 1 day 14 hours ago
Joined: 06/02/2010
Posts: 601

Hello Doug,

I am glad you got it working. However I do not understand why setting the transport mask made any difference in your situation.

There must be something else, so I am concerned we did not get to the root cause. See below...

First it makes a difference whether you are running all these applications (DomainParticipant) on the same or on differnet computers. By default the transpork mask enables both SharedMemory and UDPv4.  If you disable SharedMemory (as you did) you would have to do the same on all other applications that run on the same computer, otherwise they will not be able to communicate. This is because if an application has SharedMemory enabled and discovers another one on the same computer it will automatically try to communicate with the other one using shared memory. So if the second one does not have it then they will not communicate properly.

So the questions are:

(1) Are the other applications you are trying to discover using the monitored_discovered_types example located on the same computer or on a different computer?  

Given than you got it "working" by setting the mask to only include the TRANSPORTBUILTIN_UDPv4 I would assume they must be located on another computer...

However if they are on another computer then the SharedMemory transport setting would have no direct effect. So this would indicate something else is amiss.

(2) Did you modify the transport settings of the other applicaitions you were trying to discover? I assume they had the default settings which would include the shared memory transport.

(3) Were you running rtiddspy on the same computer you were running the  monitored_discovered_type? Were you using the default settings/paramaters? 

I assume so since you did not mention anything special about the way you run rtiddsspy...

I am specially puzzled by the the fact that rtiddsspy does not exhibit the problem. I still have nt come up with something that could explain that, assuming you are indeed running it with the default settings in the same computer as you run the monitored_discovered_types. Looking at the QoS settings of rtiddsspy (see http://community.rti.com/rti-doc/510/ndds.5.1.0/resource/qos_profiles_5.1.0/xml/RTIDDSSPY_QOS_PROFILES.example.xml ) I do not see any significant differences from the monitored_discovered_types

Gerardo

 

 

Offline
Last seen: 8 years 11 months ago
Joined: 11/07/2014
Posts: 13

Hello, Gerardo

Here are the answer to your questions and thankyou for your help:

(1) Are the other applications you are trying to discover using the monitored_discovered_types example located on the same computer or on a different computer?  

On the same computer.

(2) Did you modify the transport settings of the other applicaitions you were trying to discover? I assume they had the default settings which would include the shared memory transport.

A Qt wrapper was written for DDS and the wrapper had  TRANSPORTBUILTIN_UDPv4  set but that was not communicated to other devlopers, so we caused our own problem there.  However the developer had a real need to set  TRANSPORTBUILTIN_UDPv4 and that was because of a lack of responsiveness (too slow).  We still don't know what was causing that problem though.

3) Were you running rtiddspy on the same computer you were running the  monitored_discovered_type? Were you using the default settings/paramaters? 

I was running it on the same machine and with defualt settings, so I guess I don't understand why that was working.  Wouldn't it default to using shared memory on the same machine?

 

Thanks again,

Doug

Gerardo Pardo's picture
Offline
Last seen: 1 day 14 hours ago
Joined: 06/02/2010
Posts: 601

Hello Doug,

I figured out what was causing the  monitored_discovered_types example to not see the applications that were running before it started... And why disabling shared memory made it "work". However as you will see below it is not the right way to fix the problem.

What happens is that the builtin readers only get populated with samples corresponding to the things the DomainParticipant discovers *after* the application "retrieves" them by calling the operations:

participant->get_builtin_subscriber()->lookup_datareader("DCPSSubscription");
participant->get_builtin_subscriber()->lookup_datareader("DCPSPublication");

So in the case an application is aready running, discovery is so fast (in most cases) that the monitored_discovered_types DomainParticipant receives the discovery data *before* the application has retrieved the buitin readers and hence misses that information. When you disabled shared memory discovery was forced to use UDP which is slower and the timing changed enough that discovery now arrived afer the builtin readers had been retreived. However there is still a race condition so it may still miss them sometimes.

The correct way to address this issue is to create the DomainParticipant disabled, so that it does not activate DDS discovery, then retrieve the builtin readers, and after that enable the DomainParticipant. That way by the time discovery data arrives the buitin readers are ready to get that data to the application. To start the DomainParticipant disabled you must execute this code prior to creating the DomainParticipant:

DDS_DomainParticipantFactoryQos factory_qos;
DDSTheParticipantFactory->get_qos(factory_qos);
factory_qos.entity_factory.autoenable_created_entities = DDS_BOOLEAN_FALSE;
DDSTheParticipantFactory->set_qos(factory_qos);

I uploaded a new version of the  monitored_discovered_types example to the Filedepot that includes this change and should work correctly.

Apologies it took so long to figure this out. I should habe looked more carefully at the example... Somehow I had assumed that code was correct as it was me who wrote it in the first place... 

Gerardo

 

 

Offline
Last seen: 8 years 11 months ago
Joined: 11/07/2014
Posts: 13

Gerardo.

Thanks for taking the time to analyze the problem and update the example's code.  In my first post I stated that I had already tried disabling participants from the factory.  The reason I disabled shared memory as a transport mechanism is that the other pre-exsiting apps had done so and since I was on the same machine "monitored_discovered_types" could not see them.  I think your point may apply to why the other apps had shared memory disabled.  I was told that shared memory was disabled on them because of performance problems, so it sounds likely.

Thanks,

Doug