RTI Connext Modern C++ API Version 7.2.0
|
Allows you to receive samples from a SharedReaderQueue. More...
#include <rti/queuing/QueueConsumer.hpp>
Public Member Functions | |
QueueConsumer (const QueueConsumerParams ¶ms, bool is_enabled=true, Listener *consumer_listener=NULL, const rti::core::Guid &consumer_guid=rti::core::Guid::unknown()) | |
Creates a queue consumer. More... | |
QueueConsumer (const QueueConsumerParams ¶ms, bool is_enabled, std::shared_ptr< Listener > consumer_listener, const rti::core::Guid &consumer_guid=rti::core::Guid::unknown()) | |
Creates a QueueConsumer with parameters. More... | |
void | listener (Listener *the_listener, const dds::core::status::StatusMask &event_mask=dds::core::status::StatusMask::none()) |
Sets the listener associated with this consumer. More... | |
Listener * | listener () const |
Returns the listener currently associated with this Consumer. More... | |
std::shared_ptr< Listener > | get_listener () const |
Gets the listener. More... | |
void | set_listener (std::shared_ptr< Listener > the_listener) |
Sets a listener to be notified of status updates. More... | |
void | enable () |
Enables the QueueConsumer to receive data and listener notifications. More... | |
void | acknowledge_sample (const dds::sub::SampleInfo &sample_info, bool is_positive_acknowledgment=true) |
Explicitly acknowledges a single sample. More... | |
void | acknowledge_all (bool is_positive_acknowledgment=true) |
Acknowledge all previously accessed samples. More... | |
dds::sub::DataReader< T > | reader () const |
Retrieves the underlying dds::sub::DataReader. More... | |
rti::core::Guid | guid () const |
Returns the GUID of this QueueConsumer. More... | |
dds::sub::LoanedSamples< T > | receive_samples (const dds::core::Duration &max_wait) |
Waits for multiple samples and provides a loaned container to access them. More... | |
dds::sub::LoanedSamples< T > | receive_samples (int min_count, int max_count, const dds::core::Duration &max_wait) |
Waits for multiple samples and provides a loaned container to access them. More... | |
dds::sub::LoanedSamples< T > | take_samples () |
Provides a loaned container to access the existing samples. More... | |
dds::sub::LoanedSamples< T > | take_samples (int max_count) |
Provides a loaned container to access the existing samples. More... | |
dds::sub::LoanedSamples< T > | read_samples () |
Provides a loaned container to access the existing samples. More... | |
dds::sub::LoanedSamples< T > | read_samples (int max_count) |
Provides a loaned container to access the existing samples. More... | |
bool | wait_for_samples (const dds::core::Duration &max_wait) |
Waits for samples. More... | |
bool | wait_for_samples (int min_count, const dds::core::Duration &max_wait) |
Waits for samples. More... | |
void | send_availability (ConsumerAvailabilityParams parameters) |
Sends the QueueConsumer availability to Queuing Service. More... | |
bool | has_matching_reader_queue () |
Checks whether this QueueConsumer has matched at least one SharedReaderQueue. More... | |
Allows you to receive samples from a SharedReaderQueue.
A QueueConsumer is an entity that allows you to receive samples from a SharedReaderQueue hosted by Queuing Service. A QueueConsumer has one underlying dds::sub::DataReader to communicate with a SharedReaderQueue.
Valid types for the topic of the DataReader (T
) are those generated by rtiddsgen, the DDS built-in types, and dds::core::xtypes::DynamicData.
To receive samples from a SharedReaderQueue, a QueueConsumer must set its topic name (see QueueConsumerParams::queue_topic_name) equal to the SharedReaderQueue topic name (set with the XML tag <topic_name> under <shared_reader_queue>).
In addition, the QueueConsumer must set its SharedSubscriber name (QueueConsumerParams::shared_subscriber_name) equal to the name of the SharedSubscriber hosting the SharedReaderQuueue.
The QueueConsumer and the SharedReaderQueue must also be in the same DDS domain (that is, they must have the same domain ID).
A QueueConsumer has an associated dds::domain::DomainParticipant, which can be shared with other QueueConsumers or RTI Connext routines. All the other DDS entities required for queuing interaction, including the dds::sub::DataReader to receive samples, are automatically created when the QueueConsumer is constructed.
Quality of Service (QoS) for the underlying DataReader is configurable (see QueueConsumerParams::qos_profile and QueueConsumerParams::datareader_qos).
If no QoS is specified in the QueueConsumerParams, the default DataReaderQos will be used.
A QueueConsumer must always be reliable in order to match and communicate with the SharedReaderQueue. In addition, it shall use application acknowledgement to notify Queuing Service that a sample has been consumed successfully. Hence, whichever DataReaderQos is selected, the following QoS settings are always overriden:
The underlying DataReader is created with an internal dds::sub::DataReaderListener, which is required to provide the behavior of a QueueConsumer, including notification of events via QueueConsumerListener. For this reason, when accessing the underlying DataReader, the listener should not be set, removed or modified; otherwise the behavior of the QueueConsumer will be incorrect and unpredictable.
Optionally, a QueueConsumer can be configured to create an availability channel (QueueConsumerParams::enable_availability). This channel is used to report the QueueConsumer's availability to the SharedReaderQueue from which the QueueConsumer receives samples. Queuing Service uses the reported availability to distribute samples to the QueueConsumer.
If the availability channel is enabled for the QueueConsumer, a dds::pub::DataWriter is created to send availability samples to the SharedReaderQueue.
You can configure the DataWriterQos used by the availability DataWriter by using QueueConsumerParams::qos_profile.
Whichever DataWriterQos is selected, the following QoS settings are always overriden:
T | The data type for the SharedReaderQueue topic |
|
inlineexplicit |
Creates a queue consumer.
[DEPRECATED] When using a listener, prefer the constructor that receives a shared_ptr<Listener>
instead of a regular Listener*
pointer.
|
inlineexplicit |
Creates a QueueConsumer with parameters.
params | All the parameters that configure this QueueConsumer |
is_enabled | Specifies if the QueueConsumer is created ready to receive notifications in its listener and receive data. If you choose to bind the QueueConsumer to a QueueConsumerListener using an rti::core::ListenerBinder you should create the QueueConsumer disabled to avoid missing listener notifications. You can later enable the disabled QueueConsumer using QueueConsumer::enable. |
consumer_listener | A shared_ptr to a QueueConsumerListener object to receive event notifications. |
consumer_guid | A GUID identifier for the consumer. When consumer_guid is set to rti::core::Guid::unknown() the Guid identifier is generated automatically. You can retrieve the generated QueueConsumer Guid using the QueueConsumer::guid method. To restart a QueueConsumer just create a new one with its same GUID. |
One | of the Standard Exceptions |
|
inline |
Sets the listener associated with this consumer.
[DEPRECATED] The use of set_listener()
is recommended. Unlike this function, set_listener
receives a shared_ptr
which simplifies the management of listener's lifecycle.
the_listener | The QueueConsumerListener to set |
event_mask | The dds::core::status::StatusMask associated with the listener |
|
inline |
Returns the listener currently associated with this Consumer.
[DEPRECATED] Prefer get_listener()
instead of this function.
If there is no listener it returns NULL.
|
inline |
Gets the listener.
|
inline |
Sets a listener to be notified of status updates.
the_listener | A shared pointer to the listener to receive updates or nullptr to reset the current listener and stop receiving updates. |
|
inline |
Enables the QueueConsumer to receive data and listener notifications.
If you create the QueueConsumer disabled you can enable it using this mehtod. To avoid missing listener notifciation when you use an rti::core::ListenerBinder to bind the QueueConsumer to a QueueConsumerListener use QueueConsumer::enable to enable the QueueConsumer after it is bound to the listener.
|
inline |
Explicitly acknowledges a single sample.
This operation calls dds::sub::DataReader::acknowledge_sample on the underlying DataReader.
The parameter is_positive_acknowledgment indicates whether or not the acknowledged sample was successfully processed by the application. If the sample was not processed successfully, Queuing Service will try to redeliver it to a different QueueConsumer according to the delivery policy of the SharedReaderQueue.
sample_info | Identifies the sample being acknowledged. The sampleInfo can be extracted from Sample::info. |
is_positive_acknowledgment | Indicates whether or not the sample was succesfully processed. |
One | of the Standard Exceptions |
|
inline |
Acknowledge all previously accessed samples.
This is equivalent to calling QueueConsumer::acknowledge_sample(const dds::sub::SampleInfo&, bool) for every single previously accessed sample.
is_positive_acknowledgment | Indicates whether or not the acknowledged samples were successfully processed by the application. If the samples were not processed successfully, Queuing Service will try to redeliver them to a different QueueConsumer according to the delivery policy of the SharedReaderQueue. |
One | of the Standard Exceptions |
|
inline |
Retrieves the underlying dds::sub::DataReader.
Accessing the DataReader may be useful for a number of advanced use cases, such as getting DataReader protocol or cache statuses.
|
inline |
Returns the GUID of this QueueConsumer.
The GUID of the QueueConsumer is determined based on the value of QueueConsumerParams::entity_name and QueueConsumerParams::queue_topic_name. Note that the QueueConsumer GUID may be equal or different than the GUID of the underlying DataReader virtual GUID.
The GUID identifies a QueueConsumer and the samples it consumes. The underlying DataReader's content filter on the related_reader_guid is set to the QueueConsumer's GUID.
|
inline |
Waits for multiple samples and provides a loaned container to access them.
Equivalent to using QueueConsumer::receive_samples(int, int, const dds::core::Duration&) with min_count=1
and max_count=dds::core::LENGTH_UNLIMITED
.
|
inline |
Waits for multiple samples and provides a loaned container to access them.
Equivalent to using QueueConsumer::wait_for_samples(int, const dds::core::Duration&) and QueueConsumer::take_samples(int).
|
inline |
Provides a loaned container to access the existing samples.
This operation is equivalent to using QueueConsumer::take_samples(int) with max_count=dds::core::LENGTH_UNLIMITED
.
|
inline |
Provides a loaned container to access the existing samples.
Takes all the existing samples up to max_count
and provides a loaned container to access them.
This operation does not make a copy of the data.
The loan is returned with dds::sub::LoanedSamples::return_loan.
This operation may be used after a call to QueueConsumer::wait_for_samples(int, const dds::core::Duration&).
max_count | The maximum number of samples that are taken at a time. The special value dds::core::LENGTH_UNLIMITED may be used. This value will read up to the limit specified by rti::core::policy::DataReaderResourceLimits::max_samples_per_read. |
One | of the Standard Exceptions |
|
inline |
Provides a loaned container to access the existing samples.
This operation is equivalent to QueueConsumer::take_samples(), except the samples remain in the QueueConsumer and can be read or taken again.
|
inline |
Provides a loaned container to access the existing samples.
This operation is equivalent to QueueConsumer::take_samples(int), except the samples remain in the QueueConsumer and can be read or taken again.
|
inline |
Waits for samples.
This operation is equivalent to QueueConsumer::wait_for_samples(int, const dds::core::Duration&) with min_count=1
.
|
inline |
Waits for samples.
This operation waits for min_count samples to be available. It will wait up to max_wait .
If this operation is called several times but the available samples are not taken (with QueueConsumer::take_samples(int)), this operation may return immediately and will not wait for new samples. New samples may replace existing ones if they are not taken, depending on the dds::core::policy::History used to configure this QueueConsumer.
min_count | Minimum number of samples that need to be available for this operation to unblock. |
max_wait | Maximum waiting time after which this operation unblocks regardless of how many samples are available. |
|
inline |
Sends the QueueConsumer availability to Queuing Service.
This method will use the availability DataWriter to send a sample to Queue Service indicating the availability status of the QueueConsumer application.
The method throws dds::core::PreconditionNotMetError if availability is not enabled for this QueueConsumer using QueueConsumerParams::enable_availability.
One | of the Standard Exceptions ; |
parameters | Availability status |
|
inline |
Checks whether this QueueConsumer has matched at least one SharedReaderQueue.
One | of the Standard Exceptions ; |