Proper use of WaitSets and Conditions
I have noticed in our support forums a common question regarding the use of WaitSets and the fact that sometimes they don't behave as people initially expect. Often the question is posed along the lines of "Why does my WaitSet not block? Why does my subscriber using WaitSets consume so much CPU?
As a background WaitSets provide an alternative mechanism to listeners, allowing the application to "wait" for specific sets events (e.g. the arrival of data on a DataReader, the match of a DataWriter to a DataReader, etc.). Listeners provide equivalent facilities but they are called on the context of a middleware thread which implies certain limitations (e.g. you are not allowed to do things that might block or consume a lot of time). The Listener and WaitSet mechanisms are equivalent in that any event that can cause notification to a listener can also be used to awaken a WaitSet. In fact you can do both, get called in the listener and awaken out of a WaitSet, but that is not relevant to this discussion.
The following code snippet shows a typical use of a WaitSet to wait for a DDS_SUBSCRIPTION_MATCHED_STATUS or a DDS_LIVELINESS_CHANGED_STATUS. The code assumes you have already a DataReader called "myDataReader".
DDS::WaitSet *myWaitSet = new WaitSet(); HelloStructDataReader *myDataReader = HelloStructDataReader::narrow(reader); DDS::StatusCondition *myCondition = myDataReader->get_statuscondition(); myCondition->set_enabled_statuses( DDS_SUBSCRIPTION_MATCHED_STATUS | DDS_LIVELINESS_CHANGED_STATUS | DDS_DATA_AVAILABLE_STATUS); myWaitSet->attach_condition(myCondition); //Now you can wait for any events on the DataReader DDSConditionSeq active_conditions; DDS_Duration_t timeout = { 30, 0 }; // 1 second DDS_SubscriptionMatchedStatus subsMatchedStatus; DDS_LivelinessChangedStatus livelinessChangedStatus; /* Create a helper class to process the data */ HelloDataProcessor *data_processor = new HelloDataProcessor(); // Dedicate the thread to handle the events as they come for (count=0; (sample_count == 0) || (count < sample_count); ++count) { DDS::ReturnCode_t retcode = myWaitSet->wait(active_conditions, timeout); if (retcode == DDS_RETCODE_TIMEOUT) { // Handle timeout printf("Wait TIMEOUT\n"); } else if (retcode == DDS_RETCODE_OK) { // Since this example has only one condition this will always be myDataReader, // but in the more general case of multiple conditions we can use the code // below to accessthe DataReader assocated with each active conditions returned by //the myWaitSet->wait() on the output parameter 'active_conditions' DDS::DataReader *activeDataReader = (DDS::DataReader *)myCondition->get_entity(); // IMPORTANT: must call dataReader->get_***_status() for all the triggered // statuses on the StatusCondion myCondition that caused the WaitSet to wakeup. // Otherwise the trigger status of the condition is not cleared and the next time // we call the 'wait' operation on the WaitSet it will wakeup immediately DDS_StatusMask triggered_statuses = activeDataReader->get_status_changes() & myCondition->get_enabled_statuses(); if ( triggered_statuses & DDS_SUBSCRIPTION_MATCHED_STATUS ) { // Handle DDS_SUBSCRIPTION_MATCHED_STATUS // WARNING. Must call activeDataReader->get_subscription_matched_status(); // to clear it printf("Got: DDS_SUBSCRIPTION_MATCHED_STATUS for DataReader: %p " "on topic \"%s\"\n", activeDataReader, activeDataReader->get_topicdescription()->get_name()); activeDataReader->get_subscription_matched_status(subsMatchedStatus); } if ( triggered_statuses & DDS_LIVELINESS_CHANGED_STATUS ) { // Handle DDS_LIVELINESS_CHANGED_STATUS // WARNING. Must call myCondition->get_liveliness_changed_status(); to clear it printf("Got: DDS_LIVELINESS_CHANGED_STATUS for DataReader: %p " "on topic \"%s\"\n", activeDataReader, activeDataReader->get_topicdescription()->get_name()); activeDataReader->get_liveliness_changed_status(livelinessChangedStatus); } if ( triggered_statuses & DDS_DATA_AVAILABLE_STATUS ) { // Handle DDS_DATA_AVAILABLE_STATUS // WARNING. Must call activeDataReader->read() or similar to clear it; printf("Got: DDS_DATA_AVAILABLE_STATUS for DataReader: %p on topic \"%s\"\n", activeDataReader, activeDataReader->get_topicdescription()->get_name()); data_processor->data_available(myDataReader); } // WARNING. Make sure you have 'reset' the trigger statuses on myCondition, // that is, all the bits in triggered_statuses otherwise the next call to // myWaitSet->wait() will wakeup immediately } }
When any statuses remain enabled, the DDS::StatusCondition also remains triggered, which causes the DDSWaitSet::wait() call to return immediately. Therefore it is important that the application "resets" the DDS:StatusCondition to the "untriggered" state, and this is accomplished by calling the myCondition->get_***_status() on all the enabled statuses. The one exception to this is the ON_DATA_AVAILABLE status (this status is not enabled in the example myCondition above). This status is reset by calling 'read' or 'take' or one of their variants on the corresponding DataReader.
If the application code forgets to do this, then the WaiSet will wake up immediately and the observable effect will be that the CPU is being burned in the loop as the thread is continually awaken despite being no new events.
Comments
caleb_r_miller
Tue, 10/02/2012 - 09:28
Permalink
Is thise example still valid?
Is thise example still valid? Some of the methods referenced in the above don't seem to appear in the latest release (for example get_subscription_matched_status() ).
It would be nice if the examples provided in the distribution contained at least one example of using a WaitSet.
Gerardo Pardo
Tue, 10/02/2012 - 20:44
Permalink
Hi,Thank you for bringing up
Hi,
Thank you for bringing up this point.
Yes the example was still 'valid' but incorrectly written. Still valid in the sense that things have not changed relative to the version for which the example was written. Incorrect in that, as you observed, operations such as the get_subscription_matched_status() you mentioned, are not on the DDS::StatusCondition. These operations on the DDS::DataReader instead.
I updated the example in the posting above to be correct. To avoid similar bugs I have created a C++ code snippet that shows how to use it in a complete example program which you can compile. You can find the code attached to the original posting and also at this link: http://community.rti.com/filedepot/folder/5?fid=12