RTI Routing Service  Version 6.0.0
 All Data Structures Files Functions Typedefs Enumerations Enumerator Groups Pages
StreamReader.hpp
1 /*
2  * (c) Copyright, Real-Time Innovations, 2017.
3  * All rights reserved.
4  *
5  * No duplications, whole or partial, manual or electronic, may be made
6  * without express written permission. Any such copies, or
7  * revisions thereof, must display this notice unaltered.
8  * This code contains trade secrets of Real-Time Innovations, Inc.
9  */
10 
11 #ifndef RTI_ROUTING_ADAPTER_STREAM_READER_HPP_
12 #define RTI_ROUTING_ADAPTER_STREAM_READER_HPP_
13 
14 #include <vector>
15 
16 #include <dds/core/SafeEnumeration.hpp>
17 #include <dds/sub/status/DataState.hpp>
18 #include <dds/core/InstanceHandle.hpp>
19 #include <dds/core/types.hpp>
20 #include <dds/topic/Filter.hpp>
21 #include <dds/core/xtypes/DynamicData.hpp>
22 #include <dds/sub/SampleInfo.hpp>
23 #include <rti/core/NativeValueType.hpp>
24 #include <rti/routing/UpdatableEntity.hpp>
25 #include <rti/routing/detail/ForwarderUtils.hpp>
26 
27 namespace rti { namespace routing { namespace adapter {
28 
29 class SelectorState;
30 
46 
47 public:
48  typedef void* SamplePtr;
49  typedef void* InfoPtr;
50 
89  virtual void take(
90  std::vector<SamplePtr>& sample_seq,
91  std::vector<InfoPtr>& info_seq) = 0;
92 
100  virtual void read(
101  std::vector<SamplePtr>& sample_seq,
102  std::vector<InfoPtr>& info_seq) = 0;
103 
120  virtual void take(
121  std::vector<SamplePtr>& sample_seq,
122  std::vector<InfoPtr>& info_seq,
123  const SelectorState& selector_state) = 0;
124 
141  virtual void read(
142  std::vector<SamplePtr>& sample_seq,
143  std::vector<InfoPtr>& info_seq,
144  const SelectorState& selector_state) = 0;
145 
161  virtual void return_loan(
162  std::vector<SamplePtr>& sample_seq,
163  std::vector<InfoPtr>& info_seq) = 0;
164 
187  virtual void* create_content_query(
188  void *old_query_data,
189  const dds::topic::Filter& filter) = 0;
190 
196  virtual void delete_content_query(void* query_data) = 0;
197 
201  virtual ~StreamReader()
202  {
203  }
204 
205 };
206 
207 
219 template <typename Data, typename Info>
220 class TStreamReader : public StreamReader {
221 
222 public:
223 
227  typedef Data DataRep;
231  typedef Info InfoRep;
232 
233  using StreamReader::take;
234  using StreamReader::read;
236 
241  void take(
242  std::vector<SamplePtr>& sample_seq,
243  std::vector<InfoPtr>& info_seq) RTI_FINAL
244  {
245  take(sample_seq_, info_seq_);
246  convert_samples(sample_seq, info_seq);
247  }
248 
253  void read(
254  std::vector<SamplePtr>& sample_seq,
255  std::vector<InfoPtr>& info_seq) RTI_FINAL
256  {
257  read(sample_seq_, info_seq_);
258  convert_samples(sample_seq, info_seq);
259  }
260 
265  void take(
266  std::vector<SamplePtr>& sample_seq,
267  std::vector<InfoPtr>& info_seq,
268  const SelectorState& selector_state) RTI_FINAL
269  {
270  take(sample_seq_, info_seq_, selector_state);
271  convert_samples(sample_seq, info_seq);
272  }
273 
278  void read(
279  std::vector<SamplePtr>& sample_seq,
280  std::vector<InfoPtr>& info_seq,
281  const SelectorState& selector_state) RTI_FINAL
282  {
283  read(sample_seq_, info_seq_, selector_state);
284  convert_samples(sample_seq, info_seq);
285  }
286 
292  std::vector<SamplePtr>& sample_seq,
293  std::vector<InfoPtr>& info_seq) RTI_FINAL
294  {
295  RTI_ROUTING_SAMPLE_VECTOR_COPY_PTRS(sample_seq_, sample_seq);
296  RTI_ROUTING_SAMPLE_VECTOR_COPY_PTRS(info_seq_, info_seq);
297  return_loan(sample_seq_, info_seq_);
298  sample_seq_.clear();
299  info_seq_.clear();
300  }
301 
307  virtual void take(
308  std::vector<Data*>& sample_seq,
309  std::vector<Info*>& info_seq) = 0;
310 
316  virtual void read(
317  std::vector<Data*>& sample_seq,
318  std::vector<Info*>& info_seq) = 0;
319 
325  virtual void take(
326  std::vector<Data*>& sample_seq,
327  std::vector<Info*>& info_seq,
328  const SelectorState& selector_state) = 0;
329 
335  virtual void read(
336  std::vector<Data*>& sample_seq,
337  std::vector<Info*>& info_seq,
338  const SelectorState& selector_state) = 0;
339 
345  virtual void return_loan(
346  std::vector<Data*>& sample_seq,
347  std::vector<Info*>& info_seq) = 0;
348 
349  /*
350  * @brief Virtual destructor
351  */
352  virtual ~TStreamReader()
353  {
354 
355  }
356 
357 private:
358  void convert_samples(
359  std::vector<SamplePtr>& sample_seq,
360  std::vector<InfoPtr>& info_seq)
361  {
362  RTI_ROUTING_SAMPLE_VECTOR_COPY_PTRS(sample_seq, sample_seq_);
363  RTI_ROUTING_SAMPLE_VECTOR_COPY_PTRS(info_seq, info_seq_);
364  }
365 
366 private:
367  std::vector<Data*> sample_seq_;
368  std::vector<Info*> info_seq_;
369 
370 };
371 
380 template <typename Data, typename Info>
381 class NoOpStreamReader : public TStreamReader<Data, Info> {
382 
383 public:
384  virtual void take(
385  std::vector<Data*>&,
386  std::vector<Info*>&) RTI_OVERRIDE
387  {
388  }
389 
390  virtual void read(
391  std::vector<Data*>&,
392  std::vector<Info*>&) RTI_OVERRIDE
393  {
394  }
395 
396  virtual void take(
397  std::vector<Data*>&,
398  std::vector<Info*>&,
399  const SelectorState&) RTI_OVERRIDE
400  {
401  }
402 
403  virtual void read(
404  std::vector<Data*>&,
405  std::vector<Info*>&,
406  const SelectorState&) RTI_OVERRIDE
407  {
408  }
409 
410  virtual void return_loan(
411  std::vector<Data*>&,
412  std::vector<Info*>&) RTI_OVERRIDE
413  {
414  }
415 
416  virtual void* create_content_query(
417  void *,
418  const dds::topic::Filter&) RTI_OVERRIDE
419  {
420  return NULL;
421  }
422 
423 
424  virtual void delete_content_query(void*) RTI_OVERRIDE
425  {
426 
427  }
428 
429 
430 
431  virtual ~NoOpStreamReader()
432  {
433  }
434 
435 };
436 
444 typedef NoOpStreamReader<dds::core::xtypes::DynamicData, dds::sub::SampleInfo>
446 
447 
448 
449 class SelectorStateAdapter {
450 public:
451  typedef RTI_RoutingServiceSelectorState native_type;
452 
453  static void initialize(native_type& native_value)
454  {
455  RTI_RoutingServiceSelectorState native_state =
456  RTI_RoutingServiceSelectorState_INITIALIZER;
457  memcpy(&native_value, &native_state, sizeof(native_state));
458  }
459 
460  static void finalize(native_type& native_value)
461  {
462  if (native_value.content.expression != NULL) {
463  DDS_String_free(native_value.content.expression);
464  }
465  DDS_StringSeq_finalize(&native_value.content.expression_parameters);
466  }
467 
468  static void copy(native_type& destination, const native_type& source)
469  {
470  destination.sample_state = source.sample_state;
471  destination.view_state = source.view_state;
472  destination.instance_state = source.instance_state;
473  destination.instance_handle = source.instance_handle;
474  destination.instance_selection = source.instance_selection;
475  destination.sample_count_max = source.sample_count_max;
476  destination.query_data = source.query_data;
477  char *result = DDS_String_replace(
478  &destination.content.expression,
479  source.content.expression);
480  if (source.content.expression != NULL && result == NULL) {
481  throw std::bad_alloc();
482  }
483  if (DDS_StringSeq_copy(
484  &destination.content.expression_parameters,
485  &source.content.expression_parameters) == NULL) {
486  throw std::bad_alloc();
487  }
488  }
489 
490  static bool equals(const native_type& first, const native_type& second)
491  {
492  if (first.sample_state != second.sample_state) {
493  return false;
494  }
495  if (first.view_state != second.view_state) {
496  return false;
497  }
498  if (first.instance_state != second.instance_state) {
499  return false;
500  }
501  if (first.instance_selection != second.instance_selection) {
502  return false;
503  }
504  if (first.sample_count_max != second.sample_count_max) {
505  return false;
506  }
507  if (first.query_data != second.query_data) {
508  return false;
509  }
510  if (first.content.expression != NULL
511  && second.content.expression != NULL
512  && strcmp(first.content.expression, second.content.expression) != 0) {
513  return false;
514  }
515  return DDS_StringSeq_equals(
516  &first.content.expression_parameters,
517  &second.content.expression_parameters);
518  }
519 
520 };
521 
522 } }
523 
524 // native_type_traits needs to be defined in rti::core
525 namespace core {
526 
527 template <>
528 struct native_type_traits<rti::routing::adapter::SelectorState> {
529  typedef rti::routing::adapter::SelectorStateAdapter adapter_type;
530  typedef RTI_RoutingServiceSelectorState native_type;
531 };
532 
533 }
534 
535 namespace routing { namespace adapter {
536 
547 class SelectorState : public rti::core::NativeValueType<SelectorState> {
548 public:
549  RTI_NATIVE_VALUE_TYPE_DEFINE_DEFAULT_MOVE_OPERATIONS(SelectorState)
550 
551  typedef rti::core::NativeValueType<SelectorState> Base;
552 public:
553  SelectorState()
554  {
555 
556  }
557 
558  SelectorState(const RTI_RoutingServiceSelectorState &native)
559  : Base(native)
560  {
561  }
562 
563  dds::sub::status::DataState state() const
564  {
565  return dds::sub::status::DataState(
566  dds::sub::status::SampleState(native().sample_state),
567  dds::sub::status::ViewState(native().view_state),
568  dds::sub::status::InstanceState(native().instance_state));
569  }
570 
571  SelectorState& state(const dds::sub::status::DataState& state)
572  {
573  native().sample_state = state.sample_state().to_ulong();
574  native().view_state = state.view_state().to_ulong();
575  native().instance_state = state.instance_state().to_ulong();
576  return *this;
577  }
578 
579  int32_t max_samples() const
580  {
581  return native().sample_count_max;
582  }
583 
584  SelectorState& max_samples(const int32_t count)
585  {
586  native().sample_count_max = count;
587  return *this;
588  }
589 
590  dds::core::InstanceHandle instance() const
591  {
592  if (native().instance_selection
593  == RTI_ROUTING_SERVICE_THIS_INSTANCE_SELECTION) {
594  return rti::core::native_conversions::from_native_handle(
595  native().instance_handle);
596  }
597 
598  return dds::core::InstanceHandle::nil();
599  }
600 
601  SelectorState& instance(const dds::core::InstanceHandle& handle)
602  {
603  native().instance_handle =
604  static_cast<DDS_InstanceHandle_t>(handle->native());
605  native().instance_selection = RTI_ROUTING_SERVICE_THIS_INSTANCE_SELECTION;
606  return *this;
607  }
608 
609  dds::core::InstanceHandle next_instance() const
610  {
611  if (native().instance_selection
612  == RTI_ROUTING_SERVICE_NEXT_INSTANCE_SELECTION) {
613  return rti::core::native_conversions::from_native_handle(
614  native().instance_handle);
615  }
616 
617  return dds::core::InstanceHandle::nil();
618  }
619 
620  SelectorState& next_instance(
621  const dds::core::InstanceHandle& handle)
622  {
623  native().instance_handle =
624  static_cast<DDS_InstanceHandle_t>(handle->native());
625  native().instance_selection = RTI_ROUTING_SERVICE_NEXT_INSTANCE_SELECTION;
626  return *this;
627  }
628 
629  void* content()
630  {
631  return native().query_data;
632  }
633 
634  SelectorState& content(void *query_data)
635  {
636  native().query_data = query_data;
637  return *this;
638  }
639 
640  dds::topic::Filter filter() const
641  {
642  return dds::topic::Filter(
643  native().content.expression == NULL ? "" : native().content.expression,
644  rti::core::native_conversions::from_native<std::string>(
645  native().content.expression_parameters));
646  }
647 
648  SelectorState& filter(const dds::topic::Filter& filter)
649  {
650  DDS_String_replace(
651  &native().content.expression,
652  filter.expression().c_str());
653  rti::core::native_conversions::to_native(
654  native().content.expression_parameters,
655  filter->parameters());
656  return *this;
657  }
658 
659 };
660 
661 }}}
662 
663 #endif // RTI_ROUTING_ADAPTER_STREAM_READER_HPP_

RTI Routing Service Version 6.0.0 Copyright © Sun Mar 3 2019 Real-Time Innovations, Inc