Hi guys, I am working on a distributed app and I have three types of behavior which I do not seem to get the fully working using QoS.
1. A transient persistant topic from which any new endpoind will receive the latest needed data from a specific topic (topic is keyed so there are the latest state from a specific entity)
2. A fast topic which can send large arrays of samples but with the assurance that all samples from that call are received and in proper order (like a Query)
3. A topic where samples do not need hight troughput but only speed and the assurance that the last sample is received by the subscribers. (Data is only send periodicaly on request or by a specific interval one sample at the time)
I am using 5.2.0 with unbound support. Atached is the current mocked QoS
I would be most greatfull for the help
P.S. - I am not allowed to do changes due to coding. Only XML
Attachment | Size |
---|---|
ndds_qos_profiles.xml | 13.28 KB |
Hi again,
What I need to implement is a wrapper that abstracts all dds comunication under a functional interface. This has to be able to handle multiple connections (50 - 100) simultaniosly and be able to process large amount of smaples or large data that is send on to it.
The Ideea is that I would like to generilze most of the dds under an interface interface and have minimal footprint in implementation.
I have reduced my need to 4 basic patterns :
- Batching to handle large amount of data (50-100 MB)
- Async that has to handle samples with large content inside
- Persistence over a transient model
- Fast flow topic to handle large amount of smaller samples ( 5000 - 50000 samples)
I would be greatful if you can give me a few starting pointers.(how many participants to use, Would it be faster with WaitSet or Listener, security isuues and acknowledgements for packages)
If anyone is interested in helping me I would gladly provide source code and Ideas in ho I have thought the system so far.
Thank you
Hello,
I will try to answer your questions. But there are a few things I did not fully understand...
What do you mean by the sentence: "This has to be able to handle multiple connections (50 - 100) simultaneously"? What kind of connections are these?
Do you mean that there would be many separate applications (processes) producing the data and that your application that uses the wrapper must consume from all of them?
Note that in DDS the communication follows a pub-sub model and normally connectionless trasports are used. There is no real concept of a transport connection, even if you can think of a DataWriter as having some "logical connection" to its matched DataReaders this is more of a logical concept, not a traditional TCP-like connection...
Answering your specific bullets:
Generic.StrictReliable.LargeData.MediumFlow
Generic.KeepLastReliable.TransientLocal
- Fast flow topic to handle large amount of smaller samples ( 5000 - 50000 samples)
Better to use WaitSets if you are going to spend any significant time processing the received data (or you have a lot of volume) that way the processing happens in your own application threads and you free the middleware receive threads to prepare the messages. This will also give you more concurrency. Listeners are fine for quick processing of infrequent messages, not for heavy loads.
This merits a whose separate discussion! It is not a trivial matter. If you need security you probably need the Connext DDS secure product. But better start a separate thread to discuss what the security requirements are...
I am not sure what you mean by this... Do you mean how to configure the reliable protocol?
Gerardo
Thank you werry much for your answer.
I have started documenting your reply and get back to you with updates and maybe more questions about security issues.
Hi again Gerardo and thank you for taking the time to answer my question
I have ran into a small issue .
I have an async publisher which sends 100000 messages on a topic. The administration console is already subscribed to the topic and I can see the samples as soon as they are published. The problem is that I am not receiving all samples. They tend to be missing from the end and some times from the front. I am uploading the QoS, an exported file with data and the Log from dds.
I am using async publishing mode with reliable qos and keep all history . Profile name is "Custom.Async"
Thank You
Hi Christian,
Regarding the Admin Console issue...are you using that same QoS (Custom.Async) when subscribing to the topic in Admin Console? Also, in smartdoseddslog.txt, I see 121 error messages reading "MIGGenerator_addDataBatch:serialize buffer too small." Are those from your publishing application? If so, that could be the source of the issue as it would appear those messages are not being sent.
About the test_topic_export.xml; how was this exported? Admin Console supports CSV and TXT formats but not XML. Line 7520 contains this:
Which I find really confusing because I don't know what could have written that during one of our exports. I also wrote a little parser for the output and found that there are 66642 missing IDs in this output. This seems like a lot of missing data. I'd love to try to reproduce what you're seeing here on my machine if you don't mind sharing your publisher.
Thanks,
Ken
Hi Ken ,
I have exported the file as a CSV and then made it a xml because the uploader on the forum does not support csv. I have deleted between those Id's and prechecked if all ID's were not interrupted. Those between the text that I wrote were good and can be substracted from the missing number. I will prepare a demo of the project because right now is tight up with our framework. I will extract the main part and have it ready in a coule of hours.
Thank you for your help
Hi again Ken,
I have put togheder a project based on my implementation and my needs. Feel free to make some improvements suggestions.
At some tests I am able to get the whole 100 000 samples.
Hi Christian,
Thank you for the follow up. Unfortunately, I'm having trouble building your reproducer. I'm getting the following errors:
Error 5 Metadata file 'C:\ken\dev\workspaces\support\ac_dv_strict_reliable_comm_post\bin\Debug-VS2013\MessageSystem.dll' could not be found C:\ken\dev\workspaces\support\ac_dv_strict_reliable_comm_post\MessageSystem.Test\CSC MessageSystem.Test
Error 3 Metadata file 'C:\ken\dev\workspaces\support\ac_dv_strict_reliable_comm_post\bin\Debug-VS2013\Test_type.dll' could not be found C:\ken\dev\workspaces\support\ac_dv_strict_reliable_comm_post\Interop\CSC Interop
Error 4 Metadata file 'C:\ken\dev\workspaces\support\ac_dv_strict_reliable_comm_post\bin\Debug-VS2013\Test_type.dll' could not be found C:\ken\dev\workspaces\support\ac_dv_strict_reliable_comm_post\MessageSystem\CSC MessageSystem
Error 6 Metadata file 'C:\ken\dev\workspaces\support\ac_dv_strict_reliable_comm_post\bin\Debug-VS2013\Test_type.dll' could not be found C:\ken\dev\workspaces\support\ac_dv_strict_reliable_comm_post\MessageSystem.Test\CSC MessageSystem.Test
But, looking at your screenshot, I think I see part of the problem. You've set the *administration* QoS to "UserQosProfilesLibrary::Test." That QoS will be used when creating DomainParticipants and other DDS entities for the administration functions, not for Data Visualization. What you want to do is set the QoS when you subscriber to a topic using the Subscription Dialog (please see attached screenshot). You can either set reliable and/or transient local using the combo boxes or pick your QoS from the "Select QoS profile" combo box. Also, I recommend setting the administration QoS back to its default (it is clearly labeled).
Thanks,
Ken
Hi again Christian,
I created a quick test of your use case to see if there was any underlying issue. My publisher QoS has only reliability and transient local durability. In the Admin Console's Subscription Dialog, I opened up the advanced settings and chose transient local durability (which automatically selects reliable). I publish 10,000 samples per second and got all 100,000 instances in the Instance Table. I did experience sample loss at the writer (see the screenshot), but the system is able to recover from the loss and still get me all of the instances/samples. Please let me know if you're able to replicate these results and/or if you need any further assistance.
Thanks,
Ken
Hi Ken.
I was able to get the the system working with a strict reliable large data QoS. The idea behind my issue is that I do not want to have behind this behaviour a transient storage. It's more of a fire and forget type of communication with large data. I need from this topic to be able to send up 100 000 messages with a strict reliable and async publication.
Today I got the first results based on bigger data packages and unbounded support. I still need to optimize my test framework to be able to get better readings and timing result. What I have managed so far is to have the 100 000 packages of small data send with 55 000 / second.
Tommorow I will have another question regarding optimization and CPU balancing after I will have a new set of results and data to compare them for a deeper and proper configuration. Regarding sample lost, I have that on my side and it is between 20 - 60 000 but so far those samples were recovered and succesfully received in the other end.
.
Thank you for yout time, Christian
P.S. I do not now why but the administration console keeps crashing every time there is a bigger load of data without transient storage which requires more memory. It reaches really fast the 2GB Java limitation aldough I have the 64 bit version of connext dds
Hey Christian,
I'm really glad you were able to get things working! I'm also glad to see that you're doing real-world tests of your use case and tweaking as needed from there. That's a great way to get the most out of your design. We have a LOT of capability/QoS and I've always found it better to start with the defaults and tweak from there with the actual use case. :-)
Regarding the need for more memory, you could edit the bin/rtiadminconsole[.bat] file to give the Java VM more memory. There's a line there with "-vmargs" where you could add something like -Xmx4g to give the VM 4GB of memory. Please let me know if that doesn't work and we'll figure it out.
Thanks,
Ken
Hi again,
I have tested the uploaded project that I have made and all you need to have is VS 2013, DDS 5.2 and make sure that you build the test type project before building the solution. So far my conclusions are that the QoS that are out of the box do not work as expected. I am getting inconclusive results on my tests meaning that the same qos, same topic, same entity sometimes works, sometimes works partialy and sometimes not at all. I will start to build a Qos from scratch based on my needs from participant to the writers and readers.
Is there a comprehemsive guide to the QoS because the online documentation is pretty thin... I mean something where all properties are explained and how they can be placed together to achieve a particular behaviour. ?
Thank you
Christian,
I failed to get your project working because I had (mistakenly) been using VS2012. I now have it built with VS2013. Sorry for the confusion.
Regarding the QoS; I recommend starting with the QoS reference (cheat sheet) and then moving on to the User's Manual.
You said that you're getting different results for your test. Could that be caused by startup/discovery sequencing? This can happen if you're not using a durability higher than the default of volatile. So, samples you publish in a DataWriter are lost if there have not been any DataReaders discovered for that Topic. Transient-local durability (which requires reliability) will address this problem.
Thanks,
Ken
Hi Ken,
Got another question to which I don't seem to find the exact answer. I am using an inheritance from Generic strict reliable QoS for the reader and writer. I am usin a wait set to retrieve the data (a lot faster than DataReaderListener )
Hi Ken,
Got another question to which I don't seem to find the exact answer. I am using an inheritance from Generic strict reliable QoS for the reader and writer. I am usin a wait set to retrieve the data (a lot faster than DataReaderListener )
the problem is that no matter what I set or change it makes the system slower or maybe faster in writing and reading data but I do not get more than 1024 samples on the reader side. I have overwritten the Max_Samples for the reader which were 256 in default.
I got a felling that the issue is maybe from to many NACK's or hartbeat but no matter what was on the watermark, hartbeat and window size I stll do not receive more than 1024 samples readed by the DataReader.
Thank you
Cristian
Hey Christian,
The 1024 limit per read is caused by DataReaderQoS.reader_resource_limits.max_samples_per_read. This policy limits how many samples you can get back at once from a single read/take operation. I've been tripped up by this one as well. You can increase this limit and/or add logic to keep trying to read as long as there may be samples. I tend to use the second approach since it doesn't use more resources.
Thanks,
Ken
Hi guys,
I have build a Message bus wrapper over DDS which translates that for each endpoint I am using an interface for each topic. Each endpoint works on it's own process and for data reading I am using waitSets. I have encountered two issues on my system which maybe can be better explained by someone with more expirience with DDS.
1. If I am using WaitSets for almoust all topics and for one I am using DataReaderListener the listener doue not recive any more data. on_data_available does not get triggered.
2. I am using a WaitSet for each interface (Topic) to read the data which is translated that I have around 100 WaitSets that work in the thread pool to retrieve data. I have a good felling that I am doing something wrong aldough I was not able to find more detailed data about waitSet. I have complet generic implementation of the DDS Wrapping. The system works but I think is to slow due to context switching but I want to have waitSet(s) because I am sending large amount o samples as burst data.
If anyone can help with an Idea or a pointer ;) would be great.
P.S - Later today I will post an example with the direction I am going.
Thank You
Hello,
Can you explain a bit more how your wrapper works. How many threads it has and what the "thread pool" does?
It would appear your design uses too many WaitSet objects. Normally the number of WaitSet should correlate to the number of insependent thread you have processing the messages/events. So unless you have on the order of 100 threads having all these WaitSet objects would seem unnecessary.
A WaitSet is essentialy a semaphone that is associated with a list of conditions (WatSet::attach_condition()). Whenever any of the conditions became active the Semaphore is signalled so that any therad that is waiting on the WaitSet (WaitSet::wait()) will be woken up. Because of this normally for each WaitSet there should be just one thread used to just wait on it. If you had multiple they would all wake up and likely fight each other to get the data...
In general if you want to wait for data in multiple Topics you are effectively expecting to receive data in the corresponding DataReader entities. For this you do not need a separate thread for each Topic/DataReader. Rather a single thread can wait for multiple DataReaders. This is done by creating a WaitSet dedicated to that thread and attaching the StatusCondition ( DataReader::get_statuscondition() ) of each of the DataReader entities to the WaitSet.
You can also use a leader-follower pattern where you create a single WaitSet to be notified of data at any of the DataReader (i.e. it has attached all the StatusConditions) entities and elect a single thread to wait on this WaitSet (the leader). When the "leader" thread wakes up it (1) takes the data from whichever DataReader activated the WaitSet, (2) elects a "follower" thread to be the next "leader" to wait on the WaitSet, and (3) goes ahead to processes the Data it had taken. The "follower" threads could be waiting on separate WaitSets (one per thread) but this could be done using GuardConditions (a different GuardCondition per WaitSet and per thread) so the leader decides which one to wake up an signals that one GuardCondition...
Gerardo
Thank you Gerardo,
I have started looking as of yesterday on this and my conclusion was to use only one waiting thread to read the data, check the triggered topic and pass the data to the coresponding reader to be procesed an during this time have another waitset to as the "leader". If you can point me to some more complex examples on using this scenario it would be great.
I will post updates regarding this matter.