RTI Routing Service  Version 6.1.1
Input.hpp
1 /*
2  * (c) Copyright, Real-Time Innovations, 2017.
3  * All rights reserved.
4  *
5  * No duplications, whole or partial, manual or electronic, may be made
6  * without express written permission. Any such copies, or
7  * revisions thereof, must display this notice unaltered.
8  * This code contains trade secrets of Real-Time Innovations, Inc.
9  */
10 
11 #ifndef RTI_ROUTING_PROCESSOR_INPUT_HPP_
12 #define RTI_ROUTING_PROCESSOR_INPUT_HPP_
13 
14 #include <dds/core/Value.hpp>
15 #include <dds/core/SafeEnumeration.hpp>
16 #include <rti/core/NativeValueType.hpp>
17 #include <rti/core/Entity.hpp>
18 #include <dds/sub/DataReader.hpp>
19 #include <dds/core/xtypes/DynamicData.hpp>
20 
21 #include "routingservice/routingservice_processor.h"
22 #include "routingservice/routingservice_adapter_new.h"
23 #include <rti/routing/StreamInfo.hpp>
24 #include <rti/routing/adapter/StreamReader.hpp>
25 #include <rti/routing/processor/LoanedSamples.hpp>
26 #include <rti/routing/processor/Query.hpp>
27 
28 namespace rti { namespace routing { namespace processor {
29 
30 class Route;
31 
32 template <typename Data, typename Info> class TypedInput;
33 
34 template <typename Data, typename Info = dds::sub::SampleInfo>
35 class TypedInput;
36 
49 class Input {
50 public:
51 
52  /*i
53  */
54  Input(RTI_RoutingServiceInput *native,
55  int32_t index,
56  RTI_RoutingServiceRoute *native_route)
57  : native_(native),
58  index_(index),
59  native_route_(native_route),
60  stream_info_(*RTI_RoutingServiceInput_get_stream_info(native)),
61  name_(RTI_RoutingServiceInput_get_name(native))
62  {
63  }
64 
65  /*i
66  *
67  */
68  const rti::routing::StreamInfo& stream_info() const
69  {
70  return stream_info_;
71  }
72 
73  /*i
74  *
75  */
76  const std::string& name() const
77  {
78  return name_;
79  }
80 
81  /*i
82  *
83  */
84  int32_t index()
85  {
86  return index_;
87  }
88 
99  template <typename Data, typename Info>
101  {
102  return this;
103  }
104 
114  template <typename Data>
116  {
117  return this;
118  }
119 
120  /*i
121  *
122  */
123  RTI_RoutingServiceInput* native()
124  {
125  return native_;
126  }
127 
128  // Implementation details
129 private:
130 
131  template <typename Data, typename Info> friend class TypedInput;
132  friend class Route;
133  friend class std::allocator<Input>;
134  RTI_RoutingServiceInput *native_;
135  int32_t index_;
136  RTI_RoutingServiceRoute *native_route_;
137  rti::routing::StreamInfo stream_info_;
138  std::string name_;
139  DDS_Entity *native_topic_;
140 };
141 
142 template <typename Data, typename Info = dds::sub::SampleInfo>
143 class Selector;
144 
153 template <typename Data, typename Info>
154 class TypedInput{
155 public:
156 
158 
159  TypedInput(Input *input);
160 
167  const rti::routing::StreamInfo& stream_info() const;
168 
174  const std::string& name() const;
175 
189 
199 
226  Selector select();
227 
229  const dds::topic::Filter& filter);
230 
242  bool active();
243 
261  Data create_data();
262 
263  /*i
264  *
265  */
266  TypedInput<Data, Info>* operator->()
267  {
268  return this;
269  }
270 
271 
288  dds::sub::DataReader<Data> dds_data_reader()
289  {
290  using rti::core::detail::create_from_native_entity;
291 
292  DDS_DataReader *native_reader =
293  RTI_RoutingServiceInput_get_dds_reader(input_->native_);
294  if (native_reader == NULL) {
295  throw dds::core::InvalidArgumentError(
296  "invalid argument: input does not hold a DDS StreamReader");
297  }
298 
299  typedef dds::sub::DataReader<Data> data_reader_type;
300  return rti::core::detail::create_from_native_entity<data_reader_type>(
301  native_reader);
302  }
303 
304 private:
305  TypedInput();
306 
307  friend class rti::routing::processor::Selector<Data, Info>;
308 
310  const rti::routing::adapter::SelectorState& selector_state);
311 
313  const rti::routing::adapter::SelectorState& selector_state);
314 
315  friend class rti::routing::processor::Route;
316  Input *input_;
317 };
318 
319 
354 template <typename Data, typename Info>
355 class Selector {
356 public:
357 
367  : typed_input_(input), query_(dds::core::null)
368  {
369 
370  }
371 
375  Selector(const Selector& other)
376  : typed_input_(other.typed_input_),
377  state_(other.state_),
378  query_(other.query_)
379  {
380 
381  }
382 
395  const dds::sub::status::DataState& the_state)
396  {
397  state_.state(the_state);
398  return *this;
399  }
400 
408  const int32_t count)
409  {
410  state_.max_samples(count);
411  return *this;
412  }
413 
434  const dds::core::InstanceHandle& the_handle)
435  {
436  state_.instance(the_handle);
437  return *this;
438  }
439 
470  const dds::core::InstanceHandle& the_handle)
471  {
472  state_.next_instance(the_handle);
473  return *this;
474  }
475 
490  const dds::topic::Filter& the_filter)
491  {
492  state_.filter(the_filter);
493  return *this;
494  }
495 
496 
513  {
514  query_ = the_query;
515  state_.content(the_query.delegate().get()->query_data_);
516  return *this;
517  }
518 
530  {
531  return typed_input_.take(state_);
532  }
533 
545  {
546  return typed_input_.read(state_);
547  }
548 
549 private:
553 };
554 
555 
556 template <typename Data, typename Info>
557 struct create_data_from_input {
558 
559  static Data get(TypedInput<Data, Info>& )
560  {
561  return Data();
562  }
563 };
564 
565 template <typename Info>
566 struct create_data_from_input<dds::core::xtypes::DynamicData, Info> {
567 
568  static dds::core::xtypes::DynamicData get(
570  {
571  if (input->stream_info().type_info().type_representation_kind()
572  != TypeRepresentationKind::DYNAMIC_TYPE) {
573  throw dds::core::PreconditionNotMetError(
574  "inconsistent data representation kind");
575  }
576  dds::core::xtypes::DynamicType *type_code =
577  static_cast<dds::core::xtypes::DynamicType *> (
578  input->stream_info().type_info().type_representation());
579  return dds::core::xtypes::DynamicData(*type_code);
580  }
581 };
582 
583 template <typename Data, typename Info>
585 : input_(input)
586 {
587 }
588 
589 template <typename Data, typename Info>
591 {
592  return input_->stream_info_;
593 }
594 
595 template <typename Data, typename Info>
596 const std::string& TypedInput<Data, Info>::name() const
597 {
598  return input_->name_;
599 }
600 
601 template <typename Data, typename Info>
603 {
604  RTI_RoutingServiceLoanedSamples native_samples;
605  if (!RTI_RoutingServiceInput_take(
606  input_->native_,
607  &native_samples)) {
608  throw dds::core::Error("error taking samples from native input");
609  }
610  return LoanedSamples<Data, Info>(input_->native_, native_samples);
611 }
612 
613 template <typename Data, typename Info>
615 {
616  RTI_RoutingServiceLoanedSamples native_samples;
617  if (!RTI_RoutingServiceInput_read(
618  input_->native_,
619  &native_samples)) {
620  throw dds::core::Error("error reading samples from native input");
621  }
622  return LoanedSamples<Data, Info>(input_->native_, native_samples);
623 }
624 
625 template <typename Data, typename Info>
628 {
630 }
631 
632 template <typename Data, typename Info>
634  const dds::topic::Filter& filter)
635 {
636  return rti::routing::processor::Query(input_->native_, filter);
637 
638 }
639 
640 template <typename Data, typename Info>
642  const rti::routing::adapter::SelectorState& selector_state)
643 {
644  RTI_RoutingServiceLoanedSamples native_samples;
645  if (!RTI_RoutingServiceInput_take_w_selector(
646  input_->native_,
647  &native_samples,
648  &selector_state.native())) {
649  throw dds::core::Error("error taking samples with selector from native input");
650  }
651  return LoanedSamples<Data, Info>(input_->native_, native_samples);
652 }
653 
654 template <typename Data, typename Info>
656  const rti::routing::adapter::SelectorState& selector_state)
657 {
658  RTI_RoutingServiceLoanedSamples native_samples;
659  if (!RTI_RoutingServiceInput_read_w_selector(
660  input_->native_,
661  &native_samples,
662  &selector_state.native())) {
663  throw dds::core::Error("error reading samples with selector from native input");
664  }
665  return LoanedSamples<Data, Info>(input_->native_, native_samples);
666 }
667 
668 template <typename Data, typename Info>
670 {
671  return RTI_RoutingServiceInput_is_active(input_->native_) ? true : false;
672 }
673 
674 template <typename Data, typename Info>
676 {
677  return create_data_from_input<Data, Info>::get(*this);
678 }
679 
688 typedef dds::sub::DataReader<dds::core::xtypes::DynamicData> DynamicDataReader;
689 
690 } } }
691 
692 
693 #endif // RTI_ROUTING_PROCESSOR_INPUT_HPP_
Definition of the stream information that RTI Routing Service needs to manage user data streams...
Definition: StreamInfo.hpp:106
Provides temporary access to a collection of samples (data and info) from a TypedInput.
Definition: LoanedSamples.hpp:81
Selector & instance(const dds::core::InstanceHandle &the_handle)
Select a specific instance to read/take.
Definition: Input.hpp:433
Selector & state(const dds::sub::status::DataState &the_state)
Select a specific dds::sub::status::DataState.
Definition: Input.hpp:394
dds::sub::DataReader< Data > dds_data_reader()
Returns the underlying DDS DataReader that is part of this StreamReader implementation, assuming this input holds a DDS StreamReader.
Definition: Input.hpp:288
Generic Representation of a Route&#39;s input.
Definition: Input.hpp:49
Selector & filter(const dds::topic::Filter &the_filter)
Select samples based on a content filter parameters.
Definition: Input.hpp:489
An element that allows reading data that meet a set of specified attributes.
Definition: Input.hpp:143
Data create_data()
Creates a new data sample from this input.
Definition: Input.hpp:675
Representation of the Route object that owns a Processor.
Definition: Route.hpp:84
LoanedSamples< Data, Info > take()
Take samples based on the state associated with this Selector.
Definition: Input.hpp:529
Selector(const Selector &other)
Copy constructor.
Definition: Input.hpp:375
const rti::routing::StreamInfo & stream_info() const
Returns the StreamInfo associated with this object.
Definition: Input.hpp:590
LoanedSamples< Data, Info > read()
Same as take() but this calls rti::routing::adapter::StreamReader::read instead.
Definition: Input.hpp:614
LoanedSamples< Data, Info > take()
Returns all the available samples in this object.
Definition: Input.hpp:602
Selector select()
Gets a Selector to perform complex data selections, such as per-instance selection, content and status filtering.
Definition: Input.hpp:627
const std::string & name() const
Returns the name of this output.
Definition: Input.hpp:596
Encapsulates a content query to select data from a rti::routing::adapter::StreamReader.
Definition: Query.hpp:92
Selector & query(const rti::routing::processor::Query &the_query)
Select samples based on a rti::routing::processor::Query.
Definition: Input.hpp:512
Definition: AdapterPlugin.hpp:25
bool active()
Indicates whether this input has received new data since the last time any variant of the read operat...
Definition: Input.hpp:669
Defines a set of attributes that can be used to read a subset of data from StreamReader.
Definition: StreamReader.hpp:547
Representation of an Input whose data representation is DataRep, whose info representation is InfoRep...
Definition: Input.hpp:32
Selector & next_instance(const dds::core::InstanceHandle &the_handle)
Select the instance after a specific instance.
Definition: Input.hpp:469
Selector(const rti::routing::processor::TypedInput< Data, Info > input)
Create a Selector for a TypedInput.
Definition: Input.hpp:366
LoanedSamples< Data, Info > read()
Read samples based on the state associated with this Selector.
Definition: Input.hpp:544
Selector & max_samples(const int32_t count)
Choose to only read/take up to a maximum number of samples.
Definition: Input.hpp:407