RTI Routing Service Version 7.3.0
Input.hpp
1/*
2 * (c) Copyright, Real-Time Innovations, 2017.
3 * All rights reserved.
4 *
5 * No duplications, whole or partial, manual or electronic, may be made
6 * without express written permission. Any such copies, or
7 * revisions thereof, must display this notice unaltered.
8 * This code contains trade secrets of Real-Time Innovations, Inc.
9 */
10
11#ifndef RTI_ROUTING_PROCESSOR_INPUT_HPP_
12#define RTI_ROUTING_PROCESSOR_INPUT_HPP_
13
14#include <dds/core/Value.hpp>
15#include <dds/core/SafeEnumeration.hpp>
16#include <rti/core/NativeValueType.hpp>
17#include <rti/core/Entity.hpp>
18#include <dds/sub/DataReader.hpp>
19#include <dds/core/xtypes/DynamicData.hpp>
20
21#include "routingservice/routingservice_processor.h"
22#include "routingservice/routingservice_adapter_new.h"
23#include <rti/routing/StreamInfo.hpp>
24#include <rti/routing/adapter/StreamReader.hpp>
25#include <rti/routing/processor/LoanedSamples.hpp>
26#include <rti/routing/processor/Query.hpp>
27
28namespace rti { namespace routing { namespace processor {
29
30class Route;
31
32template <typename Data, typename Info> class TypedInput;
33
34template <typename Data, typename Info = dds::sub::SampleInfo>
35class TypedInput;
36
49class Input {
50public:
51
52 /*i
53 */
54 Input(RTI_RoutingServiceInput *native,
55 int32_t index,
56 RTI_RoutingServiceRoute *native_route)
57 : native_(native),
58 index_(index),
59 native_route_(native_route),
60 stream_info_("", ""),
61 name_(RTI_RoutingServiceInput_get_name(native))
62 {
63 const RTI_RoutingServiceStreamInfo *native_stream_info =
64 RTI_RoutingServiceInput_get_stream_info(native);
65 if (native_stream_info == nullptr) {
66 throw dds::core::InvalidArgumentError(
67 "invalid argument: invalid StreamInfo for input");
68 }
69 stream_info_ = *native_stream_info;
70 }
71
72 /*i
73 *
74 */
75 const rti::routing::StreamInfo& stream_info() const
76 {
77 return stream_info_;
78 }
79
80 /*i
81 *
82 */
83 const std::string& name() const
84 {
85 return name_;
86 }
87
88 /*i
89 *
90 */
91 int32_t index()
92 {
93 return index_;
94 }
95
106 template <typename Data, typename Info>
108 {
109 return this;
110 }
111
121 template <typename Data>
123 {
124 return this;
125 }
126
127 /*i
128 *
129 */
130 RTI_RoutingServiceInput* native()
131 {
132 return native_;
133 }
134
135 // Implementation details
136private:
137
138 template <typename Data, typename Info> friend class TypedInput;
139 friend class Route;
140 friend class std::allocator<Input>;
141 RTI_RoutingServiceInput *native_;
142 int32_t index_;
143 RTI_RoutingServiceRoute *native_route_;
144 rti::routing::StreamInfo stream_info_;
145 std::string name_;
146};
147
148template <typename Data, typename Info = dds::sub::SampleInfo>
149class Selector;
150
159template <typename Data, typename Info>
160class TypedInput{
161public:
162
164
165 TypedInput(Input *input);
166
174
180 const std::string& name() const;
181
194 LoanedSamples<Data, Info> take();
195
204 LoanedSamples<Data, Info> read();
205
232 Selector select();
233
235 const dds::topic::Filter& filter);
236
248 bool active();
249
267 Data create_data();
268
269 /*i
270 *
271 */
272 TypedInput<Data, Info>* operator->()
273 {
274 return this;
275 }
276
277
294 dds::sub::DataReader<Data> dds_data_reader()
295 {
296 using rti::core::detail::create_from_native_entity;
297
298 DDS_DataReader *native_reader =
299 RTI_RoutingServiceInput_get_dds_reader(input_->native_);
300 if (native_reader == NULL) {
301 throw dds::core::InvalidArgumentError(
302 "invalid argument: input does not hold a DDS StreamReader");
303 }
304
305 typedef dds::sub::DataReader<Data> data_reader_type;
306 return rti::core::detail::create_from_native_entity<data_reader_type>(
307 native_reader);
308 }
309
310private:
311 TypedInput();
312
313 friend class rti::routing::processor::Selector<Data, Info>;
314
316 const rti::routing::adapter::SelectorState& selector_state);
317
319 const rti::routing::adapter::SelectorState& selector_state);
320
322 Input *input_;
323};
324
325
360template <typename Data, typename Info>
361class Selector {
362public:
363
373 : typed_input_(input), query_(dds::core::null)
374 {
375
376 }
377
381 Selector(const Selector& other)
382 : typed_input_(other.typed_input_),
383 state_(other.state_),
384 query_(other.query_)
385 {
386
387 }
388
401 const dds::sub::status::DataState& the_state)
402 {
403 state_.state(the_state);
404 return *this;
405 }
406
414 const int32_t count)
415 {
416 state_.max_samples(count);
417 return *this;
418 }
419
440 const dds::core::InstanceHandle& the_handle)
441 {
442 state_.instance(the_handle);
443 return *this;
444 }
445
476 const dds::core::InstanceHandle& the_handle)
477 {
478 state_.next_instance(the_handle);
479 return *this;
480 }
481
496 const dds::topic::Filter& the_filter)
497 {
498 state_.filter(the_filter);
499 return *this;
500 }
501
502
519 {
520 query_ = the_query;
521 state_.content(the_query.delegate().get()->query_data_);
522 return *this;
523 }
524
536 {
537 return typed_input_.take(state_);
538 }
539
551 {
552 return typed_input_.read(state_);
553 }
554
555private:
559};
560
561
562template <typename Data, typename Info>
563struct create_data_from_input {
564
565 static Data get(TypedInput<Data, Info>& )
566 {
567 return Data();
568 }
569};
570
571template <typename Info>
572struct create_data_from_input<dds::core::xtypes::DynamicData, Info> {
573
574 static dds::core::xtypes::DynamicData get(
575 TypedInput<dds::core::xtypes::DynamicData, Info>& input)
576 {
577 if (input->stream_info().type_info().type_representation_kind()
578 != TypeRepresentationKind::DYNAMIC_TYPE) {
579 throw dds::core::PreconditionNotMetError(
580 "inconsistent data representation kind");
581 }
582 dds::core::xtypes::DynamicType *type_code =
583 static_cast<dds::core::xtypes::DynamicType *> (
584 input->stream_info().type_info().type_representation());
585 return dds::core::xtypes::DynamicData(*type_code);
586 }
587};
588
589template <typename Data, typename Info>
590TypedInput<Data, Info>::TypedInput(Input *input)
591: input_(input)
592{
593}
594
595template <typename Data, typename Info>
597{
598 return input_->stream_info_;
599}
600
601template <typename Data, typename Info>
602const std::string& TypedInput<Data, Info>::name() const
603{
604 return input_->name_;
605}
606
607template <typename Data, typename Info>
609{
610 RTI_RoutingServiceLoanedSamples native_samples =
611 RTI_RoutingServiceLoanedSamples_INITIALIZER;
612 if (!RTI_RoutingServiceInput_take(
613 input_->native_,
614 &native_samples)) {
615 throw dds::core::Error("error taking samples from native input");
616 }
617 return LoanedSamples<Data, Info>(input_->native_, native_samples);
618}
619
620template <typename Data, typename Info>
622{
623 RTI_RoutingServiceLoanedSamples native_samples =
624 RTI_RoutingServiceLoanedSamples_INITIALIZER;
625 if (!RTI_RoutingServiceInput_read(
626 input_->native_,
627 &native_samples)) {
628 throw dds::core::Error("error reading samples from native input");
629 }
630 return LoanedSamples<Data, Info>(input_->native_, native_samples);
631}
632
633template <typename Data, typename Info>
636{
638}
639
640template <typename Data, typename Info>
642 const dds::topic::Filter& filter)
643{
644 return rti::routing::processor::Query(input_->native_, filter);
645
646}
647
648template <typename Data, typename Info>
649LoanedSamples<Data, Info> TypedInput<Data, Info>::take(
650 const rti::routing::adapter::SelectorState& selector_state)
651{
652 RTI_RoutingServiceLoanedSamples native_samples =
653 RTI_RoutingServiceLoanedSamples_INITIALIZER;
654 if (!RTI_RoutingServiceInput_take_w_selector(
655 input_->native_,
656 &native_samples,
657 &selector_state.native())) {
658 throw dds::core::Error("error taking samples with selector from native input");
659 }
660 return LoanedSamples<Data, Info>(input_->native_, native_samples);
661}
662
663template <typename Data, typename Info>
664LoanedSamples<Data, Info> TypedInput<Data, Info>::read(
665 const rti::routing::adapter::SelectorState& selector_state)
666{
667 RTI_RoutingServiceLoanedSamples native_samples =
668 RTI_RoutingServiceLoanedSamples_INITIALIZER;
669 if (!RTI_RoutingServiceInput_read_w_selector(
670 input_->native_,
671 &native_samples,
672 &selector_state.native())) {
673 throw dds::core::Error("error reading samples with selector from native input");
674 }
675 return LoanedSamples<Data, Info>(input_->native_, native_samples);
676}
677
678template <typename Data, typename Info>
680{
681 return RTI_RoutingServiceInput_is_active(input_->native_) ? true : false;
682}
683
684template <typename Data, typename Info>
686{
687 return create_data_from_input<Data, Info>::get(*this);
688}
689
698typedef dds::sub::DataReader<dds::core::xtypes::DynamicData> DynamicDataReader;
699
700} } }
701
702
703#endif // RTI_ROUTING_PROCESSOR_INPUT_HPP_
Definition of the stream information that RTI Routing Service needs to manage user data streams.
Definition: StreamInfo.hpp:106
Defines a set of attributes that can be used to read a subset of data from StreamReader.
Definition: StreamReader.hpp:547
Generic Representation of a Route's input.
Definition: Input.hpp:49
TypedInput< Data, dds::sub::SampleInfo > get()
Returns a typed version of this Input.
Definition: Input.hpp:122
TypedInput< Data, Info > get()
Returns a typed version of this Input.
Definition: Input.hpp:107
Provides temporary access to a collection of samples (data and info) from a TypedInput.
Definition: LoanedSamples.hpp:81
Encapsulates a content query to select data from a rti::routing::adapter::StreamReader.
Definition: Query.hpp:92
Representation of the Route object that owns a Processor.
Definition: Route.hpp:84
An element that allows reading data that meet a set of specified attributes.
Definition: Input.hpp:361
LoanedSamples< Data, Info > read()
Read samples based on the state associated with this Selector.
Definition: Input.hpp:550
Selector & max_samples(const int32_t count)
Choose to only read/take up to a maximum number of samples.
Definition: Input.hpp:413
Selector(const rti::routing::processor::TypedInput< Data, Info > input)
Create a Selector for a TypedInput.
Definition: Input.hpp:372
Selector & filter(const dds::topic::Filter &the_filter)
Select samples based on a content filter parameters.
Definition: Input.hpp:495
Selector & instance(const dds::core::InstanceHandle &the_handle)
Select a specific instance to read/take.
Definition: Input.hpp:439
Selector & next_instance(const dds::core::InstanceHandle &the_handle)
Select the instance after a specific instance.
Definition: Input.hpp:475
Selector(const Selector &other)
Copy constructor.
Definition: Input.hpp:381
LoanedSamples< Data, Info > take()
Take samples based on the state associated with this Selector.
Definition: Input.hpp:535
Selector & query(const rti::routing::processor::Query &the_query)
Select samples based on a rti::routing::processor::Query.
Definition: Input.hpp:518
Selector & state(const dds::sub::status::DataState &the_state)
Select a specific dds::sub::status::DataState.
Definition: Input.hpp:400
Representation of an Input whose data representation is DataRep, whose info representation is InfoRep...
Definition: Input.hpp:149
bool active()
Indicates whether this input has received new data since the last time any variant of the read operat...
Definition: Input.hpp:679
LoanedSamples< Data, Info > take()
Returns all the available samples in this object.
Definition: Input.hpp:608
dds::sub::DataReader< Data > dds_data_reader()
Returns the underlying DDS DataReader that is part of this StreamReader implementation,...
Definition: Input.hpp:294
Data create_data()
Creates a new data sample from this input.
Definition: Input.hpp:685
const rti::routing::StreamInfo & stream_info() const
Returns the StreamInfo associated with this object.
Definition: Input.hpp:596
const std::string & name() const
Returns the name of this output.
Definition: Input.hpp:602
LoanedSamples< Data, Info > read()
Same as take() but this calls rti::routing::adapter::StreamReader::read instead.
Definition: Input.hpp:621
Selector select()
Gets a Selector to perform complex data selections, such as per-instance selection,...
Definition: Input.hpp:635