.. include:: vars.rst .. _section-gsg_intro_cpp11: Publish/Subscribe ***************** .. list-table:: :name: TableContentFiltersPrerequisites :widths: 20 80 :header-rows: 0 * - Prerequisites - * `Install git `__ * Install |CONNEXT_DRIVE| (see :ref:`section-gsg_before`) * Install `CMake `_ * - Time to complete - 45 minutes * - Concepts covered in this module - * Introduction to publish/subscribe * Introduction to |DWs|, and |DRs|, and *Topics* * Using the code generator * Using Waitsets * Viewing your data in *RTI Admin Console* * Working with the safety-critical SDK The most basic communication pattern supported by *RTI* |CONNEXT_DRIVE| is the publish/subscribe model. Publish/Subscribe is a communications model where data producers "publish" data and data consumers "subscribe" to data. These publishers and subscribers don’t need to know about each other ahead of time; they discover each other dynamically at runtime. The data they share is described by a "topic," and publishers and subscribers send and receive data only for the topics they are interested in. In this pattern, many publishers may publish the same topic, and many subscribers may subscribe to the same topic. Subscribers receive data from all of the publishers that they share a topic with. Publishers send data directly to subscribers, with no need for a broker or centralized application to mediate communications. Introduction to DataWriters, DataReaders, and Topics ==================================================== In DDS, the objects that actually publish data are called |DWs|, and the objects that subscribe to data are |DRs|. |DWs| and |DRs| are associated with a single *Topic* object that describes that data. (DDS also has *Publisher* and *Subscriber* objects, but we will talk about them later.) An application typically has a combination of |DWs| and |DRs|. .. figure:: static/writer_reader_topic.png :scale: 50 % :alt: Writers Readers Overview :name: FigureWritersReaders :align: center |DWs| write data and |DRs| read data of a *Topic*. |DWs| of the “ChocolateTemperature” *Topic* communicate with |DRs| of the “ChocolateTemperature” *Topic*. |DWs| of the “ChocolateLotState” *Topic* communicate with |DRs| of the “ChocolateLotState” *Topic*. In a chocolate factory, for example, there might be a sensor that measures and publishes the current temperature of the tempering machine. Other applications monitor the temperature by subscribing to it. In this example, your *Topic* might be “ChocolateTemperature.” The sensor’s |DW| will be associated with the “ChocolateTemperature” *Topic*. In a similar way, other |DWs| and |DRs| share different types of data using additional *Topics*. |CONNEXT| is responsible for discovering |DWs| and |DRs| in a system, checking if they have a matching *Topic* (and compatible quality of service, which we will discuss later) and then providing the communication between those |DWs| and |DRs|. Logically, this means you can visualize your applications as having |DWs| and |DRs| that connect to a “databus,” because your applications are not specifying exactly which other applications they communicate with – they only specify which *Topics* they read from and write to the databus, and |CONNEXT| sets up the communication. Note that there is no "databus" object in your system – it is a logical way to visualize systems in which you don’t have to configure each communication path. .. note:: We recommend completing the following examples in the order presented, starting with :ref:`section-gsg_intro_handson1`, then :ref:`section-gsg_view_data`, and finally :ref:`section-gsg_safety_critical_sdk`. Alternatively, you can complete :ref:`section-gsg_intro_handson1` and move directly to :ref:`section-gsg_safety_critical_sdk` if you aren't interested in the data visualization tools covered in :ref:`section-gsg_view_data`. .. _section-gsg_intro_handson1: .. include:: ../../getting_started/intro_pubsub_cpp.rst :start-after: .. _section-gsg_intro_handson1: :end-before: .. _section-gsg_intro_next: .. _section-gsg_safety_critical_sdk: Hands-On 3: Getting Started with the Safety-Critical SDK ======================================================== Now that you've created some basic applications with |CONNEXT| *Professional*, you can create applications with |CONNEXT_MICRO|, using libraries that are safety-certified to automotive standards. .. note:: We recommend that you complete :ref:`section-gsg_intro_handson1` before working on this example. The instructions for Hands-On 1 cover some basic concepts that are not explained in detail in this example. .. _section-ho3_build_libs: Build the libraries ------------------- If you installed a |CONNEXT_DRIVE| LM (licensed-managed) bundle, the libraries do not need to be built. As explained in :ref:`section-safety_certified_libraries`, an LM bundle (usually used for evaluation purposes) provides prebuilt libraries for two target architectures: * ``armv8Linux4gcc7.3.0_certprofile`` * ``x64Linux4gcc7.3.0_certprofile`` These are prebuilt |CONNEXT_MICRO| libraries that only use features available in |CONNEXT_CERT|. We refer to these as *cert-profile libraries*, and will use them in this hands-on section. If you installed a non-license-managed |CONNEXT_DRIVE| bundle, you will need to compile your own cert-profile libraries. To do so, follow the instructions in the :link_micro_cert_compatibility:`Building and Porting chapter of the Connext Micro 2.4.14 User's Manual<>`. .. note:: The User's Manual for |CONNEXT_MICRO| 2.4.14 refers to |CONNEXT_MICRO| as *Connext DDS Micro*, and to |CONNEXT_CERT| as *Connext DDS Micro Cert*. These are simply legacy names and do not denote different products. Define a Data Type ------------------ To distribute data using |CONNEXT_DRIVE| cert-profile libraries, you must first define a data type as described in this section, then run the RTI Code Generator (*rtiddsgen*) utility as described in :ref:`section-ho3_run_codegen` below. This utility will generate the type-specific support code that the cert-profile libraries need and the code that makes calls to publish and subscribe to that data type. *rtiddsgen* accepts type-definitions in Interface Definition Language (IDL) format, and generates type-support code using those type-definitions. For instance, to define a simple type called HelloWorld which contains a string “msg” with a maximum length of 128 characters, create a file called “HelloWorld.idl” with the following content: .. code-block:: c struct HelloWorld { string<256> msg; }; .. _section-ho3_run_codegen: Run Code Generator ------------------ To generate an example for |CONNEXT_MICRO|, run the following command: .. code-block:: console rtiddsgen -micro -example -exampleTemplate cert -language C In the above command, we passed *rtiddsgen* the following options: * ``-micro``: Necessary to generate type-support code that is specific to the cert-profile libraries. * ``-example``: Generates type files and example files. * ``-exampleTemplate cert``: Generates example files from the cert template, only using APIs compatible with |CONNEXT_CERT|. * ``-language C``: Generates C code. You can create your own type-definition file, modeled after the one described in `Define a data type`_. The type-definition file name must end in “.idl”. How to compile the generated examples ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. note:: This section assumes that a `CMake `_ bundle is available. Make sure that CMake is installed and available before continuing. The |CONNEXT_DRIVE| bundle includes *rtime-make*. This script is a convenient way to build the examples with the correct options. Note that before calling *rtime-make*, make sure that ``/resource/scripts`` is in your path environment variable. For example: .. code-block:: console cd "" rtime-make --config Release --build --target --source-dir . \ -G "Unix Makefiles" --delete [-DRTIME_IDL_ADD_REGENERATE_TYPESUPPORT_RULE=true] For ````: use either of the two architectures included in the LM bundle, or another architecture that you have built yourself (see :ref:`section-ho3_build_libs`): * ``armv8Linux4gcc7.3.0_certprofile`` * ``x64Linux4gcc7.3.0_certprofile`` .. note:: Since ``--name`` is not specified, the value for ``--target`` will be used as the name. Since ```` has the suffix “``_certprofile``” in the name, the *rtime-make* build system will automatically set the CMake flag ``RTIME_CERT`` when compiling. The CMake flag ``RTIME_CERT`` instructs the build system to build |CONNEXT_MICRO| with |CONNEXT_CERT| compatibility. The executable can be found in the directory ‘objs’. How to run the generated examples ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ By default, the example uses all available interfaces to receive samples. This can cause communication problems if the number of available interfaces is greater than the maximum number of interfaces supported by the cert-profile libraries. For this reason, we recommend that you restrict the number of interfaces used by the application; use the option ``-udp_interface `` when running the example to restrict it to using only the specified network interface. For example, if the example has been compiled for ``x64Linux4gcc7.3.0_certprofile``, run the subscriber with this command: .. code-block:: console objs/x64Linux4gcc7.3.0_certprofile/_subscriber [-domain ] [-peer
] \ [-sleep ] [-count ] [-udp_intf ] and run the publisher with this command: .. code-block:: console objs/x64Linux4gcc7.3.0_certprofile/_publisher [-domain -peer
] \ [-sleep ] [-count ] [-udp_intf ] Configure UDP Transport ----------------------- You may need to configure the UDP transport component that is pre-registered by the cert-profile libraries. To change the properties of the UDP transport: unregister the UDP component, then update the properties and reregister it. Example code: * Unregister the pre-registered UDP component: .. code-block:: c /* Unregister the pre-registered UDP component */ if (!RT_Registry_unregister(registry, "_udp", NULL, NULL)) { /* failure */ } * Configure UDP transport properties: .. code-block:: c struct UDP_InterfaceFactoryProperty *udp_property = NULL; udp_property = (struct UDP_InterfaceFactoryProperty *) malloc(sizeof(struct UDP_InterfaceFactoryProperty)); if (udp_property != NULL) { *udp_property = UDP_INTERFACE_FACTORY_PROPERTY_DEFAULT; /* allow_interface: Names of network interfaces allowed to send/receive. * Allow one loopback (lo) and one NIC (eth0). */ REDA_StringSeq_set_maximum(&udp_property->allow_interface,2); REDA_StringSeq_set_length(&udp_property->allow_interface,2); *REDA_StringSeq_get_reference(&udp_property->allow_interface,0) = DDS_String_dup("lo"); *REDA_StringSeq_get_reference(&udp_property->allow_interface,1) = DDS_String_dup("eth0"); } else { /* failure */ } * Re-register UDP component with updated properties: .. code-block:: c if (!RT_Registry_register(registry, "_udp", UDP_InterfaceFactory_get_interface(), (struct RT_ComponentFactoryProperty*)udp_property, NULL)) { /* failure */ } Create a DomainParticipant and Topic with a registered type ----------------------------------------------------------- A DomainParticipantFactory creates *DomainParticipants*, and a *DomainParticipant* itself is the factory for creating *Publishers*, *Subscribers*, and *Topics*. When creating a *DomainParticipant*, you may need to customize DomainParticipantQos, notably for: * **Resource limits**. Default resource limits are set at minimum values. * **Initial peers**. A list of known remote entities that your application should connect to automatically. Entities that are not in this list will still be found using the Discovery process. * **Discovery**. The name of the registered discovery component (typically “dpde” or “dpse”) must be assigned to DiscoveryQosPolicy’s name. Please note that in |CONNEXT_CERT|, only the DPSE discovery plugin is supported. * **Participant Name**. Every *DomainParticipant* is given the same default name. Must be unique when using DPSE discovery. Example code: * Create a *DomainParticipant* with configured DomainParticipantQos: .. code-block:: c DDS_DomainParticipant *participant = NULL; struct DDS_DomainParticipantQos dp_qos = DDS_DomainParticipantQos_INITIALIZER; /* DDS domain of DomainParticipant */ DDS_Long domain_id = 0; /* Name of your registered Discovery component */ if (!RT_ComponentFactoryId_set_name(&dp_qos.discovery.discovery.name, "dpde")) { /* failure */ } /* Initial peers: use only default multicast peer */ DDS_StringSeq_set_maximum(&dp_qos.discovery.initial_peers,1); DDS_StringSeq_set_length(&dp_qos.discovery.initial_peers,1); *DDS_StringSeq_get_reference(&dp_qos.discovery.initial_peers,0) = DDS_String_dup("239.255.0.1"); /* Resource limits */ dp_qos.resource_limits.max_destination_ports = 32; dp_qos.resource_limits.max_receive_ports = 32; dp_qos.resource_limits.local_topic_allocation = 1; dp_qos.resource_limits.local_type_allocation = 1; dp_qos.resource_limits.local_reader_allocation = 1; dp_qos.resource_limits.local_writer_allocation = 1; dp_qos.resource_limits.remote_participant_allocation = 8; dp_qos.resource_limits.remote_reader_allocation = 8; dp_qos.resource_limits.remote_writer_allocation = 8; /* Participant name */ strcpy(dp_qos.participant_name.name, "Participant_1"); participant = DDS_DomainParticipantFactory_create_participant(factory, domain_id, &dp_qos, NULL, DDS_STATUS_MASK_NONE); if (participant == NULL) { /* failure */ } Register type ^^^^^^^^^^^^^ Your data types that have been generated from IDL need to be registered with the *DomainParticipants* that will be using them. Each registered type must have a unique name, preferably the same as its IDL defined name. .. code-block:: c DDS_ReturnCode_t retcode; retcode = DDS_DomainParticipant_register_type(participant, "HelloWorld", HelloWorldTypePlugin_get()); if (retcode != DDS_RETCODE_OK) { /* failure */ } Create Topic of registered type ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ DDS *Topics* encapsulate the types being communicated, and you can create *Topics* for your type once your type is registered. A topic is given a name at creation (e.g. “Example HelloWorld”). The type associated with the *Topic* is specified with its registered name. .. code-block:: c DDS_Topic *topic = NULL; topic = DDS_DomainParticipant_create_topic(participant, "Example HelloWorld", "HelloWorld", &DDS_TOPIC_QOS_DEFAULT, NULL, DDS_STATUS_MASK_NONE); if (topic == NULL) { /* failure */ } DPSE Discovery: assert remote participant ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ DPSE Discovery relies on the application to specify the other, or remote, *DomainParticipants* that its local *DomainParticipants* are allowed to discover. Your application must call a DPSE API for each remote participant to be discovered. The API takes as input the name of the remote participant. .. code-block:: c /* Enable discovery of remote participant with name Participant_2 */ retcode = DPSE_RemoteParticipant_assert(participant, "Participant_2"); if (retcode != DDS_RETCODE_OK) { /* failure */ } For more information, see the :link_um:`Working with DDS Domains in the Core Libraries User's Manual`. Create Publisher ---------------- A publishing application needs to create a DDS *Publisher* and then a |DW| for each *Topic* it wants to publish. In |CONNEXT_DRIVE|, PublisherQos in general contains no policies that need to be customized, while DataWriterQos does contain several customizable policies. * Create *Publisher*: .. code-block:: c DDS_Publisher *publisher = NULL; publisher = DDS_DomainParticipant_create_publisher(participant, &DDS_PUBLISHER_QOS_DEFAULT, NULL, DDS_STATUS_MASK_NONE); if (publisher == NULL) { /* failure */ } For more information, see the :link_um:`Sending Data section in the Core Libraries User’s Manual`. Create DataWriter ----------------- .. code-block:: c DDS_DataWriter *datawriter = NULL; struct DDS_DataWriterQos dw_qos = DDS_DataWriterQos_INITIALIZER; struct DDS_DataWriterListener dw_listener = DDS_DataWriterListener_INITIALIZER; /* Configure writer Qos */ dw_qos.protocol.rtps_object_id = 100; dw_qos.reliability.kind = DDS_RELIABLE_RELIABILITY_QOS; dw_qos.resource_limits.max_samples_per_instance = 2; dw_qos.resource_limits.max_instances = 2; dw_qos.resource_limits.max_samples = dw_qos.resource_limits.max_samples_per_instance * dw_qos.resource_limits.max_instances; dw_qos.history.depth = 1; dw_qos.durability.kind = DDS_VOLATILE_DURABILITY_QOS; dw_qos.protocol.rtps_reliable_writer.heartbeat_period.sec = 0; dw_qos.protocol.rtps_reliable_writer.heartbeat_period.nanosec = 250000000; /* Set enabled listener callbacks */ dw_listener.on_publication_matched = HelloWorldPublisher_on_publication_matched; datawriter = DDS_Publisher_create_datawriter(publisher, topic, &dw_qos, &dw_listener, DDS_PUBLICATION_MATCHED_STATUS); if (datawriter == NULL) { /* failure */ } The DataWriterListener has its callbacks selectively enabled by the DDS status mask. In the example, the mask has set the ``on_publication_matched`` status, and accordingly the DataWriterListener has its ``on_publication_matched`` assigned to a callback function. .. code-block:: c void HelloWorldPublisher_on_publication_matched(void *listener_data, DDS_DataWriter * writer, const struct DDS_PublicationMatchedStatus *status) { /* Print on match/unmatch */ if (status->current_count_change > 0) { printf("Matched a subscriber\n"); } else { printf("Unmatched a subscriber\n"); } } DPSE Discovery: assert remote subscription ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ A publishing application using DPSE discovery must specify the other |DRs| that its |DRs| are allowed to discover. Like the API for asserting a remote participant, the DPSE API for asserting a remote subscription must be called for each remote |DR| that a |DW| may discover. Whereas asserting a remote participant requires only the remote *Participant*’s name, asserting a remote subscription requires more configuration, as all QoS policies of the subscription necessary to determine matching must be known and thus specified. .. code-block:: c struct DDS_SubscriptionBuiltinTopicData rem_subscription_data = DDS_SubscriptionBuiltinTopicData_INITIALIZER; /* Set Reader's protocol.rtps_object_id */ rem_subscription_data.key.value[DDS_BUILTIN_TOPIC_KEY_OBJECT_ID] = 200; rem_subscription_data.topic_name = DDS_String_dup("Example HelloWorld"); rem_subscription_data.type_name = DDS_String_dup("HelloWorld"); rem_subscription_data.reliability.kind = DDS_RELIABLE_RELIABILITY_QOS; retcode = DPSE_RemoteSubscription_assert(participant, "Participant_2", &rem_subscription_data, HelloWorld_get_key_kind(HelloWorldTypePlugin_get(), NULL))); if (retcode != DDS_RETCODE_OK) { /* failure */ } Writing samples ^^^^^^^^^^^^^^^ Within the generated type support code are declarations of the type-specific |DW|. For the HelloWorld type, this is the HelloWorldDataWriter. Writing a HelloWorld sample is done by calling the write API of the HelloWorldDataWriter. .. code-block:: c HelloWorldDataWriter *hw_datawriter; DDS_ReturnCode_t retcode; HelloWorld *sample = NULL; /* Create and set sample */ sample = HelloWorld_create(); if (sample == NULL) { /* failure */ } sprintf(sample->msg, "Hello World!"); /* Write sample */ hw_datawriter = HelloWorldDataWriter_narrow(datawriter); retcode = HelloWorldDataWriter_write(hw_datawriter, sample, &DDS_HANDLE_NIL); if (retcode != DDS_RETCODE_OK) { /* failure */ } For more information, see the :link_um:`Sending Data section in the Core Libraries User’s Manual`. Create Subscriber ----------------- A subscribing application needs to create a DDS *Subscriber* and then a |DR| for each *Topic* to which it wants to subscribe. In the cert profile of |CONNEXT_DRIVE|, SubscriberQos in general contains no policies that need to be customized, while DataReaderQos does contain several customizable policies. .. code-block:: c DDS_Subscriber *subscriber = NULL; subscriber = DDS_DomainParticipant_create_subscriber(participant, &DDS_SUBSCRIBER_QOS_DEFAULT, NULL, DDS_STATUS_MASK_NONE); if (subscriber == NULL) { /* failure */ } For more information, see the :link_um:`Receiving Data section in the Core Libraries User’s Manual`. Create DataReader ----------------- .. code-block:: c DDS_DataReader *datareader = NULL; struct DDS_DataReaderQos dr_qos = DDS_DataReaderQos_INITIALIZER; struct DDS_DataReaderListener dr_listener = DDS_DataReaderListener_INITIALIZER; /* Configure Reader Qos */ dr_qos.protocol.rtps_object_id = 200; dr_qos.resource_limits.max_instances = 2; dr_qos.resource_limits.max_samples_per_instance = 2; dr_qos.resource_limits.max_samples = dr_qos.resource_limits.max_samples_per_instance * dr_qos.resource_limits.max_instances; dr_qos.reader_resource_limits.max_remote_writers = 10; dr_qos.reader_resource_limits.max_remote_writers_per_instance = 10; dr_qos.history.depth = 1; dr_qos.durability.kind = DDS_VOLATILE_DURABILITY_QOS; dr_qos.reliability.kind = DDS_RELIABLE_RELIABILITY_QOS; /* Set listener callbacks */ dr_listener.on_data_available = HelloWorldSubscriber_on_data_available; dr_listener.on_subscription_matched = HelloWorldSubscriber_on_subscription_matched; datareader = DDS_Subscriber_create_datareader(subscriber, DDS_Topic_as_topicdescription(topic), &dr_qos, &dr_listener, DDS_DATA_AVAILABLE_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS); if (datareader == NULL) { /* failure */ } The DataReaderListener has its callbacks selectively enabled by the DDS status mask. In the example, the mask has set the DDS_SUBSCRIPTION_MATCHED_STATUS and DDS_DATA_AVAILABLE_STATUS statuses, and accordingly the DataReaderListener has its on_subscription_matched and on_data_available assigned to callback functions. .. code-block:: c void HelloWorldSubscriber_on_subscription_matched(void *listener_data, DDS_DataReader * reader, const struct DDS_SubscriptionMatchedStatus *status) { if (status->current_count_change > 0) { printf("Matched a publisher\n"); } else { printf("Unmatched a publisher\n"); } } .. code-block:: c void HelloWorldSubscriber_on_data_available(void* listener_data, DDS_DataReader* reader) { HelloWorldDataReader *hw_reader = HelloWorldDataReader_narrow(reader); DDS_ReturnCode_t retcode; struct DDS_SampleInfo *sample_info = NULL; HelloWorld *sample = NULL; struct DDS_SampleInfoSeq info_seq = DDS_SEQUENCE_INITIALIZER(struct DDS_SampleInfo); struct HelloWorldSeq sample_seq = DDS_SEQUENCE_INITIALIZER(HelloWorld); const DDS_Long TAKE_MAX_SAMPLES = 32; DDS_Long i; retcode = HelloWorldDataReader_take(hw_reader, &sample_seq, &info_seq, TAKE_MAX_SAMPLES, DDS_ANY_SAMPLE_STATE, DDS_ANY_VIEW_STATE, DDS_ANY_INSTANCE_STATE); if (retcode != DDS_RETCODE_OK) { printf("failed to take data: %d\n", retcode); goto done; } /* Print each valid sample taken */ for (i = 0; i < HelloWorldSeq_get_length(&sample_seq); ++i) { sample_info = DDS_SampleInfoSeq_get_reference(&info_seq, i); if (sample_info->valid_data) { sample = HelloWorldSeq_get_reference(&sample_seq, i); printf("\nSample received\n\tmsg: %s\n", sample->msg); } else { printf("not valid data\n"); } } HelloWorldDataReader_return_loan(hw_reader, &sample_seq, &info_seq); done: HelloWorldSeq_finalize(&sample_seq); DDS_SampleInfoSeq_finalize(&info_seq); } DPSE Discovery: assert remote publication ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ A subscribing application using DPSE discovery must specify the other |DRs| that its |DRs| are allowed to discover. Like the API for asserting a remote participant, the DPSE API for asserting a remote publication must be called for each remote |DW| that a |DR| may discover. .. code-block:: c struct DDS_PublicationBuiltinTopicData rem_publication_data = DDS_PublicationBuiltinTopicData_INITIALIZER; /* Set Writer's protocol.rtps_object_id */ rem_publication_data.key.value[DDS_BUILTIN_TOPIC_KEY_OBJECT_ID] = 100; rem_publication_data.topic_name = DDS_String_dup("Example HelloWorld"); rem_publication_data.type_name = DDS_String_dup("HelloWorld"); rem_publication_data.reliability.kind = DDS_RELIABLE_RELIABILITY_QOS; retcode = DPSE_RemotePublication_assert(participant, "Participant_1", &rem_publication_data, HelloWorld_get_key_kind(HelloWorldTypePlugin_get(), NULL))); if (retcode != DDS_RETCODE_OK) { /* failure */ } Asserting a remote publication requires configuration of all QoS policies necessary to determine matching. Receiving samples ^^^^^^^^^^^^^^^^^ Accessing received samples can be done in a few ways: * **Polling**. Do read or take within a periodic polling loop. * **Listener**. When a new sample is received, the DataReaderListener’s on_data_available is called. Processing is done in the context of the middleware’s receive thread. See the above HelloWorldSubscriber_on_data_available callback for example code. * **Waitset**. Create a waitset, attach it to a status condition with the data_available status enabled, and wait for a received sample to trigger the waitset. Processing is done in the context of the user’s application thread. (Note: the code snippet below is taken from the shipped HelloWorld_dpde_waitset example). .. code-block:: c DDS_WaitSet *waitset = NULL; struct DDS_Duration_t wait_timeout = { 10, 0 }; /* 10 seconds */ DDS_StatusCondition *dr_condition = NULL; struct DDS_ConditionSeq active_conditions = DDS_SEQUENCE_INITIALIZER(struct DDS_ConditionSeq); if (!DDS_ConditionSeq_initialize(&active_conditions)) { /* failure */ } if (!DDS_ConditionSeq_set_maximum(&active_conditions, 1)) { /* failure */ } waitset = DDS_WaitSet_new(); if (waitset == NULL ) { /* failure */ } dr_condition = DDS_Entity_get_statuscondition(DDS_DataReader_as_entity(datareader)); retcode = DDS_StatusCondition_set_enabled_statuses(dr_condition, DDS_DATA_AVAILABLE_STATUS); if (retcode != DDS_RETCODE_OK) { /* failure */ } retcode = DDS_WaitSet_attach_condition(waitset, DDS_StatusCondition_as_condition(dr_condition)); if (retcode != DDS_RETCODE_OK) { /* failure */ } retcode = DDS_WaitSet_wait(waitset, active_conditions, &wait_timeout); switch (retcode) { case DDS_RETCODE_OK: { /* This WaitSet only has a single condition attached to it * so we can implicitly assume the DataReader's status condition * to be active (with the enabled DATA_AVAILABLE status) upon * successful return of wait(). * If more than one conditions were attached to the WaitSet, * the returned sequence must be examined using the * commented out code instead of the following. */ HelloWorldSubscriber_take_data(HelloWorldDataReader_narrow(datareader)); /* DDS_Long active_len = DDS_ConditionSeq_get_length(&active_conditions); for (i = active_len - 1; i >= 0; --i) { DDS_Condition *active_condition = *DDS_ConditionSeq_get_reference(&active_conditions, i); if (active_condition == DDS_StatusCondition_as_condition(dr_condition)) { total_samples += HelloWorldSubscriber_take_data( HelloWorldDataReader_narrow(datareader)); } else if (active_condition == some_other_condition) { do_something_else(); } } */ break; } case DDS_RETCODE_TIMEOUT: { printf("WaitSet_wait timed out\n"); break; } default: { printf("ERROR in WaitSet_wait: retcode=%d\n", retcode); break; } } Filtering samples ^^^^^^^^^^^^^^^^^ In lieu of supporting Content-Filtered Topics, a DataReaderListener in |CONNEXT_DRIVE| provides callbacks to do application-level filtering per sample. * **on_before_sample_deserialize**. Through this callback, a received sample is presented to the application before it has been deserialized or stored in the |DR|’s queue. * **on_before_sample_commit**. Through this callback, a received sample is presented to the application after it has been deserialized but before it has been stored in the |DR|’s queue. You control the callbacks’ ``sample_dropped`` parameter; upon exiting either callback, the |DR| will drop the sample if ``sample_dropped`` is true. Consequently, dropped samples are not stored in the |DR|’s queue and are not available to be read or taken. Neither callback is associated with a DDS Status. Rather, each is enabled when assigned to a non-NULL callback. .. note:: Because it is called after the sample has been deserialized, ``on_before_sample_commit`` provides an additional sample_info parameter, containing some of the usual sample information that would be available when the sample is read or taken. The HelloWorld_dpde example’s subscriber has this ``on_before_sample_commit`` callback: .. code-block:: c DDS_Boolean HelloWorldSubscriber_on_before_sample_commit( void *listener_data, DDS_DataReader *reader, const void *const sample, const struct DDS_SampleInfo *const sample_info, DDS_Boolean *dropped) { HelloWorld *hw_sample = (HelloWorld *)sample; /* Drop samples with even-numbered count in msg */ HelloWorldSubscriber_filter_sample(hw_sample, dropped); if (*dropped) { printf("\nSample filtered, before commit\n\tDROPPED - msg: %s\n", hw_sample->msg); } return DDS_BOOLEAN_TRUE; } ... dr_listener.on_before_sample_commit = HelloWorldSubscriber_on_before_sample_commit; For more information, see the :link_um:`Receiving Data section in the Core Libraries User’s Manual`. Next Steps ========== Congratulations! You’ve written your first DDS applications, which publish HelloWorld data. In this exercise, you’ve experienced a quick overview of the development process from defining a data type and using the code generator, to building an example application and using |CONNEXT| tools to see that data is being published. We’ll continue to build on these skills and to use these tools in more depth in subsequent exercises. The next section takes a deeper dive into building a successful Next Generation E/E Architecture for automotive systems with |CONNEXT_DRIVE|. This includes a hands-on example and advice for integration with major industry standards, such as ROS 2, AUTOSAR Classic, and AUTOSAR Adaptive.