Keyed topic - data writer blocks when samples for multiple instances are published parallel

6 posts / 0 new
Last post
Offline
Joined: 05/18/2018
Posts: 4
Keyed topic - data writer blocks when samples for multiple instances are published parallel

I' m trying to implement a strict reliable data stream based on keyed topic. On the topic multiple instances should transfer samples in parallel. So that each instance can be seen as separate data stream.

The idea of using a keyed topic to implement independent streams is based on the VideoStreaming example.

The data type is defined by the following IDL file:

struct Data {
string uid; //@key
unsigned long long size;
unsigned long long error;
unsigned long long packageNumber;
string testData;
};

For each instance an independent data writer (with own publisher) is used. 

Within the QoS profile of the datawriter the resource_limits for max_samples is set to 1000 and the max_sample_per_instance is set to 100. (The complete QoS profile can be found below)

In my test program I start 8 data writers in parallel. Each data writer publishes samples for a different instance (different key).

For reception of samples different keyed topic readers are used. Each reader is configured to read samples from one instance.

Expectation:

If no samples are read, I would expect each writer to be able to write 100 samples until it blocks.

If I read date from one instance I expect the writer of the corresponding instance to be able to publish new samples. So that each instance is handled independent.

Problem:

If the data writers publish data, each data writer is able to publish a different number of samples until it blocks. The values seems to have nothing in common with the setting configured via the QoS profile.

If I start reading from one instance, the data writer for this particular instance stays blocked.

Even if I read with parallel readers from all instances it happens sometimes that the data writers blocks before all data is transferred.

Currently I have no idea where the problem comes from. Is my understanding of the QoS profile configuration wrong or did I forget to configure some values?

Best regards

Olav

AttachmentSize
Plain text icon streamqosprofile.txt2.88 KB
Keywords:
Gerardo Pardo's picture
Offline
Joined: 06/02/2010
Posts: 581

How are you creating the DataReaders?

Normally a DataReader subscribes to a Topic and therefore to all instances in the Topic. The fact that it only calls "read/take" on a single Topic does not change that. DDS still thinks you are interested in all the other instences and sends them to the DataReader and they will just sit in the DataReader cache.

Once the DataReader cache cannot fir any more samples it will reject samples from the DataWriters. And since the DataWriters are configured with KEEP_ALL it will block the DataWriter.

I think this may be the reason for what you observe.

If you want the DataReader to only receive samples for one instance then you could use a ContentFilteredTopic (see DomainParticipant::create_contentfilteredtopic) and put an expression that only matches that one instance.

-Gerardo

Offline
Joined: 05/18/2018
Posts: 4

Hi Gerardo,

thanks for the reply. In our example we are already using content filtered topics, but we still discover the described behaviour. We created an example without the usage of our abstractions and still observe the same behaviour. I will attach the code later. Would be great if you could take a look at it.

Best regards

Olav

Offline
Joined: 05/18/2018
Posts: 4

Okay, here is a small sample program triggering the unexpected behaviour. (See attached file)

The program creates one domain participant and one keyed topic.

There are two data-writers (each with own publisher).
For each data-writer a thread publishes samples of a distinct instance in a period of 10 milliseconds.

For both of the topic instances, there is a data-reader (each with own subscriber and a filtered-topic).
For each data-reader there is a thread that takes the new samples.
The reader-thread for instance 1 reads all samples imediatly.
The reader-threads for instance 2 sleeps 300 seconds after it took all current the samples from the reader.

## Expected behaviour
I expected that instance 1 is published and received by Reader 1 all the time without interruptions
since the reader does not block (sleep).

The writer thread of instance 2 should block, if max-samples-per-instance (=100) is reached due to the sleep operation in reader thread of
second topic.

## Observed behaviour
It seems like the max-samples-per-instance is ignored totally and the instances are coupled somehow:
Reader 1 gets all samples of Writer 1 immediately.
Reader 2 gets all samples of Writer 2 in a period of 2 seconds.
After some time, both of the writers are blocked until the Reader 2 takes the samples after the sleep operation.


Additional:
If reader-thread 2 is not started at all, both writers publish about 300 samples.
Then both writers throw timeout-errors during write method call.

Gerardo Pardo's picture
Offline
Joined: 06/02/2010
Posts: 581

Hi,

Thank you for posting the example program. I was able to reproduce the behavior you described.

The problem is happening because the two DataReaders are being created in the same DomainParticipant. In this case the default behavior is that they will use the same UDP port to receive the messages and the DataWriters will send the samples for either DataReader to that same port.

When this happens there is some resource contention. The details are a bit involved. I will try to explain below. But the bottom line is that in this situation the DataWriter can be blocked by the DataReader that is only interested in the other instance. 

I verified this situation does not occur if the two DataReaders are in different DomainPartitcipants (e.g. different applications/processes).

I agree this is not intuitive and I think there are some things that we could do better to avoid the contention... I have created an internal RFE to look into this.

A potential workaround is to specify a different port to be used by each DataReader. This can be done via the UnicastQosPolicy in the DataReaderQos. For example:

          <datareader_qos>
                <unicast>
                    <value>
                        <element>
                            <receive_port>10001</receive_port>
                        </element>
                    </value>
                </unicast>
            </datareader_qos>

Another way to do this if your pattern is to use sepatate DataWriters and DataReaders for each stream would be to use Partitions (one per stream) so that each DataWriter only matches the DataReaders that are interested on those streams.

As I mentioned the root cause  is that the samples are sent to the common port. This samples are tagged to indicate the filter they pass. So the DataReader with the filter that does not match should ignore them.

For example, samples from DW1 are tagged to indicate that they only pass DR1's filter. They are sent to the common UDP port and both DR1 and DR2 process them, DR1 will accept it, and DR2 will see it is tagged as not matching its filter and drop it. However the way our internal logic works DR2 does not "evaluate" whether the sample is relevant to it unless it has space to "accept the sample". Because of this if DR2 runs out of the max_samples resource it will not detect the samples that DW1 sends to it are "not relevant" to DR2. When this happens DR2 does not acknowlesge the samples and causes DW1 to block as you observed.

-Gerardo

Offline
Joined: 05/18/2018
Posts: 4

Hi Gerardo,

thank you for the quick reply. Well this behavior was unexpected for me.

If I understand you right, the best way to achieve my use case would be to use a separate partition for every stream instance. Defining different ports for each data reader would be difficult in my use case because the number of stream instances and therefore the number of data reader per participant changes depending on the program flow.

Best regards

Olav