11#ifndef RTI_ROUTING_PROCESSOR_INPUT_HPP_
12#define RTI_ROUTING_PROCESSOR_INPUT_HPP_
14#include <dds/core/Value.hpp>
15#include <dds/core/SafeEnumeration.hpp>
16#include <rti/core/NativeValueType.hpp>
17#include <rti/core/Entity.hpp>
18#include <dds/sub/DataReader.hpp>
19#include <dds/core/xtypes/DynamicData.hpp>
21#include "routingservice/routingservice_processor.h"
22#include "routingservice/routingservice_adapter_new.h"
23#include <rti/routing/StreamInfo.hpp>
24#include <rti/routing/adapter/StreamReader.hpp>
25#include <rti/routing/processor/LoanedSamples.hpp>
26#include <rti/routing/processor/Query.hpp>
28namespace rti {
namespace routing {
namespace processor {
32template <
typename Data,
typename Info>
class TypedInput;
34template <
typename Data,
typename Info = dds::sub::SampleInfo>
54 Input(RTI_RoutingServiceInput *native,
56 RTI_RoutingServiceRoute *native_route)
59 native_route_(native_route),
60 stream_info_(*RTI_RoutingServiceInput_get_stream_info(native)),
61 name_(RTI_RoutingServiceInput_get_name(native))
76 const std::string& name()
const
99 template <
typename Data,
typename Info>
114 template <
typename Data>
123 RTI_RoutingServiceInput* native()
131 template <
typename Data,
typename Info>
friend class TypedInput;
133 friend class std::allocator<Input>;
134 RTI_RoutingServiceInput *native_;
136 RTI_RoutingServiceRoute *native_route_;
139 DDS_Entity *native_topic_;
142template <
typename Data,
typename Info = dds::sub::SampleInfo>
153template <
typename Data,
typename Info>
159 TypedInput(Input *input);
174 const std::string&
name()
const;
188 LoanedSamples<Data, Info>
take();
198 LoanedSamples<Data, Info>
read();
229 const dds::topic::Filter& filter);
266 TypedInput<Data, Info>* operator->()
290 using rti::core::detail::create_from_native_entity;
292 DDS_DataReader *native_reader =
293 RTI_RoutingServiceInput_get_dds_reader(input_->native_);
294 if (native_reader == NULL) {
295 throw dds::core::InvalidArgumentError(
296 "invalid argument: input does not hold a DDS StreamReader");
299 typedef dds::sub::DataReader<Data> data_reader_type;
300 return rti::core::detail::create_from_native_entity<data_reader_type>(
354template <
typename Data,
typename Info>
367 : typed_input_(input), query_(dds::core::null)
376 : typed_input_(other.typed_input_),
377 state_(other.state_),
395 const dds::sub::status::DataState& the_state)
397 state_.state(the_state);
410 state_.max_samples(count);
434 const dds::core::InstanceHandle& the_handle)
436 state_.instance(the_handle);
470 const dds::core::InstanceHandle& the_handle)
472 state_.next_instance(the_handle);
490 const dds::topic::Filter& the_filter)
492 state_.filter(the_filter);
515 state_.content(the_query.delegate().get()->query_data_);
531 return typed_input_.take(state_);
546 return typed_input_.read(state_);
556template <
typename Data,
typename Info>
557struct create_data_from_input {
559 static Data get(TypedInput<Data, Info>& )
565template <
typename Info>
566struct create_data_from_input<dds::core::xtypes::DynamicData, Info> {
568 static dds::core::xtypes::DynamicData get(
569 TypedInput<dds::core::xtypes::DynamicData, Info>& input)
571 if (input->stream_info().type_info().type_representation_kind()
572 != TypeRepresentationKind::DYNAMIC_TYPE) {
573 throw dds::core::PreconditionNotMetError(
574 "inconsistent data representation kind");
576 dds::core::xtypes::DynamicType *type_code =
577 static_cast<dds::core::xtypes::DynamicType *
> (
578 input->stream_info().type_info().type_representation());
579 return dds::core::xtypes::DynamicData(*type_code);
583template <
typename Data,
typename Info>
584TypedInput<Data, Info>::TypedInput(Input *input)
589template <
typename Data,
typename Info>
592 return input_->stream_info_;
595template <
typename Data,
typename Info>
598 return input_->name_;
601template <
typename Data,
typename Info>
604 RTI_RoutingServiceLoanedSamples native_samples =
605 RTI_RoutingServiceLoanedSamples_INITIALIZER;
606 if (!RTI_RoutingServiceInput_take(
609 throw dds::core::Error(
"error taking samples from native input");
614template <
typename Data,
typename Info>
617 RTI_RoutingServiceLoanedSamples native_samples =
618 RTI_RoutingServiceLoanedSamples_INITIALIZER;
619 if (!RTI_RoutingServiceInput_read(
622 throw dds::core::Error(
"error reading samples from native input");
627template <
typename Data,
typename Info>
634template <
typename Data,
typename Info>
636 const dds::topic::Filter& filter)
642template <
typename Data,
typename Info>
646 RTI_RoutingServiceLoanedSamples native_samples =
647 RTI_RoutingServiceLoanedSamples_INITIALIZER;
648 if (!RTI_RoutingServiceInput_take_w_selector(
651 &selector_state.native())) {
652 throw dds::core::Error(
"error taking samples with selector from native input");
654 return LoanedSamples<Data, Info>(input_->native_, native_samples);
657template <
typename Data,
typename Info>
661 RTI_RoutingServiceLoanedSamples native_samples =
662 RTI_RoutingServiceLoanedSamples_INITIALIZER;
663 if (!RTI_RoutingServiceInput_read_w_selector(
666 &selector_state.native())) {
667 throw dds::core::Error(
"error reading samples with selector from native input");
669 return LoanedSamples<Data, Info>(input_->native_, native_samples);
672template <
typename Data,
typename Info>
675 return RTI_RoutingServiceInput_is_active(input_->native_) ? true :
false;
678template <
typename Data,
typename Info>
681 return create_data_from_input<Data, Info>::get(*
this);
692typedef dds::sub::DataReader<dds::core::xtypes::DynamicData> DynamicDataReader;
Definition of the stream information that RTI Routing Service needs to manage user data streams.
Definition: StreamInfo.hpp:106
Defines a set of attributes that can be used to read a subset of data from StreamReader.
Definition: StreamReader.hpp:547
Provides temporary access to a collection of samples (data and info) from a TypedInput.
Definition: LoanedSamples.hpp:81
Encapsulates a content query to select data from a rti::routing::adapter::StreamReader.
Definition: Query.hpp:92
Representation of the Route object that owns a Processor.
Definition: Route.hpp:84
An element that allows reading data that meet a set of specified attributes.
Definition: Input.hpp:355
LoanedSamples< Data, Info > read()
Read samples based on the state associated with this Selector.
Definition: Input.hpp:544
Selector & max_samples(const int32_t count)
Choose to only read/take up to a maximum number of samples.
Definition: Input.hpp:407
Selector(const rti::routing::processor::TypedInput< Data, Info > input)
Create a Selector for a TypedInput.
Definition: Input.hpp:366
Selector & filter(const dds::topic::Filter &the_filter)
Select samples based on a content filter parameters.
Definition: Input.hpp:489
Selector & instance(const dds::core::InstanceHandle &the_handle)
Select a specific instance to read/take.
Definition: Input.hpp:433
Selector & next_instance(const dds::core::InstanceHandle &the_handle)
Select the instance after a specific instance.
Definition: Input.hpp:469
Selector(const Selector &other)
Copy constructor.
Definition: Input.hpp:375
LoanedSamples< Data, Info > take()
Take samples based on the state associated with this Selector.
Definition: Input.hpp:529
Selector & query(const rti::routing::processor::Query &the_query)
Select samples based on a rti::routing::processor::Query.
Definition: Input.hpp:512
Selector & state(const dds::sub::status::DataState &the_state)
Select a specific dds::sub::status::DataState.
Definition: Input.hpp:394