RTI Routing Service  Version 6.0.0
 All Data Structures Files Functions Typedefs Enumerations Enumerator Groups Pages
Input.hpp
1 /*
2  * (c) Copyright, Real-Time Innovations, 2017.
3  * All rights reserved.
4  *
5  * No duplications, whole or partial, manual or electronic, may be made
6  * without express written permission. Any such copies, or
7  * revisions thereof, must display this notice unaltered.
8  * This code contains trade secrets of Real-Time Innovations, Inc.
9  */
10 
11 #ifndef RTI_ROUTING_PROCESSOR_INPUT_HPP_
12 #define RTI_ROUTING_PROCESSOR_INPUT_HPP_
13 
14 #include <dds/core/Value.hpp>
15 #include <dds/core/SafeEnumeration.hpp>
16 #include <rti/core/NativeValueType.hpp>
17 
18 #include "routingservice/routingservice_processor.h"
19 #include "routingservice/routingservice_adapter_new.h"
20 #include <rti/routing/StreamInfo.hpp>
21 #include <rti/routing/adapter/StreamReader.hpp>
22 #include <rti/routing/processor/LoanedSamples.hpp>
23 #include <rti/routing/processor/Query.hpp>
24 
25 namespace rti { namespace routing { namespace processor {
26 
27 class Route;
28 
29 template <typename Data, typename Info> class TypedInput;
30 
31 inline
32 rti::routing::StreamInfo from_native_input(
33  RTI_RoutingServiceStreamReaderExt *native,
34  RTI_RoutingServiceRoute *native_route)
35 {
37  *RTI_RoutingServiceRoute_get_input_stream_info(native_route, native));
38 }
39 
40 inline
41 std::string name_from_native_input(
42  RTI_RoutingServiceStreamReaderExt *native,
43  RTI_RoutingServiceRoute *native_route)
44 {
45  return RTI_RoutingServiceRoute_get_input_name(
46  native_route,
47  native);
48 }
49 
50 template <typename Data, typename Info = dds::sub::SampleInfo>
51 class TypedInput;
52 
68 public:
69 
70  /*i
71  */
72  Input(RTI_RoutingServiceStreamReaderExt *native,
73  int32_t index,
74  RTI_RoutingServiceRoute *native_route,
75  RTI_RoutingServiceEnvironment *native_env)
76  : native_(native),
77  index_(index),
78  native_route_(native_route),
79  native_env_(native_env),
80  stream_info_(from_native_input(native, native_route)),
81  name_(name_from_native_input(native, native_route))
82  {
83  }
84 
85  /*i
86  *
87  */
88  const rti::routing::StreamInfo& stream_info() const
89  {
90  return stream_info_;
91  }
92 
93  /*i
94  *
95  */
96  const std::string& name() const
97  {
98  return name_;
99  }
100 
101  /*i
102  *
103  */
104  int32_t index()
105  {
106  return index_;
107  }
108 
119  template <typename Data, typename Info>
121  {
122  return this;
123  }
124 
134  template <typename Data>
136  {
137  return this;
138  }
139 
140 
141  /*
142  * --- StreamReader Implementation ----------------------------------------
143  */
144 private:
145  /*i
146  * @brief Returns the underlying rti::routing::adapter::StreamReader.
147  *
148  * @warning The returned object is a wrapper to an implementation of the
149  * StreamReader, which may be an object returned by any of the
150  * possible adapter API bindings.
151  * The use of this operation is not recommended unless your Processor
152  * implementation strictly needs to perform operations on the
153  * StreamReader.
154  */
155  rti::routing::adapter::StreamReader& stream_reader()
156  {
157  return *this;
158  }
159 
160  void take(
161  std::vector<SamplePtr>& sample_seq,
162  std::vector<InfoPtr>& info_seq) RTI_FINAL
163  {
164  using namespace rti::routing::adapter;
165 
166  detail::NativeSamples native_samples;
167  native_->take(
168  native_->stream_reader_data,
169  &native_samples.sample_array_,
170  &native_samples.info_array_,
171  &native_samples.length_,
172  native_env_);
173  RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
174  get_samples_from_native(sample_seq, info_seq, native_samples);
175  }
176 
177  void read(
178  std::vector<SamplePtr>& sample_seq,
179  std::vector<InfoPtr>& info_seq) RTI_FINAL
180  {
181  using namespace rti::routing::adapter;
182 
183  detail::NativeSamples native_samples;
184  native_->take(
185  native_->stream_reader_data,
186  &native_samples.sample_array_,
187  &native_samples.info_array_,
188  &native_samples.length_,
189  native_env_);
190  RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
191  get_samples_from_native(sample_seq, info_seq, native_samples);
192  }
193 
194  void take(
195  std::vector<SamplePtr>& sample_seq,
196  std::vector<InfoPtr>& info_seq,
197  const rti::routing::adapter::SelectorState& selector_state) RTI_FINAL
198  {
199  detail::NativeSamples native_samples;
200  native_->take_w_selector(
201  native_->stream_reader_data,
202  &native_samples.sample_array_,
203  &native_samples.info_array_,
204  &native_samples.length_,
205  &selector_state.native(),
206  native_env_);
207  RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
208  get_samples_from_native(sample_seq, info_seq, native_samples);
209  }
210 
211  void read(
212  std::vector<SamplePtr>& sample_seq,
213  std::vector<InfoPtr>& info_seq,
214  const rti::routing::adapter::SelectorState& selector_state) RTI_FINAL
215  {
216  detail::NativeSamples native_samples;
217  native_->read_w_selector(
218  native_->stream_reader_data,
219  &native_samples.sample_array_,
220  &native_samples.info_array_,
221  &native_samples.length_,
222  &selector_state.native(),
223  native_env_);
224  RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
225  get_samples_from_native(sample_seq, info_seq, native_samples);
226  }
227 
228  void return_loan(
229  std::vector<SamplePtr>& sample_seq,
230  std::vector<InfoPtr>& info_seq) RTI_FINAL
231  {
232  detail::NativeSamples native_samples;
233  native_samples.length_ = sample_seq.size();
234  native_samples.sample_array_ = static_cast<RTI_RoutingServiceSample*>
235  (sample_seq[native_samples.length_]);
236  if (info_seq.size() > 0) {
237  native_samples.info_array_ = static_cast<RTI_RoutingServiceSample*>
238  (info_seq[info_seq.size()]);
239  }
240 
241  native_->return_loan(
242  native_->stream_reader_data,
243  native_samples.sample_array_,
244  native_samples.info_array_,
245  native_samples.length_,
246  native_env_);
247  RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
248  sample_seq.clear();
249  info_seq.clear();
250  }
251 
252 
253  void* create_content_query(
254  void* old_query_data,
255  const dds::topic::Filter& filter) RTI_FINAL
256  {
257  if (native_->create_content_query == NULL) {
258  throw dds::core::UnsupportedError("Unsupported operation: create_content_query");
259  }
260 
261  RTI_RoutingServiceSelectorContent content =
262  RTI_RoutingServiceSelectorContent_INITIALIZER;
263  content.expression = (char *) filter.expression().c_str();
264  rti::core::native_conversions::to_native(
265  content.expression_parameters,
266  filter->parameters());
267  void* query_data = native_->create_content_query(
268  native_->stream_reader_data,
269  old_query_data,
270  &content,
271  native_env_);
272  DDS_StringSeq_finalize(&content.expression_parameters);
273  RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
274  return query_data;
275  }
276 
277  void delete_content_query(void* query_data) RTI_FINAL
278  {
279  if (native_->delete_content_query == NULL) {
280  throw dds::core::UnsupportedError("delete_content_query");
281  }
282 
283  native_->delete_content_query(
284  native_->stream_reader_data,
285  query_data,
286  native_env_);
287  RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
288  }
289 
290 
291  virtual void update(const std::map<std::string, std::string>&) RTI_FINAL
292  {
293  throw dds::core::PreconditionNotMetError(
294  "update not applicable within a Processor notification context");
295  }
296 
297  // Implementation details
298 private:
299  void get_samples_from_native(
300  std::vector<SamplePtr>& sample_seq,
301  std::vector<InfoPtr>& info_seq,
302  detail::NativeSamples& native_samples)
303  {
304  sample_seq.reserve(native_samples.length_ + 1);
305  RTI_ROUTING_SAMPLE_VECTOR_COPY_FROM_NATIVE(
306  sample_seq,
307  native_samples.sample_array_,
308  native_samples.length_);
309  sample_seq[native_samples.length_] =
310  native_samples.sample_array_;
311  if (native_samples.info_array_ != NULL) {
312  info_seq.reserve(native_samples.length_ + 1);
313  RTI_ROUTING_SAMPLE_VECTOR_COPY_FROM_NATIVE(
314  info_seq,
315  native_samples.info_array_,
316  native_samples.length_);
317  info_seq[info_seq.size()] =
318  native_samples.info_array_;
319  } else {
320  info_seq.clear();
321  }
322  }
323 
324  RTI_RoutingServiceStreamReaderExt* native()
325  {
326  return native_;
327  }
328 
329  template <typename Data, typename Info> friend class TypedInput;
330  friend class Route;
331  friend class std::allocator<Input>;
332  RTI_RoutingServiceStreamReaderExt *native_;
333  int32_t index_;
334  RTI_RoutingServiceRoute *native_route_;
335  RTI_RoutingServiceEnvironment *native_env_;
336  rti::routing::StreamInfo stream_info_;
337  std::string name_;
338 };
339 
340 template <typename Data, typename Info = dds::sub::SampleInfo>
341 class Selector;
342 
351 template <typename Data, typename Info>
353 public:
354 
356 
357  TypedInput(Input *input);
358 
365  const rti::routing::StreamInfo& stream_info() const;
366 
372  const std::string& name() const;
373 
387 
397 
424  Selector select();
425 
427  const dds::topic::Filter& filter);
428 
440  bool active();
441 
459  Data create_data();
460 
461  /*i
462  *
463  */
464  TypedInput<Data, Info>* operator->()
465  {
466  return this;
467  }
468 
469 private:
470  TypedInput();
471 
472  friend class rti::routing::processor::Selector<Data, Info>;
473 
475  const rti::routing::adapter::SelectorState& selector_state);
476 
478  const rti::routing::adapter::SelectorState& selector_state);
479 
480  friend class rti::routing::processor::Route;
481  Input *input_;
482 };
483 
484 
519 template <typename Data, typename Info>
520 class Selector {
521 public:
522 
532  : typed_input_(input), query_(dds::core::null)
533  {
534 
535  }
536 
540  Selector(const Selector& other)
541  : typed_input_(other.typed_input_),
542  state_(other.state_),
543  query_(other.query_)
544  {
545 
546  }
547 
559  Selector& state(
560  const dds::sub::status::DataState& the_state)
561  {
562  state_.state(the_state);
563  return *this;
564  }
565 
572  Selector& max_samples(
573  const int32_t count)
574  {
575  state_.max_samples(count);
576  return *this;
577  }
578 
598  Selector& instance(
599  const dds::core::InstanceHandle& the_handle)
600  {
601  state_.instance(the_handle);
602  return *this;
603  }
604 
634  Selector& next_instance(
635  const dds::core::InstanceHandle& the_handle)
636  {
637  state_.next_instance(the_handle);
638  return *this;
639  }
640 
654  Selector& filter(
655  const dds::topic::Filter& the_filter)
656  {
657  state_.filter(the_filter);
658  return *this;
659  }
660 
661 
677  Selector& query(const rti::routing::processor::Query& the_query)
678  {
679  query_ = the_query;
680  state_.content(the_query.delegate().get()->query_data_);
681  return *this;
682  }
683 
695  {
696  return typed_input_.take(state_);
697  }
698 
710  {
711  return typed_input_.read(state_);
712  }
713 
714 private:
718 };
719 
720 
721 template <typename Data, typename Info>
722 struct create_data_from_input {
723 
724  static Data get(TypedInput<Data, Info>& )
725  {
726  return Data();
727  }
728 };
729 
730 template <typename Info>
731 struct create_data_from_input<dds::core::xtypes::DynamicData, Info> {
732 
733  static dds::core::xtypes::DynamicData get(
734  TypedInput<dds::core::xtypes::DynamicData, Info>& input)
735  {
736  if (input->stream_info().type_info().type_representation_kind()
737  != TypeRepresentationKind::DYNAMIC_TYPE) {
738  throw dds::core::PreconditionNotMetError(
739  "inconsistent data representation kind");
740  }
741  dds::core::xtypes::DynamicType *type_code =
742  static_cast<dds::core::xtypes::DynamicType *> (
743  input->stream_info().type_info().type_representation());
744  return dds::core::xtypes::DynamicData(*type_code);
745  }
746 };
747 
748 template <typename Data, typename Info>
749 TypedInput<Data, Info>::TypedInput(Input *input)
750 : input_(input)
751 {
752 }
753 
754 template <typename Data, typename Info>
756 {
757  return input_->stream_info_;
758 }
759 
760 template <typename Data, typename Info>
761 const std::string& TypedInput<Data, Info>::name() const
762 {
763  return input_->name_;
764 }
765 
766 template <typename Data, typename Info>
768 {
769  detail::NativeSamples native_samples;
770  input_->native_->take(
771  input_->native_->stream_reader_data,
772  &native_samples.sample_array_,
773  &native_samples.info_array_,
774  &native_samples.length_,
775  input_->native_env_);
776  RTI_ROUTING_THROW_ON_ENV_ERROR(input_->native_env_);
778  input_->native_,
779  native_samples,
780  input_->native_env_);
781 }
782 
783 template <typename Data, typename Info>
785 {
786  detail::NativeSamples native_samples;
787  input_->native_->read(
788  input_->native_->stream_reader_data,
789  &native_samples.sample_array_,
790  &native_samples.info_array_,
791  &native_samples.length_,
792  input_->native_env_);
793  RTI_ROUTING_THROW_ON_ENV_ERROR(input_->native_env_);
795  input_->native_,
796  native_samples,
797  input_->native_env_);
798 }
799 
800 template <typename Data, typename Info>
803 {
805 }
806 
807 template <typename Data, typename Info>
809  const dds::topic::Filter& filter)
810 {
812  &input_->stream_reader(),
813  filter);
814 
815 }
816 
817 template <typename Data, typename Info>
818 LoanedSamples<Data, Info> TypedInput<Data, Info>::take(
819  const rti::routing::adapter::SelectorState& selector_state)
820 {
821  detail::NativeSamples native_samples;
822  input_->native_->take_w_selector(
823  input_->native_->stream_reader_data,
824  &native_samples.sample_array_,
825  &native_samples.info_array_,
826  &native_samples.length_,
827  &selector_state.native(),
828  input_->native_env_);
829  RTI_ROUTING_THROW_ON_ENV_ERROR(input_->native_env_);
830  return LoanedSamples<Data, Info>(
831  input_->native_,
832  native_samples,
833  input_->native_env_);
834 }
835 
836 template <typename Data, typename Info>
837 LoanedSamples<Data, Info> TypedInput<Data, Info>::read(
838  const rti::routing::adapter::SelectorState& selector_state)
839 {
840  detail::NativeSamples native_samples;
841  input_->native_->read_w_selector(
842  input_->native_->stream_reader_data,
843  &native_samples.sample_array_,
844  &native_samples.info_array_,
845  &native_samples.length_,
846  &selector_state.native(),
847  input_->native_env_);
848  RTI_ROUTING_THROW_ON_ENV_ERROR(input_->native_env_);
849  return LoanedSamples<Data, Info>(
850  input_->native_,
851  native_samples,
852  input_->native_env_);
853 }
854 
855 template <typename Data, typename Info>
857 {
858  return RTI_RoutingServiceRoute_is_input_active(
859  input_->native_route_,
860  input_->native_) ? true : false;
861 }
862 
863 template <typename Data, typename Info>
865 {
866  return create_data_from_input<Data, Info>::get(*this);
867 }
868 
877 
878 } } }
879 
880 
881 #endif // RTI_ROUTING_PROCESSOR_INPUT_HPP_

RTI Routing Service Version 6.0.0 Copyright © Sun Mar 3 2019 Real-Time Innovations, Inc