Finding existing topics on the network (Java)

8 posts / 0 new
Last post
Offline
Last seen: 9 years 1 day ago
Joined: 11/12/2015
Posts: 8
Finding existing topics on the network (Java)

I've currently got a Java application which displays DDS information on topics by subscribing to found topics. This works perfectly on topics created after the discover-er, but any existing publisher isn't picked up. Any ideas?

I based my code off of the example provided here:https://community.rti.com/index.php?q=filedepot_download/1655/20

The above code is a dynamic topic listener, which finds publishers and prints the IDL of any newly published topics if the type is unseen (ie it can take any message return the IDL)

I've included the code used to start my network monitor and discover publishers. Any help would be really appreciated! the place where it is "failing" is where I've marked ("FAILURE HERE"). If it actually calls "processTypesInDiscoveredDataWriters()" then it happily creates the listener and reader.

CODE (text editor code option didn't work for me unfortunately):

private boolean start(int theDomainId)
{
    discoveredTypes = new ConcurrentSkipListMap<String, TypeCode>();
    domainId = theDomainId;
    DomainParticipantFactory factory = DomainParticipantFactory.get_instance();
    DomainParticipantFactoryQos factoryQos = new DomainParticipantFactoryQos();

// This instructs the DomainParticipantFactory to not enable the
// DomainParticipant
// entities it creates automatically. This is needed so that we have a
// chance to
// retrieve the builtin data-readers before the participant starts
// receiving
// discovery data. Later it is explained why this is needed
    factoryQos.entity_factory.autoenable_created_entities = false;

    DomainParticipantQos pQos = new DomainParticipantQos();
    factory.get_default_participant_qos(pQos);
    pQos.participant_name.name = "dynamically subbing";
    try
    {
        participant = factory.create_participant(domainId, pQos, // DomainParticipantFactory.PARTICIPANT_QOS_DEFAULT,
        null, // listener
        StatusKind.STATUS_MASK_NONE);
    } catch (Exception e)
    {
        String lastStartError = "Error creating the DDS domain. Common causes are:"
        + "\n - Lack of a network. E.g disconected wireless."
        + "\n - A network interface that does not bind multicast addresses. In some platforms enabling using the TUN interface "
        + "\n for (Open)VPN causes this. If this is your situation try configure (Open)VPN to use TAP instead.";

        System.out.println(lastStartError);
        return false;
    }

// We count ourselves as a participant that is present
// The "lookup_xxx" operations not only retrieve the built-in
// data-readers but also
// activate the caching of discovered types. To save resources
// discovered types
// are only saved in the built-in reader cache which is only active
// after calling
// the corresponding "lookup_xxx" operation.
// It is for this reason that we instructed the DomainParticipantFactory
// to not
// automatically enable the DomainParticipant. This gives us the
// opportunity to
// retrieve the built-in entities and activate the caching of discovered
// types
// before we receive any discovery information. If we did not do this we
// may
// miss the data-types of the first few entities discovered
    publicationsDR = (PublicationBuiltinTopicDataDataReader) participant.get_builtin_subscriber()
    .lookup_datareader("DCPSPublication");

// Enable the participant. This causes it to start receiving discovery
// traffic.
// Note: Enable fails if there is no network of if any interfaces are
// not multicast enabled
// In some platforms (e.g. MacOSX) having the VPN running causes
// enable() to fail
    try
    {
        participant.enable();
    } catch (Exception e)
    {    
        String lastStartError = "Error enabling the DDS domain. Common causes are:"
        + "\n - Lack of a network. E.g disconected wireless."
        + "\n - A network interface that does not bind multicast addresses. In some platforms enabling using the TUN interface "
        + "\n for (Open)VPN causes this. If this is your situation try configure (Open)VPN to use TAP instead.";

        System.out.println(lastStartError);
        return false;
    }

    this.processDiscoveredDataWriters();
// Create a WairSet object that can be used to block the calling thread
// until there is
// discovery data to read. This avoids having to poll and this use CPU
// continuously.
    discoveryWaitSet = new WaitSet();
// Attach the conditions that would wakeup the waitset. In this case the
// arrival of data on
// any of the built-in datareaders
    discoveryWaitSet.attach_condition(publicationsDR.get_statuscondition());
    publicationsDR.get_statuscondition().set_enabled_statuses(StatusKind.DATA_AVAILABLE_STATUS);

    activeConditionSeq = new ConditionSeq(MAX_ACTIVE_CONDITIONS);

    return true;
}

private void waitForDiscoveredDataWriters(Duration_t waitTime)
{
    try
    {    
        discoveryWaitSet.wait(activeConditionSeq, waitTime);
    } catch (RETCODE_TIMEOUT timeoutRetcode)
    {
        // no publisher found within the time
    } catch (Exception ex)
    {
        // publisher found OR other error has occured while waiting
    }
}

private void processDiscoveredDataWriters()
{
    System.out.println("processDiscoveredDataWriters");

// API reference
// http://community.rti.com/rti-doc/510/ndds/doc/html/api_java/interfacecom_1_1rti_1_1dds_1_1infrastructure_1_1StatusCondition.html
// not entirely sure what it checks about the publication status,
// possibly checks for "bad" publication?
    if (publicationsDR.get_statuscondition().get_trigger_value())           <------FAILURE HERE  (get_trigger_value = false)
    {
        processTypesInDiscoveredDataWriters();
    }
}

rip
rip's picture
Offline
Last seen: 1 day 8 min ago
Joined: 04/06/2012
Posts: 324

My comments assume that you didn't supply all the code you have.  For example, activeConditionSeq is never shown being defined, so it must be defined as a class variable elsewhere. 

private void waitForDiscoveredDataWriters(Duration_t waitTime)
{
    try
    {    
        discoveryWaitSet.wait(activeConditionSeq, waitTime);
    } catch (RETCODE_TIMEOUT timeoutRetcode)
    {
        // no publisher found within the time
    } catch (Exception ex)
    {
        // publisher found OR other error has occured while waiting
    }
}

1) Doesn't do anything, and

2) is never actually called in the code you've shown.

Here it is doing something after the waitset is unblocked:

private void waitForDiscoveredDataWriters(Duration_t waitTime)
{
    try
    {    
        discoveryWaitSet.wait(activeConditionSeq, waitTime);

        // a bunch of code that determines if the ON_DATA_AVAILABLE condition exists
        // and if so:
            processDiscoveredDataWriters();
        // probably more code

    } catch (RETCODE_TIMEOUT timeoutRetcode)
    {
        // no publisher found within the time
    } catch (Exception ex)
    {
        // publisher found OR other error has occured while waiting
    }
}

the "// publisher found OR other error has occured..." comment is probably not correct (might just be comment rot, but it would be better served to simply remove it) because a new publisher triggering the condition change that releases the wait... that isn't an exception since that is, by definition, its purpose.

 example code for WaitSets

 

rip

Offline
Last seen: 9 years 1 day ago
Joined: 11/12/2015
Posts: 8

Ah sorry, should've said more clearly that this isn't all of the code; I included what I thought would be relevant to the issue (code that creates and starts my participant, and a few called functions)

Currently,

waitForDiscoveredDataWriters

stops the current thread until it is interrupted, and is from the example. It's not the best practice, but it's used in a loop;

roughly psuedocoded:

while(keepSearching)

{

    wait()

    processinput() <-can set keepSearching

}

I'll read the documentation for waitsecs and modify it. Despite being an example I found posted here, it's got a few bits of dodgey practice as is.

 

If you check the example HERE to get more context, it will hopefully make more sense.

I still have the original problem, however I think I've found a possible, more likely cause; I'm not being sent builtin topics for publishers that already exist because for whatever reason my domain participant isn't visible over the network. Locally all the RTI tools pick up the participant but on other machines it's not seen. However, once created I can "see" new publishers by receiving their "i'm a new publisher on the network" message (built in publisher topic). So if you have any advice on that, I'd appreciate it. I'll post back if I make any progress.

Fernando Garcia's picture
Offline
Last seen: 5 days 17 hours ago
Joined: 05/18/2011
Posts: 200

Hi Mike,

I noticed in your snippet that you are not setting the DomainParticipantFactoryQos that we modify to create a disable DomainParticipant. We recently realized that we were not doing this in the original example and posted the a response with solution and an updated version of the example.

In a nutshell, since we were not setting the DomainparticipantFactoyQos, the DomainParticipant was enabled upon creation. When this happens, by the time you retrieve the built-in entities, discovery information may have been received and we may miss the data types of the first discovered entities. This should solve your problem:

        DomainParticipantFactoryQos factoryQos = new DomainParticipantFactoryQos();
        
        // This instructs the DomainParticipantFactory to not enable the DomainParticipant
        // entities it creates automatically. This is needed so that we have a chance to
        // retrieve the builtin data-readers before the participant starts receiving
        // discovery data. Later it is explained why this is needed
        factory.get_qos(factoryQos);
        factoryQos.entity_factory.autoenable_created_entities = false;
        factory.set_qos(factoryQos);

Please, let me know if this works,
Fernando.

rip
rip's picture
Offline
Last seen: 1 day 8 min ago
Joined: 04/06/2012
Posts: 324

Locally all the RTI tools pick up the participant but on other machines it's not seen. However, once created I can "see" new publishers by receiving their "i'm a new publisher on the network" message (built in publisher topic). So if you have any advice on that, I'd appreciate it. I'll post back if I make any progress.


"Locally", so you mean via shmem.  Interesting.  The behavior you describe is correct if outgoing is blocked somehow from reaching the other machines (unidirectional multicast permitted across a router? no multicast permitted, but the other participants have your IP address in their initial hosts list? Unexpected TTL?)

I'd be running Wireshark both locally and remote, looking for discovery packets going out locally and being received on the remote.  If you see them going out but not coming in, then the problem is unrelated to DDS, and I would start looking at network topology (routers, switches, TTL settings, etc).

Left field:  if the packets are being seen on the remote machine, check the remote participant's accept_unknown_peers QoS (or look for <participant>.ignore_participant(pHandle,...) in the remote machine's code).  Stranger things have happened...

 

 

 

Offline
Last seen: 9 years 1 day ago
Joined: 11/12/2015
Posts: 8

Thanks for both responses! After having roughly isolated the main issue, adding in participant tear-down combined it with Fernando's response has gotten me to the conclusion; I need to change the QoS on ether the subscriber or publisher. If I wait for an arbitrary amount of time (which I suspect is probably very similar to the QoS liveliness timeout on either end) and then run the program, it works perfectly! all existing topics found. And thus, problem solved!

Now I just need to figure out how to change QoS without having to write an XML for it... Since some discovery settings are final, I can't work out how to change them, but I've only been triyng for a short while so I'll keep at it.

 

Thanks for all the help thus far, I did run wireshark just to be sure what was happening; messages seems to be going in and out of my machine, so I beleive visiblity wasn't the issue. I can send you the wireshark logs if you fancy a look, rip. You were likely on the right track about the remote machine not accepting my machine's discovery request; it likely thought it wasn't a new participant! As far as being only on shared memory, that was my eventual conclusion. Cheers for the feedback!

rip
rip's picture
Offline
Last seen: 1 day 8 min ago
Joined: 04/06/2012
Posts: 324

"on my machine", in the singular, is problematic.  Specifically, outgoing traffic may be seen but you can't guarantee that it is received because it is UDP and might be dropped by an intersticial router or switch.  You need to be able to run wireshark on both the source and target machines (not necessarily at the same time).  You need to be able to see the incoming packets on the machine that isn't responding.  ie:  Proof of Sending is not Proof of Receipt.

Sorry, I'm kind of a pedant. 

"... thought it wasn't a new participant."  This will happen if the restarting participant ends up with the same Participant ID, which can happen on deterministic RTOS boards (there is some randomness in the ID selection, but if the board and its OS are deterministic, then the same random decisions are made at the same exact times each time you boot... so the same paths are followed and you end up with the same Participant ID.  The receiver assumes that this is the same participant entity and ignores it.

1) Why can't you edit the XML? That's the simplist and best solution, since it means you won't have to recompile everything.

2) There are examples for the various languages in the sidebar, under Documentation, find the API link for your language.  For Java, for example, it would be in Modules -> Programming How-Tos -> Participant Use-Cases ... BUT:  There isn't a sub-header for "how to change qos", because it's covered in the PDF documentation.  I use those API pages and read between the lines (grab all the boilerplate code from the examples, but ignore the example QoS changes as not relevant to what I'm trying to do, and substitute in the qos changes I need).

On the Participant Use-Cases link there is something relevant -- how to disable auto-enabling of child entities.  In this case, you need that code -- disable the auto-enable, make the changes in the Participant Factory Qos that you need and then create the Participant entities.  After editing the Qos there (for those that can be edited after creation, but before enable), and then enable the participant.

r

 

Offline
Last seen: 9 years 1 day ago
Joined: 11/12/2015
Posts: 8

No problem, if people weren't pedantic then we'd have even more buggy software out there than we do! I will run WS to confirm it's received.

I'll write an XML, will lookup how to do that. Rebuilding my application isn't a labourious process at present, and I'd still need to tell it to load the new QoS (versus the factory default), but I'll allow the user to change the XML after changing appropriate code. 

Cool, I'm already reading the boilerplate for it. At present I do turn off auto-enable, and apply the default QoS before I then enable the participant, so this is fine. The values I think I need to change are finals, so can't be edited after creation, regardless of whether it is enabled.

 

Cheers for all the help thus far