Hello.
We recently changed one process that is sending lots of data on different topics on a reliable flashion from synchronous to asynchronous using the "on demand" flow controller.
https://community.rti.com/static/documentation/connext-dds/6.0.1/doc/api/connext_dds/api_cpp2/group__DDSFlowControllerModule.html#ga6e2d5f82bb73055a69f94a67016d7cdc
Data is sent at a fixed frequency in an loop.
Previously on that loop the datawriters were just sending. Now at the end of the loop to have a call to "rti::pub::FlowController::trigger_flow". Of course the QoS is now configured to use the ON_DEMAND flow controller.
What we saw is that by measuring on the caller thread, using async with the flow controller is actually slower; it's blocking the writer thread for more time and we have even more problems to meet the deadline. 
With this change we expected that every write operation would be just placing the items on queues, an then at the end RTI workers would handle sending to various destinations in a more optimal fashion.
From my understanding something configured asynchronously shouldn't block the caller.
Do these results make sense? Is "trigger_flow" a blocking call? Should we look for errors on our end?
 
      
Hi Rafael,
I think that you're on a project that has a support contract with RTI. Your question may be best asked to and answered by RTI's professional support team.
I believe that Johan Schutzer and Kristhoffer Hultenius are supported developers and can submit your issue to RTI support for resolution. However, in your submission to RTI's support team, it would be great if you could answer the following questions to clarify your situation:
1) Why did you want to change your application from using synchronous DataWriters to asynchronous DataWriters with flow controller? What behavior are you trying to avoid to trying to achieve?
2) How many DataWriters are involved? How big is the data for each DataWriter? What is the period at which data is sent? And are you setting all of the DataWriters to be ASYNCHRONOUS Publish Mode with the same flow controller?
3) What time are you measuring with respect to your statement "measuring on the caller thread, using async with the flow controller is actually slower; it's blocking the writer thread for more time"?
I guess your application uses a single thread to call DataWriter::write() on multiple topics and then at the end of the loop
Where is your writer thread being blocked?
trigger_flow() should not block...it's only giving a semaphore under-the-hood.
4) "From my understanding something configured asynchronously shouldn't block the caller." No, whether or not the DataWriter::write() call blocks or not is not related to whether or not the Publish Mode is synchronous or asynchronous.
If the publish mode is synchronous, then the write() call wil pass the data to the network layer before it returns. Asynchronous write() calls only serializes and copies the data to the DataWriter queue. So theoretically, a synchronous write() will take more time to return (executes more code) than an asynchrnous write().
HOWEVER, whether a write() call will block is independent of the publish mode. A DataWrite::write() call will block if there is no space left in the DataWriter queue to store a copy of the data. This will only happen for RELIABLE Reliability DataWriters that have one or more RELIABLE DataReaders, i.e., there is a reliable connection between the DataWriter and one or more DataReaders.
Sent data is stored in the Writer's history queue (aka writer cache) until it's been ACK'ed by all reliable DataReaders. If one or more DataReaders are not acknowledging send data fast enough, then the queue can eventually be filled. How fast depends on
a) how fast data is being sent
b) how big the Send Windows is set for the Writer queue
c) configuration of the parameters that control the reliability protocol
which begs the question, are you using the Generic.StrictReliable builtin QOS profile to create the QOS used for your DataWriter/DataReaders that need to be reliable? If not, I would certainly start there. The out-of-the-box configuration of the reliability protocol is not designed for any sort of high performance situation.
Again, you should work with RTI's support team for them to understand what you're doing and offer suggestions on how to achieve your desired behavior.
Thanks for the valuable insights.
Unfortunately the scenario is maybe too complex to be able to formulate precise things to support. But maybe we should do that.
1) The application couldn't send at the required rate (40Hz) in synchronous mode, so we thought that maybe using the RTI worker thread(s) and letting all messages arrive to the queue before starting sending could reduce the send/receive overhead.
2) There are lots of topics (50-60) send from that process. All of them with very different sizes. A few of them relatively big.
3) Measuring on a the (by instrumenting) that would need to do those calls at 40Hz.
4) I see. What you described is a syncrhonous writer, as it is always blocking; it just happens that on async mode with free queue space it returns sooner than on other configuration modes. That was surprising. I'd expect an asynchronous writer to never block, specially not for that long, or if it can do so to have a per-call timeout parameter. Given the API (no send message events/callbacks/waitsets) I assumed that writers would throw on full internal queues. This should IMO should be clearly documented.
The API would make no difference if we can't keep the rate anyways. So we'll have to look at the QoS configuration.
Probably enabling FlatData would help. The application is big, so there is no single person with all the details of what is going on.
If we don't succeed we might have to contact support.
Are the application sending all of the data from a single thread? Does your CPU/OS support multiple cores?
If not, then unfortunately, the only thing that you can really try is to optimize your code to be more CPU efficient. If all of the data needs to be published at 40 Hz, then there is a certain level of processing, aka, CPU usage, needed by DDS to do that work...whether done as the application calls write() synchronously or at the bottom of the send loop all at once with the asynchronous writer thread...I don't there is there significantly less work one way versus the other way.
In this case, I would use a software profiling tool to see where is most of the CPU being spent. And after knowing that, try to work on optimizing those paths...
On the other hand, if the application is running on a multi-core system, then you can try to divide the work across different threads. Of course, at some point there is a bottleneck, i.e., the network stack. But you can certainly try to send different data using different threads.
It's possible that when sending "large" data...not sure what you consider to be large, that DDS will have to do extra work to fragment that data to send it. In that case, I would certainly try to send that data in a different thread.
Now having said that, you will have to create your DataWriters differently if you want the calls to different DataWriters() to be multi-threaded. By design, the DataWriters created by the same DDSPublisher object all shared a mutex. So write() calls to the DataWriters of the same DDSPublisher will be executed in a single thread manner.
To have the write() calls to different DataWriters be multi-threaded, i.e., simultaneously executable by threads on different cores, you will have to create the DataWriters with different instances of the DDSPublisher. All of the DataWriters that will be sent by the same thread can use the same DDSPublisher.
I'm not sure what you mean by this sentence above. My original question was how were you measuring "time"? How do you know which API calls are taking up more time than others? How are you determining that the write() call is blocking?
While your definition of what is "synchronous" is different, ours comes from the DDS Standard. A write() call only has the potential for blocking when the connection is set to be RELIABLE via the QOS.
In the same Reliability QosPolicy, you can set the max_blocking_time parameter to 0...so that the write() will return if the internal queues are full. DDSDataWriter::write() does have a return value in C and traditional C++ for which you can detect this condition, DDS_RETCODE_TIMEOUT, or throws an exception for the same situation with the Modern C++ API.
Generally, you don't want write() to return an error just because the queue is full...because it's not unusual for this case and is used as a mechanism to throttle the writer when the writer is sending too fast. This is the default behavior of TCP socket for example. When sending a burst of data, a full queue will prevent a sender to flood the network causing packet losses which necessitates repairing lost data at the expense of additional CPU and network bandwidth. So, slowing down a writer, usually for under a millisecond, will allow the system to optimally use the network.
Using FlatData will save a copy of the data in the write() path, however, the application code that access the data structure will have to change. You can't just access elements of a structure directly...instead, you'll have to use assessor methods. And the time savings for small data is minimal. Even for "large" data, it's probably not significant until the data is in the MB range.
I suggest doing some performance tests before refactoring your application to use FlatData just to understand how much savings you can expect.
Starting with some things I disagree..
The suggestion of having a worker thread pool (where each thread has a DataWriter for each Topic to workaround what seems to be an implementation detail) so we can spread the serialization CPU load is something that I think it could happen in the library code when configured as asynchronous.
The TCP throttling example you provided is about a Socket opened on blocking mode. For an asynchronous TCP socket (opened with
SOCK_NONBLOCK) the blocking happens on select/poll/epoll/overlapped IO,etc. at the user's will. So all the sends can be managed by a single user space thread while the Kernel's tcp stack does its job. This is were our misunderstanding of asynchronous comes from.That's why I still think that a word of caution on the documentation would help. This blocking behavior by default is surprising/misleading, even more considering that DDS seems to be a protocol for real-time data. On a (soft) real time application I would assume that the desired behavior should be to fail fast, not to block. Just a personal opinion.
https://community.rti.com/glossary/asynchronous-writer
Leaving my/our misunderstanding about how async should work and coming back to the problem.
We enabled the flow manual controller to trigger at the bottom of each cycle in case the library could coalesce more of the small samples on the same UDP datagrams.
There is not much to optimize on that application, as it is basically a logger service listening on DDS with no own code. We haven't done full profiling, just some instrumentation using NVTX to measure the CPU time on some sections of the code until we pinpointed that most time is spent in this section serializing and sending data. The setup is pretty complex and can't run locally, let's say that we can't attach a traditional profiler.
Good to know about max_blocking_time. This seems to give us exactly the desired behavior. Every cycle/iteration we know how much time we have left, is it possible to change this per-call? (even if mutex/semaphore timeouts and sleep calls on user space are just lower bounds for the OS scheduler). Otherwise we'd have to go with always async.
So we might set max_blocking_time and handle the exception. Which are the parameters to set the bytesize of the library internal DDS async write/read queues? I assume that we have to tweak those both on Readers and Writers after going asynchronous.
I only have found these, but they seem to be specified in samples, but 20x 4MB samples are not the same than a 20x 16 byte ones.
https://community.rti.com/static/documentation/connext-dds/6.0.1/doc/api/connext_dds/api_cpp/structDDS__ResourceLimitsQosPolicy.html
Some samples are MB, but we are keeping FlatData on hold, as as you said it requires extra work.
Thanks for your help!
Do you prefer this to go through support? Things seem to be generic enough that if kept public they might be solve doubts for other people for those rare specimens using the search function...
Sorry, the value of max_blocking_time and the rest of the Reliabilty QOS Policy is fixed once the entity (DataWriter/DataReader) is created.
There is no internal queue/buffer being used by the internal thread used to send data for DataWriters that are configured for ASYNCHRONOUS publish mode. The thread takes the data that is stored with each DataWriter.
Each DataWriter has a send queue (aka writer cache) that is configured physically through the DataWriter's ResourceLimits QOS Policy, although for reliable connections, the configuration of the History QOS Policy as well as the configuration of the send_window for the reliability protocol will affect how the allocated queue is used.
In addition, if you configure the DataWriter to use a FlowController, the internal publish thread will use the FlowController to determine how much data it will send for each DataWriter every time it wakes up.
I'm not sure what you mean by this. The sizes of the queues for the Writers/Readers (aka send queue/receive queue aka writer/reader caches) are certainly something that you may want to configure...but the fact that a Writer is configured to use the ASYNCHRONOUS publish mode is usually not a factor in queue sizing.
Yes, the sizes of queues are specified in the ResourceLimits QOS Policy in terms of data samples, not data bytes.
Generally, if your project has a support contract with RTI's support team, it would be to your advantage to use that resource to answer all of your technical questions about RTI products. The team is dedicated and global...they are able to respond to your questions in your own timezone. Also, your questions are recorded in your account's record so that you have a record of all issues as well as their resolution that would be available to you and your colleagues for review. Your project is paying for the service, you should get something useful from it.
The community forum is completely self-help/self-serve. There is no guarantee of a timely response to a posting or even if a question will be answered. While RTI engineers do monitor and often provide some answers/advice, we're usually doing this in our "spare" time. There's no-one at RTI who has the responsibility to make sure that all posting to the forum are answered. We're hoping that other members of the RTI community are able to help each other via the forum as many of our users don't have a support contract (researchers/university program members/etc).
Here I wrote very vaguely indeed. I wanted to mean that as we have an asynchronous writer and an ON_DEMAND flow controller we'd need to be sure that the queues need to hold at least one Cycle. But at that point in time on my mental model on how things work internally there was a queue on the Publisher that you said it doesn't exist (or at least if it exists it's not for the purpose of holding the sample values).
Then we might be switching to BestEffort, as I think that the most important sample in case the last one and if we have timings so tight we might not be able to affort retransmissions. The problem is that there are different services developed by different teams, so we'd have to be sure before making such a change.
We will do some further testing and try to narrow down the issue. Then we will contact support. Thanks for the valuable info!