DDS and Load Sharing with Multiple Subscribers

6 posts / 0 new
Last post
Offline
Last seen: 7 years 4 months ago
Joined: 10/21/2012
Posts: 18
DDS and Load Sharing with Multiple Subscribers

A question...

Is it possible to publish one or more samples for a topic for which there are multiple subscribers, and guarantee that any given sample is received by ONE AND ONLY ONE subscriber? I don't care which subscriber receives which sample. I only care that when multiple samples are published, they are shared out approximately equally among the subscribers.

A common problem in my line of work is one of CPU load sharing among multiple processes and processors. Currently this is managed using Point-toPoint socket comms where a master process shares the work out to multiple slave worker-bees.

If I was able to simply publish a bunch of processing requests, knowing that each would be picked up by one subscriber worker-bee, it would simplify the architecture significantly. Scaling a system up to meet higher demand would then be trivial.

I suspect that this may go against the grain of the whole Publish-Subscribe ethos of DDS, but I'll never know if I don't ask, right?

Dallas

Organization:
rip
rip's picture
Offline
Last seen: 18 hours 17 min ago
Joined: 04/06/2012
Posts: 324

Hi Dallas,

I spent several years working in tech support for an RTOS vendor, and later for a company doing an "IDE for XML". 

One of the things I noticed is that ideas or requests for capabilities are bursty in nature, and the idea of being able to do load balancing using DDS appears to be the current need -- Load balancing has crossed my desk at least three times in the past couple of weeks.  With this sudden flurry of interest, I spent some time considering it and thinking about how I'd do it using Connext and the various tools available.  This post is a summation of what I learned and describes one proof of concept I built.

If asked to look into load balancing, I would first ask what the requirements are.  In your case, you are specifically targetting "exactly once", which is the point-to-point data integration pattern, and requires that each work package get delivered to one, and only one, downstream processing "worker bee".  I'll call this "Hard" point-to-point. The issue here is that RTI Connext Messaging does not yet support this integration pattern natively, which means it would have to be implemented at the application layer. 

The main assumption that I had for the PoC was that I would not have influence on the data Type descriptions (if you do have this influence, I touch on what it means below).  Without influence on the Types, the load balancing has to happen at the layer of a 'service', or by manipulating QoS. 

In comparison to hard point-to-point, there is also "Soft".  Other load balancing patterns may be loose, and allow multiple worker-bees to see a work package (if the process they implement is idempotent, such that each worker bee will return the same result or complete the same actions, and these actions or results don't result in side effects or other follow-on processing), or the data may be best effort and having a data instance be lost enroute is not the end of the world.

The design I came up with has the worker bee Subscribers (S) reporting their current load to a small service application that runs in parallel with the Publisher(s).  The service application maintains state for the system, and determines which S is currently the least loaded and most able to process new work packages.  The actual routing of those work packages is then done by manipulating Routing Service routing tables.  The service is aware enough that when a new S' appears (by publishing it's loading), the service will dynamically add the new available route to the RS's table.  When a load on an S drops and becomes the lowest within the system of S*, the service will either

  • pause the current route and resume the new "low" route, OR
  • resume the new "low" route and then pause the old route.

The service would then wait a (configurable) length of time before again changing the route, in order to prevent thrashing if two or more S are dancing around the same absolute loading.  The PoC works but I found out during the implementation that this is Soft point-to-point, due to how the Routing Service handles history queues on routes.

Which order the pause and resume actions happen determined how looseness was demonstrated.  Because the pause and the resume are non-atomic, there is a race condition inside the process and data may be lost when there is no active route and it is coming fast enough to swamp the history keep_last buffer.  Likewise, if the system resumes one and then pauses the other, then both routes may receive an instance (and, because of how RS DataReader history is implemented, this is exactly what happens in all cases, and changing the history queue depth determined how many instances were duplicated.  I can explain why this is if needed).

The PoC implementation does not need to be a standalone service.  If the Publisher is a single instance, it can manage this itself on a different thread.  If the architecture in use is one of the ones we support for Routing Service as a linked object to an application, this presents some additional techniques that could be leveraged, but now we're no longer talking about a "service" but a full-blown application with many moving parts. 

Other techniques or objects that could be leveraged to do load balancing (each with various trade-offs, and for the most part Soft) include

  • Partitions (QoS) within the Domain (if the data isn't keyed)
  • if the Type definition supports it, using Content Filtered Topics
  • Routing Service loopback adapter that maintains the load balancer state 

With a bit more coding, it would be possible to build a loopback adapter for Routing Service that manages the state and determines where to forward each work package.  This solves the multiple DataReader history queue issues in Routing Service when implemented like my PoC, and gains you all the Goodness that RS brings to the table (see the Admin Console integration in Connext Tools for 5.0.0 for example).

As a second PoC, I would use Routing Service, and a purpose-built adapter as the means towards a Hard point-to-poind end, since that's the tool I have the most experience in and I like the fact that I can manipulate its behavior from within the Admin Console. This is still assuming that I don't have influence on the Type descriptions, and in the "when your toolbox contains only a hammer, every problem looks like a nail" mode.

Indeed, the "closest" approximation of point-to-point could be implemented using CFTs and a bit of state maintained in the publisher.  Work packages could then be routed by changing the field value that is the determinator of where a work package will go.  Alternately, the state maintenance can be moved out into a service, at the cost of each work package being published (generally) and then republished (specifically), but at the benefit of supporting multiple, independent Publishers. 

If I can influence the Type descriptions, then I'd use Content Filtering on a load-balance specific field:

struct WorkPackageType {
    long workerId;
    ...
};

Each worker-bee DataReader would then CFT on the workerId, watching for its unique identifier.  The unique identifiers could then be configured statically or dynamically, the worker-bee would report via a control topic that its id was now accepting work packages, and the state maintainer (either a service or a thread in the Publisher application) would then know it could be used.  Determining which workerId would get the next package would be whatever algorythm suited, round-robin or by lowest reported load, etc.

As you can see from the scattershot response, the question of Load Balancing currently requires some trade-offs between what can be influenced and what makes sense in the System you're designing. If you'd like to continue the discussion, feel free to narrow() by listing the influencers and system design you have to work with, and I can respond accordingly.

Regards,

rip

Offline
Last seen: 7 years 4 months ago
Joined: 10/21/2012
Posts: 18

Hello rip,

Thanks for your very detailed response to my question.

In the system on which I work, I have a high degree of influence over the type descriptions. Chatting with some colleagues, we think that the last scenario that you described would be the most suitable in our situation.

We have quite a bit of control over software implementation and data typing, but little over network and router configuration; especially at runtime. So to use your analogy, our toolbox also contains one tool, but ours is a screwdriver, so all our problems look like a screw. :-)

Dallas

rip
rip's picture
Offline
Last seen: 18 hours 17 min ago
Joined: 04/06/2012
Posts: 324

Excellent.

module demo {
module rti {
module loadBalance {

struct Load {
  string<128> id; //@key
  long load; //0 <= Load <= 100
};

};
};
};

This is the "load" reporting Type I used in my PoC. Generally I work with strings for my keys since for a demo I generally don't need to worry at all about being bound by CPU cycles and a stringy key means I can name the parts and see who is talking to whom without having to worry about looking at a spreadsheet. I'm lazy that way. The load is an int between 0 and 100, meaning 0% to 100% load.  How the load is generated (ie, what is the limiting factor, which will probably be the CPU load) is entirely up to you, just make sure that both sides have the same semantic understanding of what it means (100% loaded? or 100% unloaded?) :)

The service then listens for publications on topic "Loading" with the Load type. Keep_last depth is 1 and I read() so that the keyed values are all kept in the DR cache for me. Whenever an update is received I simply read() everything out of the cache, determine which one has the lowest load in the batch and switch the routes to make that one live.

In a CFT version, your worker-bees create and use an SQL-Like string very similar to "workerId = %1" and set the param to whatever their personal ID is.

Keep in mind I've not built one of these, so you'll get to discover all the little things.  If you get stuck, feel free to post what you've run into.

Have fun!  And sorry about the delay in responding.  Had a hurricane parked over my head the past couple of days :)

rip

rip
rip's picture
Offline
Last seen: 18 hours 17 min ago
Joined: 04/06/2012
Posts: 324

Load balancing using routing service might be called "passive", in that it doesn't require that the publisher be aware that its load is being balanced.  Technically, you could have a plug-in into RS that manages both the load and the balancing without requiring that the subscribers be aware of the effort, either.

Active load balancing, where the subscribers report their willingness to accept traffic and publishers have a module to decide where to send the data, is much easier, but assumes that you have green-field development.

One additional option is to roll the below method into the plug-in for RS, which gives you all the capabilities of the RS (including management via the Admin Console in 5.0.0) as well as the load-balancing. 

Start with a type with a field that is to be used as the slot for a token.  It is possible to either insert the field directly, or to overload an existing field.  Let's use the ShapeType  (that the Shapes Demo uses), and use the color field as our overloaded slot:

struct ShapeType {
    string<128> color; //@key
    long x;
    long y;
    long shapesize;
};

The tokens then would be the eight values "understood" by the Shapes front end, being RED, GREEN, BLUE, CYAN, MAGENTA, ORANGE, PURPLE and YELLOW.

When a subscriber publishes its Loading using the Load type described above, it will populate the "id" field with one of RED, GREEN, etc.  When it subscribes to Circle, it subscribes using a content-filtered topic with the filter string "(color = 'RED')" or whatever the "id" field is.

On the publisher side, I have an application that continually publishes a Circle, in a method similar to the crawling circles that the normal demo has (because I'm odd that way, I use polar coords and convert them to x, y values just before publication.  My shapes move in arcs, not straight lines).  Also part of that application is a subscriber that listens for Loading Loads and keeps a sorted array of them, sorted based on the values in the "load" field.   As new Loading instances arrive, I update the Loading array.

When the publisher is just about to publish its next ShapeType on Circle, it requests a Token from the Loading module.  The module returns the "id" value of whatever Loading is in slot 0.  Remember that the "id" values are one of RED, GREEN, etc.  The publisher takes the returned token, sets the color field of its instance to that value, and writes it out.The middleware then processes the filters and sends the instance to only the subscriber whose token was used.

1) A publishing application does not need to getToken() every single time it is going to write an instance.  In this case, assume that the token is a session token. In this case, affinity can be per instance published, per session or per runtime life, that is up to the system design criteria.

2) The algorithm for deciding which Token to hand to the publishing application might be by-least-loaded, or round robin, or whatever.  I have one that uses the least loaded, but each time it gives that Token out it increases the load for that token by an arbitrary amount, based on its distance from 100.  Eventually, each Load (assuming that no Load is updated by its subscriber) seeks to '98', at which point all Tokens are handed out in Round Robin. This third algorithm was developed based on the need to minimize the amount of "Loading" traffic on the wire, with this algorithm you can reduce the number of Load updates, or even turn off getting new instances from the publishers by setting a load > 98. 

3) Multiple publishing applications can have their own Token management, and each can have their own getToken() behavior.  Alternately you could go all-in for Pub-Sub and have a single Loading subscriber that then publishes Tokens on a different topic.  Applications would then subscribe to that new topic, and update its downstream affinity based on that subscription.

It's a pretty flexible technique.

http://youtu.be/KFhNXSmhers

Right at the end you'll see a blue-cyan-blue-cyan-blue alternation, this is because their loads were the same. When this is the case, the sorting algorithm moves the "one just used" to the end of that loading group.  The result is round-robin at a given loading, even when the algorithm is by-least-loading.

Rip

Gerardo Pardo's picture
Offline
Last seen: 18 hours 19 min ago
Joined: 06/02/2010
Posts: 601

Hello Dallas,

I realize this is a pretty old thread, but I wanted to point out that RTI introduced the RTI Queueing Service product in 2015 ot address exactly the "load sharing" use case you describe. See https://www.rti.com/products/dds/queuing-service.html

Gerardo