Check if a DataWriter is ready to write

22 posts / 0 new
Last post
His Nerdship's picture
Offline
Last seen: 5 years 6 months ago
Joined: 05/08/2019
Posts: 18
Check if a DataWriter is ready to write

Following on from my previous post re unreliable WiFi connections, I found that the problem was caused by calling DataWriter::write()too soon after creating the DataWriter.  Because write()is non-blocking it returned immediately without doing anything.  Also, because it is void, there is no error return code.

So the publishing app was not in fact sending anything, and I had no way of knowing.

It obviously needed more time to assemble its plumbing, so I put in a 2 second sleep, after which it worked.

I saw a forum post about this:

https://community.rti.com/forum-topic/dynamic-data-writing-seems-need-setup-time

However, this is not an ideal solution.  Two seconds will usually be overkill, but occasionally not enough.

  1. Is there any way I can test if the DataWriteris ready to write?  Is there a status flag to tell me?
  2. If so, would I need to test before every write, or only the first write after object construction?
Organization:
Offline
Last seen: 3 months 1 day ago
Joined: 10/22/2018
Posts: 91

His Nerdship,

The assmebling of the plumbing you have found is a the DataWriter discovering the DataReaders (and vice versa) in the system. This process is known as Discovery and, in a simplified way, is how the DDS Entities in a system locate each other and decide if they can communicate or not.

It is not recommended to use a timer to wait for 2 DDS Entities to discovery each other as it is not deterministic. Instead, you could use the match notification statuses on the DataWriter and DataReader (e.g., on_publication_matched and on_subscription_matched). You could also access these statuses directly using the matched_publications() and matched_subscriptions() APIs.

We have a Knowledge Base article here which explains this in more detail and includes some example code.

Hope that helps,

Sam

His Nerdship's picture
Offline
Last seen: 5 years 6 months ago
Joined: 05/08/2019
Posts: 18

Thanks Sam,

I opted for the  on_publication_matched() approach, and created a trivial class derived from NoOpDataWriterListener.  I only implemented the  on_publication_matched virtual function.

I then attached it to the DataWriter, as per the examples in another RTI page.  This class is instantiated, as I can set a breakpoint on the constructor.  Alas when the other app (implementing the DataReader) is started, the  on_publication_matched() callback is never called.  I want that function to call  condition_variable::notify_one(), so that the publisher function can wait on it so it knows when the DataWriter is ready to send.

I know the writer does 'discover' the readers, because when I remove the listener and revert to the 2 second delay, it works.

Here is a fragment of the code.  Can you see anything obvious here? 


 

std::condition_variable condVar;
...
// Listener class
template <typename T> class DataWriterReadinessListener:
               public dds::pub::NoOpDataWriterListener<T>
{
private:
   condition_variable& m_condVar;
public:
   DataWriterReadinessListener(condition_variable& condVar)
             : m_condVar(condVar) {}
public:
   // on_publication_matched is never called!!
   void on_publication_matched(dds::pub::DataWriter<RequestResID>&,
            const dds::core::status::PublicationMatchedStatus &status)
   {
      std::cout << "on_publication_matched callback" << std::endl;
      m_condVar.notify_one();
   }
};

// Publisher function
void PublishToResource(int domain_id) {
   DataWriterReadinessListener<SupplyResID> drListener(condVar);
   ...
   dds::pub::DataWriter<SupplyResID>
   writer(dds::pub::Publisher(participant), topic,
            dds::pub::qos::DataWriterQos(),
            &drListener);
   ...
   ...

   {
      std::unique_lock<std::mutex> lck(mux);
      printf("%s - waiting for writer to connect\n",ThisFunc);
      condVar.wait(lck); // Wait for listener to notify_one()
   }
   // Writer ready, send data to the matched reader(s)
   ...
}

Offline
Last seen: 3 months 1 week ago
Joined: 04/02/2013
Posts: 196

Hi Sholto,

I think you are overloading on_publication_matched by mistake instead of overriding it. If you add the override specifier to your on_publication_matched the compilation should fail.

The correct signature would be  void on_publication_matched(dds::pub::DataWriter<T>&,...), instead of  void on_publication_matched(dds::pub::DataWriter<RequestResID>&,...)

Or alternatively don't templatize the listener:

class DataWriterReadinessListener: public dds::pub::NoOpDataWriterListener<SupplyResID>

 

His Nerdship's picture
Offline
Last seen: 5 years 6 months ago
Joined: 05/08/2019
Posts: 18

Hey Sam,

Many thanks, that worked nicely.  I am more annoyed with myself - that wasn't even a DDS error (for which I could forgive myself), but a C++ one (for which I can't!)

Thanks again for all your help.  You have been great.

Offline
Last seen: 2 years 5 months ago
Joined: 05/12/2019
Posts: 9

I have a similar issue. I changed my code to Modern C++, and when I send something, the Listener receives the message from on_subscription matched "subscription matched" and then from on_liveliness_changed "liveliness changed". Once all 10 - 15 messages, I receive a message.

Offline
Last seen: 3 months 1 day ago
Joined: 10/22/2018
Posts: 91

Hi Greenbolt,

I'm afraid I don't fully understand your issue.

The on_subscription_matched callback will be called whenever the associated DataReader discovers a matching DataWriter. This matching will only occur if they both have the same Topic and compatible QoS policies.

The on_liveliness_changed callback will be called whenever the liveliness of one of the associated DataReader's matched DataWriters changes. Liveliness is configurable via QoS. Of the various possible changes in liveliness, one is that a new matching entity has been discovered - this would explain why this callback is called at the same time as on_subscription_matched.

You can read about all of the stauses available for DataReaders in our User's Manual (their is also a similar section of the manual for DataWriter's statuses).

Please could you explain what you mean by "Once all 10 - 15 messages, I receive a message"?

Regards,
Sam

Offline
Last seen: 2 years 5 months ago
Joined: 05/12/2019
Posts: 9

Hi Sam,

What I probably really meant that on_data_available is not always called. I am workign on project involving some User Interface and some mediation service utilizing DDS. When I press a button in the UI, a message is put together (based on an IDL) and then sent via DDS to the mediation service. However, not every time I press the button the message arrives at the mediation service. I oriented myself on the connext_dds/c++11/hello_idl example.

 

Offline
Last seen: 3 months 1 day ago
Joined: 10/22/2018
Posts: 91

Greenbolt,

I have written a full reply in the thread you opened regarding this problem (Need to send data more than once for data to be sent), but in short, I think you should look into Durability QoS (see my message on your thread for more info).

Sholto,

I am glad you have it working now but you should be aware of a potential race condition that could now occur in your system (apologies for not pointing this out sooner). Currently, when the on_publication_matched callback on the Publisher is fired, we consider the system ready (with all entities ready to go etc.). However, in reality, Discovery is a bidirectional process, and it could happen that the Subscriber has not yet finished its own matching process with the Publisher (and therefore is not ready to begin receiving data).

This is the downfall of the solution I have provided you with.
If you were not using Request/Reply, I would suggest you to use Durability QoS to mitigate this (and that way the "late joining" DataReader would still receive missed samples). Unfortunately, the nature of the R/R communication pattern doesn't lend itself that well to this (though this is use-case dependent).

The simplest way to mitigate this race condition is to establish another Topic (with Transient Local Durability) which is written by the Replier-side application. This Topic would simply notify the Requester-side application that Discovery on this end has completed (heavy, but deterministic).

If you have more queries let me know,
Sam

His Nerdship's picture
Offline
Last seen: 5 years 6 months ago
Joined: 05/08/2019
Posts: 18

Hey Sam,

Funny you should say that, because I did find a problem when I enacted the  on_publication_matched callback.  It worked fine when I did it on the Registry app (which receives initial communication from the device app), but when I transferred it to the device, the first message from the device failed.  It would only work on the second or third transmission.

Pity the DataReader does not have a similar event to on_publication_matched.

I am not at work now, but will look into your suggestion about a single-use topic.

Offline
Last seen: 3 months 1 week ago
Joined: 04/02/2013
Posts: 196

Hi Sholto,

In the DataReader you can use on_subscription_matched.

Alex

His Nerdship's picture
Offline
Last seen: 5 years 6 months ago
Joined: 05/08/2019
Posts: 18

Thanks Alex.  I tried this out and it didn't actually help - my fault, mea culpa.  I hadn't thought it out, but it's good to know it's there.

His Nerdship's picture
Offline
Last seen: 5 years 6 months ago
Joined: 05/08/2019
Posts: 18

Hey Sam,

Re that suggestion of using a separate topic just to check if the Replier is ready, I have a question.  To reiterate, I have a central registry app (the Replier in this case) that trades messages with any number of apps representing devices.  So the registry and devices act as both publishers and subscribers.

Does the discovery take place before any reading or writing by the publisher/subscriber?  In other words does a subscriber know about a publisher (and vice versa) before receiving any message from it, or does the message itself start the discovery process?

If discovery occurs before messages start flying, would this separate topic in the Replier just respond to every new subscriber it discovers with an "I'm ready" notification, or just the first one?  I take it each new subscriber would just wait till it had received such a notification before sending its 'real' topic messages?

Offline
Last seen: 3 months 1 day ago
Joined: 10/22/2018
Posts: 91

Hi Sholto,

By default, discovery begins when an Entity (DataWriter, DataReader, etc.) is created. In other words, before the reading and writing of user data topics (i.e., the topics you have defined and created in your application).

Discovery is a per entity process. If there were a single DataWriter and two DataReaders, the DataWriter would have to discover each DataReader separately. So you will need to inform every new entity when discovery has completed.

Sam

His Nerdship's picture
Offline
Last seen: 5 years 6 months ago
Joined: 05/08/2019
Posts: 18

Thanks Sam, but if Discovery is per entity, why would the separate 'discovery' topic (with Transient Local Durability you mentioned in your post of 17 May) be relevant?  I assume it would have its own DataReader, in which case it would just tell the Requester that it was ready, it wouldn't say anything about the other DataReader's handling the application data.

Can you see any opportunity for Alex's suggestion above, that I use the on_subscription_matched event? I can see the benefit of the  on_publication_matched event in the Writer, but not so convinced about the one for the Reader.  After all it would have to tell its corresponding writer apps, and what if they weren't ready...?

Offline
Last seen: 3 months 1 day ago
Joined: 10/22/2018
Posts: 91

Hi Sholto,

The issue is that the Devices, which send requests to the Registry, need to ensure that both sides of Discovery have completed before they send their Requests.

On your Device's DataWriter you have the on_publication_matched callback which notifies you when Discovery at the local end has completed. The problem is that you have no way of knowing on the Device's DataWriter when the Registry's DataReader is ready.

If you create a new Transient Local topic and write() it when the Registry's on_subscription_matched callback is fired, you can convey this information to the Device application. Since the new topic's Durability is configured as Transient Local, it doesn't matter if its Discovery has completed yet or not (since the message will be resent to any late joiners). The receipt of a message from this new topic can act as a flag that the DataReader's on_subscription_matched callback has fired.

So, on the Device's DataWriter, once you have received a message from this new Transient Local topic (and once the DataWriter's on_publication_matched callback has fired) you can be sure that Discovery has completed on both ends and begin your communication.

If you were not using Request Reply (are you using RTI's Requester and Replier entities or implementing your own?) I would simply suggest that you configure the Durability of your DataReaders and DataWriters as Transient Local (as that completely mitigates this race condition). But, due to the nature of Request / Reply it is not currently possible to do this.

Regards,

Sam

His Nerdship's picture
Offline
Last seen: 5 years 6 months ago
Joined: 05/08/2019
Posts: 18

Oh heck, I was unaware of RTI's Requester and Replier entities.  I didn't see them in the tutorials, and unless you are aware of something's existence you don't go looking for it!  As I said, this is my first DDS app, and it seems the requester/replier model is far more suited to what I want - namely for a device to request the Registry to send back a single struct of information.  Especially as it seems it would automatically take care of the connection/timing problems that have bedevilled me.  In fact even a SimpleReplier would be perfect for this.... if I could get it working.

I used the example Modern C++ SimpleReplier which does not explicitly use a SimpleReplierListener - according to the comment below it, just defining the SimpleReplier with a functor should be enough.  However when I run it, it just returns immediately.  Shouldn't it block until it receives a request?

Another example suggests that the listener's on_request_available() callback, rather than the SimpleReplier itself, returns the reply to the requester.  Unfortunately it shows only the SimpleReplier constructor (with the listener as a parameter), but no method calls.

I have found the examples on the RTI website can be frustratingly inadequate - just fragments of code without any context.

 

 

Offline
Last seen: 3 months 1 day ago
Joined: 10/22/2018
Posts: 91

Hi Sholto,

Thanks for your feedback on our examples, I'll follow it up internally and see if there's anything we can do to improve it.

The reason for the difference between the two examples you have linked is that they are from two different versions of Connext DDS (one from 5.1.0 and the other from 5.3.1). Depending which version of Connext DDS you are using (the latest release is 6.0.0) the best practices can be different. In Connext DDS 6.0.0, the example you have linked uses a lambda expression, it is equivalent to the example from the 5.3.1 documentation.

The installation of Connext DDS should have created a directory called "rti_workspace" for you. Within this directory we ship an example of Request/Reply (rti_workspace/6.0.0/examples/connext_dds/c++11/hello_world_request_reply). All of the code is already written so you will just need to follow the instructions in the README.txt in that directory to build the example. The example is not using SimpleReplier or SimpleRequester but I think it will help you see the intended use of the APIs.

Sam

His Nerdship's picture
Offline
Last seen: 5 years 6 months ago
Joined: 05/08/2019
Posts: 18

Thanks Sam,

Looking at the sample source you suggested (.../c++11/hello_world_request_reply), is there a typo in  run_example() in PrimeNumberReplier.cxx?  The ReplierParams datawriter_qos/datareader_qosboth get their values from the  RequesterExampleProfile element in the USER_QOS_PROFILES.xml, whereas I see there is also a ReplierExampleProfile element.  I presume the latter is correct for the replier, as it reverses the datawriter_qos/datareader_qos values.

I based my code on this sample project but nothing gets past the intial connection loop in run_example()in the requester module (PrimeNumberRequester.cxx).  It never gets out of here (line 39 in my copy):

   while (rti::request::matched_replier_count(requester) == 0) { ...

In this manual page, it says that the requester and replier will match if they have the same domain and service name:

Well mine do have identical topics, but they still won't connect.  My code creating the request & replier is the same as that from the helloworld sample.  I also copied the relevant  qos_profile's into USER_QOS_PROFILES.xml.  Is there anything else that must match?

Offline
Last seen: 3 months 1 day ago
Joined: 10/22/2018
Posts: 91

Hi Sholto,

Are you able to successfully run the unmodified example (hello_world_request_reply)? I have tested this and it was successful:

./objs/x64Linux3gcc5.4.0/PrimeNumberReplier 229
PrimeNumberReplier running (on domain 229)

./objs/x64Linux3gcc5.4.0/PrimeNumberRequester 10 1 229
PrimeNumberRequester: Sending a request to calculate the prime numbers <= 10 in sequences of 1 or less elements (on domain 229)
2
3
5
7
DONE

In order for two DDS Entities to match they need to be on the same domain, have the same Topic, compatible QoS policies and a compatible data type. The shipped example meets all of these requirements (providing you supply the same domain ID for both of the entities).
The easiest way to detect matching problems is to use Admin Console's match analysis feature.

Sam

His Nerdship's picture
Offline
Last seen: 5 years 6 months ago
Joined: 05/08/2019
Posts: 18

Thanks Sam, that match analysis feature saved the day.  Everything works perfectly now.

One last question, though.  Is the Request/Reply pattern specific to RTI, or is it standard DDS?  Google searches just seem to bring up RTI references.

I'm using RTI for development, but eventually everything should be standard DDS.  Would be a pity if this pattern isn't standard as it's been a godsend.

Offline
Last seen: 3 months 1 day ago
Joined: 10/22/2018
Posts: 91

Hi Sholto, I'm glad you got it all working.

Our implementation of a Request/Reply communication pattern preceeded the standard, and whilst its very close it isn't quite standard. Eventually we plan to conform to the standard.

The standardised communication pattern is defined in the DDS-RPC specification (there's also this blog post that covers a little bit about it too).

Sam