concurrency about writer and reader

8 posts / 0 new
Last post
Offline
Last seen: 3 years 10 months ago
Joined: 03/29/2020
Posts: 9
concurrency about writer and reader

I am trying to develop some unit tests based on RTI context. And there is a problem related to concurrency, perhaps you can help with that.

the code is like:

....
std::vector<Message> data(3);
Message msg1;
Message msg2;
....
 
// write two messages at first

writer_.write(msg1)

writer_.write(msg2)

// read messages by reader

auto n = reader_.read(data)  // sometimes, n is 1, and sometimes, n is 2

....

I understand that there might be some threads behind writer and reader for sending/receiving data. And the result "n" might depend on OS scheduling ( if writer thread is prioritzed, then, the result "n" could be 2, and if reader thread is priortized, the result "n" might be 1)

Is there any way to get stable result "n"=2 ?   

 

Offline
Last seen: 1 year 1 month ago
Joined: 10/22/2018
Posts: 91

Hi yuanxing,

One way you could accomplish this is to check the DataReader's cache status. The sample_count fields within that status informs you of how many samples are currently in the reader's queue.

I'd also like to point a couple of other things (apologies if you are already aware of them).

You are calling DataReader::read, which leaves the samples in the DataReader's queue. This means you are never removing them. The DataReader::take API does the same operation as the read, but removes them from the queue.

You are currently effectively polling for data, but we provide some other mechanisms to be notified when data is available in the DataReader's queue.  listeners and waitsets can be used to be informed when there is new data available in the DataReader's queue. We ship examples of both of these use-cases alongside Connext DDS.

Hope that helps,
Sam

Offline
Last seen: 3 years 10 months ago
Joined: 03/29/2020
Posts: 9

Hi,

Thanks for quick reply.

Waitset is used in my test, sorry that I did not post all information in question description.

With DataReader's cache status, do you mean that I have to check sample_count in my code?  Is it possible to achieve that without check sample_count?  Can those code(the example code in my previous post) run in sequence? (write twice at first, then reader can read 2 samples by default)

Offline
Last seen: 1 year 1 month ago
Joined: 10/22/2018
Posts: 91

One solution you could use, which would mean you don't have to check the DataReader's cache status is to use reliable communication on both the DataReader and the DataWriter and, after each call to write(), use the DDS_DataWriter_wait_for_acknowldgements API. That API will block until the DataReader has acknowledged that it has received the sample.

Sam

Offline
Last seen: 3 years 10 months ago
Joined: 03/29/2020
Posts: 9

Hi samr,

It works, super helpful, thanks!

 

Offline
Last seen: 3 years 10 months ago
Joined: 03/29/2020
Posts: 9

Hi again,

Is it possible to configure wait_for_acknowldgements for writer in QoS xml? 

 

Offline
Last seen: 1 year 1 month ago
Joined: 10/22/2018
Posts: 91

Hi,

No it is not, it is an API that we provide, not a QoS.

Offline
Last seen: 3 years 10 months ago
Joined: 03/29/2020
Posts: 9

Probably I misunderstood "rti::core::policy::AcknowledgmentKind_def ".   I though that can help to give similar functionality as "wait_for_acknowldgements"