Missing Thousands of Messages using DynamicData Listener

2 posts / 0 new
Last post
Offline
Last seen: 2 years 5 months ago
Joined: 04/18/2022
Posts: 8
Missing Thousands of Messages using DynamicData Listener
Hi All, I am experiencing an issue where my Listener is missing messages when I use DynamicData and configuring QOS seems to have no effect. I have an application that subscribes to many topics with the purpose of reading all the messages and persisting an XML and binary version for each (dont ask why). We have a publisher pushing a test set of 69,000 messages into the DDS for us to consume. Each message is on one of ten Topics each with a single Type. We have subscribed to the Topics, registered Types, created Topics and subscribed to them with a DataReader and Listener. If all I do is record the count of times on_data_available() is invoked we get 69,000
public void on_data_available( DataReader reader ) {
    // record count
    // DO NOTHING MORE -- ALL OK
}

If I use a DynamicDataReader to take messages and get the DynamicData from the sequence and stop there, we get 69,000 messages coming through.
public void on_data_available( DataReader reader ) {

    // record count

    DynamicDataReader dynamicDataReader = (DynamicDataReader) reader;
    try {
        dynamicDataReader.take(dynamicDataSeq, sampleInfoSeq, 
                ResourceLimitsQosPolicy.LENGTH_UNLIMITED, 
                SampleStateKind.ANY_SAMPLE_STATE, 
                ViewStateKind.ANY_VIEW_STATE, 
                InstanceStateKind.ANY_INSTANCE_STATE);
        
        for (int i = 0; i < dynamicDataSeqSize; ++i) {

            SampleInfo info = (SampleInfo) sampleInfoSeq.get(i);
            if ( info.valid_data ) {

                DynamicData dynamicData = (DynamicData) dynamicDataSeq.get(i);

                // DO NOTHING MORE -- ALL OK 
            }
        }

    } catch ( RETCODE_NO_DATA noData ) {
        reuturn;

    } finally {
        dynamicDataReader.return_loan(dynamicDataSeq, sampleInfoSeq);
    }
}
As soon as I try to get the Type from the DynamicData and read the name or do stuff with it the number of messages we get, and the number of times on_data_available() is called, drops to ~500 messages of the 69,000
public void on_data_available( DataReader reader ) {

    // record count

    DynamicDataReader dynamicDataReader = (DynamicDataReader) reader;
    try {
        dynamicDataReader.take(dynamicDataSeq, sampleInfoSeq, 
                ResourceLimitsQosPolicy.LENGTH_UNLIMITED, 
                SampleStateKind.ANY_SAMPLE_STATE, 
                ViewStateKind.ANY_VIEW_STATE, 
                InstanceStateKind.ANY_INSTANCE_STATE);
        
        for (int i = 0; i < dynamicDataSeqSize; ++i) {

            SampleInfo info = (SampleInfo) sampleInfoSeq.get(i);
            if ( info.valid_data ) {

                DynamicData dynamicData = (DynamicData) dynamicDataSeq.get(i);

                TypeCode typeCode = dynamicData.get_type();   // <--- THIS

                String typeName = typeCode.name();

                // get XML and binary
            }
        }

    } catch ( RETCODE_NO_DATA noData ) {
        reuturn;

    } finally {
        dynamicDataReader.return_loan(dynamicDataSeq, sampleInfoSeq);
    }
}
We noted that the Default TopicQos and DataReaderQos was configured as
Reliability 'BEST_EFFORT_RELIABILITY_QOS'
Acknowledgment 'PROTOCOL_ACKNOWLEDGMENT_MODE'
So we have changed that to 
TopicQos topicQos = DomainParticipant.TOPIC_QOS_DEFAULT;
topicQos.reliability.kind = ReliabilityQosPolicyKind.RELIABLE_RELIABILITY_QOS;
topicQos.reliability.acknowledgment_kind = ReliabilityQosPolicyAcknowledgmentModeKind.APPLICATION_AUTO_ACKNOWLEDGMENT_MODE;
and
DataReaderQos dataReaderQos = Subscriber.DATAREADER_QOS_DEFAULT;
dataReaderQos.reliability.kind = ReliabilityQosPolicyKind.RELIABLE_RELIABILITY_QOS;
dataReaderQos.reliability.acknowledgment_kind = ReliabilityQosPolicyAcknowledgmentModeKind.APPLICATION_AUTO_ACKNOWLEDGMENT_MODE;
With no effect... still ~500 messages. Clearly the amount of time it takes to call dynamicData.get_type() is impacting on the performance of the on_data_available() method and we're missing messages. If setting the QOS doesnt resolve this what can? Any assistance you can offer will be gratefully recieved Cheers Al  
Howard's picture
Offline
Last seen: 15 hours 15 min ago
Joined: 11/29/2012
Posts: 608

Hi,

So as I responded in my reply to your other inquiry, QoS policies must be confgured both on the DataWriter and DataReader in order to affect end-to-end behavior.  (and the Topic QoS actually does not configure any behavior by default...so your configuration of the Topic QoS is a noop unless you're explicitly creating DataReaders and DataWriters to use the QoS of the Topic that they are created with)  So to configure a strictly reliable connection (aka TCP-like reliability), you need to configure the Reliability and HIstory QoS settings of the DataReaderQos and DataWriterQos and use them to create your DataReader and DataWriter.

Or...a better practice is to use a builtin QoS Profile to create your DataReader and DataWriter since the builtin profiles are preconfigured to enable different behaviors.

You can read more abut QoS Profiles in these articles:

https://community.rti.com/examples/built-qos-profiles

https://community.rti.com/kb/configuring-qos-built-profiles