Concurrent threads limitation collecting dynamic data

2 posts / 0 new
Last post
Ucf bader's picture
Offline
Last seen: 9 months 3 hours ago
Joined: 03/10/2021
Posts: 2
Concurrent threads limitation collecting dynamic data

Hello,

I have written code that periodically takes specific number of samples (complex data) each time. I am traversing the received samples to collect dynamic data (read only) building a json object that will be sent to a db.
I am facing a series issue with network blocking/delay.

My assumption is that several concurrent threads (take thread and other conversion threads) are accessing the rti infrastructure resulting some sort of network blocking.

Further info:
I've got several threads:

1) Thread responsible for the take operation.

2) I am using C# TPL (https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/task-parallel-library-tpl) for processing the received data in two phases:

a) recursively traversing the dynamic data and building a json object. (uses bind_complex_member, unbind_complex_member, get_member_info_by_index).
b) async sending the data to db.
The TPL blocks max degree of parallelism set to: Environment.ProcessorCount.

As time passes by, converting newer blocks of samples (iterating over dynamic data) phase takes longer than previous blocks (tens of seconds while first blocks took only 1 second for each block) resulting in blocking the take thread and a bad performance.

I'm wondering if there is any limitation on number of concurrent threads accessing the rti infrastructure? If so, is this configurable?

Is there a best practices/performance tips with respect to dynamic data and arrays of structures?

We are running version 5.3.1 on windows10.

Howard's picture
Offline
Last seen: 17 hours 8 min ago
Joined: 11/29/2012
Posts: 570

Are you using RTI Connext DDS 5.3.1?

I note that you posted your question in the "RTI Connext DDS Micro" forum...which is for questions about RTI Connext DDS Micro.

In any case, are you using a RTI Connext DDS listener (on_data_available() callback) to call DataReader::take()?  How many DDS Topics are involved?  How many DataReaders?  Or are you using a waitset() for a user-thread to call take()?

If it's the case that you're only talking about a single DataReader for a single topic, how are you using multiple threads to call take() on the DataReader to get data?

As you can imagine, for specific Entities in DDS, such as DataReaders, only a single thread can access and modify the state of the DataReader at a time.  So, all API calls to the same instance of a DDS Entity (Participant, Publisher, Subscriber, DataWriter, DataReader) will be single threaded (fundamentally protected by an internal mutex).

However for Data structures, dynamic data and otherwise, there is no mutex protection mechanism.