Following on from my previous post re unreliable WiFi connections, I found that the problem was caused by calling DataWriter::write()
too soon after creating the DataWriter
. Because write()
is non-blocking it returned immediately without doing anything. Also, because it is void
, there is no error return code.
So the publishing app was not in fact sending anything, and I had no way of knowing.
It obviously needed more time to assemble its plumbing, so I put in a 2 second sleep, after which it worked.
I saw a forum post about this:
https://community.rti.com/forum-topic/dynamic-data-writing-seems-need-setup-time
However, this is not an ideal solution. Two seconds will usually be overkill, but occasionally not enough.
- Is there any way I can test if the
DataWriter
is ready to write? Is there a status flag to tell me? - If so, would I need to test before every write, or only the first write after object construction?
His Nerdship,
The assmebling of the plumbing you have found is a the DataWriter discovering the DataReaders (and vice versa) in the system. This process is known as Discovery and, in a simplified way, is how the DDS Entities in a system locate each other and decide if they can communicate or not.
It is not recommended to use a timer to wait for 2 DDS Entities to discovery each other as it is not deterministic. Instead, you could use the match notification statuses on the DataWriter and DataReader (e.g., on_publication_matched and on_subscription_matched). You could also access these statuses directly using the matched_publications() and matched_subscriptions() APIs.
We have a Knowledge Base article here which explains this in more detail and includes some example code.
Hope that helps,
Sam
Thanks Sam,
I opted for the
on_publication_matched()
approach, and created a trivial class derived fromNoOpDataWriterListener
. I only implemented theon_publication_matched
virtual function.I then attached it to the DataWriter, as per the examples in another RTI page. This class is instantiated, as I can set a breakpoint on the constructor. Alas when the other app (implementing the DataReader) is started, the
on_publication_matched()
callback is never called. I want that function to callcondition_variable::notify_one()
, so that the publisher function can wait on it so it knows when the DataWriter is ready to send.I know the writer does 'discover' the readers, because when I remove the listener and revert to the 2 second delay, it works.
Here is a fragment of the code. Can you see anything obvious here?
std::condition_variable condVar;
...
// Listener class
template <typename T> class DataWriterReadinessListener:
public dds::pub::NoOpDataWriterListener<T>
{
private:
condition_variable& m_condVar;
public:
DataWriterReadinessListener(condition_variable& condVar)
: m_condVar(condVar) {}
public:
// on_publication_matched is never called!!
void on_publication_matched(dds::pub::DataWriter<RequestResID>&,
const dds::core::status::PublicationMatchedStatus &status)
{
std::cout << "on_publication_matched callback" << std::endl;
m_condVar.notify_one();
}
};
// Publisher function
void PublishToResource(int domain_id) {
DataWriterReadinessListener<SupplyResID> drListener(condVar);
...
dds::pub::DataWriter<SupplyResID>
writer(dds::pub::Publisher(participant), topic,
dds::pub::qos::DataWriterQos(),
&drListener);
...
...
{
std::unique_lock<std::mutex> lck(mux);
printf("%s - waiting for writer to connect\n",ThisFunc);
condVar.wait(lck); // Wait for listener to notify_one()
}
// Writer ready, send data to the matched reader(s)
...
}
Hi Sholto,
I think you are overloading
on_publication_matched
by mistake instead of overriding it. If you add theoverride
specifier to youron_publication_matched
the compilation should fail.The correct signature would be
void on_publication_matched(dds::pub::DataWriter<T>&,...)
, instead ofvoid on_publication_matched(dds::pub::DataWriter<RequestResID>&,...)
Or alternatively don't templatize the listener:
class DataWriterReadinessListener: public dds::pub::NoOpDataWriterListener<SupplyResID>
Hey Sam,
Many thanks, that worked nicely. I am more annoyed with myself - that wasn't even a DDS error (for which I could forgive myself), but a C++ one (for which I can't!)
Thanks again for all your help. You have been great.
I have a similar issue. I changed my code to Modern C++, and when I send something, the Listener receives the message from on_subscription matched "subscription matched" and then from on_liveliness_changed "liveliness changed". Once all 10 - 15 messages, I receive a message.
Hi Greenbolt,
I'm afraid I don't fully understand your issue.
The on_subscription_matched callback will be called whenever the associated DataReader discovers a matching DataWriter. This matching will only occur if they both have the same Topic and compatible QoS policies.
The on_liveliness_changed callback will be called whenever the liveliness of one of the associated DataReader's matched DataWriters changes. Liveliness is configurable via QoS. Of the various possible changes in liveliness, one is that a new matching entity has been discovered - this would explain why this callback is called at the same time as on_subscription_matched.
You can read about all of the stauses available for DataReaders in our User's Manual (their is also a similar section of the manual for DataWriter's statuses).
Please could you explain what you mean by "Once all 10 - 15 messages, I receive a message"?
Regards,
Sam
Hi Sam,
What I probably really meant that on_data_available is not always called. I am workign on project involving some User Interface and some mediation service utilizing DDS. When I press a button in the UI, a message is put together (based on an IDL) and then sent via DDS to the mediation service. However, not every time I press the button the message arrives at the mediation service. I oriented myself on the connext_dds/c++11/hello_idl example.
Greenbolt,
I have written a full reply in the thread you opened regarding this problem (Need to send data more than once for data to be sent), but in short, I think you should look into Durability QoS (see my message on your thread for more info).
Sholto,
I am glad you have it working now but you should be aware of a potential race condition that could now occur in your system (apologies for not pointing this out sooner). Currently, when the on_publication_matched callback on the Publisher is fired, we consider the system ready (with all entities ready to go etc.). However, in reality, Discovery is a bidirectional process, and it could happen that the Subscriber has not yet finished its own matching process with the Publisher (and therefore is not ready to begin receiving data).
This is the downfall of the solution I have provided you with.
If you were not using Request/Reply, I would suggest you to use Durability QoS to mitigate this (and that way the "late joining" DataReader would still receive missed samples). Unfortunately, the nature of the R/R communication pattern doesn't lend itself that well to this (though this is use-case dependent).
The simplest way to mitigate this race condition is to establish another Topic (with Transient Local Durability) which is written by the Replier-side application. This Topic would simply notify the Requester-side application that Discovery on this end has completed (heavy, but deterministic).
If you have more queries let me know,
Sam
Hey Sam,
Funny you should say that, because I did find a problem when I enacted the
on_publication_matched
callback. It worked fine when I did it on the Registry app (which receives initial communication from the device app), but when I transferred it to the device, the first message from the device failed. It would only work on the second or third transmission.Pity the DataReader does not have a similar event to
on_publication_matched
.I am not at work now, but will look into your suggestion about a single-use topic.
Hi Sholto,
In the DataReader you can use on_subscription_matched.
Alex
Thanks Alex. I tried this out and it didn't actually help - my fault, mea culpa. I hadn't thought it out, but it's good to know it's there.
Hey Sam,
Re that suggestion of using a separate topic just to check if the Replier is ready, I have a question. To reiterate, I have a central registry app (the Replier in this case) that trades messages with any number of apps representing devices. So the registry and devices act as both publishers and subscribers.
Does the discovery take place before any reading or writing by the publisher/subscriber? In other words does a subscriber know about a publisher (and vice versa) before receiving any message from it, or does the message itself start the discovery process?
If discovery occurs before messages start flying, would this separate topic in the Replier just respond to every new subscriber it discovers with an "I'm ready" notification, or just the first one? I take it each new subscriber would just wait till it had received such a notification before sending its 'real' topic messages?
Hi Sholto,
By default, discovery begins when an Entity (DataWriter, DataReader, etc.) is created. In other words, before the reading and writing of user data topics (i.e., the topics you have defined and created in your application).
Discovery is a per entity process. If there were a single DataWriter and two DataReaders, the DataWriter would have to discover each DataReader separately. So you will need to inform every new entity when discovery has completed.
Sam
Thanks Sam, but if Discovery is per entity, why would the separate 'discovery' topic (with Transient Local Durability you mentioned in your post of 17 May) be relevant? I assume it would have its own DataReader, in which case it would just tell the Requester that it was ready, it wouldn't say anything about the other DataReader's handling the application data.
Can you see any opportunity for Alex's suggestion above, that I use the on_subscription_matched event? I can see the benefit of the
on_publication_matched
event in the Writer, but not so convinced about the one for the Reader. After all it would have to tell its corresponding writer apps, and what if they weren't ready...?Hi Sholto,
The issue is that the Devices, which send requests to the Registry, need to ensure that both sides of Discovery have completed before they send their Requests.
On your Device's DataWriter you have the on_publication_matched callback which notifies you when Discovery at the local end has completed. The problem is that you have no way of knowing on the Device's DataWriter when the Registry's DataReader is ready.
If you create a new Transient Local topic and write() it when the Registry's on_subscription_matched callback is fired, you can convey this information to the Device application. Since the new topic's Durability is configured as Transient Local, it doesn't matter if its Discovery has completed yet or not (since the message will be resent to any late joiners). The receipt of a message from this new topic can act as a flag that the DataReader's on_subscription_matched callback has fired.
So, on the Device's DataWriter, once you have received a message from this new Transient Local topic (and once the DataWriter's on_publication_matched callback has fired) you can be sure that Discovery has completed on both ends and begin your communication.
If you were not using Request Reply (are you using RTI's Requester and Replier entities or implementing your own?) I would simply suggest that you configure the Durability of your DataReaders and DataWriters as Transient Local (as that completely mitigates this race condition). But, due to the nature of Request / Reply it is not currently possible to do this.
Regards,
Sam
Oh heck, I was unaware of RTI's Requester and Replier entities. I didn't see them in the tutorials, and unless you are aware of something's existence you don't go looking for it! As I said, this is my first DDS app, and it seems the requester/replier model is far more suited to what I want - namely for a device to request the Registry to send back a single struct of information. Especially as it seems it would automatically take care of the connection/timing problems that have bedevilled me. In fact even a SimpleReplier would be perfect for this.... if I could get it working.
I used the example Modern C++ SimpleReplier which does not explicitly use a SimpleReplierListener - according to the comment below it, just defining the SimpleReplier with a functor should be enough. However when I run it, it just returns immediately. Shouldn't it block until it receives a request?
Another example suggests that the listener's
on_request_available()
callback, rather than the SimpleReplier itself, returns the reply to the requester. Unfortunately it shows only the SimpleReplier constructor (with the listener as a parameter), but no method calls.I have found the examples on the RTI website can be frustratingly inadequate - just fragments of code without any context.
Hi Sholto,
Thanks for your feedback on our examples, I'll follow it up internally and see if there's anything we can do to improve it.
The reason for the difference between the two examples you have linked is that they are from two different versions of Connext DDS (one from 5.1.0 and the other from 5.3.1). Depending which version of Connext DDS you are using (the latest release is 6.0.0) the best practices can be different. In Connext DDS 6.0.0, the example you have linked uses a lambda expression, it is equivalent to the example from the 5.3.1 documentation.
The installation of Connext DDS should have created a directory called "rti_workspace" for you. Within this directory we ship an example of Request/Reply (rti_workspace/6.0.0/examples/connext_dds/c++11/hello_world_request_reply). All of the code is already written so you will just need to follow the instructions in the README.txt in that directory to build the example. The example is not using SimpleReplier or SimpleRequester but I think it will help you see the intended use of the APIs.
Sam
Thanks Sam,
Looking at the sample source you suggested (.../c++11/hello_world_request_reply), is there a typo in
run_example()
in PrimeNumberReplier.cxx? The ReplierParamsdatawriter_qos
/datareader_qos
both get their values from theRequesterExampleProfile
element in the USER_QOS_PROFILES.xml, whereas I see there is also aReplierExampleProfile
element. I presume the latter is correct for the replier, as it reverses thedatawriter_qos
/datareader_qos
values.I based my code on this sample project but nothing gets past the intial connection loop in
run_example()
in the requester module (PrimeNumberRequester.cxx). It never gets out of here (line 39 in my copy):while (rti::request::matched_replier_count(requester) == 0) { ...
In this manual page, it says that the requester and replier will match if they have the same domain and service name:
Well mine do have identical topics, but they still won't connect. My code creating the request & replier is the same as that from the helloworld sample. I also copied the relevant
qos_profile
's into USER_QOS_PROFILES.xml. Is there anything else that must match?Hi Sholto,
Are you able to successfully run the unmodified example (hello_world_request_reply)? I have tested this and it was successful:
./objs/x64Linux3gcc5.4.0/PrimeNumberReplier 229
PrimeNumberReplier running (on domain 229)
./objs/x64Linux3gcc5.4.0/PrimeNumberRequester 10 1 229
PrimeNumberRequester: Sending a request to calculate the prime numbers <= 10 in sequences of 1 or less elements (on domain 229)
2
3
5
7
DONE
In order for two DDS Entities to match they need to be on the same domain, have the same Topic, compatible QoS policies and a compatible data type. The shipped example meets all of these requirements (providing you supply the same domain ID for both of the entities).
The easiest way to detect matching problems is to use Admin Console's match analysis feature.
Sam
Thanks Sam, that match analysis feature saved the day. Everything works perfectly now.
One last question, though. Is the Request/Reply pattern specific to RTI, or is it standard DDS? Google searches just seem to bring up RTI references.
I'm using RTI for development, but eventually everything should be standard DDS. Would be a pity if this pattern isn't standard as it's been a godsend.
Hi Sholto, I'm glad you got it all working.
Our implementation of a Request/Reply communication pattern preceeded the standard, and whilst its very close it isn't quite standard. Eventually we plan to conform to the standard.
The standardised communication pattern is defined in the DDS-RPC specification (there's also this blog post that covers a little bit about it too).
Sam