3.8. Create DataReader¶
DDS_DataReader *datareader = NULL;
struct DDS_DataReaderQos dr_qos = DDS_DataReaderQos_INITIALIZER;
struct DDS_DataReaderListener dr_listener = DDS_DataReaderListener_INITIALIZER;
/* Configure Reader Qos */
dr_qos.protocol.rtps_object_id = 200;
dr_qos.resource_limits.max_instances = 2;
dr_qos.resource_limits.max_samples_per_instance = 2;
dr_qos.resource_limits.max_samples =
dr_qos.resource_limits.max_samples_per_instance * dr_qos.resource_limits.max_instances;
dr_qos.reader_resource_limits.max_remote_writers = 10;
dr_qos.reader_resource_limits.max_remote_writers_per_instance = 10;
dr_qos.history.depth = 1;
dr_qos.durability.kind = DDS_VOLATILE_DURABILITY_QOS;
dr_qos.reliability.kind = DDS_RELIABLE_RELIABILITY_QOS;
/* Set listener callbacks */
dr_listener.on_data_available = HelloWorldSubscriber_on_data_available;
dr_listener.on_subscription_matched = HelloWorldSubscriber_on_subscription_matched;
datareader = DDS_Subscriber_create_datareader(subscriber,
DDS_Topic_as_topicdescription(topic),
&dr_qos,
&dr_listener,
DDS_DATA_AVAILABLE_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS);
if (datareader == NULL)
{
/* failure */
}
The DataReaderListener has its callbacks selectively enabled by the DDS status mask. In the example, the mask has set the DDS_SUBSCRIPTION_MATCHED_STATUS and DDS_DATA_AVAILABLE_STATUS statuses, and accordingly the DataReaderListener has its on_subscription_matched and on_data_available assigned to callback functions.
void HelloWorldSubscriber_on_subscription_matched(void *listener_data,
DDS_DataReader * reader,
const struct DDS_SubscriptionMatchedStatus *status)
{
if (status->current_count_change > 0)
{
printf("Matched a publisher\n");
}
else
{
printf("Unmatched a publisher\n");
}
}
void HelloWorldSubscriber_on_data_available(void* listener_data,
DDS_DataReader* reader)
{
HelloWorldDataReader *hw_reader = HelloWorldDataReader_narrow(reader);
DDS_ReturnCode_t retcode;
struct DDS_SampleInfo *sample_info = NULL;
HelloWorld *sample = NULL;
struct DDS_SampleInfoSeq info_seq =
DDS_SEQUENCE_INITIALIZER(struct DDS_SampleInfo);
struct HelloWorldSeq sample_seq =
DDS_SEQUENCE_INITIALIZER(HelloWorld);
const DDS_Long TAKE_MAX_SAMPLES = 32;
DDS_Long i;
retcode = HelloWorldDataReader_take(hw_reader,
&sample_seq, &info_seq, TAKE_MAX_SAMPLES,
DDS_ANY_SAMPLE_STATE, DDS_ANY_VIEW_STATE, DDS_ANY_INSTANCE_STATE);
if (retcode != DDS_RETCODE_OK)
{
printf("failed to take data: %d\n", retcode);
goto done;
}
/* Print each valid sample taken */
for (i = 0; i < HelloWorldSeq_get_length(&sample_seq); ++i)
{
sample_info = DDS_SampleInfoSeq_get_reference(&info_seq, i);
if (sample_info->valid_data)
{
sample = HelloWorldSeq_get_reference(&sample_seq, i);
printf("\nSample received\n\tmsg: %s\n", sample->msg);
}
else
{
printf("not valid data\n");
}
}
HelloWorldDataReader_return_loan(hw_reader, &sample_seq, &info_seq);
done:
HelloWorldSeq_finalize(&sample_seq);
DDS_SampleInfoSeq_finalize(&info_seq);
}
3.8.1. DPSE Discovery: Assert Remote Publication¶
A subscribing application using DPSE discovery must specify the other DataWriters that its DataReaders are allowed to discover. Like the API for asserting a remote participant, the DPSE API for asserting a remote publication must be called for each remote DataWriter that a DataReader may discover.
struct DDS_PublicationBuiltinTopicData rem_publication_data =
DDS_PublicationBuiltinTopicData_INITIALIZER;
/* Set Writer's protocol.rtps_object_id */
rem_publication_data.key.value[DDS_BUILTIN_TOPIC_KEY_OBJECT_ID] = 100;
rem_publication_data.topic_name = DDS_String_dup("Example HelloWorld");
rem_publication_data.type_name = DDS_String_dup("HelloWorld");
rem_publication_data.reliability.kind = DDS_RELIABLE_RELIABILITY_QOS;
retcode = DPSE_RemotePublication_assert(participant,
"Participant_1",
&rem_publication_data,
HelloWorld_get_key_kind(HelloWorldTypePlugin_get(),
NULL)));
if (retcode != DDS_RETCODE_OK)
{
/* failure */
}
Asserting a remote publication requires configuration of all QoS policies necessary to determine matching.
3.8.2. Receiving Samples¶
Accessing received samples can be done in a few ways:
- Polling. Do read or take within a periodic polling loop.
- Listener. When a new sample is received, the DataReaderListener’s on_data_available is called. Processing is done in the context of the middleware’s receive thread. See the above HelloWorldSubscriber_on_data_available callback for example code.
- Waitset. Create a waitset, attach it to a status condition with the data_available status enabled, and wait for a received sample to trigger the waitset. Processing is done in the context of the user’s application thread. (Note: the code snippet below is taken from the shipped HelloWorld_dpde_waitset example).
DDS_WaitSet *waitset = NULL;
struct DDS_Duration_t wait_timeout = { 10, 0 }; /* 10 seconds */
DDS_StatusCondition *dr_condition = NULL;
struct DDS_ConditionSeq active_conditions =
DDS_SEQUENCE_INITIALIZER(struct DDS_ConditionSeq);
if (!DDS_ConditionSeq_initialize(&active_conditions))
{
/* failure */
}
if (!DDS_ConditionSeq_set_maximum(&active_conditions, 1))
{
/* failure */
}
waitset = DDS_WaitSet_new();
if (waitset == NULL )
{
/* failure */
}
dr_condition = DDS_Entity_get_statuscondition(DDS_DataReader_as_entity(datareader));
retcode = DDS_StatusCondition_set_enabled_statuses(dr_condition,
DDS_DATA_AVAILABLE_STATUS);
if (retcode != DDS_RETCODE_OK)
{
/* failure */
}
retcode = DDS_WaitSet_attach_condition(waitset,
DDS_StatusCondition_as_condition(dr_condition));
if (retcode != DDS_RETCODE_OK)
{
/* failure */
}
retcode = DDS_WaitSet_wait(waitset, active_conditions, &wait_timeout);
switch (retcode) {
case DDS_RETCODE_OK:
{
/* This WaitSet only has a single condition attached to it
* so we can implicitly assume the DataReader's status condition
* to be active (with the enabled DATA_AVAILABLE status) upon
* successful return of wait().
* If more than one conditions were attached to the WaitSet,
* the returned sequence must be examined using the
* commented out code instead of the following.
*/
HelloWorldSubscriber_take_data(HelloWorldDataReader_narrow(datareader));
/*
DDS_Long active_len = DDS_ConditionSeq_get_length(&active_conditions);
for (i = active_len - 1; i >= 0; --i)
{
DDS_Condition *active_condition =
*DDS_ConditionSeq_get_reference(&active_conditions, i);
if (active_condition ==
DDS_StatusCondition_as_condition(dr_condition))
{
total_samples += HelloWorldSubscriber_take_data(
HelloWorldDataReader_narrow(datareader));
}
else if (active_condition == some_other_condition)
{
do_something_else();
}
}
*/
break;
}
case DDS_RETCODE_TIMEOUT:
{
printf("WaitSet_wait timed out\n");
break;
}
default:
{
printf("ERROR in WaitSet_wait: retcode=%d\n", retcode);
break;
}
}
3.8.3. Filtering Samples¶
In lieu of supporting Content-Filtered Topics, a DataReaderListener in Connext DDS Micro provides callbacks to do application-level filtering per sample.
- on_before_sample_deserialize. Through this callback, a received sample is presented to the application before it has been deserialized or stored in the DataReader’s queue.
- on_before_sample_commit. Through this callback, a received sample is presented to the application after it has been deserialized but before it has been stored in the DataReader’s queue.
You control the callbacks’ sample_dropped parameter; upon exiting either callback, the DataReader will drop the sample if sample_dropped is true. Consequently, dropped samples are not stored in the DataReader’s queue and are not available to be read or taken.
Neither callback is associated with a DDS Status. Rather, each is enabled when assigned, to a non-NULL callback.
NOTE: Because it is called after the sample has been deserialized, on_before_sample_commit provides an additional sample_info parameter, containing some of the usual sample information that would be available when the sample is read or taken.
The HelloWorld_dpde example’s subscriber has this on_before_sample_commit callback:
DDS_Boolean HelloWorldSubscriber_on_before_sample_commit(
void *listener_data,
DDS_DataReader *reader,
const void *const sample,
const struct DDS_SampleInfo *const sample_info,
DDS_Boolean *dropped)
{
HelloWorld *hw_sample = (HelloWorld *)sample;
/* Drop samples with even-numbered count in msg */
HelloWorldSubscriber_filter_sample(hw_sample, dropped);
if (*dropped)
{
printf("\nSample filtered, before commit\n\tDROPPED - msg: %s\n",
hw_sample->msg);
}
return DDS_BOOLEAN_TRUE;
}
...
dr_listener.on_before_sample_commit =
HelloWorldSubscriber_on_before_sample_commit;
For more information, see the Receiving Data section in the User’s Manual.