RTI Routing Service  Version 6.0.1
 All Data Structures Files Functions Typedefs Enumerations Enumerator Groups Pages
Input.hpp
1 /*
2  * (c) Copyright, Real-Time Innovations, 2017.
3  * All rights reserved.
4  *
5  * No duplications, whole or partial, manual or electronic, may be made
6  * without express written permission. Any such copies, or
7  * revisions thereof, must display this notice unaltered.
8  * This code contains trade secrets of Real-Time Innovations, Inc.
9  */
10 
11 #ifndef RTI_ROUTING_PROCESSOR_INPUT_HPP_
12 #define RTI_ROUTING_PROCESSOR_INPUT_HPP_
13 
14 #include <dds/core/Value.hpp>
15 #include <dds/core/SafeEnumeration.hpp>
16 #include <rti/core/NativeValueType.hpp>
17 #include <rti/core/Entity.hpp>
18 #include <dds/sub/DataReader.hpp>
19 #include <dds/core/xtypes/DynamicData.hpp>
20 
21 #include "routingservice/routingservice_processor.h"
22 #include "routingservice/routingservice_adapter_new.h"
23 #include <rti/routing/StreamInfo.hpp>
24 #include <rti/routing/adapter/StreamReader.hpp>
25 #include <rti/routing/processor/LoanedSamples.hpp>
26 #include <rti/routing/processor/Query.hpp>
27 
28 namespace rti { namespace routing { namespace processor {
29 
30 class Route;
31 
32 template <typename Data, typename Info> class TypedInput;
33 
34 inline
35 rti::routing::StreamInfo from_native_input(
36  RTI_RoutingServiceStreamReaderExt *native,
37  RTI_RoutingServiceRoute *native_route)
38 {
40  *RTI_RoutingServiceRoute_get_input_stream_info(native_route, native));
41 }
42 
43 inline
44 std::string name_from_native_input(
45  RTI_RoutingServiceStreamReaderExt *native,
46  RTI_RoutingServiceRoute *native_route)
47 {
48  return RTI_RoutingServiceRoute_get_input_name(
49  native_route,
50  native);
51 }
52 
53 template <typename Data, typename Info = dds::sub::SampleInfo>
54 class TypedInput;
55 
71 public:
72 
73  /*i
74  */
75  Input(RTI_RoutingServiceStreamReaderExt *native,
76  int32_t index,
77  RTI_RoutingServiceRoute *native_route,
78  RTI_RoutingServiceEnvironment *native_env)
79  : native_(native),
80  index_(index),
81  native_route_(native_route),
82  native_env_(native_env),
83  stream_info_(from_native_input(native, native_route)),
84  name_(name_from_native_input(native, native_route))
85  {
86  }
87 
88  /*i
89  *
90  */
91  const rti::routing::StreamInfo& stream_info() const
92  {
93  return stream_info_;
94  }
95 
96  /*i
97  *
98  */
99  const std::string& name() const
100  {
101  return name_;
102  }
103 
104  /*i
105  *
106  */
107  int32_t index()
108  {
109  return index_;
110  }
111 
122  template <typename Data, typename Info>
124  {
125  return this;
126  }
127 
137  template <typename Data>
139  {
140  return this;
141  }
142 
143 
144  /*
145  * --- StreamReader Implementation ----------------------------------------
146  */
147 private:
148  /*i
149  * @brief Returns the underlying rti::routing::adapter::StreamReader.
150  *
151  * @warning The returned object is a wrapper to an implementation of the
152  * StreamReader, which may be an object returned by any of the
153  * possible adapter API bindings.
154  * The use of this operation is not recommended unless your Processor
155  * implementation strictly needs to perform operations on the
156  * StreamReader.
157  */
158  rti::routing::adapter::StreamReader& stream_reader()
159  {
160  return *this;
161  }
162 
163  void take(
164  std::vector<SamplePtr>& sample_seq,
165  std::vector<InfoPtr>& info_seq) RTI_FINAL
166  {
167  using namespace rti::routing::adapter;
168 
169  detail::NativeSamples native_samples;
170  native_->take(
171  native_->stream_reader_data,
172  &native_samples.sample_array_,
173  &native_samples.info_array_,
174  &native_samples.length_,
175  native_env_);
176  RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
177  get_samples_from_native(sample_seq, info_seq, native_samples);
178  }
179 
180  void read(
181  std::vector<SamplePtr>& sample_seq,
182  std::vector<InfoPtr>& info_seq) RTI_FINAL
183  {
184  using namespace rti::routing::adapter;
185 
186  detail::NativeSamples native_samples;
187  native_->take(
188  native_->stream_reader_data,
189  &native_samples.sample_array_,
190  &native_samples.info_array_,
191  &native_samples.length_,
192  native_env_);
193  RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
194  get_samples_from_native(sample_seq, info_seq, native_samples);
195  }
196 
197  void take(
198  std::vector<SamplePtr>& sample_seq,
199  std::vector<InfoPtr>& info_seq,
200  const rti::routing::adapter::SelectorState& selector_state) RTI_FINAL
201  {
202  detail::NativeSamples native_samples;
203  native_->take_w_selector(
204  native_->stream_reader_data,
205  &native_samples.sample_array_,
206  &native_samples.info_array_,
207  &native_samples.length_,
208  &selector_state.native(),
209  native_env_);
210  RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
211  get_samples_from_native(sample_seq, info_seq, native_samples);
212  }
213 
214  void read(
215  std::vector<SamplePtr>& sample_seq,
216  std::vector<InfoPtr>& info_seq,
217  const rti::routing::adapter::SelectorState& selector_state) RTI_FINAL
218  {
219  detail::NativeSamples native_samples;
220  native_->read_w_selector(
221  native_->stream_reader_data,
222  &native_samples.sample_array_,
223  &native_samples.info_array_,
224  &native_samples.length_,
225  &selector_state.native(),
226  native_env_);
227  RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
228  get_samples_from_native(sample_seq, info_seq, native_samples);
229  }
230 
231  void return_loan(
232  std::vector<SamplePtr>& sample_seq,
233  std::vector<InfoPtr>& info_seq) RTI_FINAL
234  {
235  detail::NativeSamples native_samples;
236  native_samples.length_ = sample_seq.size();
237  native_samples.sample_array_ = static_cast<RTI_RoutingServiceSample*>
238  (sample_seq[native_samples.length_]);
239  if (info_seq.size() > 0) {
240  native_samples.info_array_ = static_cast<RTI_RoutingServiceSample*>
241  (info_seq[info_seq.size()]);
242  }
243 
244  native_->return_loan(
245  native_->stream_reader_data,
246  native_samples.sample_array_,
247  native_samples.info_array_,
248  native_samples.length_,
249  native_env_);
250  RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
251  sample_seq.clear();
252  info_seq.clear();
253  }
254 
255 
256  void* create_content_query(
257  void* old_query_data,
258  const dds::topic::Filter& filter) RTI_FINAL
259  {
260  if (native_->create_content_query == NULL) {
261  throw dds::core::UnsupportedError("Unsupported operation: create_content_query");
262  }
263 
264  RTI_RoutingServiceSelectorContent content =
265  RTI_RoutingServiceSelectorContent_INITIALIZER;
266  content.expression = (char *) filter.expression().c_str();
267  rti::core::native_conversions::to_native(
268  content.expression_parameters,
269  filter->parameters());
270  void* query_data = native_->create_content_query(
271  native_->stream_reader_data,
272  old_query_data,
273  &content,
274  native_env_);
275  DDS_StringSeq_finalize(&content.expression_parameters);
276  RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
277  return query_data;
278  }
279 
280  void delete_content_query(void* query_data) RTI_FINAL
281  {
282  if (native_->delete_content_query == NULL) {
283  throw dds::core::UnsupportedError("delete_content_query");
284  }
285 
286  native_->delete_content_query(
287  native_->stream_reader_data,
288  query_data,
289  native_env_);
290  RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
291  }
292 
293 
294  virtual void update(const std::map<std::string, std::string>&) RTI_FINAL
295  {
296  throw dds::core::PreconditionNotMetError(
297  "update not applicable within a Processor notification context");
298  }
299 
300  // Implementation details
301 private:
302  void get_samples_from_native(
303  std::vector<SamplePtr>& sample_seq,
304  std::vector<InfoPtr>& info_seq,
305  detail::NativeSamples& native_samples)
306  {
307  sample_seq.reserve(native_samples.length_ + 1);
308  RTI_ROUTING_SAMPLE_VECTOR_COPY_FROM_NATIVE(
309  sample_seq,
310  native_samples.sample_array_,
311  native_samples.length_);
312  sample_seq[native_samples.length_] =
313  native_samples.sample_array_;
314  if (native_samples.info_array_ != NULL) {
315  info_seq.reserve(native_samples.length_ + 1);
316  RTI_ROUTING_SAMPLE_VECTOR_COPY_FROM_NATIVE(
317  info_seq,
318  native_samples.info_array_,
319  native_samples.length_);
320  info_seq[info_seq.size()] =
321  native_samples.info_array_;
322  } else {
323  info_seq.clear();
324  }
325  }
326 
327  RTI_RoutingServiceStreamReaderExt* native()
328  {
329  return native_;
330  }
331 
332  template <typename Data, typename Info> friend class TypedInput;
333  friend class Route;
334  friend class std::allocator<Input>;
335  RTI_RoutingServiceStreamReaderExt *native_;
336  int32_t index_;
337  RTI_RoutingServiceRoute *native_route_;
338  RTI_RoutingServiceEnvironment *native_env_;
339  rti::routing::StreamInfo stream_info_;
340  std::string name_;
341  DDS_Entity *native_topic_;
342 };
343 
344 template <typename Data, typename Info = dds::sub::SampleInfo>
345 class Selector;
346 
355 template <typename Data, typename Info>
357 public:
358 
360 
361  TypedInput(Input *input);
362 
369  const rti::routing::StreamInfo& stream_info() const;
370 
376  const std::string& name() const;
377 
391 
401 
428  Selector select();
429 
431  const dds::topic::Filter& filter);
432 
444  bool active();
445 
463  Data create_data();
464 
465  /*i
466  *
467  */
468  TypedInput<Data, Info>* operator->()
469  {
470  return this;
471  }
472 
473 
490  dds::sub::DataReader<Data> dds_data_reader()
491  {
492  using rti::core::detail::create_from_native_entity;
493 
494  DDS_DataReader *native_reader = RTI_RoutingServiceRoute_get_dds_reader(
495  input_->native_route_,
496  input_->native_);
497  if (native_reader == NULL) {
498  throw dds::core::InvalidArgumentError(
499  "invalid argument: input does not hold a DDS StreamReader");
500  }
501 
502  typedef dds::sub::DataReader<Data> data_reader_type;
503  return rti::core::detail::create_from_native_entity<data_reader_type>(
504  native_reader);
505  }
506 
507 private:
508  TypedInput();
509 
510  friend class rti::routing::processor::Selector<Data, Info>;
511 
513  const rti::routing::adapter::SelectorState& selector_state);
514 
516  const rti::routing::adapter::SelectorState& selector_state);
517 
518  friend class rti::routing::processor::Route;
519  Input *input_;
520 };
521 
522 
557 template <typename Data, typename Info>
558 class Selector {
559 public:
560 
570  : typed_input_(input), query_(dds::core::null)
571  {
572 
573  }
574 
578  Selector(const Selector& other)
579  : typed_input_(other.typed_input_),
580  state_(other.state_),
581  query_(other.query_)
582  {
583 
584  }
585 
597  Selector& state(
598  const dds::sub::status::DataState& the_state)
599  {
600  state_.state(the_state);
601  return *this;
602  }
603 
610  Selector& max_samples(
611  const int32_t count)
612  {
613  state_.max_samples(count);
614  return *this;
615  }
616 
636  Selector& instance(
637  const dds::core::InstanceHandle& the_handle)
638  {
639  state_.instance(the_handle);
640  return *this;
641  }
642 
672  Selector& next_instance(
673  const dds::core::InstanceHandle& the_handle)
674  {
675  state_.next_instance(the_handle);
676  return *this;
677  }
678 
692  Selector& filter(
693  const dds::topic::Filter& the_filter)
694  {
695  state_.filter(the_filter);
696  return *this;
697  }
698 
699 
715  Selector& query(const rti::routing::processor::Query& the_query)
716  {
717  query_ = the_query;
718  state_.content(the_query.delegate().get()->query_data_);
719  return *this;
720  }
721 
733  {
734  return typed_input_.take(state_);
735  }
736 
748  {
749  return typed_input_.read(state_);
750  }
751 
752 private:
756 };
757 
758 
759 template <typename Data, typename Info>
760 struct create_data_from_input {
761 
762  static Data get(TypedInput<Data, Info>& )
763  {
764  return Data();
765  }
766 };
767 
768 template <typename Info>
769 struct create_data_from_input<dds::core::xtypes::DynamicData, Info> {
770 
771  static dds::core::xtypes::DynamicData get(
772  TypedInput<dds::core::xtypes::DynamicData, Info>& input)
773  {
774  if (input->stream_info().type_info().type_representation_kind()
775  != TypeRepresentationKind::DYNAMIC_TYPE) {
776  throw dds::core::PreconditionNotMetError(
777  "inconsistent data representation kind");
778  }
779  dds::core::xtypes::DynamicType *type_code =
780  static_cast<dds::core::xtypes::DynamicType *> (
781  input->stream_info().type_info().type_representation());
782  return dds::core::xtypes::DynamicData(*type_code);
783  }
784 };
785 
786 template <typename Data, typename Info>
787 TypedInput<Data, Info>::TypedInput(Input *input)
788 : input_(input)
789 {
790 }
791 
792 template <typename Data, typename Info>
794 {
795  return input_->stream_info_;
796 }
797 
798 template <typename Data, typename Info>
799 const std::string& TypedInput<Data, Info>::name() const
800 {
801  return input_->name_;
802 }
803 
804 template <typename Data, typename Info>
806 {
807  detail::NativeSamples native_samples;
808  input_->native_->take(
809  input_->native_->stream_reader_data,
810  &native_samples.sample_array_,
811  &native_samples.info_array_,
812  &native_samples.length_,
813  input_->native_env_);
814  RTI_ROUTING_THROW_ON_ENV_ERROR(input_->native_env_);
816  input_->native_,
817  native_samples,
818  input_->native_env_);
819 }
820 
821 template <typename Data, typename Info>
823 {
824  detail::NativeSamples native_samples;
825  input_->native_->read(
826  input_->native_->stream_reader_data,
827  &native_samples.sample_array_,
828  &native_samples.info_array_,
829  &native_samples.length_,
830  input_->native_env_);
831  RTI_ROUTING_THROW_ON_ENV_ERROR(input_->native_env_);
833  input_->native_,
834  native_samples,
835  input_->native_env_);
836 }
837 
838 template <typename Data, typename Info>
841 {
843 }
844 
845 template <typename Data, typename Info>
847  const dds::topic::Filter& filter)
848 {
850  &input_->stream_reader(),
851  filter);
852 
853 }
854 
855 template <typename Data, typename Info>
856 LoanedSamples<Data, Info> TypedInput<Data, Info>::take(
857  const rti::routing::adapter::SelectorState& selector_state)
858 {
859  detail::NativeSamples native_samples;
860  input_->native_->take_w_selector(
861  input_->native_->stream_reader_data,
862  &native_samples.sample_array_,
863  &native_samples.info_array_,
864  &native_samples.length_,
865  &selector_state.native(),
866  input_->native_env_);
867  RTI_ROUTING_THROW_ON_ENV_ERROR(input_->native_env_);
868  return LoanedSamples<Data, Info>(
869  input_->native_,
870  native_samples,
871  input_->native_env_);
872 }
873 
874 template <typename Data, typename Info>
875 LoanedSamples<Data, Info> TypedInput<Data, Info>::read(
876  const rti::routing::adapter::SelectorState& selector_state)
877 {
878  detail::NativeSamples native_samples;
879  input_->native_->read_w_selector(
880  input_->native_->stream_reader_data,
881  &native_samples.sample_array_,
882  &native_samples.info_array_,
883  &native_samples.length_,
884  &selector_state.native(),
885  input_->native_env_);
886  RTI_ROUTING_THROW_ON_ENV_ERROR(input_->native_env_);
887  return LoanedSamples<Data, Info>(
888  input_->native_,
889  native_samples,
890  input_->native_env_);
891 }
892 
893 template <typename Data, typename Info>
895 {
896  return RTI_RoutingServiceRoute_is_input_active(
897  input_->native_route_,
898  input_->native_) ? true : false;
899 }
900 
901 template <typename Data, typename Info>
903 {
904  return create_data_from_input<Data, Info>::get(*this);
905 }
906 
915 typedef dds::sub::DataReader<dds::core::xtypes::DynamicData> DynamicDataReader;
916 
917 } } }
918 
919 
920 #endif // RTI_ROUTING_PROCESSOR_INPUT_HPP_

RTI Routing Service Version 6.0.1 Copyright © Sun Nov 17 2019 Real-Time Innovations, Inc