RTI Connext Modern C++ API Version 7.2.0
rti::queuing::QueueConsumer< T > Class Template Reference

Allows you to receive samples from a SharedReaderQueue. More...

#include <rti/queuing/QueueConsumer.hpp>

Inheritance diagram for rti::queuing::QueueConsumer< T >:
dds::core::Reference< detail::QueueConsumerImpl< T > >

Public Member Functions

 QueueConsumer (const QueueConsumerParams &params, bool is_enabled=true, Listener *consumer_listener=NULL, const rti::core::Guid &consumer_guid=rti::core::Guid::unknown())
 Creates a queue consumer. More...
 
 QueueConsumer (const QueueConsumerParams &params, bool is_enabled, std::shared_ptr< Listener > consumer_listener, const rti::core::Guid &consumer_guid=rti::core::Guid::unknown())
 Creates a QueueConsumer with parameters. More...
 
void listener (Listener *the_listener, const dds::core::status::StatusMask &event_mask=dds::core::status::StatusMask::none())
 Sets the listener associated with this consumer. More...
 
Listenerlistener () const
 Returns the listener currently associated with this Consumer. More...
 
std::shared_ptr< Listenerget_listener () const
 Gets the listener. More...
 
void set_listener (std::shared_ptr< Listener > the_listener)
 Sets a listener to be notified of status updates. More...
 
void enable ()
 Enables the QueueConsumer to receive data and listener notifications. More...
 
void acknowledge_sample (const dds::sub::SampleInfo &sample_info, bool is_positive_acknowledgment=true)
 Explicitly acknowledges a single sample. More...
 
void acknowledge_all (bool is_positive_acknowledgment=true)
 Acknowledge all previously accessed samples. More...
 
dds::sub::DataReader< T > reader () const
 Retrieves the underlying dds::sub::DataReader. More...
 
rti::core::Guid guid () const
 Returns the GUID of this QueueConsumer. More...
 
dds::sub::LoanedSamples< T > receive_samples (const dds::core::Duration &max_wait)
 Waits for multiple samples and provides a loaned container to access them. More...
 
dds::sub::LoanedSamples< T > receive_samples (int min_count, int max_count, const dds::core::Duration &max_wait)
 Waits for multiple samples and provides a loaned container to access them. More...
 
dds::sub::LoanedSamples< T > take_samples ()
 Provides a loaned container to access the existing samples. More...
 
dds::sub::LoanedSamples< T > take_samples (int max_count)
 Provides a loaned container to access the existing samples. More...
 
dds::sub::LoanedSamples< T > read_samples ()
 Provides a loaned container to access the existing samples. More...
 
dds::sub::LoanedSamples< T > read_samples (int max_count)
 Provides a loaned container to access the existing samples. More...
 
bool wait_for_samples (const dds::core::Duration &max_wait)
 Waits for samples. More...
 
bool wait_for_samples (int min_count, const dds::core::Duration &max_wait)
 Waits for samples. More...
 
void send_availability (ConsumerAvailabilityParams parameters)
 Sends the QueueConsumer availability to Queuing Service. More...
 
bool has_matching_reader_queue ()
 Checks whether this QueueConsumer has matched at least one SharedReaderQueue. More...
 

Detailed Description

template<typename T>
class rti::queuing::QueueConsumer< T >

Allows you to receive samples from a SharedReaderQueue.

A QueueConsumer is an entity that allows you to receive samples from a SharedReaderQueue hosted by Queuing Service. A QueueConsumer has one underlying dds::sub::DataReader to communicate with a SharedReaderQueue.

Valid types for the topic of the DataReader (T) are those generated by rtiddsgen, the DDS built-in types, and dds::core::xtypes::DynamicData.

To receive samples from a SharedReaderQueue, a QueueConsumer must set its topic name (see QueueConsumerParams::queue_topic_name) equal to the SharedReaderQueue topic name (set with the XML tag <topic_name> under <shared_reader_queue>).

In addition, the QueueConsumer must set its SharedSubscriber name (QueueConsumerParams::shared_subscriber_name) equal to the name of the SharedSubscriber hosting the SharedReaderQuueue.

The QueueConsumer and the SharedReaderQueue must also be in the same DDS domain (that is, they must have the same domain ID).

A QueueConsumer has an associated dds::domain::DomainParticipant, which can be shared with other QueueConsumers or RTI Connext routines. All the other DDS entities required for queuing interaction, including the dds::sub::DataReader to receive samples, are automatically created when the QueueConsumer is constructed.

Quality of Service (QoS) for the underlying DataReader is configurable (see QueueConsumerParams::qos_profile and QueueConsumerParams::datareader_qos).

If no QoS is specified in the QueueConsumerParams, the default DataReaderQos will be used.

A QueueConsumer must always be reliable in order to match and communicate with the SharedReaderQueue. In addition, it shall use application acknowledgement to notify Queuing Service that a sample has been consumed successfully. Hence, whichever DataReaderQos is selected, the following QoS settings are always overriden:

The underlying DataReader is created with an internal dds::sub::DataReaderListener, which is required to provide the behavior of a QueueConsumer, including notification of events via QueueConsumerListener. For this reason, when accessing the underlying DataReader, the listener should not be set, removed or modified; otherwise the behavior of the QueueConsumer will be incorrect and unpredictable.

Optionally, a QueueConsumer can be configured to create an availability channel (QueueConsumerParams::enable_availability). This channel is used to report the QueueConsumer's availability to the SharedReaderQueue from which the QueueConsumer receives samples. Queuing Service uses the reported availability to distribute samples to the QueueConsumer.

If the availability channel is enabled for the QueueConsumer, a dds::pub::DataWriter is created to send availability samples to the SharedReaderQueue.

You can configure the DataWriterQos used by the availability DataWriter by using QueueConsumerParams::qos_profile.

Whichever DataWriterQos is selected, the following QoS settings are always overriden:

Template Parameters
TThe data type for the SharedReaderQueue topic
See also
QueueConsumer
QueueConsumerListener

Constructor & Destructor Documentation

◆ QueueConsumer() [1/2]

template<typename T >
rti::queuing::QueueConsumer< T >::QueueConsumer ( const QueueConsumerParams params,
bool  is_enabled = true,
Listener consumer_listener = NULL,
const rti::core::Guid consumer_guid = rti::core::Guid::unknown() 
)
inlineexplicit

Creates a queue consumer.

[DEPRECATED] When using a listener, prefer the constructor that receives a shared_ptr<Listener> instead of a regular Listener* pointer.

◆ QueueConsumer() [2/2]

template<typename T >
rti::queuing::QueueConsumer< T >::QueueConsumer ( const QueueConsumerParams params,
bool  is_enabled,
std::shared_ptr< Listener consumer_listener,
const rti::core::Guid consumer_guid = rti::core::Guid::unknown() 
)
inlineexplicit

Creates a QueueConsumer with parameters.

Parameters
paramsAll the parameters that configure this QueueConsumer
is_enabledSpecifies if the QueueConsumer is created ready to receive notifications in its listener and receive data. If you choose to bind the QueueConsumer to a QueueConsumerListener using an rti::core::ListenerBinder you should create the QueueConsumer disabled to avoid missing listener notifications. You can later enable the disabled QueueConsumer using QueueConsumer::enable.
consumer_listenerA shared_ptr to a QueueConsumerListener object to receive event notifications.
consumer_guidA GUID identifier for the consumer. When consumer_guid is set to rti::core::Guid::unknown() the Guid identifier is generated automatically. You can retrieve the generated QueueConsumer Guid using the QueueConsumer::guid method. To restart a QueueConsumer just create a new one with its same GUID.
Exceptions
Oneof the Standard Exceptions
See also
QueueConsumerParams

Member Function Documentation

◆ listener() [1/2]

template<typename T >
void rti::queuing::QueueConsumer< T >::listener ( Listener the_listener,
const dds::core::status::StatusMask event_mask = dds::core::status::StatusMask::none() 
)
inline

Sets the listener associated with this consumer.

[DEPRECATED] The use of set_listener() is recommended. Unlike this function, set_listener receives a shared_ptr which simplifies the management of listener's lifecycle.

Parameters
the_listenerThe QueueConsumerListener to set
event_maskThe dds::core::status::StatusMask associated with the listener

◆ listener() [2/2]

template<typename T >
Listener * rti::queuing::QueueConsumer< T >::listener ( ) const
inline

Returns the listener currently associated with this Consumer.

[DEPRECATED] Prefer get_listener() instead of this function.

If there is no listener it returns NULL.

◆ get_listener()

template<typename T >
std::shared_ptr< Listener > rti::queuing::QueueConsumer< T >::get_listener ( ) const
inline

Gets the listener.

◆ set_listener()

template<typename T >
void rti::queuing::QueueConsumer< T >::set_listener ( std::shared_ptr< Listener the_listener)
inline

Sets a listener to be notified of status updates.

Warning
It's recommended that the listener implementation doesn't hold a permanent reference to this object. If it does, the application needs to manually reset the listener or manually close this object to ensure that there is no cycle that prevents the destruction of these two objects.
Parameters
the_listenerA shared pointer to the listener to receive updates or nullptr to reset the current listener and stop receiving updates.

◆ enable()

template<typename T >
void rti::queuing::QueueConsumer< T >::enable ( )
inline

Enables the QueueConsumer to receive data and listener notifications.

If you create the QueueConsumer disabled you can enable it using this mehtod. To avoid missing listener notifciation when you use an rti::core::ListenerBinder to bind the QueueConsumer to a QueueConsumerListener use QueueConsumer::enable to enable the QueueConsumer after it is bound to the listener.

◆ acknowledge_sample()

template<typename T >
void rti::queuing::QueueConsumer< T >::acknowledge_sample ( const dds::sub::SampleInfo sample_info,
bool  is_positive_acknowledgment = true 
)
inline

Explicitly acknowledges a single sample.

This operation calls dds::sub::DataReader::acknowledge_sample on the underlying DataReader.

The parameter is_positive_acknowledgment indicates whether or not the acknowledged sample was successfully processed by the application. If the sample was not processed successfully, Queuing Service will try to redeliver it to a different QueueConsumer according to the delivery policy of the SharedReaderQueue.

Parameters
sample_infoIdentifies the sample being acknowledged. The sampleInfo can be extracted from Sample::info.
is_positive_acknowledgmentIndicates whether or not the sample was succesfully processed.
Exceptions
Oneof the Standard Exceptions

◆ acknowledge_all()

template<typename T >
void rti::queuing::QueueConsumer< T >::acknowledge_all ( bool  is_positive_acknowledgment = true)
inline

Acknowledge all previously accessed samples.

This is equivalent to calling QueueConsumer::acknowledge_sample(const dds::sub::SampleInfo&, bool) for every single previously accessed sample.

Parameters
is_positive_acknowledgmentIndicates whether or not the acknowledged samples were successfully processed by the application. If the samples were not processed successfully, Queuing Service will try to redeliver them to a different QueueConsumer according to the delivery policy of the SharedReaderQueue.
Exceptions
Oneof the Standard Exceptions
See also
QueueConsumer::acknowledge_sample(const dds::sub::SampleInfo&, bool)

◆ reader()

template<typename T >
dds::sub::DataReader< T > rti::queuing::QueueConsumer< T >::reader ( ) const
inline

Retrieves the underlying dds::sub::DataReader.

Accessing the DataReader may be useful for a number of advanced use cases, such as getting DataReader protocol or cache statuses.

MT Safety:
SAFE
See also
dds::sub::DataReader
dds::sub::DataReader
dds::sub::DataReader::datareader_protocol_status

◆ guid()

template<typename T >
rti::core::Guid rti::queuing::QueueConsumer< T >::guid ( ) const
inline

Returns the GUID of this QueueConsumer.

The GUID of the QueueConsumer is determined based on the value of QueueConsumerParams::entity_name and QueueConsumerParams::queue_topic_name. Note that the QueueConsumer GUID may be equal or different than the GUID of the underlying DataReader virtual GUID.

The GUID identifies a QueueConsumer and the samples it consumes. The underlying DataReader's content filter on the related_reader_guid is set to the QueueConsumer's GUID.

See also
QueueConsumerParams::entity_name
QueueProducer::guid

◆ receive_samples() [1/2]

template<typename T >
dds::sub::LoanedSamples< T > rti::queuing::QueueConsumer< T >::receive_samples ( const dds::core::Duration max_wait)
inline

Waits for multiple samples and provides a loaned container to access them.

Equivalent to using QueueConsumer::receive_samples(int, int, const dds::core::Duration&) with min_count=1 and max_count=dds::core::LENGTH_UNLIMITED.

See also
dds::sub::LoanedSamples
QueueConsumer::receive_samples(int, int, const dds::core::Duration&)

◆ receive_samples() [2/2]

template<typename T >
dds::sub::LoanedSamples< T > rti::queuing::QueueConsumer< T >::receive_samples ( int  min_count,
int  max_count,
const dds::core::Duration max_wait 
)
inline

◆ take_samples() [1/2]

template<typename T >
dds::sub::LoanedSamples< T > rti::queuing::QueueConsumer< T >::take_samples ( )
inline

Provides a loaned container to access the existing samples.

This operation is equivalent to using QueueConsumer::take_samples(int) with max_count=dds::core::LENGTH_UNLIMITED.

See also
dds::sub::LoanedSamples
QueueConsumer::take_samples(int)

◆ take_samples() [2/2]

template<typename T >
dds::sub::LoanedSamples< T > rti::queuing::QueueConsumer< T >::take_samples ( int  max_count)
inline

Provides a loaned container to access the existing samples.

Takes all the existing samples up to max_count and provides a loaned container to access them.

This operation does not make a copy of the data.

The loan is returned with dds::sub::LoanedSamples::return_loan.

This operation may be used after a call to QueueConsumer::wait_for_samples(int, const dds::core::Duration&).

Parameters
max_countThe maximum number of samples that are taken at a time. The special value dds::core::LENGTH_UNLIMITED may be used. This value will read up to the limit specified by rti::core::policy::DataReaderResourceLimits::max_samples_per_read.
Returns
A container with up to max_count elements. May be empty if there were no replies to get.
Exceptions
Oneof the Standard Exceptions
MT Safety:
SAFE
See also
Sample
dds::sub::LoanedSamples
dds::sub::LoanedSamples::return_loan
QueueConsumer::wait_for_samples(int, const dds::core::Duration&)
dds::sub::DataReader::take (for a more detailed description on how QoS and other parameters affect the underlying DataReader)
QueueConsumer::take_samples()

◆ read_samples() [1/2]

template<typename T >
dds::sub::LoanedSamples< T > rti::queuing::QueueConsumer< T >::read_samples ( )
inline

Provides a loaned container to access the existing samples.

This operation is equivalent to QueueConsumer::take_samples(), except the samples remain in the QueueConsumer and can be read or taken again.

◆ read_samples() [2/2]

template<typename T >
dds::sub::LoanedSamples< T > rti::queuing::QueueConsumer< T >::read_samples ( int  max_count)
inline

Provides a loaned container to access the existing samples.

This operation is equivalent to QueueConsumer::take_samples(int), except the samples remain in the QueueConsumer and can be read or taken again.

◆ wait_for_samples() [1/2]

template<typename T >
bool rti::queuing::QueueConsumer< T >::wait_for_samples ( const dds::core::Duration max_wait)
inline

◆ wait_for_samples() [2/2]

template<typename T >
bool rti::queuing::QueueConsumer< T >::wait_for_samples ( int  min_count,
const dds::core::Duration max_wait 
)
inline

Waits for samples.

This operation waits for min_count samples to be available. It will wait up to max_wait .

If this operation is called several times but the available samples are not taken (with QueueConsumer::take_samples(int)), this operation may return immediately and will not wait for new samples. New samples may replace existing ones if they are not taken, depending on the dds::core::policy::History used to configure this QueueConsumer.

Parameters
min_countMinimum number of samples that need to be available for this operation to unblock.
max_waitMaximum waiting time after which this operation unblocks regardless of how many samples are available.
Returns
true if at least min_count samples were available before max_wait elapsed, or false otherwise.
MT Safety:
Concurrent calls to this operation on the same object are not allowed.
See also
QueueConsumer::take_samples(int)

◆ send_availability()

template<typename T >
void rti::queuing::QueueConsumer< T >::send_availability ( ConsumerAvailabilityParams  parameters)
inline

Sends the QueueConsumer availability to Queuing Service.

This method will use the availability DataWriter to send a sample to Queue Service indicating the availability status of the QueueConsumer application.

The method throws dds::core::PreconditionNotMetError if availability is not enabled for this QueueConsumer using QueueConsumerParams::enable_availability.

Exceptions
Oneof the Standard Exceptions ;
Parameters
parametersAvailability status
MT Safety:
SAFE
See also
QueueConsumerParams::enable_availability

◆ has_matching_reader_queue()

template<typename T >
bool rti::queuing::QueueConsumer< T >::has_matching_reader_queue ( )
inline

Checks whether this QueueConsumer has matched at least one SharedReaderQueue.

Exceptions
Oneof the Standard Exceptions ;
Returns
True if this QueueConsumer matches at least one SharedReaderQueue.
MT Safety:
SAFE