RTI Routing Service Version 7.1.0
StreamReader.hpp
1/*
2 * (c) Copyright, Real-Time Innovations, 2017.
3 * All rights reserved.
4 *
5 * No duplications, whole or partial, manual or electronic, may be made
6 * without express written permission. Any such copies, or
7 * revisions thereof, must display this notice unaltered.
8 * This code contains trade secrets of Real-Time Innovations, Inc.
9 */
10
11#ifndef RTI_ROUTING_ADAPTER_STREAM_READER_HPP_
12#define RTI_ROUTING_ADAPTER_STREAM_READER_HPP_
13
14#include <vector>
15
16#include <dds/core/SafeEnumeration.hpp>
17#include <dds/sub/status/DataState.hpp>
18#include <dds/core/InstanceHandle.hpp>
19#include <dds/core/types.hpp>
20#include <dds/topic/Filter.hpp>
21#include <dds/core/xtypes/DynamicData.hpp>
22#include <dds/sub/SampleInfo.hpp>
23#include <rti/core/NativeValueType.hpp>
24#include <rti/routing/UpdatableEntity.hpp>
25#include <rti/routing/detail/ForwarderUtils.hpp>
26
27namespace rti { namespace routing { namespace adapter {
28
29class SelectorState;
30
46
47public:
48 typedef void* SamplePtr;
49 typedef void* InfoPtr;
50
89 virtual void take(
90 std::vector<SamplePtr>& sample_seq,
91 std::vector<InfoPtr>& info_seq) = 0;
92
100 virtual void read(
101 std::vector<SamplePtr>& sample_seq,
102 std::vector<InfoPtr>& info_seq) = 0;
103
120 virtual void take(
121 std::vector<SamplePtr>& sample_seq,
122 std::vector<InfoPtr>& info_seq,
123 const SelectorState& selector_state) = 0;
124
141 virtual void read(
142 std::vector<SamplePtr>& sample_seq,
143 std::vector<InfoPtr>& info_seq,
144 const SelectorState& selector_state) = 0;
145
161 virtual void return_loan(
162 std::vector<SamplePtr>& sample_seq,
163 std::vector<InfoPtr>& info_seq) = 0;
164
187 virtual void* create_content_query(
188 void *old_query_data,
189 const dds::topic::Filter& filter) = 0;
190
196 virtual void delete_content_query(void* query_data) = 0;
197
202 {
203 }
204
205};
206
207
219template <typename Data, typename Info>
221
222public:
223
227 typedef Data DataRep;
231 typedef Info InfoRep;
232
233 using StreamReader::take;
234 using StreamReader::read;
236
241 void take(
242 std::vector<SamplePtr>& sample_seq,
243 std::vector<InfoPtr>& info_seq) RTI_FINAL
244 {
245 take(sample_seq_, info_seq_);
246 convert_samples(sample_seq, info_seq);
247 }
248
253 void read(
254 std::vector<SamplePtr>& sample_seq,
255 std::vector<InfoPtr>& info_seq) RTI_FINAL
256 {
257 read(sample_seq_, info_seq_);
258 convert_samples(sample_seq, info_seq);
259 }
260
265 void take(
266 std::vector<SamplePtr>& sample_seq,
267 std::vector<InfoPtr>& info_seq,
268 const SelectorState& selector_state) RTI_FINAL
269 {
270 take(sample_seq_, info_seq_, selector_state);
271 convert_samples(sample_seq, info_seq);
272 }
273
278 void read(
279 std::vector<SamplePtr>& sample_seq,
280 std::vector<InfoPtr>& info_seq,
281 const SelectorState& selector_state) RTI_FINAL
282 {
283 read(sample_seq_, info_seq_, selector_state);
284 convert_samples(sample_seq, info_seq);
285 }
286
292 std::vector<SamplePtr>& sample_seq,
293 std::vector<InfoPtr>& info_seq) RTI_FINAL
294 {
295 RTI_ROUTING_SAMPLE_VECTOR_COPY_PTRS(sample_seq_, sample_seq);
296 RTI_ROUTING_SAMPLE_VECTOR_COPY_PTRS(info_seq_, info_seq);
297 return_loan(sample_seq_, info_seq_);
298 sample_seq_.clear();
299 info_seq_.clear();
300 }
301
307 virtual void take(
308 std::vector<Data*>& sample_seq,
309 std::vector<Info*>& info_seq) = 0;
310
316 virtual void read(
317 std::vector<Data*>& sample_seq,
318 std::vector<Info*>& info_seq) = 0;
319
325 virtual void take(
326 std::vector<Data*>& sample_seq,
327 std::vector<Info*>& info_seq,
328 const SelectorState& selector_state) = 0;
329
335 virtual void read(
336 std::vector<Data*>& sample_seq,
337 std::vector<Info*>& info_seq,
338 const SelectorState& selector_state) = 0;
339
345 virtual void return_loan(
346 std::vector<Data*>& sample_seq,
347 std::vector<Info*>& info_seq) = 0;
348
349 /*
350 * @brief Virtual destructor
351 */
352 virtual ~TStreamReader()
353 {
354
355 }
356
357private:
358 void convert_samples(
359 std::vector<SamplePtr>& sample_seq,
360 std::vector<InfoPtr>& info_seq)
361 {
362 RTI_ROUTING_SAMPLE_VECTOR_COPY_PTRS(sample_seq, sample_seq_);
363 RTI_ROUTING_SAMPLE_VECTOR_COPY_PTRS(info_seq, info_seq_);
364 }
365
366private:
367 std::vector<Data*> sample_seq_;
368 std::vector<Info*> info_seq_;
369
370};
371
380template <typename Data, typename Info>
381class NoOpStreamReader : public TStreamReader<Data, Info> {
382
383public:
384 virtual void take(
385 std::vector<Data*>&,
386 std::vector<Info*>&) RTI_OVERRIDE
387 {
388 }
389
390 virtual void read(
391 std::vector<Data*>&,
392 std::vector<Info*>&) RTI_OVERRIDE
393 {
394 }
395
396 virtual void take(
397 std::vector<Data*>&,
398 std::vector<Info*>&,
399 const SelectorState&) RTI_OVERRIDE
400 {
401 }
402
403 virtual void read(
404 std::vector<Data*>&,
405 std::vector<Info*>&,
406 const SelectorState&) RTI_OVERRIDE
407 {
408 }
409
410 virtual void return_loan(
411 std::vector<Data*>&,
412 std::vector<Info*>&) RTI_OVERRIDE
413 {
414 }
415
416 virtual void* create_content_query(
417 void *,
418 const dds::topic::Filter&) RTI_OVERRIDE
419 {
420 return NULL;
421 }
422
423
424 virtual void delete_content_query(void*) RTI_OVERRIDE
425 {
426
427 }
428
429
430
431 virtual ~NoOpStreamReader()
432 {
433 }
434
435};
436
444typedef NoOpStreamReader<dds::core::xtypes::DynamicData, dds::sub::SampleInfo>
446
447
448
449class SelectorStateAdapter {
450public:
451 typedef RTI_RoutingServiceSelectorState native_type;
452
453 static void initialize(native_type& native_value)
454 {
455 RTI_RoutingServiceSelectorState native_state =
456 RTI_RoutingServiceSelectorState_INITIALIZER;
457 memcpy(&native_value, &native_state, sizeof(native_state));
458 }
459
460 static void finalize(native_type& native_value)
461 {
462 if (native_value.content.expression != NULL) {
463 DDS_String_free(native_value.content.expression);
464 }
465 DDS_StringSeq_finalize(&native_value.content.expression_parameters);
466 }
467
468 static void copy(native_type& destination, const native_type& source)
469 {
470 destination.sample_state = source.sample_state;
471 destination.view_state = source.view_state;
472 destination.instance_state = source.instance_state;
473 destination.instance_handle = source.instance_handle;
474 destination.instance_selection = source.instance_selection;
475 destination.sample_count_max = source.sample_count_max;
476 destination.query_data = source.query_data;
477 char *result = DDS_String_replace(
478 &destination.content.expression,
479 source.content.expression);
480 if (source.content.expression != NULL && result == NULL) {
481 throw std::bad_alloc();
482 }
483 if (DDS_StringSeq_copy(
484 &destination.content.expression_parameters,
485 &source.content.expression_parameters) == NULL) {
486 throw std::bad_alloc();
487 }
488 }
489
490 static bool equals(const native_type& first, const native_type& second)
491 {
492 if (first.sample_state != second.sample_state) {
493 return false;
494 }
495 if (first.view_state != second.view_state) {
496 return false;
497 }
498 if (first.instance_state != second.instance_state) {
499 return false;
500 }
501 if (first.instance_selection != second.instance_selection) {
502 return false;
503 }
504 if (first.sample_count_max != second.sample_count_max) {
505 return false;
506 }
507 if (first.query_data != second.query_data) {
508 return false;
509 }
510 if (first.content.expression != NULL
511 && second.content.expression != NULL
512 && strcmp(first.content.expression, second.content.expression) != 0) {
513 return false;
514 }
515 return DDS_StringSeq_equals(
516 &first.content.expression_parameters,
517 &second.content.expression_parameters);
518 }
519
520};
521
522} }
523
524// native_type_traits needs to be defined in rti::core
525namespace core {
526
527template <>
528struct native_type_traits<rti::routing::adapter::SelectorState> {
529 typedef rti::routing::adapter::SelectorStateAdapter adapter_type;
530 typedef RTI_RoutingServiceSelectorState native_type;
531};
532
533}
534
535namespace routing { namespace adapter {
536
547class SelectorState : public rti::core::NativeValueType<SelectorState> {
548public:
549 RTI_NATIVE_VALUE_TYPE_DEFINE_DEFAULT_MOVE_OPERATIONS(SelectorState)
550
551 typedef rti::core::NativeValueType<SelectorState> Base;
552public:
554 {
555
556 }
557
558 SelectorState(const RTI_RoutingServiceSelectorState &native)
559 : Base(native)
560 {
561 }
562
563 dds::sub::status::DataState state() const
564 {
565 return dds::sub::status::DataState(
566 dds::sub::status::SampleState(native().sample_state),
567 dds::sub::status::ViewState(native().view_state),
568 dds::sub::status::InstanceState(native().instance_state));
569 }
570
571 SelectorState& state(const dds::sub::status::DataState& state)
572 {
573 native().sample_state = static_cast<DDS_SampleStateMask>(
574 state.sample_state().to_ulong());
575 native().view_state = static_cast<DDS_ViewStateMask>(
576 state.view_state().to_ulong());
577 native().instance_state = static_cast<DDS_InstanceStateMask>(
578 state.instance_state().to_ulong());
579 return *this;
580 }
581
582 int32_t max_samples() const
583 {
584 return native().sample_count_max;
585 }
586
587 SelectorState& max_samples(const int32_t count)
588 {
589 native().sample_count_max = count;
590 return *this;
591 }
592
593 dds::core::InstanceHandle instance() const
594 {
595 if (native().instance_selection
596 == RTI_ROUTING_SERVICE_THIS_INSTANCE_SELECTION) {
597 return rti::core::native_conversions::from_native_handle(
598 native().instance_handle);
599 }
600
601 return dds::core::InstanceHandle::nil();
602 }
603
604 SelectorState& instance(const dds::core::InstanceHandle& handle)
605 {
606 native().instance_handle =
607 static_cast<DDS_InstanceHandle_t>(handle->native());
608 native().instance_selection = RTI_ROUTING_SERVICE_THIS_INSTANCE_SELECTION;
609 return *this;
610 }
611
612 dds::core::InstanceHandle next_instance() const
613 {
614 if (native().instance_selection
615 == RTI_ROUTING_SERVICE_NEXT_INSTANCE_SELECTION) {
616 return rti::core::native_conversions::from_native_handle(
617 native().instance_handle);
618 }
619
620 return dds::core::InstanceHandle::nil();
621 }
622
623 SelectorState& next_instance(
624 const dds::core::InstanceHandle& handle)
625 {
626 native().instance_handle =
627 static_cast<DDS_InstanceHandle_t>(handle->native());
628 native().instance_selection = RTI_ROUTING_SERVICE_NEXT_INSTANCE_SELECTION;
629 return *this;
630 }
631
632 void* content()
633 {
634 return native().query_data;
635 }
636
637 SelectorState& content(void *query_data)
638 {
639 native().query_data = query_data;
640 return *this;
641 }
642
643 dds::topic::Filter filter() const
644 {
645 return dds::topic::Filter(
646 native().content.expression == NULL ? "" : native().content.expression,
647 rti::core::native_conversions::from_native<std::string>(
648 native().content.expression_parameters));
649 }
650
651 SelectorState& filter(const dds::topic::Filter& filter)
652 {
653 DDS_String_replace(
654 &native().content.expression,
655 filter.expression().c_str());
656 rti::core::native_conversions::to_native(
657 native().content.expression_parameters,
658 filter->parameters());
659 return *this;
660 }
661
662};
663
664}}}
665
666#endif // RTI_ROUTING_ADAPTER_STREAM_READER_HPP_
Defines a common interface for all the pluggable entities that can be updated at runtime.
Definition: UpdatableEntity.hpp:34
An empty implementation of the TStreamReader interface.
Definition: StreamReader.hpp:381
virtual void * create_content_query(void *, const dds::topic::Filter &) RTI_OVERRIDE
Creates a query object that selects the data with the specified filter.
Definition: StreamReader.hpp:416
virtual void read(std::vector< Data * > &, std::vector< Info * > &, const SelectorState &) RTI_OVERRIDE
Interface redefinition.
Definition: StreamReader.hpp:403
virtual void take(std::vector< Data * > &, std::vector< Info * > &) RTI_OVERRIDE
Interface redefinition.
Definition: StreamReader.hpp:384
virtual void take(std::vector< Data * > &, std::vector< Info * > &, const SelectorState &) RTI_OVERRIDE
Interface redefinition.
Definition: StreamReader.hpp:396
virtual void read(std::vector< Data * > &, std::vector< Info * > &) RTI_OVERRIDE
Interface redefinition.
Definition: StreamReader.hpp:390
virtual void return_loan(std::vector< Data * > &, std::vector< Info * > &) RTI_OVERRIDE
Interface redefinition.
Definition: StreamReader.hpp:410
virtual void delete_content_query(void *) RTI_OVERRIDE
Deletes a content query created from this StreamReader.
Definition: StreamReader.hpp:424
Defines a set of attributes that can be used to read a subset of data from StreamReader.
Definition: StreamReader.hpp:547
Provides a way to read samples of a specific type from a data domain. In the XML configuration file,...
Definition: StreamReader.hpp:45
virtual ~StreamReader()
Virtual destructor.
Definition: StreamReader.hpp:201
virtual void take(std::vector< SamplePtr > &sample_seq, std::vector< InfoPtr > &info_seq)=0
Takes a collection of all data samples and info samples available from an input stream.
virtual void * create_content_query(void *old_query_data, const dds::topic::Filter &filter)=0
Creates a query object that selects the data with the specified filter.
virtual void take(std::vector< SamplePtr > &sample_seq, std::vector< InfoPtr > &info_seq, const SelectorState &selector_state)=0
Variation of StreamReader::take where the returned samples shall represent the subset specified by th...
virtual void read(std::vector< SamplePtr > &sample_seq, std::vector< InfoPtr > &info_seq)=0
Variation of StreamReader::take where the returned samples will remain in the StreamReader's cache,...
virtual void delete_content_query(void *query_data)=0
Deletes a content query created from this StreamReader.
virtual void read(std::vector< SamplePtr > &sample_seq, std::vector< InfoPtr > &info_seq, const SelectorState &selector_state)=0
Variation of StreamReader::read where the returned samples shall represent the subset specified by th...
virtual void return_loan(std::vector< SamplePtr > &sample_seq, std::vector< InfoPtr > &info_seq)=0
Returns a loan on the read or taken data samples and info samples.
A wrapper implementation of a StreamReader that provides a strongly-typed interface through template ...
Definition: StreamReader.hpp:220
virtual void read(std::vector< Data * > &sample_seq, std::vector< Info * > &info_seq)=0
Interface redefinition.
virtual void take(std::vector< Data * > &sample_seq, std::vector< Info * > &info_seq)=0
Interface redefinition.
virtual void read(std::vector< Data * > &sample_seq, std::vector< Info * > &info_seq, const SelectorState &selector_state)=0
Interface redefinition.
virtual void return_loan(std::vector< Data * > &sample_seq, std::vector< Info * > &info_seq)=0
Interface redefinition.
Data DataRep
The data type.
Definition: StreamReader.hpp:227
virtual void take(std::vector< Data * > &sample_seq, std::vector< Info * > &info_seq, const SelectorState &selector_state)=0
Interface redefinition.
void read(std::vector< SamplePtr > &sample_seq, std::vector< InfoPtr > &info_seq, const SelectorState &selector_state) RTI_FINAL
Performs the conversion between the vector of data and info pointers to strongly-type pointers.
Definition: StreamReader.hpp:278
void read(std::vector< SamplePtr > &sample_seq, std::vector< InfoPtr > &info_seq) RTI_FINAL
Performs the conversion between the vector of data and info pointers to strongly-type pointers.
Definition: StreamReader.hpp:253
void take(std::vector< SamplePtr > &sample_seq, std::vector< InfoPtr > &info_seq) RTI_FINAL
Performs the conversion between the vector of data and info pointers to strongly-type pointers.
Definition: StreamReader.hpp:241
Info InfoRep
The info type.
Definition: StreamReader.hpp:231
void return_loan(std::vector< SamplePtr > &sample_seq, std::vector< InfoPtr > &info_seq) RTI_FINAL
Performs the conversion between the vector of data and info pointers to strongly-type pointers.
Definition: StreamReader.hpp:291
void take(std::vector< SamplePtr > &sample_seq, std::vector< InfoPtr > &info_seq, const SelectorState &selector_state) RTI_FINAL
Performs the conversion between the vector of data and info pointers to strongly-type pointers.
Definition: StreamReader.hpp:265
NoOpStreamReader< dds::core::xtypes::DynamicData, dds::sub::SampleInfo > DynamicDataStreamReader
Convenient definition of typed StreamReader that requires dds::core::xtypes::DynamicData for data sam...
Definition: StreamReader.hpp:445