RTI Routing Service Version 7.2.0
Input.hpp
1/*
2 * (c) Copyright, Real-Time Innovations, 2017.
3 * All rights reserved.
4 *
5 * No duplications, whole or partial, manual or electronic, may be made
6 * without express written permission. Any such copies, or
7 * revisions thereof, must display this notice unaltered.
8 * This code contains trade secrets of Real-Time Innovations, Inc.
9 */
10
11#ifndef RTI_ROUTING_PROCESSOR_INPUT_HPP_
12#define RTI_ROUTING_PROCESSOR_INPUT_HPP_
13
14#include <dds/core/Value.hpp>
15#include <dds/core/SafeEnumeration.hpp>
16#include <rti/core/NativeValueType.hpp>
17#include <rti/core/Entity.hpp>
18#include <dds/sub/DataReader.hpp>
19#include <dds/core/xtypes/DynamicData.hpp>
20
21#include "routingservice/routingservice_processor.h"
22#include "routingservice/routingservice_adapter_new.h"
23#include <rti/routing/StreamInfo.hpp>
24#include <rti/routing/adapter/StreamReader.hpp>
25#include <rti/routing/processor/LoanedSamples.hpp>
26#include <rti/routing/processor/Query.hpp>
27
28namespace rti { namespace routing { namespace processor {
29
30class Route;
31
32template <typename Data, typename Info> class TypedInput;
33
34template <typename Data, typename Info = dds::sub::SampleInfo>
35class TypedInput;
36
49class Input {
50public:
51
52 /*i
53 */
54 Input(RTI_RoutingServiceInput *native,
55 int32_t index,
56 RTI_RoutingServiceRoute *native_route)
57 : native_(native),
58 index_(index),
59 native_route_(native_route),
60 stream_info_(*RTI_RoutingServiceInput_get_stream_info(native)),
61 name_(RTI_RoutingServiceInput_get_name(native))
62 {
63 }
64
65 /*i
66 *
67 */
68 const rti::routing::StreamInfo& stream_info() const
69 {
70 return stream_info_;
71 }
72
73 /*i
74 *
75 */
76 const std::string& name() const
77 {
78 return name_;
79 }
80
81 /*i
82 *
83 */
84 int32_t index()
85 {
86 return index_;
87 }
88
99 template <typename Data, typename Info>
101 {
102 return this;
103 }
104
114 template <typename Data>
116 {
117 return this;
118 }
119
120 /*i
121 *
122 */
123 RTI_RoutingServiceInput* native()
124 {
125 return native_;
126 }
127
128 // Implementation details
129private:
130
131 template <typename Data, typename Info> friend class TypedInput;
132 friend class Route;
133 friend class std::allocator<Input>;
134 RTI_RoutingServiceInput *native_;
135 int32_t index_;
136 RTI_RoutingServiceRoute *native_route_;
137 rti::routing::StreamInfo stream_info_;
138 std::string name_;
139 DDS_Entity *native_topic_;
140};
141
142template <typename Data, typename Info = dds::sub::SampleInfo>
143class Selector;
144
153template <typename Data, typename Info>
154class TypedInput{
155public:
156
158
159 TypedInput(Input *input);
160
168
174 const std::string& name() const;
175
188 LoanedSamples<Data, Info> take();
189
198 LoanedSamples<Data, Info> read();
199
226 Selector select();
227
229 const dds::topic::Filter& filter);
230
242 bool active();
243
261 Data create_data();
262
263 /*i
264 *
265 */
266 TypedInput<Data, Info>* operator->()
267 {
268 return this;
269 }
270
271
288 dds::sub::DataReader<Data> dds_data_reader()
289 {
290 using rti::core::detail::create_from_native_entity;
291
292 DDS_DataReader *native_reader =
293 RTI_RoutingServiceInput_get_dds_reader(input_->native_);
294 if (native_reader == NULL) {
295 throw dds::core::InvalidArgumentError(
296 "invalid argument: input does not hold a DDS StreamReader");
297 }
298
299 typedef dds::sub::DataReader<Data> data_reader_type;
300 return rti::core::detail::create_from_native_entity<data_reader_type>(
301 native_reader);
302 }
303
304private:
305 TypedInput();
306
307 friend class rti::routing::processor::Selector<Data, Info>;
308
310 const rti::routing::adapter::SelectorState& selector_state);
311
313 const rti::routing::adapter::SelectorState& selector_state);
314
316 Input *input_;
317};
318
319
354template <typename Data, typename Info>
355class Selector {
356public:
357
367 : typed_input_(input), query_(dds::core::null)
368 {
369
370 }
371
375 Selector(const Selector& other)
376 : typed_input_(other.typed_input_),
377 state_(other.state_),
378 query_(other.query_)
379 {
380
381 }
382
395 const dds::sub::status::DataState& the_state)
396 {
397 state_.state(the_state);
398 return *this;
399 }
400
408 const int32_t count)
409 {
410 state_.max_samples(count);
411 return *this;
412 }
413
434 const dds::core::InstanceHandle& the_handle)
435 {
436 state_.instance(the_handle);
437 return *this;
438 }
439
470 const dds::core::InstanceHandle& the_handle)
471 {
472 state_.next_instance(the_handle);
473 return *this;
474 }
475
490 const dds::topic::Filter& the_filter)
491 {
492 state_.filter(the_filter);
493 return *this;
494 }
495
496
513 {
514 query_ = the_query;
515 state_.content(the_query.delegate().get()->query_data_);
516 return *this;
517 }
518
530 {
531 return typed_input_.take(state_);
532 }
533
545 {
546 return typed_input_.read(state_);
547 }
548
549private:
553};
554
555
556template <typename Data, typename Info>
557struct create_data_from_input {
558
559 static Data get(TypedInput<Data, Info>& )
560 {
561 return Data();
562 }
563};
564
565template <typename Info>
566struct create_data_from_input<dds::core::xtypes::DynamicData, Info> {
567
568 static dds::core::xtypes::DynamicData get(
569 TypedInput<dds::core::xtypes::DynamicData, Info>& input)
570 {
571 if (input->stream_info().type_info().type_representation_kind()
572 != TypeRepresentationKind::DYNAMIC_TYPE) {
573 throw dds::core::PreconditionNotMetError(
574 "inconsistent data representation kind");
575 }
576 dds::core::xtypes::DynamicType *type_code =
577 static_cast<dds::core::xtypes::DynamicType *> (
578 input->stream_info().type_info().type_representation());
579 return dds::core::xtypes::DynamicData(*type_code);
580 }
581};
582
583template <typename Data, typename Info>
584TypedInput<Data, Info>::TypedInput(Input *input)
585: input_(input)
586{
587}
588
589template <typename Data, typename Info>
591{
592 return input_->stream_info_;
593}
594
595template <typename Data, typename Info>
596const std::string& TypedInput<Data, Info>::name() const
597{
598 return input_->name_;
599}
600
601template <typename Data, typename Info>
603{
604 RTI_RoutingServiceLoanedSamples native_samples =
605 RTI_RoutingServiceLoanedSamples_INITIALIZER;
606 if (!RTI_RoutingServiceInput_take(
607 input_->native_,
608 &native_samples)) {
609 throw dds::core::Error("error taking samples from native input");
610 }
611 return LoanedSamples<Data, Info>(input_->native_, native_samples);
612}
613
614template <typename Data, typename Info>
616{
617 RTI_RoutingServiceLoanedSamples native_samples =
618 RTI_RoutingServiceLoanedSamples_INITIALIZER;
619 if (!RTI_RoutingServiceInput_read(
620 input_->native_,
621 &native_samples)) {
622 throw dds::core::Error("error reading samples from native input");
623 }
624 return LoanedSamples<Data, Info>(input_->native_, native_samples);
625}
626
627template <typename Data, typename Info>
630{
632}
633
634template <typename Data, typename Info>
636 const dds::topic::Filter& filter)
637{
638 return rti::routing::processor::Query(input_->native_, filter);
639
640}
641
642template <typename Data, typename Info>
643LoanedSamples<Data, Info> TypedInput<Data, Info>::take(
644 const rti::routing::adapter::SelectorState& selector_state)
645{
646 RTI_RoutingServiceLoanedSamples native_samples =
647 RTI_RoutingServiceLoanedSamples_INITIALIZER;
648 if (!RTI_RoutingServiceInput_take_w_selector(
649 input_->native_,
650 &native_samples,
651 &selector_state.native())) {
652 throw dds::core::Error("error taking samples with selector from native input");
653 }
654 return LoanedSamples<Data, Info>(input_->native_, native_samples);
655}
656
657template <typename Data, typename Info>
658LoanedSamples<Data, Info> TypedInput<Data, Info>::read(
659 const rti::routing::adapter::SelectorState& selector_state)
660{
661 RTI_RoutingServiceLoanedSamples native_samples =
662 RTI_RoutingServiceLoanedSamples_INITIALIZER;
663 if (!RTI_RoutingServiceInput_read_w_selector(
664 input_->native_,
665 &native_samples,
666 &selector_state.native())) {
667 throw dds::core::Error("error reading samples with selector from native input");
668 }
669 return LoanedSamples<Data, Info>(input_->native_, native_samples);
670}
671
672template <typename Data, typename Info>
674{
675 return RTI_RoutingServiceInput_is_active(input_->native_) ? true : false;
676}
677
678template <typename Data, typename Info>
680{
681 return create_data_from_input<Data, Info>::get(*this);
682}
683
692typedef dds::sub::DataReader<dds::core::xtypes::DynamicData> DynamicDataReader;
693
694} } }
695
696
697#endif // RTI_ROUTING_PROCESSOR_INPUT_HPP_
Definition of the stream information that RTI Routing Service needs to manage user data streams.
Definition: StreamInfo.hpp:106
Defines a set of attributes that can be used to read a subset of data from StreamReader.
Definition: StreamReader.hpp:547
Generic Representation of a Route's input.
Definition: Input.hpp:49
TypedInput< Data, dds::sub::SampleInfo > get()
Returns a typed version of this Input.
Definition: Input.hpp:115
TypedInput< Data, Info > get()
Returns a typed version of this Input.
Definition: Input.hpp:100
Provides temporary access to a collection of samples (data and info) from a TypedInput.
Definition: LoanedSamples.hpp:81
Encapsulates a content query to select data from a rti::routing::adapter::StreamReader.
Definition: Query.hpp:92
Representation of the Route object that owns a Processor.
Definition: Route.hpp:84
An element that allows reading data that meet a set of specified attributes.
Definition: Input.hpp:355
LoanedSamples< Data, Info > read()
Read samples based on the state associated with this Selector.
Definition: Input.hpp:544
Selector & max_samples(const int32_t count)
Choose to only read/take up to a maximum number of samples.
Definition: Input.hpp:407
Selector(const rti::routing::processor::TypedInput< Data, Info > input)
Create a Selector for a TypedInput.
Definition: Input.hpp:366
Selector & filter(const dds::topic::Filter &the_filter)
Select samples based on a content filter parameters.
Definition: Input.hpp:489
Selector & instance(const dds::core::InstanceHandle &the_handle)
Select a specific instance to read/take.
Definition: Input.hpp:433
Selector & next_instance(const dds::core::InstanceHandle &the_handle)
Select the instance after a specific instance.
Definition: Input.hpp:469
Selector(const Selector &other)
Copy constructor.
Definition: Input.hpp:375
LoanedSamples< Data, Info > take()
Take samples based on the state associated with this Selector.
Definition: Input.hpp:529
Selector & query(const rti::routing::processor::Query &the_query)
Select samples based on a rti::routing::processor::Query.
Definition: Input.hpp:512
Selector & state(const dds::sub::status::DataState &the_state)
Select a specific dds::sub::status::DataState.
Definition: Input.hpp:394
Representation of an Input whose data representation is DataRep, whose info representation is InfoRep...
Definition: Input.hpp:143
bool active()
Indicates whether this input has received new data since the last time any variant of the read operat...
Definition: Input.hpp:673
LoanedSamples< Data, Info > take()
Returns all the available samples in this object.
Definition: Input.hpp:602
dds::sub::DataReader< Data > dds_data_reader()
Returns the underlying DDS DataReader that is part of this StreamReader implementation,...
Definition: Input.hpp:288
Data create_data()
Creates a new data sample from this input.
Definition: Input.hpp:679
const rti::routing::StreamInfo & stream_info() const
Returns the StreamInfo associated with this object.
Definition: Input.hpp:590
const std::string & name() const
Returns the name of this output.
Definition: Input.hpp:596
LoanedSamples< Data, Info > read()
Same as take() but this calls rti::routing::adapter::StreamReader::read instead.
Definition: Input.hpp:615
Selector select()
Gets a Selector to perform complex data selections, such as per-instance selection,...
Definition: Input.hpp:629