Is there a way to change the bandwidth throttling?

7 posts / 0 new
Last post
Offline
Last seen: 11 years 5 months ago
Joined: 05/27/2013
Posts: 8
Is there a way to change the bandwidth throttling?

I understand that there are internal mechanisms to throttle the bandwidth consumed by the sending of the samples to prevent overwhelming any late-joining DataReaders, however, is where a way to modify this settings?

rip
rip's picture
Offline
Last seen: 2 weeks 4 days ago
Joined: 04/06/2012
Posts: 324

If you are using asynchronous writers you have access to flow controllers.  cf Flow Controller

If you are not using asynchronous writers, the only "throttling" that will happen is when for example a reliable writer's History cache is full with un-acked instances, and the application's write() blocks.  Other than that, the data is sent on the execution path and is not buffered in the middleware (it may be buffered in the IP stack's implementation however). 

Giving the writer an unlimited blocking time, history KEEP_LAST depth=N, the writer will block whenever there are N samples that haven't been acknowledeged.  As the reader acknowledges samples, the writer will be freed to write the sample it is currently working on, and then the thread returns to application space.  This will keep the writer in sync with whatever the reader(s) can handle.  See Gerardo's comment below. 

rip

Gerardo Pardo's picture
Offline
Last seen: 1 week 1 day ago
Joined: 06/02/2010
Posts: 602

Hello Kuanyong and Rip,

I have some comments/clarifications to add to Rip's posting and an additional suggestion you.

With regards to blocking. The situation is a bit more involved that Rip described...  Basically there are are two situations under which a reliable DataWriter may block on the write operation:

(1) The sample just written cannot be stored in the DataWriter cache because that would exceed some resource limit as configured by the RESOURCE_LIMITS DataWriter Qos policy.

(2) The sample just written would cause the size of the send window to exceed the current value of the send_window_size. The send_window_size normally self-adjusts in reaction to network conditions and receiver performance, but its limits and adjusting policy can be using using the DATA_WRITER_PROTOCOL Qos Policy, specifically the rtps_reliable_writer attribute of type DDS_RtpsReliableWriterProtocol_t.

The way these Qos interact to cause a the DataWriter write operation to block is described in Section 10.3.2.1 of the Connext DDS 5.0 User's manual titled "Understanding the Send Queue and Setting its Size".

The fundamental intuition is that the DataWriter only blocks when accepting the sample would cause it to violate a QoS policy and the situation can resove itself waiting for some acknowledgment from one of the DataReaders which would free the necessary recource.

Specifically a reliable DataWriter with HISTORY kind set to KEEP_LAST and a specific history depth=N will NOT block when there are N samples for an instance that have not been acknowledged.  This is because the whole purpose of the KEEP_LAST history setting is to specify that the only thjnk the DataWriter needs to keep are the last N samples and that earlier samples are not relevant if yoiu already have the last N. For this reason the DataWriter can and will remove the oldest sample, even if it is not acknowledged by some reliable reader, and replace it with the newly written sample without causing a block.

This does not mean that a write operation on a realible DataWriter with KEEP_LAST hostory will never block. There are other cases which may cause the write to block. These are described in detail in Section 6.3.8.1 titled "Blocking During a write()" of the Connext DDS 5.0.0 User's manual. But these are caused by the way the RESOURCE_LIMITS are configured (or the send window), not by exceeding the history depth.

With regards to your original question of how to throttle bandwith the best approach of you know the ammount of bandwidth you have available and you would like to consume is to configure the use of a FlowController. If you do not know and want it to dtnamically adjust based on network congestion or the actual speed of the DataReaders then I would configure the send_window_size  limits to have a max_send_window_size that is not unlimited. Unfortunately the default out-of-the box setting for the  max_send_window_size is "unlimited" so  this useful thorttling mechanism is disabled by default.

Gerardo

 

rip
rip's picture
Offline
Last seen: 2 weeks 4 days ago
Joined: 04/06/2012
Posts: 324

cf the new, related discussion on gap v. hb behavior

Offline
Last seen: 11 years 5 months ago
Joined: 05/27/2013
Posts: 8

Thanks for your responses, but I am still unable to resolve my issue. I have a few questions:

1) Without batching enabled, am I correct to say that I will be sending 50x10 records per second (assuming that only 1 record can fit into 1024 bytes)?

2) When I tried to reduce my byte_per_token value to 512, I encountered an "inconsistent QoS policy" error. I checked the description of bytes_per_token in the usersmanual.pdf but there was no information on cases where an inconsistent QoS policy error would occur. If it is because the value set was too small to fit one sample, how do I calculate the actual sample size because (assuming 1 char is 1 byte) I calculated the size of ExampleRecord to be 348 bytes.

3) I noticed that my queued sample count on my writer is always 2000 after writing 2000 individual samples of index 0 to 1999. As my max_instances is 2000 and history.depth is 1, If I write another sample to update an existing instance of say index=10, am I right to say that this new sample would replace the old instance in the queue of 2000 samples instead of replacing the oldest sample which is of index=1?

4) With these settings, I am still receiving batches of 300-600 records every few seconds on my C++ receiver on a separate machine from the sender. Am I miscalculating the amount of bandwidth I need to send my records smoothly?


My java writer application's initialization is as follows:

//create participant using default settings;

FlowControllerProperty_t flowControllerProp = new FlowControllerProperty_t();

participant.get_default_flowcontroller_property(flowControllerProp);
flowControllerProp.token_bucket.tokens_added_per_period = 50;
flowControllerProp.token_bucket.max_tokens = 50;
flowControllerProp.token_bucket.tokens_leaked_per_period = 0;
flowControllerProp.token_bucket.period.nanosec = 100000000; //100ms
flowControllerProp.token_bucket.period.sec = 0;
flowControllerProp.token_bucket.bytes_per_token = 1024;

flowController = participant.create_flowcontroller("flowControllerName", flowControllerProp);


//create publisher using default qos settings;

DataWriterQos dataWriterQos = new DataWriterQos();

publisher.get_default_datawriter_qos(dataWriterQos);

dataWriterQos.deadline.period.sec = Duration_t.DURATION_INFINITY_SEC;
dataWriterQos.deadline.kind = ReliabilityQosPolicyKind.RELIABLE_RELIABILITY_QOS;
dataWriterQos.reliability.max_blocking_time.sec = 60;
dataWriterQos.durability.direct_communication = true;
dataWriterQos.resource_limits.max_samples_per_instance = 1;
dataWriterQos.resource_limits.max_samples = 2000;
dataWriterQos.resource_limits.max_instances = 2000;
dataWriterQos.durability.kind = DurabilityQosPolicyKind.PERSISTENT_DURABILITY_QOS;
dataWriterQos.history.kind = HistoryQosPolicyKind.KEEP_LAST_HISTORY_QOS;
dataWriterQos.history.depth = 1;
dataWriterQos.ownership.kind = OwnershipQosPolicyKind.SHARED_OWNERSHIP_QOS;
dataWriterQos.publish_mode.kind = PublishModeQosPolicyKind.ASYNCHRONOUS_PUBLISH_MODE_QOS;
dataWriterQos.protocol.rtps_reliable_writer.max_send_window_size = 2000*2;
dataWriterQos.protocol.rtps_reliable_writer.min_send_window_size = 2000;

publisher.set_default_datawriter_qos(dataWriterQos);

//create and register topic

//create writer

ExampleRecord rec = new ExampleRecord();

for(short i=0;i<2000<++i){
rec.index = i; // this value is the key - 2 bytes
rec.strVar1 = "example"; // string<100>
rec.strVar2 = "example"; // string<100>
rec.strVar3 = "example"; // string<40>
rec.strVar4 = "example"; // string<90>
rec.integerVar = i; // 4 bytes
rec.longVar = i; // 8 bytes
rec.shortVar = i; // 2 bytes

InstanceHandle_t handle = writer.register_instance(rec);

writer.write(rec, handle);
}

//while loop to writer.get_datawriter_cache_status and print status.sample_count every second.


The C++ reader application's subscriber initialization is as follows:

DDS_DataReaderQos dataReaderQos;
subscriber->get_default_datareader_qos(dataReaderQos);
dataReaderQos.deadline.period.nanosec = DDS_DURATION_INFINITE_NSEC;
dataReaderQos.reliability.kind = DDS_RELIABLE_RELIABILITY_QOS;
dataReaderQos.durability.direct_communication = true;
dataReaderQos.resource_limits.max_samples_per_instance = 1;
dataReaderQos.resource_limits.max_samples = 2000;
dataReaderQos.resource_limits.max_instances = 2000;
dataReaderQos.durability.kind = DDS_PERSISTENT_DURABILITY_QOS;
dataReaderQos.history.kind = DDS_KEEP_LAST_HISTORY_QOS;
dataReaderQos.history.depth = 1;
dataReaderQos.ownership.kind = DDS_SHARED_OWNERSHIP_QOS;
dataReaderQos.liveliness.lease_duration.sec = DDS_DURATION_INFINITE_SEC;

subscriber->set_default_datareader_qos(dataReaderQos);

 

void on_data_available(DDSDataReader* reader)
{
ExampleRecord_reader = ExampleRecord::narrow(reader);
//null check

retcode = ExampleReader->take(dataSeq, info_seq, DDS_LENGTH_UNLIMITED, DDS_NOT_READ_SAMPLE_STATE, DDS_ANY_VIEW_STATE,DDS_ANY_VIEW_STATE);
//retcode check

for(i=0;i<data_seq.length();++i)
{
if(info_seq[i].valid_data)
{
cout << "index: " << dataSeq[i].iindex << endl;
}
}
}

 

 

 

Offline
Last seen: 11 years 5 months ago
Joined: 05/27/2013
Posts: 8

Will I be able to get some help regarding this issue?

rose's picture
Offline
Last seen: 3 years 5 months ago
Joined: 08/22/2011
Posts: 148

Hello kuanyong, 

1. This is the upper bound of what you should be able to send.  Those 1024 bytes are used for your data, but also for RTPS per-packet overhead, and for reliability overhead (heartbeats and ACKNACKs) 

2. There is a minimum size of 1024 bytes for the bytes_per_token (described here).  I am filing a bug report agaist the error message, since it doesn't point to the actual problem.

3. Since you have PERSISTENT durability, it makes sense that the DataWriter queue will always fill to the maximum available samples.  Those are being stored for late joiners.  You are correct that if you update instance #10, with history = KEEP_LAST / depth = 1, this new sample will overwrite the previous sample for instance #10.  

4. It's hard to be 100% certain of what is happening, but one thing I would start with: the default reliability tuning sends heartbeats every 3 seconds.  If any of these packets are lost, it can take 3+ seconds until it is repaired.  If you set this smaller, there is a tradeoff, because you will be using more of your flow-controlled bandwidth for reliablility. Still, you might want to try setting this to 100 ms.  (This is in dataWriterQos.protocol.rtps_reliable_writer.heartbeat_period). Edit: You will also have to set the fast_heartbeat_period and the late_joiner_heartbeat_period to avoid inconsistent QoS. 

Can you share a WireShark packet?  That might show if a lot of data is being dropped.  Also, are you using a Persistence Service in your testing?

Edit: One thing I forgot to note is that the bandwidth allowed by the flow controller will be divided between all of the DataReader/transport combinations that are receiving that data.  So, if you have a DataReader on your local machine (such as rtiddsspy), some of the flow-controlled bandwidth is being allocated to send data to that local DataReader.  This could also account for you seeing far less throughput than you expect.

Thank you,

Rose