waiting on multiple filedescriptor/objects

16 posts / 0 new
Last post
Offline
Last seen: 9 years 9 months ago
Joined: 03/01/2015
Posts: 8
waiting on multiple filedescriptor/objects

Hi,

I'm thinking about jumping on the DDS ship but as someone who regullarly does integration I've got one thing that I just can't get over - I'm hoping I missed something or this non-unique case can be addressed.

So a WaitSet says it functions like Posix select/poll and windows WaitForMultipleObjects.  The functions compared to alow the program/thread to block and react in a low-latency way to multiple devices, sockets, signal(fd)s, pipes, and eventfds.  They are the glue to hold together complex loops regularly encountered in integration.

However I cannot find a way  through proprietary api or standard api to get this functionality - that is how would I get a waitset to pay attention to sockets, signalfds, pipes, and eventfds in linux and all the events from objects on windows?  The only thing I can find is suppying a timeout which is an ancient substandard way of programming on all os'  as it wastes cpu/causes more contexts switches and increases latency significantly and complicates the polling loop greatlly to deal with draining/starvation.

Not having that functionality would be an incredibly painful dealbreaker for me given all the problems DDS is solving.  I thought about just using nonblocking timeouts on waitset's and blocking for 1ms on poll or vice-versa but the context switches and additional complexities to deal with draining/starvation in addition to many programs/threads each adding this ms of latency got unattractive very fast.

In a related question, how does DDS on linux react to signals being blocked?  In particular I block SIGINT, SIGQUIT, SIGTERM to let the applications handle them correctly.

rip
rip's picture
Offline
Last seen: 3 weeks 1 day ago
Joined: 04/06/2012
Posts: 324

Hi Jason,

"So a WaitSet says it functions like Posix select/poll and windows WaitForMultipleObjects." 

It does.  But it is not a Posix select call.  It is a WaitSet.  The 'functions like a' is there so that if you know what a Select is, you can correctly slot the WaitSet into what kind of functionality it provides, conceptually.

The "multiple objects" that it allows you to wait for are DataWriter DataReader* instances (actually, specific conditions existing on those DataWriter DataReader instances, as well as User-defined 'guard' conditions.

if you need to wait on non-DDS devices, sockets, signals, window events, etc ... use a Select statement.   If you want to be coordinating something that uses a Select in your code, I suppose you could use a guard condition and set it to True if you want the result of the Select handling to be "release the WaitSet from its block".

rip

edit 1:  I   knew what I meant.

Offline
Last seen: 9 years 9 months ago
Joined: 03/01/2015
Posts: 8

It does.  But it is not a Posix select call.  It is a WaitSet.  The 'functions like a' is there so that if you know what a Select is, you can correctly slot the WaitSet into what kind of functionality it provides, conceptually.

Right, I think that was clear it was not a replacement and is geared towards DDS specific problems.  But integration complicates things when you need to actually do both in the same loop.  Basically I need a select/pollable DDS compatible construct and it needs to be robust and offer timely (ms sensitive) servicing.

The "multiple objects" that it allows you to wait for are DataWriter instances (actually, specific conditions existing on those DataWriter instances, as well as User-defined 'guard' conditions.

if you need to wait on non-DDS devices, sockets, signals, window events, etc ... use a Select statement.   If you want to be coordinating something that uses a Select in your code, I suppose you could use a guard condition and set it to True if you want the result of the Select handling to be "release the WaitSet from its block".

To do that would require multiple threads - multiple threads in this type of loop complicates things alot and should be avoided.  In particular the poll/select loop would need to have access to the gaurd and set it true - there could be more than one condition also for different cases the poll/select loop reacts to in addition to difficulties in properly shutting down.  Also the poll/select loop is usually the guy that is arbitrating or calculating the data I'd want to pass from/to DDS.


Another way of maybe doing this is sort of half using listeners, but this approach doesn't always work - more on that in a bit.  In particular having listeners or some way of listening to all events, we can basically see when we would need to call wait on the WaitSet and be gauranteed to not block as events would be pending.  Inside the listener instance we'd have a filedescriptor such as a pipe or eventfd which we'd write to when there's something we should wake up on.  Then we'd poll on those filedescriptors with the rest in the big integration loops and call wait (assuming that the wait won't block any significant amount of time - more than a ms or 2) and the DDS reads/writes below.  This has the following assumptions:

1) You can simply catch all events / wakeups you need to react to in the listeners and direct them to 1 or more (not too many..) file descriptors

2) You don't have to complete the IO in the callback and can follow up with a waitset wait/datareader read/datawriter write later.


I'm a complete DDS newbie so I'm not sure if any of these assumptions is met.

rip
rip's picture
Offline
Last seen: 3 weeks 1 day ago
Joined: 04/06/2012
Posts: 324

Your description confuses me.  Probably I've not worked on the type of system you seem to have, where there are single big integration loops instead of a distributed system.

You have three ways to react to the possible existence of new data:

1) A DataReader listener (which is a callback into your application on a DDS participant thread -- so don't block it.  Specifically, writing to an fd?  Contra-indicated as that I believe is a blocking write unless you set the correct bits.  I tend to avoid those types of requirements -- it's too easy to forget both that it needs to be done, and there is no inherent reason (outside of the DDS knowledge) why it needs to be done.  So somebody will come along at some point, say "that's not efficient" and unset the bit.  And everyone will lose their hair as they try to figure out why DDS stops accepting new samples. 

As long as your code is running in that DataReader Listener, the thread will no longer be processing incoming data.

2) A WaitSet, with one or more Conditions (Read, Query or Guard) attached.  These run on application threads, but will probably require a task switch or two when they are unblocked.

Writing to a pipe or eventfd implies that you are signalling some other application, you are not signaling internally.  Signaling internally is what a GuardCondition is designed for.  A GuardCondition does not need to be attached to a WaitSet to have utility -- it's just a simple non-blocking binary semaphore.  Indeed, it can be used as a condition flag (has an event happened or not?) because it supplies get_trigger_value(), so a thread that knows of that GuardCondition can use it to influence its own behavior.  Alas, you can not use it as a synchronization method, for that you would need to use a Posix semaphore for example.  I would suggest these are lighter weight than writing to a fd pipe or event.

3) Polling: Keep in mind that DDS is still happening under the hood as it were.  You could just poll and treat an empty inout sample buffer as an indication that there is no data to be read.  That won't require any context switches so if you are trying to minimize or control those, that's your best bet.

And point 3 touches on another possible disconnect.  A participant has several threads running, if your goal is to minimise threads, you'd best take a look into the DDS threading model. 

If your main application loop is doing filtering, mediation and choreography... some of that can be done by DDS.  Look at ContentBasedFilter mechanisms.  This will also keep data off the wire since a CFT is propegated to the Writer side when possible.  The writer then knows when a reader isn't interested in a given sample, and so does not send it.

If you /do/ have multiple applications that you are trying to coordinate... I just use DDS for that. 

Maybe someone else here has a better grasp of the type of system you are implementing. 

 

Offline
Last seen: 9 years 9 months ago
Joined: 03/01/2015
Posts: 8

Sorry about the confusion, let's try to hash it out.  The system is actually locally distributed (on the machine through a variety of processes and threads) with lots of IPC mechnisms (pipes, sockets, eventfds, large shm blocks/framebuffers) - a few other machines are involved for control however.  By the way, this set of issues also arises when trying to integrate 2 event-reactor patterns - this is actually the reason Qt on linux depends on Glib for the big main-loop.  zeromq also supplies a poll function that is integratable (however I wish they went the extra mile and supported ppoll to deal with this issue)

1) A DataReader listener (which is a callback into your application on a DDS participant thread -- so don't block it.  Specifically, writing to an fd?  Contra-indicated as that I believe is a blocking write unless you set the correct bits.  I tend to avoid those types of requirements -- it's too easy to forget both that it needs to be done, and there is no inherent reason (outside of the DDS knowledge) why it needs to be done.  So somebody will come along at some point, say "that's not efficient" and unset the bit.  And everyone will lose their hair as they try to figure out why DDS stops accepting new samples

eventfd's have nonblocking writes but yes usually you'd put the filedescriptor in nonblocking mode.  The only way the writes would block on a pipe (universally compatible way of doing this on posix) is that it's full and in blocking mode - but if it's just simple event delivery - a full pipe means events are waiting and haven't been consumed and nonblocking is the way to go.  It does involve a syscall (>1 to drain a pipe) either way though but the latency is fixed/bounded and it has no problems satisfying deterministic timing requirements below in the 10s-100s of usecs on RT kernels.  It can get better on the carrier grade stuff/QNX.

Re listener - understood about not blocking it, but if I used listeners to dispatch nonblocking fd io as per the above, could the loops using poll/select pick up on those events and drain the DDS ready data?  Sometimes when having both listeners and queued constructs like wait-sets, it's completely mutually exclusive - if you don't finish processing in the callback, that data is gone.  My idea above was to instead of fully processing, merely redirect status changes to fds and leave it to threads using waitsets / datareaders to process.  Is that even practical?

 

Re 2) eventfds can be cross process or cross thread.  Unlike pthread conditions they are cancelable, react to signals, and integratable with other ipc mechanisms above via poll/select which is why it is preferable in some spots to use them.  Also, the mechanism of polling you mention is the more traditional mechanism where somebody checks a value.  This is not the same as for one, you must put in a fixed rate or busy-loop - both having their undesirables.  Another problem is that only takes events from poll and delivers them to DDS, not taking DDS and delivering to poll.

Re 3) As far as I could tell, this is the nonblocking poll method.  I'd need to put in a small (1ms or so) timeout to the ppoll syscall which would be the minimum latency DDS would see (assuming no other fds are ready) which DDS would see even when it has events ready.

On minimizing threads, I don't care too much about what DDS midleware itself is doing (within reason - I can give it a fairly wide range as long as it's timely and not using more than 10% of a laptop grade intel i7), I realize that it needs threads down there to solve some of its problems.  In time sensitive loops I don't want there to be too many context switches or calls to things that have poor latencies/determinism.  What I had meant with the threads statement is that the big integration loops would get harder because now you need to deal with thread communication to seperate the DDS and integration-loops.  It can range from "of great error reducing convinience" to necessity (on atomic things) to make sure things have been submitted right where you want them to.


I definitely want to use DDS for all the coordination I can, but down at the bottom or with third-party (proprietary) software not using DDS is involved we still need to use these universally compatible mechanisms to integrate with DDS.

Gerardo Pardo's picture
Offline
Last seen: 1 week 5 days ago
Joined: 06/02/2010
Posts: 602

Hi Jason,

If I understood you correctly then the DDS GuardCondition is the construct intended to support the kind o use-case you describe. That is, allowing the integtation of the DDS WaitSet with other platform or toolkit wait/notification mechanisms. 

As you noted depending on the plaform there are multiple ways in which a thread can be suspended and notified. So rather than creating special constructs for that DDS provides the GuardCondition that can be used by an application to integtrate the desired mechanisms. 

The application controls the "trigger" status of the GuardCondition by calling the set_trigger_value() operation. So in addtition to the regular DDS status you are interested in reacting to your application can also add one (or more) GuardConditon objects to the WaitSet.  If you want to then monitor a set of  sockets, signal(fd)s, pipes, eventfds, etc. You would need to use a separate thread that uses select() or some other platform-specific mechanism to wait on those events. When the thread wakes up it can then call  set_trigger_value() on the GuardCondition you attached to the WaitSet which will cause the WaitSet to be woken up.

I realize that having to start a separate thread for this is inconvenient but at the same time this is a very generic mechanism that will work even when there is no single OS/platform construct that can monitor all the events you are interested in. It is also better than a timeout becuase it will have much lower interrup latency.

Gerardo

Offline
Last seen: 9 years 9 months ago
Joined: 03/01/2015
Posts: 8

Hi Gerardo,

That is indeed a very generic solution and fairly low latency but I disagree it's a good one... especially if WaitForMultipleEvents or epoll/poll/select is being used in the guts of the middleware.  I think embracing a very small set of platform specific api (specific mainly to posix and windows) get most of the coverage needed without bringing in the complexity of threads - other platforms might have just chosen a different name for it. The additional threads means more buffers, more chances at blocking, or potentially skipped data before it making it to DDS which is part of why that isn't completely equivilent.  As I hope you saw above this is not a uncommon problem and several places "see the light" on it - not everyone though. Throwing threads at it gets in the way of adopting DDS and significantly complicates code/increases bug counts.

You can expose this functionality in ways that hide the platform-independence through sourcecode compatibilty (platform dependent typedefs), say by, if we choose DDS wait sets as the place having the poll/select driving the application, an object taking a pointer to handles/ints and a count or maybe an array of structs like the zmq_poll method (recall zeromq is also cross platform - this function works as it's intended on windows too).  It would still be a very good idea to pay attention to the problem ppoll solves on posix in going through this route but putting this  significantly simplifies integration where you're doing more than one thing (and of course it makes sense in that context to not use more additional threads/processes for that part of the problem).

Here is the results of a few minutes of googling so you fully understand that this is a wide-spread problem that deserves a proper solution which many have allocated proper or better solutions to (hopefully this isn't annoying to anybody, I just dont want you thinking oh hey, it's only this guy who has that problem - it's a real bonified integration issue that generally is solvable provided some minor cooperation):

  • http://jefftrull.livejournal.com/4361.html
  • http://libusb.org/static/api-1.0/mtasync.html
  • http://damien.douxchamps.net/ieee1394/libdc1394/faq/#How_to_cleanly_stop_a_blocking_capture
  • http://docs.enlightenment.org/auto/eio/group__Ecore__Main__Loop__Group.html#ga111f77f35bf6f6065357dd0033d75e5c
  • http://www.wangafu.net/~nickm/libevent-book/Ref4_event.html (http://libevent.org/ provides a cross platform implementation of poll/select although through callback interfaces)
  • https://rodgert.github.io/2014/12/24/boost-asio-and-zeromq-pt1/
  • http://stackoverflow.com/questions/12829005/using-zeromq-together-with-boostasio
  • http://stackoverflow.com/questions/24449936/integrate-boostasio-into-file-descriptor-based-eventloops-select-poll
  • Also I wanted to point out that boost asio lets one get the native handles of each socket/serialport/whatever and exposes some OS specific APIs (look at bottom) as posix::stream_descriptor.
  • http://www.boost.org/doc/libs/1_57_0/doc/html/boost_asio/example/cpp03/chat/posix_chat_client.cpp
  • http://stackoverflow.com/questions/8862004/how-to-add-xorg-event-handing-in-g-main-loop
  • http://nikhilm.github.io/uvbook/utilities.html#external-i-o-with-polling
  • https://github.com/jonnydee/nzmqt
  • http://stackoverflow.com/questions/17834696/qt-and-rti-dds-interaction-need-some-guidance (note that guy isn't happy either)
  • "How can I integrate ZeroMQ sockets with normal sockets? Or with a GUI event loop?" @ http://zeromq.org/area:faq
  • http://nanomsg.org/v0.5/nn_poll.3.html
rip
rip's picture
Offline
Last seen: 3 weeks 1 day ago
Joined: 04/06/2012
Posts: 324

Also,

> Sometimes when having both listeners and queued constructs like wait-sets, it's completely mutually exclusive - if you don't finish processing in the callback, that data is gone.

That may be true for non-DDS constructions.  In DDS, that's why there's a read() or take() available on the readers.  The data may still be gone -- QoS is like that -- but it won't be because you decided not to read it on this go-around.  The minimum expression of this is setting the length of the inout buffer to 1 (or by using read_next_sample() or take_next_sample do()) on the read/take call.

And, you can poll, WaitSet or use a Listener on a given reader at the same time (there is a defined priority for Listener then WaitSet).  I'm thinking of one setup I've seen that used (on a single Topic) a take_next_instance() inside a Listener to pull a specific thread of commands out of a stream, while a WaitSet-coordinated take() was used for less time-sensitive 'mass transit' stuff on everything else received on the topic.

 

Gerardo Pardo's picture
Offline
Last seen: 1 week 5 days ago
Joined: 06/02/2010
Posts: 602

Hello Jason,

Thank you for the additional explanation and the references. I took a look at them and I think I have a much better idea of what you are after...

First one clarification. The implementation of the DDS WaitSet is waiting on a plain binary semaphore. So it does not use a (Unix) select/epoll.

This not just for operating system portability. It turns out that the kinds of DDS events that wake up the DDS WaitSet are things like arrival of data to specific DataWriters, internal notifications of QoS changes and statuses, etc. Which are not directly tied to any operating system file-descriptor.  For example DDS uses only a few (normally 3) UDP sockets to receive data and metadata (discovery,  reliability acks/heartbeats, data, data fragments, liveliness data, etc.). The application may be waiting for a data on a specific DataReader or a match event on a DataWriter and there is no one-to-one correspondence between these events and the underlying sockets, that is data for multiple DataReaders may arrive over the same socket. And similarly multiple sockets may be involved the reception of information (e.g. data, discovery data, liveliness data, reliability traffic) for the same DataReader.

So the DDS intenal threads are processing the messages and performing the periodic event checking and when they detect something that affects a particular WaitSet then they wake it up by signalling the corresponding semaphore.

In the case of Windows the semaphore are waitable objects to the WaitSet::wait() ends up blocking on a WaitForSingleObject() but even in this situation a WaitSet may consume multiple "semaphore signals" before returning from the WaitSet::wait() because there are configurating settings that allow the wait to "batch the wakeup events" for increased event-processing throughput.

So at least in Unix the operation of the WaitSet is completely independent from anything that you can capture wth epoll() so this a reason we did not try to offer a way to integrate the two leaving it as something the application could do.

But I do see your point that the application may want to weave in events coming from all these sources (e.g. DDS and other I/O sockets, files, etc.) and it is not so easy to do with the APIs we provide. I agree asking the user to create additional threads to handle this common case is not "friendly". 

Reading the references you provided got me thinking as to what we could do to ease this task.  I would like to describe some of these ideas and hear your comments as to whether they would address this use-case or have other better suggestions:

I see two use-cases:

  1. The application is using its own constuct to have its thread wait for events and it wants to add the DDS WaitSet to the list of conditions that would wake up its thread.
  2. The application uses DDS WaitSet::wait() but has extra events it wants to monitor and wants to have the DDS WaitSet::wait() also wake up from those events?

Are these equally important/useful or do you see one more useful than the other?

For the first use-case, we could add an API like  DDS WaitSet::get_os_waitable_handle() that would return the kind of OS native waitable handle that is used by the "natural" wait mechanims in that platform. For example in Unix we would return a file descriptor that can be used within a select() or epoll() whereas in Windows a HANDLE that can be used with WaitForMultipleObjects(). When the user does this the DDS internal logic would signal that additional "native handle" whenever it wants to wakeup the WaitSet. So that the application can use that handle to wait upon rather than use WaitSet::wait().  

Note that in some platforms like Unix we would need to create this underlying file description because the WaitSet does not have one (it only has a semaphore). Depending on the specific platform we cantry to be effient like using eventfd() in Linux... But I am not sure if equally efficient handles can be created in all platforms... 

For the second use-case, we could create a new kind of DDS Condition: OSWaitableCondition that can be initalized with the corresponding OS native waitable handle (e.g. a file descriptor in Unix). The the user can attach this OSWaitableCondition to the WaitSet and this would couse DDS to monitor the handle using the proper native operation (e.g. epoll on Linux). This requires a internal DDS thread but a single "internal waiting thread" could multiplexed to watch over multiple handles as epoll() does.  Also having the internal DDS thread do this saves the user from having to create their own thread just to demultiplex on the handles. When the handle is signalled by the OS, the internal thread enables the OSWaitableCondition condition associated with it and that in turn wakes up the DDS WaitSet.  In a sense my first suggestion was that an application could build this OSWaitableCondition by using a DDS GuardCondition. But having it would simplify what the application needs to do.

Would this fit the use-case you are describing? Or would you suggest a different way to do this?

Thanks!

Gerardo

 

Offline
Last seen: 9 years 9 months ago
Joined: 03/01/2015
Posts: 8

Hi Gerardo!


I'm happy to see your understanding of the issue.


I read the implementation of opendds and opensplice's wait sets and I did see you were doing a bit more than just file descriptors and the implementations boiled down to semaphores & conditions so I figued the answer would have ended at the approaches we've outlined earlier.


Re the windows semaphores being waitable, yes that is something I'm always envious about - windows has everything under the sun compatible with the wait system.  I've actually petitioned The Open Group to make the posix thread & IPC apis integrated with posix poll - apparently I need to get sponsorship of it or go through IEEE to get it on their todo list.  Was not aware of the batching of wakeup events however.

I think the first use-case is the best both for RTI and integrators.  Here's why:

As the ppoll issue I mentioned earlier, the calling of that mechanism can change subtly depending on the OS (+version) and a particular integrators needs.  I do think you would want to choose the most appropriate file descriptor type on each platform.  This approach lets integrators choose exactly what works best given their situations (select for instance works on files where poll does not, ppoll, epoll, kqueue are choices that can be made), versions, et-al without having to come back to you for it supporting it and frees you from working out all the subtilities of multiplatform poll support.  It allows them to act as the top level reactor (organization and management of filedescriptors can be something more complicated than a simple array you are handed one time.  In contrast, the other approach leaves the caller of poll/select to WaitSet to deal with all the situations a user would have.

Both approaches provide the capabilities needed but the first one is probably more efficient, more flexible, and probably easier to maintain.  I think it could be done by inserting checks to see if one of these waitable descriptors is valid in a waitset, and if valid post the event on that descriptor - just after we are sure the waitset will wake up (poll guarantees read will not block after a POLLIN wakeup) - probably at the tail of your wakeup function.  From there the user's poll returns with an event pending on that waitable descriptor, then the user calls wait so they can demultiplex the WaitSet case they need to handle?


edit: You might want to let people specify what type of waitable descriptor implementation to use - for instance a pipe or eventfd - that or at least have type/platform documented;  I think it would also be a good idea to supply an api routine that cleares this opaque file descriptor type's pending events.  Clearing pipe's pending events is a bit different from eventfd's and eventfd actually returns the number of events it's accumulated so far.

Thanks in kind!

-Jason

Gerardo Pardo's picture
Offline
Last seen: 1 week 5 days ago
Joined: 06/02/2010
Posts: 602

Hi Jason,

Thank you for the detailed comments. I agree the first approach seems simpler and more general for the reasons you describe. I have entered a feature request (CORE-6707) that captures the feature. Hopefully we can get this added in one of the upcoming releases...

Regards,

Gerardo

Offline
Last seen: 9 years 9 months ago
Joined: 03/01/2015
Posts: 8

Gerardo,


How long do you think a feature of this sort will take to add/release?  Since I have to implementing programs with it immediately (and be usable), I'm seeing if I can basically code to the feature in place now - and develop/test against opensplice (which I've got a wip patch to,  see attachment)  + SIMD C++ bindings.  Otherwise I would have to use the methods we've already covered...

However I still would like to pursue usage of RTI Connext because of the low bandwidth plugins and some other reasons I wont get into.

I noticed https://www.rti.com/downloads/rti-dds.html with the "Open Source" core version and if made available I might be able to add something complying to the interface now as well... but I'm doubting  the practicalities of mixing that prebuilt Connext DDS Professional (low bandwidth plugins + other tools).  If it might speed up your work on it, I can check out what it is to add to the opensource version, and flign you that patch.   I know however that it must go through some proper cycling at your own pace however but let me know what you think about that as a jump start incase your worker-bees are backlogged.  If its worth while, to get access to the core source, is that something you can provide me with through a PM or do I need to send a request to info@rti.com? 

Small detour:  Which C++ (template based, C++11 not ignored hopefully) wrapper library is best suited for RTI?  Just want to make sure I'm not making a mistake with SIMD.


-Jason

Offline
Last seen: 5 years 9 months ago
Joined: 06/30/2016
Posts: 3

Hi. Any news about this functionality? Integration to platform native waiting mechanisms would be an extremely useful feature.

Gerardo Pardo's picture
Offline
Last seen: 1 week 5 days ago
Joined: 06/02/2010
Posts: 602

Hello,

I am sorry to say we have not made any progress on this... We agree it would be a very useful feature and also not too hard to implement. But for the last year we have been working against some tight deadlines to get security and a number of new features addressing mobility, cloud and other scenarios that were in the critical path of some large deployments.  We just took another look at our schedule to see if there was a way to slip it into our next release cycle but currently it does not seem like it would fit, unfortunately.

I will keep looking to see if we can somehow get it in as some sort of experimental feature, but we cannot commit to a timeline at this point...

Gerardo

Offline
Last seen: 5 years 9 months ago
Joined: 06/30/2016
Posts: 3

Ok, thanks for your answer. Well, maybe I can implement this myself with one extra thread, WaitSet and eventfd().

ilya_1725's picture
Offline
Last seen: 2 years 6 months ago
Joined: 04/20/2020
Posts: 24

Any updates on this feature?