11 #ifndef RTI_ROUTING_PROCESSOR_INPUT_HPP_ 12 #define RTI_ROUTING_PROCESSOR_INPUT_HPP_ 14 #include <dds/core/Value.hpp> 15 #include <dds/core/SafeEnumeration.hpp> 16 #include <rti/core/NativeValueType.hpp> 17 #include <rti/core/Entity.hpp> 18 #include <dds/sub/DataReader.hpp> 19 #include <dds/core/xtypes/DynamicData.hpp> 21 #include "routingservice/routingservice_processor.h" 22 #include "routingservice/routingservice_adapter_new.h" 23 #include <rti/routing/StreamInfo.hpp> 24 #include <rti/routing/adapter/StreamReader.hpp> 25 #include <rti/routing/processor/LoanedSamples.hpp> 26 #include <rti/routing/processor/Query.hpp> 28 namespace rti {
namespace routing {
namespace processor {
32 template <
typename Data,
typename Info>
class TypedInput;
34 template <
typename Data,
typename Info = dds::sub::SampleInfo>
54 Input(RTI_RoutingServiceInput *native,
56 RTI_RoutingServiceRoute *native_route)
59 native_route_(native_route),
60 stream_info_(*RTI_RoutingServiceInput_get_stream_info(native)),
61 name_(RTI_RoutingServiceInput_get_name(native))
76 const std::string& name()
const 99 template <
typename Data,
typename Info>
114 template <
typename Data>
123 RTI_RoutingServiceInput* native()
131 template <
typename Data,
typename Info>
friend class TypedInput;
133 friend class std::allocator<Input>;
134 RTI_RoutingServiceInput *native_;
136 RTI_RoutingServiceRoute *native_route_;
139 DDS_Entity *native_topic_;
142 template <
typename Data,
typename Info = dds::sub::SampleInfo>
153 template <
typename Data,
typename Info>
174 const std::string& name()
const;
229 const dds::topic::Filter& filter);
290 using rti::core::detail::create_from_native_entity;
292 DDS_DataReader *native_reader =
293 RTI_RoutingServiceInput_get_dds_reader(input_->native_);
294 if (native_reader == NULL) {
295 throw dds::core::InvalidArgumentError(
296 "invalid argument: input does not hold a DDS StreamReader");
299 typedef dds::sub::DataReader<Data> data_reader_type;
300 return rti::core::detail::create_from_native_entity<data_reader_type>(
354 template <
typename Data,
typename Info>
367 : typed_input_(input), query_(dds::core::null)
376 : typed_input_(other.typed_input_),
377 state_(other.state_),
395 const dds::sub::status::DataState& the_state)
397 state_.
state(the_state);
434 const dds::core::InstanceHandle& the_handle)
470 const dds::core::InstanceHandle& the_handle)
490 const dds::topic::Filter& the_filter)
492 state_.
filter(the_filter);
515 state_.content(the_query.delegate().get()->query_data_);
531 return typed_input_.take(state_);
546 return typed_input_.read(state_);
556 template <
typename Data,
typename Info>
557 struct create_data_from_input {
565 template <
typename Info>
566 struct create_data_from_input<dds::core::xtypes::DynamicData, Info> {
568 static dds::core::xtypes::DynamicData
get(
571 if (input->stream_info().type_info().type_representation_kind()
572 != TypeRepresentationKind::DYNAMIC_TYPE) {
573 throw dds::core::PreconditionNotMetError(
574 "inconsistent data representation kind");
576 dds::core::xtypes::DynamicType *type_code =
577 static_cast<dds::core::xtypes::DynamicType *
> (
578 input->stream_info().type_info().type_representation());
579 return dds::core::xtypes::DynamicData(*type_code);
583 template <
typename Data,
typename Info>
589 template <
typename Data,
typename Info>
592 return input_->stream_info_;
595 template <
typename Data,
typename Info>
598 return input_->name_;
601 template <
typename Data,
typename Info>
604 RTI_RoutingServiceLoanedSamples native_samples;
605 if (!RTI_RoutingServiceInput_take(
608 throw dds::core::Error(
"error taking samples from native input");
613 template <
typename Data,
typename Info>
616 RTI_RoutingServiceLoanedSamples native_samples;
617 if (!RTI_RoutingServiceInput_read(
620 throw dds::core::Error(
"error reading samples from native input");
625 template <
typename Data,
typename Info>
632 template <
typename Data,
typename Info>
634 const dds::topic::Filter& filter)
640 template <
typename Data,
typename Info>
644 RTI_RoutingServiceLoanedSamples native_samples;
645 if (!RTI_RoutingServiceInput_take_w_selector(
648 &selector_state.native())) {
649 throw dds::core::Error(
"error taking samples with selector from native input");
654 template <
typename Data,
typename Info>
658 RTI_RoutingServiceLoanedSamples native_samples;
659 if (!RTI_RoutingServiceInput_read_w_selector(
662 &selector_state.native())) {
663 throw dds::core::Error(
"error reading samples with selector from native input");
668 template <
typename Data,
typename Info>
671 return RTI_RoutingServiceInput_is_active(input_->native_) ? true :
false;
674 template <
typename Data,
typename Info>
677 return create_data_from_input<Data, Info>::get(*
this);
688 typedef dds::sub::DataReader<dds::core::xtypes::DynamicData> DynamicDataReader;
693 #endif // RTI_ROUTING_PROCESSOR_INPUT_HPP_ Definition of the stream information that RTI Routing Service needs to manage user data streams...
Definition: StreamInfo.hpp:106
Provides temporary access to a collection of samples (data and info) from a TypedInput.
Definition: LoanedSamples.hpp:81
Selector & instance(const dds::core::InstanceHandle &the_handle)
Select a specific instance to read/take.
Definition: Input.hpp:433
Selector & state(const dds::sub::status::DataState &the_state)
Select a specific dds::sub::status::DataState.
Definition: Input.hpp:394
Selector & filter(const dds::topic::Filter &the_filter)
Select samples based on a content filter parameters.
Definition: Input.hpp:489
An element that allows reading data that meet a set of specified attributes.
Definition: Input.hpp:143
Representation of the Route object that owns a Processor.
Definition: Route.hpp:84
LoanedSamples< Data, Info > take()
Take samples based on the state associated with this Selector.
Definition: Input.hpp:529
Selector(const Selector &other)
Copy constructor.
Definition: Input.hpp:375
Encapsulates a content query to select data from a rti::routing::adapter::StreamReader.
Definition: Query.hpp:92
Selector & query(const rti::routing::processor::Query &the_query)
Select samples based on a rti::routing::processor::Query.
Definition: Input.hpp:512
Definition: AdapterPlugin.hpp:25
Defines a set of attributes that can be used to read a subset of data from StreamReader.
Definition: StreamReader.hpp:547
Selector & next_instance(const dds::core::InstanceHandle &the_handle)
Select the instance after a specific instance.
Definition: Input.hpp:469
Selector(const rti::routing::processor::TypedInput< Data, Info > input)
Create a Selector for a TypedInput.
Definition: Input.hpp:366
LoanedSamples< Data, Info > read()
Read samples based on the state associated with this Selector.
Definition: Input.hpp:544
Selector & max_samples(const int32_t count)
Choose to only read/take up to a maximum number of samples.
Definition: Input.hpp:407