Create DataReader ================= .. highlight:: c :: DDS_DataReader *datareader = NULL; struct DDS_DataReaderQos dr_qos = DDS_DataReaderQos_INITIALIZER; struct DDS_DataReaderListener dr_listener = DDS_DataReaderListener_INITIALIZER; /* Configure Reader Qos */ dr_qos.protocol.rtps_object_id = 200; dr_qos.resource_limits.max_instances = 2; dr_qos.resource_limits.max_samples_per_instance = 2; dr_qos.resource_limits.max_samples = dr_qos.resource_limits.max_samples_per_instance * dr_qos.resource_limits.max_instances; dr_qos.reader_resource_limits.max_remote_writers = 10; dr_qos.reader_resource_limits.max_remote_writers_per_instance = 10; dr_qos.history.depth = 1; dr_qos.durability.kind = DDS_VOLATILE_DURABILITY_QOS; dr_qos.reliability.kind = DDS_RELIABLE_RELIABILITY_QOS; /* Set listener callbacks */ dr_listener.on_data_available = HelloWorldSubscriber_on_data_available; dr_listener.on_subscription_matched = HelloWorldSubscriber_on_subscription_matched; datareader = DDS_Subscriber_create_datareader(subscriber, DDS_Topic_as_topicdescription(topic), &dr_qos, &dr_listener, DDS_DATA_AVAILABLE_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS); if (datareader == NULL) { /* failure */ } The DataReaderListener_ has its callbacks selectively enabled by the DDS status mask. In the example, the mask has set the DDS_SUBSCRIPTION_MATCHED_STATUS_ and DDS_DATA_AVAILABLE_STATUS_ statuses, and accordingly the DataReaderListener_ has its on_subscription_matched_ and on_data_available_ assigned to callback functions. :: void HelloWorldSubscriber_on_subscription_matched(void *listener_data, DDS_DataReader * reader, const struct DDS_SubscriptionMatchedStatus *status) { if (status->current_count_change > 0) { printf("Matched a publisher\n"); } else { printf("Unmatched a publisher\n"); } } :: void HelloWorldSubscriber_on_data_available(void* listener_data, DDS_DataReader* reader) { HelloWorldDataReader *hw_reader = HelloWorldDataReader_narrow(reader); DDS_ReturnCode_t retcode; struct DDS_SampleInfo *sample_info = NULL; HelloWorld *sample = NULL; struct DDS_SampleInfoSeq info_seq = DDS_SEQUENCE_INITIALIZER(struct DDS_SampleInfo); struct HelloWorldSeq sample_seq = DDS_SEQUENCE_INITIALIZER(HelloWorld); const DDS_Long TAKE_MAX_SAMPLES = 32; DDS_Long i; retcode = HelloWorldDataReader_take(hw_reader, &sample_seq, &info_seq, TAKE_MAX_SAMPLES, DDS_ANY_SAMPLE_STATE, DDS_ANY_VIEW_STATE, DDS_ANY_INSTANCE_STATE); if (retcode != DDS_RETCODE_OK) { printf("failed to take data: %d\n", retcode); goto done; } /* Print each valid sample taken */ for (i = 0; i < HelloWorldSeq_get_length(&sample_seq); ++i) { sample_info = DDS_SampleInfoSeq_get_reference(&info_seq, i); if (sample_info->valid_data) { sample = HelloWorldSeq_get_reference(&sample_seq, i); printf("\nSample received\n\tmsg: %s\n", sample->msg); } else { printf("not valid data\n"); } } HelloWorldDataReader_return_loan(hw_reader, &sample_seq, &info_seq); done: HelloWorldSeq_finalize(&sample_seq); DDS_SampleInfoSeq_finalize(&info_seq); } DPSE Discovery: Assert Remote Publication ----------------------------------------- A subscribing application using DPSE_ discovery must specify the other *DataWriters* that its *DataReaders* are allowed to discover. Like the API for asserting a remote participant, the DPSE_ API for asserting a remote publication must be called for each remote *DataWriter* that a *DataReader* may discover. :: struct DDS_PublicationBuiltinTopicData rem_publication_data = DDS_PublicationBuiltinTopicData_INITIALIZER; /* Set Writer's protocol.rtps_object_id */ rem_publication_data.key.value[DDS_BUILTIN_TOPIC_KEY_OBJECT_ID] = 100; rem_publication_data.topic_name = DDS_String_dup("Example HelloWorld"); rem_publication_data.type_name = DDS_String_dup("HelloWorld"); rem_publication_data.reliability.kind = DDS_RELIABLE_RELIABILITY_QOS; retcode = DPSE_RemotePublication_assert(participant, "Participant_1", &rem_publication_data, HelloWorld_get_key_kind(HelloWorldTypePlugin_get(), NULL))); if (retcode != DDS_RETCODE_OK) { /* failure */ } Asserting a remote publication requires configuration of all QoS policies necessary to determine matching. Receiving Samples ----------------- Accessing received samples can be done in a few ways: - **Polling**. Do read or take within a periodic polling loop. - **Listener**. When a new sample is received, the DataReaderListener_'s on_data_available_ is called. Processing is done in the context of the middleware's receive thread. See the above HelloWorldSubscriber_on_data_available callback for example code. - **Waitset**. Create a waitset, attach it to a status condition with the data_available status enabled, and wait for a received sample to trigger the waitset. Processing is done in the context of the user's application thread. (Note: the code snippet below is taken from the shipped HelloWorld_dpde_waitset example). :: DDS_WaitSet *waitset = NULL; struct DDS_Duration_t wait_timeout = { 10, 0 }; /* 10 seconds */ DDS_StatusCondition *dr_condition = NULL; struct DDS_ConditionSeq active_conditions = DDS_SEQUENCE_INITIALIZER(struct DDS_ConditionSeq); if (!DDS_ConditionSeq_initialize(&active_conditions)) { /* failure */ } if (!DDS_ConditionSeq_set_maximum(&active_conditions, 1)) { /* failure */ } waitset = DDS_WaitSet_new(); if (waitset == NULL ) { /* failure */ } dr_condition = DDS_Entity_get_statuscondition(DDS_DataReader_as_entity(datareader)); retcode = DDS_StatusCondition_set_enabled_statuses(dr_condition, DDS_DATA_AVAILABLE_STATUS); if (retcode != DDS_RETCODE_OK) { /* failure */ } retcode = DDS_WaitSet_attach_condition(waitset, DDS_StatusCondition_as_condition(dr_condition)); if (retcode != DDS_RETCODE_OK) { /* failure */ } retcode = DDS_WaitSet_wait(waitset, active_conditions, &wait_timeout); switch (retcode) { case DDS_RETCODE_OK: { /* This WaitSet only has a single condition attached to it * so we can implicitly assume the DataReader's status condition * to be active (with the enabled DATA_AVAILABLE status) upon * successful return of wait(). * If more than one conditions were attached to the WaitSet, * the returned sequence must be examined using the * commented out code instead of the following. */ HelloWorldSubscriber_take_data(HelloWorldDataReader_narrow(datareader)); /* DDS_Long active_len = DDS_ConditionSeq_get_length(&active_conditions); for (i = active_len - 1; i >= 0; --i) { DDS_Condition *active_condition = *DDS_ConditionSeq_get_reference(&active_conditions, i); if (active_condition == DDS_StatusCondition_as_condition(dr_condition)) { total_samples += HelloWorldSubscriber_take_data( HelloWorldDataReader_narrow(datareader)); } else if (active_condition == some_other_condition) { do_something_else(); } } */ break; } case DDS_RETCODE_TIMEOUT: { printf("WaitSet_wait timed out\n"); break; } default: { printf("ERROR in WaitSet_wait: retcode=%d\n", retcode); break; } } Filtering Samples ----------------- In lieu of supporting Content-Filtered Topics, a DataReaderListener_ in |rti_me| provides callbacks to do application-level filtering per sample. - **on_before_sample_deserialize**. Through this callback, a received sample is presented to the application before it has been deserialized or stored in the *DataReader*'s queue. - **on_before_sample_commit**. Through this callback, a received sample is presented to the application after it has been deserialized but before it has been stored in the *DataReader*'s queue. You control the callbacks' sample_dropped parameter; upon exiting either callback, the *DataReader* will drop the sample if sample_dropped is true. Consequently, dropped samples are not stored in the *DataReader*'s queue and are not available to be read or taken. Neither callback is associated with a DDS Status. Rather, each is enabled when assigned, to a non-NULL callback. NOTE: Because it is called after the sample has been deserialized, on_before_sample_commit_ provides an additional sample_info_ parameter, containing some of the usual sample information that would be available when the sample is read or taken. The HelloWorld_dpde example's subscriber has this on_before_sample_commit_ callback: :: DDS_Boolean HelloWorldSubscriber_on_before_sample_commit( void *listener_data, DDS_DataReader *reader, const void *const sample, const struct DDS_SampleInfo *const sample_info, DDS_Boolean *dropped) { HelloWorld *hw_sample = (HelloWorld *)sample; /* Drop samples with even-numbered count in msg */ HelloWorldSubscriber_filter_sample(hw_sample, dropped); if (*dropped) { printf("\nSample filtered, before commit\n\tDROPPED - msg: %s\n", hw_sample->msg); } return DDS_BOOLEAN_TRUE; } ... dr_listener.on_before_sample_commit = HelloWorldSubscriber_on_before_sample_commit; For more information, see the :doc:`../usersmanual/receiving` section in the User's Manual.