Late joiner history discrepancy between C++ vs. Java, is there a race condition?

7 posts / 0 new
Last post
Offline
Last seen: 1 week 3 days ago
Joined: 09/09/2018
Posts: 4
Late joiner history discrepancy between C++ vs. Java, is there a race condition?

Setup

  • 80 participants
  • 13 Topics on domain
  • Topic A contains 14,000 unique key values
    • Topic A’s history is KEEP_LAST with depth 1
    • Topic A’s durability is TRANSIENT_LOCAL
    • Topic A’s reliability is RELIABLE
    • Topic A’s all sample elements are set to UNLIMITED
  • Network is 100 Mb/s
  • Late joining Java code doesn’t receive all the history for topic A
  • Created a standalone c++ app that is using the exact same Qos and it consistently receive more history than the Java
  • Both the c++ and Java discovery the same number of participant and subscription matches
  • When I look at a key that is missing from a participant, the participant has 12 key values it is producing but the Java is missing 1 of the 12
  • During the running of the Java over 6 minutes, I print out lost and rejected stats every 30 seconds, and at the end of 6 minutes they are both zero
    • SampleLostStatus::total_count_change = 0
    • SampleLostStatus::total_count = 0
    • SampleLostStatus::last_reason = NOT_LOST
    • SampleRejectedStatus::total_count_change = 0
    • SampleRejectedStatus::total_count = 0
    • SampleRejectedStatus::last_reason = NOT_REJECTED
  • I don’t see drops in /proc/net/udp for any of the sockets

How is it possible that I discover a participant, get a subscription match and receive only 11 out of the 12 keys it is publishing?

 

Howard's picture
Offline
Last seen: 2 days 3 hours ago
Joined: 11/29/2012
Posts: 679

Don't quite understand fully your scenario.

Is Topic A the only one with TRANSIENT_LOCAL?   How many participants publish TOPIC A?

You wrote that a participant has 12 key values, I guess you mean the DataWriter in that participant published 12 key values.   Do all participants (80) publish 12 unique key values (different from each other)?  (I assume that a "key value" is a unique value for the key, so 12 key values == 12 instances)

You wrote that Topic A has 14,000 instances (aka key values).  But 80 participants * 12 key values per participant = 960 key values (instances).

How many total instances are there?

What is your Java code for receiving Data from the DataReader?

Does "more history" mean all values that you expect to receive, i.e., one value for each instance that was published for Topic A by a participant?   or is the C++ app also missing data, but just getting "more" than the Java app?

 

Offline
Last seen: 1 week 3 days ago
Joined: 09/09/2018
Posts: 4

Is Topic A the only one with TRANSIENT_LOCAL?

No, I counted wrong there are 16 Topics of which 7 are TRANSIENT_LOCAL, the others are VOLATILE

 How many participants publish TOPIC A?

 80 participants

What is your Java code for receiving Data from the DataReader?

I can't post it but it is using a waitset.

Does "more history" mean ...

Still trying to confirm what the total nuimber I should expect to receive and C++ app seems to always get more that Java and C++ get the same number everytime it is run.

Howard's picture
Offline
Last seen: 2 days 3 hours ago
Joined: 11/29/2012
Posts: 679

What is the code that calls the DataReader object to get the data?

And how do you loop through the returned data?

Is it exactly the same in C++ and Java...or are you using different APIs?

Offline
Last seen: 1 week 3 days ago
Joined: 09/09/2018
Posts: 4
 
Double checked the Java and it is not using a waitset but using a looping thread

periodicLoop () {

   for (;;) {
     try {
        reader.take_untyped(data, info_seq, ResourceLimitsQosPolicy.LENGTH_UNLIMITED,
                            SampleStateKind.NOT_READ_SAMPLE_STATE, ViewStateKind.ANY_VIEW_STATE,
                            InstanceStateKind.ALIVE_INSTANCE_STATE);
        for (Object obj : data) {
            DynamicData dataElem = (DynamicData) obj;
            // Process the data
        }
     } catch (RETCODE_NO_DATA ex) {
        // Do nothing
     } catch (Exception ex) {
        // log something
     } finally {
        reader.return_loan_untyped(data, info_seq);
     }
     try {
        Thread.sleep(100);
     } catch (InterruptedException ex) {
         // log something
     }
   }
}

The C++ standalone app uses dds::sub::DataReaderListener.

void on_data_available(&reader) {
    dds::sub::LoanedSamples<dds::core::xtypes::DynamicData> samples = reader.take();
    for (const auto & sample : samples) {
        
if (sample.info().valid()) {
            // Process data
        }
        else {
            // Received metadata
        }
    }
}

This is the distribution of the keys publish per node. 
I am missing 2 participants because they where offline.

Node: count of keys published
-------------------------------
0: 342 20: 254 40: 269 60: 13
1: 204 21: 152 41: 226 61: 14
2: 235 22: 244 42: 313 62: 14
3: 465 23: 164 43: 248 63: 14
4: 145 24: 204 44: 282 64: 14
5: 277 25: 439 45: 147 65: 14
6: 219 26: 264 46: 13 66: 14
7: 210 27: 211 47: 7 67: 14
8: 286 28: 186 48: 14 68: 13
9: 210 29: 283 49: 14 69: 13
10: 319 30: 187 50: 14 70: 13
11: 245 31: 219 51: 14 71: 13
12: 287 32: 216 52: 14 72: 13
13: 434 33: 352 53: 14 73: 13
14: 341 34: 269 54: 14 74: 13
15: 215 35: 308 55: 14 75: 347
16: 181 36: 168 56: 15 76: 583
17: 212 37: 182 57: 14 77: 1604
18: 273 38: 143 58: 14
19: 195 39: 152 59: 14

Howard's picture
Offline
Last seen: 2 days 3 hours ago
Joined: 11/29/2012
Posts: 679

So, in your Java APP, I see

        reader.take_untyped(data, info_seq, ResourceLimitsQosPolicy.LENGTH_UNLIMITED,
                            SampleStateKind.NOT_READ_SAMPLE_STATE, ViewStateKind.ANY_VIEW_STATE,
                            InstanceStateKind.ALIVE_INSTANCE_STATE);

and in C++

    dds::sub::LoanedSamples<dds::core::xtypes::DynamicData> samples = reader.take();

 

And these are different in terms of the parameters to take().  In Modern C++, calling take() without arguments is equivalent to

        reader.take_untyped(data, info_seq, ResourceLimitsQosPolicy.LENGTH_UNLIMITED,
                            SampleStateKind.ANY_SAMPLE_STATE, ViewStateKind.ANY_VIEW_STATE,
                            InstanceStateKind.ANY_INSTANCE_STATE);

You can try to change your Java code to invoke take() the same way.

 

BTW, when the app is not receiving all instances...it is always from the same publishing participant?  That would be a clue that this isn't a random event.

 

 

Offline
Last seen: 1 week 3 days ago
Joined: 09/09/2018
Posts: 4

is it always from the same publishing participant?

No, seems to be from random nodes.
Hope to run the different parameters test today.