Create DataReader
=================

.. highlight:: c

::

    DDS_DataReader *datareader = NULL;
    struct DDS_DataReaderQos dr_qos = DDS_DataReaderQos_INITIALIZER;
    struct DDS_DataReaderListener dr_listener = DDS_DataReaderListener_INITIALIZER;

    /* Configure Reader Qos */
    dr_qos.protocol.rtps_object_id = 200;
    dr_qos.resource_limits.max_instances = 2;
    dr_qos.resource_limits.max_samples_per_instance = 2;
    dr_qos.resource_limits.max_samples =
        dr_qos.resource_limits.max_samples_per_instance * dr_qos.resource_limits.max_instances;
    dr_qos.reader_resource_limits.max_remote_writers = 10;
    dr_qos.reader_resource_limits.max_remote_writers_per_instance = 10;
    dr_qos.history.depth = 1;
    dr_qos.durability.kind = DDS_VOLATILE_DURABILITY_QOS;
    dr_qos.reliability.kind = DDS_RELIABLE_RELIABILITY_QOS;

    /* Set listener callbacks */
    dr_listener.on_data_available = HelloWorldSubscriber_on_data_available;
    dr_listener.on_subscription_matched = HelloWorldSubscriber_on_subscription_matched;

    datareader = DDS_Subscriber_create_datareader(subscriber,
                                                  DDS_Topic_as_topicdescription(topic),
                                                  &dr_qos,
                                                  &dr_listener,
                                                  DDS_DATA_AVAILABLE_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS);
    if (datareader == NULL)
    {
        /* failure */
    }

The DataReaderListener_ has its callbacks selectively enabled by the DDS
status mask.  In the example, the mask has set the DDS_SUBSCRIPTION_MATCHED_STATUS_
and DDS_DATA_AVAILABLE_STATUS_ statuses, and accordingly the DataReaderListener_ has its
on_subscription_matched_ and on_data_available_ assigned to callback functions.

::

   void HelloWorldSubscriber_on_subscription_matched(void *listener_data,
                                                     DDS_DataReader * reader,
                                                     const struct DDS_SubscriptionMatchedStatus *status)
   {
       if (status->current_count_change > 0)
       {
           printf("Matched a publisher\n");
       }
       else
       {
           printf("Unmatched a publisher\n");
       }
   }

::

   void HelloWorldSubscriber_on_data_available(void* listener_data,
                                               DDS_DataReader* reader)
   {
       HelloWorldDataReader *hw_reader = HelloWorldDataReader_narrow(reader);
       DDS_ReturnCode_t retcode;
       struct DDS_SampleInfo *sample_info = NULL;
       HelloWorld *sample = NULL;

       struct DDS_SampleInfoSeq info_seq =
           DDS_SEQUENCE_INITIALIZER(struct DDS_SampleInfo);
       struct HelloWorldSeq sample_seq =
           DDS_SEQUENCE_INITIALIZER(HelloWorld);

       const DDS_Long TAKE_MAX_SAMPLES = 32;
       DDS_Long i;

       retcode = HelloWorldDataReader_take(hw_reader,
          &sample_seq, &info_seq, TAKE_MAX_SAMPLES,
          DDS_ANY_SAMPLE_STATE, DDS_ANY_VIEW_STATE, DDS_ANY_INSTANCE_STATE);

       if (retcode != DDS_RETCODE_OK)
       {
           printf("failed to take data: %d\n", retcode);
           goto done;
       }

       /* Print each valid sample taken */
       for (i = 0; i < HelloWorldSeq_get_length(&sample_seq); ++i)
       {
           sample_info = DDS_SampleInfoSeq_get_reference(&info_seq, i);

           if (sample_info->valid_data)
           {
               sample = HelloWorldSeq_get_reference(&sample_seq, i);
               printf("\nSample received\n\tmsg: %s\n", sample->msg);
           }
           else
           {
               printf("not valid data\n");
           }
       }

       HelloWorldDataReader_return_loan(hw_reader, &sample_seq, &info_seq);

    done:
       HelloWorldSeq_finalize(&sample_seq);
       DDS_SampleInfoSeq_finalize(&info_seq);
   }

DPSE Discovery: Assert Remote Publication
-----------------------------------------

A subscribing application using DPSE_ discovery must specify the other
*DataWriters* that its *DataReaders* are allowed to discover.  Like the API
for asserting a remote participant, the DPSE_ API for asserting a remote
publication must be called for each remote *DataWriter* that a *DataReader*
may discover.

::

    struct DDS_PublicationBuiltinTopicData rem_publication_data =
        DDS_PublicationBuiltinTopicData_INITIALIZER;

    /* Set Writer's protocol.rtps_object_id */
    rem_publication_data.key.value[DDS_BUILTIN_TOPIC_KEY_OBJECT_ID] = 100;

    rem_publication_data.topic_name = DDS_String_dup("Example HelloWorld");
    rem_publication_data.type_name = DDS_String_dup("HelloWorld");

    rem_publication_data.reliability.kind = DDS_RELIABLE_RELIABILITY_QOS;

    retcode = DPSE_RemotePublication_assert(participant,
                                            "Participant_1",
                                            &rem_publication_data,
                                            HelloWorld_get_key_kind(HelloWorldTypePlugin_get(),
                                            NULL)));
    if (retcode != DDS_RETCODE_OK)
    {
        /* failure */
    }

Asserting a remote publication requires configuration of all QoS policies 
necessary to determine matching.

Receiving Samples
-----------------

Accessing received samples can be done in a few ways:

- **Polling**.  Do read or take within a periodic polling loop.

- **Listener**.  When a new sample is received, the DataReaderListener_'s
  on_data_available_ is called.  Processing is done in the context of the
  middleware's receive thread.  See the above
  HelloWorldSubscriber_on_data_available callback for example code.

- **Waitset**.  Create a waitset, attach it to a status condition with
  the data_available status enabled, and wait for a received sample to
  trigger the waitset.  Processing is done in the context of the user's
  application thread. (Note: the code snippet below is taken from the shipped 
  HelloWorld_dpde_waitset example).

::

    DDS_WaitSet *waitset = NULL;
    struct DDS_Duration_t wait_timeout = { 10, 0 }; /* 10 seconds */
    DDS_StatusCondition *dr_condition = NULL;
    struct DDS_ConditionSeq active_conditions =
        DDS_SEQUENCE_INITIALIZER(struct DDS_ConditionSeq);

    if (!DDS_ConditionSeq_initialize(&active_conditions))
    {
        /* failure */
    }

    if (!DDS_ConditionSeq_set_maximum(&active_conditions, 1))
    {
        /* failure */
    }

    waitset = DDS_WaitSet_new();
    if (waitset == NULL )
    {
        /* failure */
    }

    dr_condition = DDS_Entity_get_statuscondition(DDS_DataReader_as_entity(datareader));

    retcode = DDS_StatusCondition_set_enabled_statuses(dr_condition,
                                                       DDS_DATA_AVAILABLE_STATUS);
    if (retcode != DDS_RETCODE_OK)
    {
        /* failure */
    }

    retcode = DDS_WaitSet_attach_condition(waitset,
                                           DDS_StatusCondition_as_condition(dr_condition));
    if (retcode != DDS_RETCODE_OK)
    {
        /* failure */
    }

    retcode = DDS_WaitSet_wait(waitset, active_conditions, &wait_timeout);

    switch (retcode) {
        case DDS_RETCODE_OK:
        {
            /* This WaitSet only has a single condition attached to it
             * so we can implicitly assume the DataReader's status condition
             * to be active (with the enabled DATA_AVAILABLE status) upon
             * successful return of wait().
             * If more than one conditions were attached to the WaitSet,
             * the returned sequence must be examined using the
             * commented out code instead of the following.
             */

             HelloWorldSubscriber_take_data(HelloWorldDataReader_narrow(datareader));

             /*
             DDS_Long active_len = DDS_ConditionSeq_get_length(&active_conditions);
             for (i = active_len - 1; i >= 0; --i)
             {
                 DDS_Condition *active_condition =
                     *DDS_ConditionSeq_get_reference(&active_conditions, i);

                 if (active_condition ==
                         DDS_StatusCondition_as_condition(dr_condition))
                 {
                     total_samples += HelloWorldSubscriber_take_data(
                                    HelloWorldDataReader_narrow(datareader));
                 }
                 else if (active_condition == some_other_condition)
                 {
                     do_something_else();
                 }
             }
             */
             break;
        }
        case DDS_RETCODE_TIMEOUT:
        {
            printf("WaitSet_wait timed out\n");
            break;
        }
        default:
        {
            printf("ERROR in WaitSet_wait: retcode=%d\n", retcode);
            break;
        }
    }

Filtering Samples
-----------------

In lieu of supporting Content-Filtered Topics, a DataReaderListener_ in |me|
provides callbacks to do application-level filtering per sample.

- **on_before_sample_deserialize**. Through this callback, a received
  sample is presented to the application before it has been deserialized or
  stored in the *DataReader*'s queue.

- **on_before_sample_commit**.  Through this callback, a received sample
  is presented to the application after it has been deserialized but before
  it has been stored in the *DataReader*'s queue.

You control the callbacks' sample_dropped parameter; upon exiting either
callback, the *DataReader* will drop the sample if sample_dropped is true.
Consequently, dropped samples are not stored in the *DataReader*'s queue and
are not available to be read or taken.

Neither callback is associated with a DDS Status.  Rather, each is enabled
when assigned, to a non-NULL callback.

NOTE: Because it is called after the sample has been deserialized, on_before_sample_commit_
provides an additional sample_info_ parameter, containing some of the usual sample
information that would be available when the sample is read or taken.

The HelloWorld_dpde example's subscriber has this on_before_sample_commit_
callback:

::

    DDS_Boolean HelloWorldSubscriber_on_before_sample_commit(
        void *listener_data,
        DDS_DataReader *reader,
        const void *const sample,
        const struct DDS_SampleInfo *const sample_info,
        DDS_Boolean *dropped)
    {
        HelloWorld *hw_sample = (HelloWorld *)sample;

        /* Drop samples with even-numbered count in msg */
        HelloWorldSubscriber_filter_sample(hw_sample, dropped);

        if (*dropped)
        {
            printf("\nSample filtered, before commit\n\tDROPPED - msg: %s\n",
                   hw_sample->msg);
        }

        return DDS_BOOLEAN_TRUE;
    }

    ...

    dr_listener.on_before_sample_commit =
        HelloWorldSubscriber_on_before_sample_commit;

For more information, see the :doc:`../usersmanual/receiving` section in the User's Manual.