Subscription to all topics

10 posts / 0 new
Last post
Gan
Offline
Last seen: 8 years 3 months ago
Joined: 08/08/2014
Posts: 17
Subscription to all topics

Using the MonitorData.java code and other example code I made a logger that discovers topics and subscribes to them with a DynamicDataReader. All is well until the data reader freaks out over inconsistencies in the QoS between the data reader and the topic. To fix that I tried copying the QoS of the topic when it's discovered and transfer that over to the data reader. Unfortunately I get the same error:

on_requested_incompatible_qos: RequestedIncompatibleQosStatus[total_count=1, total_count_change=1, last_policy_id=Ownership, policies=[QosPolicyCount[policy_id=Ownership, count=1]]]

I get the requested incompatible qos on ownership. On inspection, the topic's QoS ownership is Exclusive. Because of the inconsistency, the data reader recieves no data.

 

How do I fix this? To make my data reader able to listen to any topic regardless of QoS inconsistencies?

Thanks,

Gan

AttachmentSize
File dynamiclistener.java5.02 KB
File logger.java3.78 KB
File topiclistener.java11.67 KB
Offline
Last seen: 3 years 1 week ago
Joined: 01/15/2013
Posts: 94

Hi Gan,

A few comments about the code and your questions, after a quick inspection:

1) You cannot make a DataReader talk with an incompatible DataWriter. That breaks the intent and power of QoS settings.

2) In you TopicListener class you copy a lot of QoS settings from a variable called "def" that I couldn't find the definition for.

2.1) First, to get QoS settings from the Topic QoS, you could use method "Subscriber.copy_from_topic_qos()", which copies the settings that are shared between DataReaders and DataWriters via the Topic QoS. I would suggest that you use this instead of manually copying every field.

2.2) I would suggest that, instead of managing that many QoS settings in your code, you define the QoS settings via XML (see chapter 17 of the RTI Connext Core Libraries and Utilities User's Manual). Then you can define and modify your QoS settings without having to recompile your application code every time.

3) In your code, you copy the ownership QoS, not from the Topic QoS, but you get it from a "def" variable. That's probably what's causing the Request vs. Offered semantics to fail. Take a closer look at this class because there are many settings that you are setting that maybe you don't need to.

Thanks,

Juanlu

Gan
Offline
Last seen: 8 years 3 months ago
Joined: 08/08/2014
Posts: 17

Thanks for the reply.

I'm trying to follow your "Subscriber.copy_from_topic_qos()" suggestion but I can't seem to get the TopicQos. The topic listener gives me a PublicationBuiltinTopicData object that has QoS data inside it but I'm not sure how I can obtain a TopicQos from it and use that to copy the QoS settings to the data reader.

Offline
Last seen: 3 years 1 week ago
Joined: 01/15/2013
Posts: 94

Hi Gan,

Oh, I was suggesting to use copy_from_topic_qos() for the DataReaderQos once you have filled up the TopicQos instance from the discovery information. This is,

...
TopicQos topicQOS = DomainParticipant.TOPIC_QOS_DEFAULT;
topicQOS.deadline.copy_from(id.deadline);
topicQOS.destination_order.copy_from(id.destination_order);
topicQOS.durability.copy_from(id.durability);
topicQOS.durability_service.copy_from(id.durability_service);
topicQOS.latency_budget.copy_from(id.latency_budget);
topicQOS.lifespan.copy_from(id.lifespan);
topicQOS.liveliness.copy_from(id.liveliness);
topicQOS.ownership.copy_from(id.ownership);
topicQOS.reliability.copy_from(id.reliability);

DataReaderQos drQOS = Subscriber.DATAREADER_QOS_DEFAULT;
subscriber.copy_from_topic_qos(drQOS, topicQOS);

...
                     

Having looked again into your code again, it is strange that you're getting incompatible QoS settings when copying the settings from the discovery data, because Ownership works with Requested vs. Offered semantics. Do you print the value you're getting and the one your using for creating the DataReader and Topic?

rip
rip's picture
Offline
Last seen: 1 day 8 hours ago
Joined: 04/06/2012
Posts: 324

As an aside, I would also suggest you reimplement your setup to use waitsets instead of listener callbacks -- you have a lot of stuff going on in those callbacks (including creating entities), which is blocking the receive thread for that resource. 

Waitset example code is available in the online documentation.  Java API > Modules > Programming How Tos > Waitset Use Cases

Gan
Offline
Last seen: 8 years 3 months ago
Joined: 08/08/2014
Posts: 17

Thanks for the Waitset suggestion, I'll give that a go.

Unfortunately I'm still having a QoS problem. I simplified it a bit, here's the data reader creation code:

TopicQos topicQOS = DomainParticipant.TOPIC_QOS_DEFAULT;

DataReaderQos drQOS = Subscriber.DATAREADER_QOS_DEFAULT;

topicQOS.ownership.copy_from(topicData.ownership);

Topic topic = participant.create_topic(topicData.topic_name, topicData.type_name, topicQOS, null, StatusKind.STATUS_MASK_NONE);

if (topic == null) {

System.err.println("Unable to create topic.");

return;

}

// Create the data reader using the default publisher

DataReader dataReader = (DataReader) participant.create_datareader(topic, drQOS,new DynamicListener(this),         StatusKind.STATUS_MASK_ALL);

dataReader.get_subscriber().copy_from_topic_qos(drQOS, topicQOS);

dataReader.enable();

dataReader.get_statuscondition().set_enabled_statuses(StatusKind.DATA_AVAILABLE_STATUS);

topicData is the PublicationBuiltinTopicData. I still get the inconsistent QOS error with ownership. I thought I was copying the ownership QOS correctly in this chunk of code. What did I do wrong?

rip
rip's picture
Offline
Last seen: 1 day 8 hours ago
Joined: 04/06/2012
Posts: 324

You're creating the reader with the default qos, then altering the qos variable. 

But when you created the reader, it copied the qos into it's internal settings. 

ie, changing the value AFTER using the value is not going to work.

Try this:

// Create the data reader using the default subscriber, using the updated qos

drQOS.ownership.copy_from(topicData.ownership);

DataReader datareader = participant.create_datareader(topic, drQOS, 
	new DynamicListener(this), StatusKind.STATUS_MASK_ALL);

// datareader.enable();  // uncomment if necessary, default behavior 
                         // is to create entities enabled, only necessary 
                         // if you have explicitly told the factory to create
                         // entities in a disabled state.
// note that the set_enabled_statuses is also superfluous, since that's what 
// the "StatusKind.STATUS_MASK_ALL" is doing.

 

note: I have not typed this in Eclipse or syntax aware editor, there may be typos in my code, if so let me know and I'll update the above to fix them.

 

Gan
Offline
Last seen: 8 years 3 months ago
Joined: 08/08/2014
Posts: 17

No dice, I still recieve the on_requested_incompatible_qos error in regards to ownership policy.

Is there other example code that creates a DynamicData reader that copies the QoS settings that I can look at?

rip
rip's picture
Offline
Last seen: 1 day 8 hours ago
Joined: 04/06/2012
Posts: 324

This is the waitset I use.  It places incoming "Publication" announcements (ie, there's a DataWriter been found) into a local hashmap so that I can avoid reprocessing the same topic when a new DataWriter for that topic appears.   This all happens in an application thread, even the created DataReader for the topic is not using Listeners (I have a thread that loops on the list of DataReaders and polls them for any new data).

(reformatted)

    // defined at wider scope:  
    //    private static Condition ping = new GuardCondition();
    //    private HashMap<String, Topic> topicTable = 
    //        new HashMap<String, Topic>();
    //    private HashMap<String, TypeCode> tcTable = 
    //        new HashMap<String, TypeCode>();


    public void publicationsWaitset () {
        HashMap<String, String> found = new HashMap<String, String>();
        
        // create waitset
        WaitSet waitset = new WaitSet();

        // attach conditions
        Condition ting = reader.create_readcondition (
                SampleStateKind.NOT_READ_SAMPLE_STATE,
                ViewStateKind.ANY_VIEW_STATE,
                InstanceStateKind.ANY_INSTANCE_STATE);

        waitset.attach_condition(ting);
        waitset.attach_condition(ping);

        // wait
        boolean rego = true;
        while (rego) {
            // 1 Second timeout, but should be cl arg
            Duration_t timeout = new Duration_t(1, 0); 
            // List of active conditions
            ConditionSeq active_conditions = new ConditionSeq(); 
            boolean is_ting_triggered = false;
            boolean is_ping_triggered = false;
            try {
                waitset.wait(active_conditions, timeout);
                for (int i = 0; i < active_conditions.size(); ++i) {
                    if (active_conditions.get(i) == ting) {
                        is_ting_triggered = true;
                    }
                    if (active_conditions.get(i) == ping) {
                        is_ping_triggered = true;
                    }
                }
                if (is_ting_triggered) {
                    // local copies
                    PublicationBuiltinTopicDataSeq _dataSeq = 
                            new PublicationBuiltinTopicDataSeq();
                    SampleInfoSeq _infoSeq = new SampleInfoSeq();

                    try {
                        reader.take(
                                _dataSeq, _infoSeq,
                                ResourceLimitsQosPolicy.LENGTH_UNLIMITED,
                                SampleStateKind.ANY_SAMPLE_STATE,
                                ViewStateKind.ANY_VIEW_STATE,
                                InstanceStateKind.ANY_INSTANCE_STATE);

                        for(int i = 0; i < _dataSeq.size(); ++i) {
                            SampleInfo info = (SampleInfo)_infoSeq.get(i);
                            PublicationBuiltinTopicData data = 
                                    (PublicationBuiltinTopicData)_dataSeq.get(i);

                            if (info.valid_data) {
                                if (found.containsKey(data.topic_name)) {
                                    System.out.println("Known: skipped");
                                } else if ((topicRegex != null) 
                                             && (!data.topic_name.matches(topicRegex))) {
                                    System.out.println("Topic: skipped - "
                                        + "Does not match topicRegex (" + topicRegex + ")");
                                    found.put(data.topic_name, data.type_name);
                                } else if ((typeRegex != null) 
                                             && (!data.type_name.matches(typeRegex))) {
                                    System.out.println("Type: skipped - "
                                        + "Does not match typeRegex (" + typeRegex + ")");
                                    found.put(data.topic_name, data.type_name);
                                } else {
                                    found.put(data.topic_name, data.type_name);
                                    // create a DynamicData Reader for this topic
                                    // first register the type
                                    String topicName = data.topic_name;
                                    String typeName = data.type_name;
                                    TypeCode tc = data.type_code;
                                    DynamicDataTypeSupport typeSupport = 
                                        new DynamicDataTypeSupport(
                                            tc, 
                                            DynamicDataTypeSupport.TYPE_PROPERTY_DEFAULT);
                                    typeSupport.register_type(domainParticipant, typeName);
                                            
                                    // then create topic
                                    Topic t = domainParticipant.create_topic(
                                            topicName, typeName, 
                                            DomainParticipant.TOPIC_QOS_DEFAULT,
                                            null, StatusKind.STATUS_MASK_NONE);
                                    topicTable.put(topicName, t);
                                    
                                    // then create dynamic data reader
                                    DataReaderQos drqos = new DataReaderQos();
                                    domainSubscriber.get_default_datareader_qos(drqos);
                                    drqos.liveliness.copy_from(data.liveliness);
                                    drqos.deadline.copy_from(data.deadline);
                                    drqos.destination_order.copy_from(data.destination_order);
                                    drqos.durability.copy_from(data.durability);
                                    drqos.ownership.copy_from(data.ownership);
                                    drqos.reliability.copy_from(data.reliability);
                                    
                                    DataReader dr = domainSubscriber.create_datareader(
                                            t, drqos,
                                            null, StatusKind.STATUS_MASK_NONE);
                                    DynamicDataReader ddr = (DynamicDataReader)dr;
                                    readerTable.put(topicName, ddr);
                                            
                                    System.out.println("OK:Created DataReader for Topic \"" 
                                        + topicName + "\"");                                
                                }
                            }
                        }
                    } catch (RETCODE_NO_DATA noData) {
                        // No data to process
                    } finally {
                        reader.return_loan(_dataSeq, _infoSeq);
                    }
                }
                if (is_ping_triggered) {
                    // ping received, so stop
                    rego = false;
                }
            } catch (RETCODE_TIMEOUT timed_out) {
            } catch (RETCODE_ERROR ex) {
                // ... check for cause of failure
                throw ex;
            }
            // return to wait
        }
        
        reader.delete_readcondition((ReadCondition)ting);
        
        waitset.delete();
        waitset = null;
    }
    
    



 

Gan
Offline
Last seen: 8 years 3 months ago
Joined: 08/08/2014
Posts: 17

That did the trick, it works. Thanks.