Listening to all DDS messages being published

15 posts / 0 new
Last post
fps
fps's picture
Offline
Last seen: 6 years 7 months ago
Joined: 01/17/2013
Posts: 16
Listening to all DDS messages being published

Hi,

Rtiddsspy has the ability to discover topics on the network as well as display the contents of the samples. And not just simple messages (strings, ints) but ones that have hierarchy. I'm interested to know how it does it.

  1. How does it discover all topics in the current domain? get_discovered_topics()?
  2. How does it retrieve the structure of a particular topic? get_discovered_topic_data()?
  3. How should I then subscribe to all the topics?
  4. How should I then print out the contents of the sample?

I know there are existing tools like rtiddsspy and DDS Wireshark, but they only do a subset of what I'm looking to do.

Many thanks,
Fidel

Gerardo Pardo's picture
Offline
Last seen: 3 months 1 week ago
Joined: 06/02/2010
Posts: 602

Hello Fidel,

I will try to answer your questions in order:

1) How does it discover all topics in the current domain? get_discovered_topics()?

To do this you use the so called Discovery Builtin Readers. The basic idea is that DDS propagates discovery data as any other kind of data only that every application automatically publishes and subscribes that data and that the topic names and associated data-types are pre-defined in the system. This is all part of the DDS specification.

You can access the builtin DataReaders and 'listen' and read these DataReaders as any other DataReader. When you read data on this topics you get the information about remote entities that have been discovered (DomainParticipants, DataWriters, DataReaders).

For example the information on a remote DataWriter is conveyed as a sample with type PublicationBuiltinTopicData 

There you can see members for the name of the Topic, the name of the Type, the different QoS Policies, and even a description of the type itself. I will elaborate on this further on the answer to your second question below.

There is a HOWTO titled Detect the presence of DomainParticipants, DataWriters and DataReaders in the DDS Domain  that describes everything in detail and has some working sample code as well. Here is a Java example that shows how to monitor the discovery data. It is in the file MonitorDiscoveryInformation.java

2) How does it retrieve the structure of a particular topic? get_discovered_topic_data()?

The description of the data-type associated with a DataWriter (or a DataReader) is also propagated via Discovery, it is one of the attributes in the data that you read using the builtin DataReaders mentioned in (1) above.

Note that these API's will change a little bit in future versions because we are rolling out support for the DDS Extensible Types specification which defines slightly different ways to do this. But the ideas and approach are very similar.

The type information is carried in the field called type_code which appears both in the PublicationBuiltinTopicData  (referring to a discovered remote DataWriter) and in the SubscriptionBuiltinTopicData  (referring to a discovered remote DataReader). These fields are of type TypeCode  Using the operations supported by the TypeCode you can extract all the information about the data-type.For example, if it was a structure you can get its name, the number of members, name of each member, TypeCode of each member, etc. So you can recursively navigate the whole type. There is even an operation on the TypeCode called printIDL which will print the IDL representation of that data-type.

In the File Exchange there is a Java example that shows how to monitor the discovery data. It is in the file MonitorDiscoveryTypes.java

3) How should I then subscribe to all the topics?

Once you have discovered a DataWriter you have the name of the Topic and the Associated data-type. At this point what you need to do is to

  1. Register the data-type into your DomainParticipant, if this is the first time you see that data-type name
  2. Create a Topic with that associated data-type, if this is the first time you see that Topic name.
  3. Create a DataReader for that Topic, agan if this is the first time you see that Topic name.

The easiest way to do this is with an example. I will post one when get a chance. But in the meantime I will give you some pointers to the steps and API calls you need to use.

a) Register the data-type into your DomainParticipant, if this is the first time you see that data-type name

This is done using the DynamicDataTypeSupport  Once you discover the TypeCode that describes a new data-type you have discovered you can construct a new DynamicDataTypeSupport using its constructor which takes the TypeCode as a parameter.

Once you have a DynamicDataTypeSupport. You call its register_type operation to register that data-type in the DomainParticipant with a type-name that matches the one in the DataWriter you discovered.

b) Create a Topic with that associated data-type, if this is the first time you see that Topic name.

After (a) the DomainParticipant knows about the data-type so create the Topic using the regular create_topic operation on the DomainParticipant 

c) Create a DataReader for that Topic, again if this is the first time you see that Topic name.

After (b) you have a Topic so you create a DataReader using the regular create_datareader() operation on a Subscriber.

The DataReader you will get is of type DynamicDataReader  Which means that when you read from it you will get samples with the type DynamicData 

In the File Exchange there is a Java example that shows how to perform the above steps to dynamically register a type that matches whet was discovered, create a Topic, a DynamicDataReader and read the data. It is in MonitorData.java

4) How should I then print out the contents of the sample?

The DynamicData object provides methods to access the members, iterate over them, etc. One convenient method is print  which will prints the contents of the sample to a file or console. This is what is used in the It is in MonitorData.java example.

Regards,

Gerardo

fps
fps's picture
Offline
Last seen: 6 years 7 months ago
Joined: 01/17/2013
Posts: 16

Gerardo - wow! That is one fantastic response. It's got the perfect level of detail. Thank you very much for taking the time to respond, I am extremely grateful.

Kind regards,
Fidel

Gerardo Pardo's picture
Offline
Last seen: 3 months 1 week ago
Joined: 06/02/2010
Posts: 602

Hi,

Glad the answer helps! I updated my original answer to point to an example I created that monitors the data values.

Gerardo

Offline
Last seen: 8 years 6 months ago
Joined: 07/22/2014
Posts: 9

Hi Gerardo,

Prior to your step by step descriptions above, I got a question. I only need to create 1 participant to subscribe to many topics of different typecodes right?

Jeslene

Gerardo Pardo's picture
Offline
Last seen: 3 months 1 week ago
Joined: 06/02/2010
Posts: 602

Hi Jeslene,

Yes this is right a single DomainParticipant is all you should need. You only need to create mutiple DomainParticipant entities in a single process/application if you want to join different domain IDs which represent separate isolated data-spaces. This does not sound like it would be your use-case... so a single DomainParticipant is enough.

Gerardo

Offline
Last seen: 8 years 3 months ago
Joined: 09/07/2015
Posts: 4

Hi Gerardo,

The monitorData.java example is great.

My question is about properly stoping the wait set.

How can we properly stop the wait set at the end of the application ?

I didn't find any stop or interupt method.

Thanks

rip
rip's picture
Offline
Last seen: 10 hours 5 min ago
Joined: 04/06/2012
Posts: 324

You can use what amounts to a software signal to trigger the WaitSet, and then treat the signal as a "we're done" in the condition handling.

See the documentation for "GuardCondition".

Offline
Last seen: 8 years 3 months ago
Joined: 09/07/2015
Posts: 4

Thanks rip.

To summuarise, here is the code I use to create the WaitSet:

 		publicationsDR = (PublicationBuiltinTopicDataDataReader) participant.get_builtin_subscriber().lookup_datareader("DCPSPublication");
		subscriptionDR = (SubscriptionBuiltinTopicDataDataReader) participant.get_builtin_subscriber().lookup_datareader("DCPSSubscription");

		publicationsDR.get_statuscondition().set_enabled_statuses(StatusKind.DATA_AVAILABLE_STATUS);
		subscriptionDR.get_statuscondition().set_enabled_statuses(StatusKind.DATA_AVAILABLE_STATUS);
		guardCondition = new GuardCondition();

		discoveryWaitSet = new WaitSet();
		discoveryWaitSet.attach_condition(publicationsDR.get_statuscondition());
		discoveryWaitSet.attach_condition(subscriptionDR.get_statuscondition());
		discoveryWaitSet.attach_condition(guardCondition);

 And here the loop :

		Duration_t waitDuration = new Duration_t(Duration_t.DURATION_INFINITE_SEC, Duration_t.DURATION_INFINITE_NSEC);
		// We always listen for Data Writer/Reader events or guard condition (to exit)
		while (true) {
			try {
				// Wait for Data Writer/Reader events or guard condition (to exit)
				discoveryWaitSet.wait(activeConditionSeq, waitDuration);
				System.out.println("--> discoveryWaitSet triggered");

				if (publicationsDR.get_statuscondition().get_trigger_value()) {
					processDiscoveredDataWriters();
				}
				if (subscriptionDR.get_statuscondition().get_trigger_value()) {
					processDiscoveredDataReaders();
				}
				if (guardCondition.get_trigger_value()) {
					break;
				}
			} catch (RETCODE_TIMEOUT timeoutRetcode) {
				break; // can't be reach due to the infinite duration
			}
		}

 Thanks

Offline
Last seen: 8 years 3 months ago
Joined: 09/07/2015
Posts: 4

Hello again,

To complete this example, I try to detect unsubscibed data readers/writters. I use the info.view_state to differenciate new writer from deleted one as descibe in MonotiorData.java. Here is my code sample :

 				publicationsDR.take_next_sample(publicationData, info);
				if (info.view_state == ViewStateKind.NEW_VIEW_STATE) {
					System.out.println("Added Writer : " + publicationData.publication_name.name
							+ " type_code: " + publicationData.type_name
							+ " topic_name: " + publicationData.topic_name);
				} else {
					System.out.println("Removed Writer : " + publicationData.publication_name.name
							+ " type_code: " + publicationData.type_name
							+ " topic_name: " + publicationData.topic_name);
				} 

Unfortunatly I've got empty names when unsubscribing. So I don't know who has left :

Added Writer : null type_code: ShapeType topic_name: Square
Removed Writer :  type_code:  topic_name: 

How can I know the type name, topic name and writer name that has left ?

Thanks

rip
rip's picture
Offline
Last seen: 10 hours 5 min ago
Joined: 04/06/2012
Posts: 324

This is covered on other posts on the forum.  Here's a place to start:  Detecting Leavers

Summary:

Knowing who has left wasn't supported as of 5.1.x.  It might have been added for 5.2, however I don't have the release notes in front of me.  If not, you have to use a method to track who has joined so that you can match who has left with your cached material.  The signal you receive is simply "entity with UUID xyz has left", so you have to track back to what information you collected when the entity joined.

The reasoning is that, because it has left, its information is not relevant going forward.  That does make sense as a standard design criteria in a system (it's gone, so why will we (in the future) care?), but there are legitimate cases where it would be interesting to know who left (the Security domain for example...it would certainly be helpful to the application to know that authenticated user Foo has disconnected, so if we get more information from Foo, we need to be really skeptical about its provenance).

hth,

rip

Offline
Last seen: 8 years 3 months ago
Joined: 09/07/2015
Posts: 4

Hi,

Here is how I do to detect leavers :

First you need to suibscribe for all kind of notifications :

participantDR.get_statuscondition().set_enabled_statuses(StatusKind.STATUS_MASK_ALL);

Then you can detect if it is a new data or not:

	private void processDiscoveredParticipant() {
		ParticipantBuiltinTopicData participantData = new ParticipantBuiltinTopicData();
		SampleInfo info = new SampleInfo();

		try {
			while (true) {
				participantDR.take_next_sample(participantData, info);
				InstanceHandle_t handle = InstanceHandle_t.createI();
				handle.copy_from(info.instance_handle);
				if (info.view_state == ViewStateKind.NEW_VIEW_STATE) {
					// new participant : handle.toString()
				} else if(!info.valid_data) {
					// removed participant : handle.toString()
				}
			}
		} catch (RETCODE_NO_DATA noData) {
			// exit the loop
		}
	}

Hi hope that will help people.


Concerning detection of already existing publishers and subscribers I've few difficulties: 

If I set domainParticipantQos.entity_factory.autoenable_created_entities = false; and do participant.enable(); then nothing works.

If I also try to enable the buildin subscriber, I get the following error : 

PRESPsService_enableLocalEndpointWithCursor:!topic enabled
PRESPsService_enableAllLocalEndpointsInGroupWithCursor:!enable endpoint
PRESPsService_enableLocalEndpointWithCursor:!topic enabled
PRESPsService_enableAllLocalEndpointsInGroupWithCursor:!enable endpoint
PRESPsService_enableLocalEndpointWithCursor:!topic enabled
PRESPsService_enableAllLocalEndpointsInGroupWithCursor:!enable endpoint
PRESPsService_enableGroupWithCursor:!enableAllLocalEndpointsInGroupWithCursor
PRESPsService_enableGroup:!enableGroupWithCursor

How can I be sure to properly enable the participant and it's built in topics ?

Thanks

ps: here is my initialisation code:

	private DomainParticipant participant;
	private PublicationBuiltinTopicDataDataReader publicationsDR; // notified when new publisher
	private SubscriptionBuiltinTopicDataDataReader subscriptionDR; // notified when new subscriber
	private ParticipantBuiltinTopicDataDataReader participantDR; // notified when new publisher
	private WaitSet discoveryWaitSet; // Wait for data to be discovered
	private GuardCondition guardCondition; // condition to trigger to turn of the wait set
	
	public void runDds() throws Exception {
		DomainParticipantQos domainParticipantQos = new DomainParticipantQos();
		domainParticipantQos.entity_factory.autoenable_created_entities = false;
		
		// Create domain participant
		try {
			participant = factory.create_participant(0, domainParticipantQos, null, StatusKind.STATUS_MASK_NONE);
		} catch (Exception e) {
			String lastStartError = "Error creating the DDS domain. Common causes are:" + "\n   - Lack of a network. E.g disconected wireless."
					+ "\n   - A network interface that does not bind multicast addresses. In some platforms enabling using the TUN interface "
					+ "\n      for (Open)VPN causes this. If this is your situation try configure (Open)VPN to use TAP instead.";
			LOGGER.error(lastStartError, e);
			throw e;
		}
		
		com.rti.dds.subscription.Subscriber builtinSubscriber = participant.get_builtin_subscriber();
		participantDR = (ParticipantBuiltinTopicDataDataReader) builtinSubscriber.lookup_datareader(
				ParticipantBuiltinTopicDataTypeSupport.PARTICIPANT_TOPIC_NAME);
		publicationsDR = (PublicationBuiltinTopicDataDataReader) builtinSubscriber.lookup_datareader(
				PublicationBuiltinTopicDataTypeSupport.PUBLICATION_TOPIC_NAME);
		subscriptionDR = (SubscriptionBuiltinTopicDataDataReader) builtinSubscriber.lookup_datareader(
				SubscriptionBuiltinTopicDataTypeSupport.SUBSCRIPTION_TOPIC_NAME);
		guardCondition = new GuardCondition();

		discoveryWaitSet = new WaitSet();
		discoveryWaitSet.attach_condition(publicationsDR.get_statuscondition());
		discoveryWaitSet.attach_condition(subscriptionDR.get_statuscondition());
		discoveryWaitSet.attach_condition(participantDR.get_statuscondition());
		publicationsDR.get_statuscondition().set_enabled_statuses(StatusKind.STATUS_MASK_ALL);
		subscriptionDR.get_statuscondition().set_enabled_statuses(StatusKind.STATUS_MASK_ALL);
		participantDR.get_statuscondition().set_enabled_statuses(StatusKind.STATUS_MASK_ALL);
		discoveryWaitSet.attach_condition(guardCondition);

		domainParticipantQos.entity_factory.autoenable_created_entities = true;
		participant.set_qos(domainParticipantQos);
		
		participant.enable();
		builtinSubscriber.enable(); // error here
		
		Duration_t waitDuration = new Duration_t(Duration_t.DURATION_INFINITE_SEC, Duration_t.DURATION_INFINITE_NSEC);
		ConditionSeq activeConditionSeq = new ConditionSeq(MAX_ACTIVE_CONDITIONS);
		
		// We always listen for Data Writer/Reader/Participant events or guard condition (to exit)
		while (true) {
			try {
				discoveryWaitSet.wait(activeConditionSeq, waitDuration);

				if(publicationsDR.get_statuscondition().get_trigger_value()) {
					processDiscoveredDataWriters();
				}
				if(subscriptionDR.get_statuscondition().get_trigger_value()) {
					processDiscoveredDataReaders();
				}
				if(participantDR.get_statuscondition().get_trigger_value()) {
					processDiscoveredParticipant();
				}
				if(guardCondition.get_trigger_value()) {
					break;
				}
			} catch (RETCODE_TIMEOUT timeoutRetcode) {
				break; // can't be reach due to the infinite duration
			}
		}
	}
	
	private void processDiscoveredParticipant() {
		ParticipantBuiltinTopicData participantData = new ParticipantBuiltinTopicData();
		SampleInfo info = new SampleInfo();

		try {
			while (true) {
				participantDR.take_next_sample(participantData, info);
				InstanceHandle_t handle = InstanceHandle_t.createI();
				handle.copy_from(info.instance_handle);
				if (info.view_state == ViewStateKind.NEW_VIEW_STATE) {
					// new participant : handle.toString()
				} else if(!info.valid_data) {
					// removed participant : handle.toString()
				}
			}
		} catch (RETCODE_NO_DATA noData) {
			// exit the loop
		}
	}
Offline
Last seen: 6 years 2 months ago
Joined: 06/25/2018
Posts: 2

Hi there, I'm currently getting the same error as Yoann above. I am trying to enable the builtin subscriber after creating a participant in disabled mode.

This seems to be the approach suggested for detecting existing participants upon creating a participant: https://community.rti.com/forum-topic/discovering-subscription-and-publication-data

Enabling just the participant does nothing, as Yoann mentions above.

Thanks

Offline
Last seen: 8 months 5 days ago
Joined: 02/11/2016
Posts: 144

Hello,

This may help:

https://community.rti.com/examples/builtin-topics

There's code attached and you may use it to set up your solution.

Good luck,

Roy.

Offline
Last seen: 6 years 2 months ago
Joined: 06/25/2018
Posts: 2

Ah thanks Roy!

For anyone here in the future: The key to being able to enable the participant, the builtin subscriber, and any data readers created from it is to use the DomainParticipantFactory.TheParticipantFactory to create your participant, instead of one that you instantiate yourself.