11#ifndef RTI_ROUTING_PROCESSOR_INPUT_HPP_
12#define RTI_ROUTING_PROCESSOR_INPUT_HPP_
14#include <dds/core/Value.hpp>
15#include <dds/core/SafeEnumeration.hpp>
16#include <rti/core/NativeValueType.hpp>
17#include <rti/core/Entity.hpp>
18#include <dds/sub/DataReader.hpp>
19#include <dds/core/xtypes/DynamicData.hpp>
21#include "routingservice/routingservice_processor.h"
22#include "routingservice/routingservice_adapter_new.h"
23#include <rti/routing/StreamInfo.hpp>
24#include <rti/routing/adapter/StreamReader.hpp>
25#include <rti/routing/processor/LoanedSamples.hpp>
26#include <rti/routing/processor/Query.hpp>
28namespace rti {
namespace routing {
namespace processor {
32template <
typename Data,
typename Info>
class TypedInput;
34template <
typename Data,
typename Info = dds::sub::SampleInfo>
54 Input(RTI_RoutingServiceInput *native,
56 RTI_RoutingServiceRoute *native_route)
59 native_route_(native_route),
61 name_(RTI_RoutingServiceInput_get_name(native))
63 const RTI_RoutingServiceStreamInfo *native_stream_info =
64 RTI_RoutingServiceInput_get_stream_info(native);
65 if (native_stream_info ==
nullptr) {
66 throw dds::core::InvalidArgumentError(
67 "invalid argument: invalid StreamInfo for input");
69 stream_info_ = *native_stream_info;
83 const std::string& name()
const
106 template <
typename Data,
typename Info>
121 template <
typename Data>
130 RTI_RoutingServiceInput* native()
138 template <
typename Data,
typename Info>
friend class TypedInput;
140 friend class std::allocator<Input>;
141 RTI_RoutingServiceInput *native_;
143 RTI_RoutingServiceRoute *native_route_;
148template <
typename Data,
typename Info = dds::sub::SampleInfo>
159template <
typename Data,
typename Info>
165 TypedInput(Input *input);
180 const std::string&
name()
const;
194 LoanedSamples<Data, Info>
take();
204 LoanedSamples<Data, Info>
read();
235 const dds::topic::Filter& filter);
272 TypedInput<Data, Info>* operator->()
296 using rti::core::detail::create_from_native_entity;
298 DDS_DataReader *native_reader =
299 RTI_RoutingServiceInput_get_dds_reader(input_->native_);
300 if (native_reader == NULL) {
301 throw dds::core::InvalidArgumentError(
302 "invalid argument: input does not hold a DDS StreamReader");
305 typedef dds::sub::DataReader<Data> data_reader_type;
306 return rti::core::detail::create_from_native_entity<data_reader_type>(
360template <
typename Data,
typename Info>
373 : typed_input_(input), query_(dds::core::null)
382 : typed_input_(other.typed_input_),
383 state_(other.state_),
401 const dds::sub::status::DataState& the_state)
403 state_.state(the_state);
416 state_.max_samples(count);
440 const dds::core::InstanceHandle& the_handle)
442 state_.instance(the_handle);
476 const dds::core::InstanceHandle& the_handle)
478 state_.next_instance(the_handle);
496 const dds::topic::Filter& the_filter)
498 state_.filter(the_filter);
521 state_.content(the_query.delegate().get()->query_data_);
537 return typed_input_.take(state_);
552 return typed_input_.read(state_);
562template <
typename Data,
typename Info>
563struct create_data_from_input {
565 static Data get(TypedInput<Data, Info>& )
571template <
typename Info>
572struct create_data_from_input<dds::core::xtypes::DynamicData, Info> {
574 static dds::core::xtypes::DynamicData get(
575 TypedInput<dds::core::xtypes::DynamicData, Info>& input)
577 if (input->stream_info().type_info().type_representation_kind()
578 != TypeRepresentationKind::DYNAMIC_TYPE) {
579 throw dds::core::PreconditionNotMetError(
580 "inconsistent data representation kind");
582 dds::core::xtypes::DynamicType *type_code =
583 static_cast<dds::core::xtypes::DynamicType *
> (
584 input->stream_info().type_info().type_representation());
585 return dds::core::xtypes::DynamicData(*type_code);
589template <
typename Data,
typename Info>
590TypedInput<Data, Info>::TypedInput(Input *input)
595template <
typename Data,
typename Info>
598 return input_->stream_info_;
601template <
typename Data,
typename Info>
604 return input_->name_;
607template <
typename Data,
typename Info>
610 RTI_RoutingServiceLoanedSamples native_samples =
611 RTI_RoutingServiceLoanedSamples_INITIALIZER;
612 if (!RTI_RoutingServiceInput_take(
615 throw dds::core::Error(
"error taking samples from native input");
620template <
typename Data,
typename Info>
623 RTI_RoutingServiceLoanedSamples native_samples =
624 RTI_RoutingServiceLoanedSamples_INITIALIZER;
625 if (!RTI_RoutingServiceInput_read(
628 throw dds::core::Error(
"error reading samples from native input");
633template <
typename Data,
typename Info>
640template <
typename Data,
typename Info>
642 const dds::topic::Filter& filter)
648template <
typename Data,
typename Info>
652 RTI_RoutingServiceLoanedSamples native_samples =
653 RTI_RoutingServiceLoanedSamples_INITIALIZER;
654 if (!RTI_RoutingServiceInput_take_w_selector(
657 &selector_state.native())) {
658 throw dds::core::Error(
"error taking samples with selector from native input");
660 return LoanedSamples<Data, Info>(input_->native_, native_samples);
663template <
typename Data,
typename Info>
667 RTI_RoutingServiceLoanedSamples native_samples =
668 RTI_RoutingServiceLoanedSamples_INITIALIZER;
669 if (!RTI_RoutingServiceInput_read_w_selector(
672 &selector_state.native())) {
673 throw dds::core::Error(
"error reading samples with selector from native input");
675 return LoanedSamples<Data, Info>(input_->native_, native_samples);
678template <
typename Data,
typename Info>
681 return RTI_RoutingServiceInput_is_active(input_->native_) ? true :
false;
684template <
typename Data,
typename Info>
687 return create_data_from_input<Data, Info>::get(*
this);
698typedef dds::sub::DataReader<dds::core::xtypes::DynamicData> DynamicDataReader;
Definition of the stream information that RTI Routing Service needs to manage user data streams.
Definition: StreamInfo.hpp:106
Defines a set of attributes that can be used to read a subset of data from StreamReader.
Definition: StreamReader.hpp:547
Provides temporary access to a collection of samples (data and info) from a TypedInput.
Definition: LoanedSamples.hpp:81
Encapsulates a content query to select data from a rti::routing::adapter::StreamReader.
Definition: Query.hpp:92
Representation of the Route object that owns a Processor.
Definition: Route.hpp:84
An element that allows reading data that meet a set of specified attributes.
Definition: Input.hpp:361
LoanedSamples< Data, Info > read()
Read samples based on the state associated with this Selector.
Definition: Input.hpp:550
Selector & max_samples(const int32_t count)
Choose to only read/take up to a maximum number of samples.
Definition: Input.hpp:413
Selector(const rti::routing::processor::TypedInput< Data, Info > input)
Create a Selector for a TypedInput.
Definition: Input.hpp:372
Selector & filter(const dds::topic::Filter &the_filter)
Select samples based on a content filter parameters.
Definition: Input.hpp:495
Selector & instance(const dds::core::InstanceHandle &the_handle)
Select a specific instance to read/take.
Definition: Input.hpp:439
Selector & next_instance(const dds::core::InstanceHandle &the_handle)
Select the instance after a specific instance.
Definition: Input.hpp:475
Selector(const Selector &other)
Copy constructor.
Definition: Input.hpp:381
LoanedSamples< Data, Info > take()
Take samples based on the state associated with this Selector.
Definition: Input.hpp:535
Selector & query(const rti::routing::processor::Query &the_query)
Select samples based on a rti::routing::processor::Query.
Definition: Input.hpp:518
Selector & state(const dds::sub::status::DataState &the_state)
Select a specific dds::sub::status::DataState.
Definition: Input.hpp:400