11#ifndef RTI_ROUTING_ADAPTER_DETAIL_STREAM_READER_FORWARDER_HPP_
12#define RTI_ROUTING_ADAPTER_DETAIL_STREAM_READER_FORWARDER_HPP_
16#include <rti/core/Exception.hpp>
18#include <rti/routing/adapter/StreamReader.hpp>
19#include <rti/routing/StreamInfo.hpp>
20#include <rti/routing/detail/UpdatableEntityForwarder.hpp>
21#include <rti/routing/detail/ForwarderUtils.hpp>
22#include <rti/routing/adapter/detail/DiscoveryStreamReaderForwarder.hpp>
25namespace rti {
namespace routing {
namespace adapter {
namespace detail {
27template <
bool ReadOrTake,
bool HasSelector>
struct read_or_take;
31struct read_or_take<false, false> {
35 std::vector<StreamReader::SamplePtr>& sample_seq,
36 std::vector<StreamReader::InfoPtr>& info_seq,
37 const struct RTI_RoutingServiceSelectorState *)
39 stream_reader.
take(sample_seq, info_seq);
44struct read_or_take<true, false> {
48 std::vector<StreamReader::SamplePtr>& sample_seq,
49 std::vector<StreamReader::InfoPtr>& info_seq,
50 const struct RTI_RoutingServiceSelectorState *)
52 stream_reader.
read(sample_seq, info_seq);
57struct read_or_take<false, true> {
61 std::vector<StreamReader::SamplePtr>& sample_seq,
62 std::vector<StreamReader::InfoPtr>& info_seq,
63 const struct RTI_RoutingServiceSelectorState *native_selector)
68 SelectorState(*native_selector));
74struct read_or_take<true, true> {
78 std::vector<StreamReader::SamplePtr>& sample_seq,
79 std::vector<StreamReader::InfoPtr>& info_seq,
80 const struct RTI_RoutingServiceSelectorState *native_selector)
85 SelectorState(*native_selector));
97 std::vector<StreamReader::SamplePtr> sample_seq_;
98 std::vector<StreamReader::InfoPtr> info_seq_;
101class StreamReaderForwarder {
103 static RTI_RoutingServiceStreamReaderExt* create_native(
104 Connection *connection,
106 const struct RTI_RoutingServiceStreamInfo *native_stream_info,
107 const struct RTI_RoutingServiceProperties *native_properties,
108 const struct RTI_RoutingServiceStreamReaderListenerExt *native_listener,
109 RTI_RoutingServiceEnvironment *environment)
113 using rti::routing::detail::ScopedForwarder;
115 StreamInfo stream_info(*native_stream_info);
117 std::map<std::string, std::string> properties;
118 rti::routing::PropertyAdapter::add_properties_from_native(
122 StreamReaderForwarder *forwarder =
new StreamReaderForwarder(
124 ScopedForwarder<Connection, StreamReaderForwarder> scoped(
128 forwarder->stream_reader_ = connection->create_stream_reader(
132 &forwarder->listener_);
133 RTI_ROUTING_THROW_ON_NULL(forwarder->stream_reader_);
142 return forwarder->native();
143 }
catch(
const std::exception& ex) {
144 RTI_RoutingServiceEnvironment_set_error(
150 RTI_RoutingServiceEnvironment_set_error(
152 "unexpected exception");
159 static void delete_native(
160 Connection *connection,
161 RTI_RoutingServiceStreamReaderExt *native_stream_reader,
162 RTI_RoutingServiceEnvironment *environment)
164 StreamReaderForwarder *stream_reader_forwarder =
165 from_native(native_stream_reader);
167 if (stream_reader_forwarder->stream_reader_ != NULL) {
168 connection->delete_stream_reader(
169 stream_reader_forwarder->stream_reader_);
170 stream_reader_forwarder->stream_reader_ = NULL;
172 }
catch(
const std::exception& ex) {
173 RTI_RoutingServiceEnvironment_set_error(
178 RTI_RoutingServiceEnvironment_set_error(
180 "unexpected exception");
183 delete stream_reader_forwarder;
187 RTI_RoutingServiceStreamReaderExt* native()
189 return &this->native_;
195 StreamReaderForwarder(
196 const RTI_RoutingServiceStreamReaderListenerExt *native_listener) :
197 stream_reader_(NULL),
198 listener_(native_listener)
200 RTIOsapiMemory_zero(&native_,
sizeof(native_));
201 native_.stream_reader_data =
202 static_cast<void *
>(
this);
204 StreamReaderForwarder::take;
206 StreamReaderForwarder::read;
207 native_.take_w_selector =
208 StreamReaderForwarder::take_with_selector;
209 native_.read_w_selector =
210 StreamReaderForwarder::read_with_selector;
211 native_.return_loan =
212 StreamReaderForwarder::return_loan;
213 native_.create_content_query =
214 StreamReaderForwarder::create_content_query;
215 native_.delete_content_query =
216 StreamReaderForwarder::delete_content_query;
218 StreamReaderForwarder::update;
221 ~StreamReaderForwarder()
224 for (std::list<SamplesHolder*>::iterator it = holder_pool_.begin();
225 it != holder_pool_.end();
233 void *native_stream_reader_data,
234 RTI_RoutingServiceSample **sample_array,
235 RTI_RoutingServiceSampleInfo **sample_info_array,
237 RTI_RoutingServiceEnvironment *environment)
239 proxy_read<false, false>(
240 native_stream_reader_data,
249 void *native_stream_reader_data,
250 RTI_RoutingServiceSample **sample_array,
251 RTI_RoutingServiceSampleInfo **sample_info_array,
253 RTI_RoutingServiceEnvironment *environment)
255 proxy_read<true, false>(
256 native_stream_reader_data,
264 static void take_with_selector(
265 void *native_stream_reader_data,
266 RTI_RoutingServiceSample **sample_array,
267 RTI_RoutingServiceSampleInfo **sample_info_array,
269 const struct RTI_RoutingServiceSelectorState *native_selector,
270 RTI_RoutingServiceEnvironment *environment)
272 proxy_read<false, true>(
273 native_stream_reader_data,
281 static void read_with_selector(
282 void *native_stream_reader_data,
283 RTI_RoutingServiceSample **sample_array,
284 RTI_RoutingServiceSampleInfo **sample_info_array,
286 const struct RTI_RoutingServiceSelectorState *native_selector,
287 RTI_RoutingServiceEnvironment *environment)
289 proxy_read<true, true>(
290 native_stream_reader_data,
298 static void return_loan(
299 void *native_stream_reader_data,
300 RTI_RoutingServiceSample *native_samples,
301 RTI_RoutingServiceSampleInfo *,
303 RTI_RoutingServiceEnvironment *environment)
305 StreamReaderForwarder *forwarder =
306 static_cast<StreamReaderForwarder*
>(native_stream_reader_data);
307 assert(forwarder != NULL);
309 SamplesHolder *holder =
static_cast<SamplesHolder*
>(
310 native_samples[count]);
311 assert(holder != NULL);
315 forwarder->stream_reader_->return_loan(
318 }
catch (
const std::exception& ex) {
319 RTI_RoutingServiceEnvironment_set_error(
324 RTI_RoutingServiceEnvironment_set_error(
326 "unexpected exception");
329 forwarder->return_holder(holder);
333 void *native_stream_reader_data,
334 const struct RTI_RoutingServiceProperties * native_properties,
335 RTI_RoutingServiceEnvironment * environment)
338 StreamReaderForwarder *stream_reader_forwarder =
339 static_cast<StreamReaderForwarder*
>(native_stream_reader_data);
341 rti::routing::detail::UpdatableEntityForwarder::update(
342 stream_reader_forwarder->stream_reader_,
347 static void* create_content_query(
348 void *native_stream_reader_data,
349 RTI_RoutingServiceSelectorStateQueryData old_query_data,
350 const struct RTI_RoutingServiceSelectorContent *content,
351 RTI_RoutingServiceEnvironment *environment)
353 StreamReaderForwarder *forwarder =
354 static_cast<StreamReaderForwarder*
>(native_stream_reader_data);
356 void *query_data = NULL;
358 dds::topic::Filter filter(
359 content->expression == NULL ?
"" : content->expression,
360 rti::core::native_conversions::from_native<std::string>(
361 content->expression_parameters));
362 query_data = forwarder->stream_reader_->create_content_query(
365 }
catch (
const std::exception& ex) {
366 RTI_RoutingServiceEnvironment_set_error(
371 RTI_RoutingServiceEnvironment_set_error(
373 "unexpected exception");
379 static void delete_content_query(
380 void *native_stream_reader_data,
381 RTI_RoutingServiceSelectorStateQueryData query_data,
382 RTI_RoutingServiceEnvironment *environment)
384 StreamReaderForwarder *forwarder =
385 static_cast<StreamReaderForwarder*
>(native_stream_reader_data);
387 forwarder->stream_reader_->delete_content_query(query_data);
388 }
catch (
const std::exception& ex) {
389 RTI_RoutingServiceEnvironment_set_error(
394 RTI_RoutingServiceEnvironment_set_error(
396 "unexpected exception");
400 static StreamReaderForwarder* from_native(
401 RTI_RoutingServiceStreamReaderExt *native)
403 return static_cast<StreamReaderForwarder*
>(native->stream_reader_data);
406 template <
bool ReadOrTake,
bool HasSelector>
407 static void proxy_read(
408 void *native_stream_reader_data,
409 RTI_RoutingServiceSample **sample_array,
410 RTI_RoutingServiceSampleInfo **sample_info_array,
412 const struct RTI_RoutingServiceSelectorState *native_selector,
413 RTI_RoutingServiceEnvironment *environment)
415 StreamReaderForwarder *forwarder =
416 static_cast<StreamReaderForwarder*
>(native_stream_reader_data);
417 assert(forwarder != NULL);
419 SamplesHolder *holder = forwarder->get_holder();
423 read_or_take<ReadOrTake, HasSelector>::read(
424 *forwarder->stream_reader_,
428 if (!holder->info_seq_.empty()
429 && (holder->info_seq_.size() != holder->sample_seq_.size())) {
430 throw dds::core::PreconditionNotMetError(
431 "sample and info sequences length mismatch");
439 holder->sample_seq_.reserve(holder->sample_seq_.size() + 1);
440 *(&holder->sample_seq_[0] + holder->sample_seq_.size()) = holder;
441 }
catch (
const std::exception& ex) {
442 RTI_RoutingServiceEnvironment_set_error(
446 forwarder->return_holder(holder);
449 RTI_RoutingServiceEnvironment_set_error(
451 "unexpected exception");
452 forwarder->return_holder(holder);
457 *array_length =
static_cast<int>(holder->sample_seq_.size());
458 *sample_array = &holder->sample_seq_[0];
459 if (holder->info_seq_.empty()) {
460 *sample_info_array = NULL;
462 *sample_info_array = &holder->info_seq_[0];
466 SamplesHolder* get_holder()
468 if (holder_pool_.size() == 0) {
469 return new SamplesHolder();
472 SamplesHolder *holder = holder_pool_.front();
473 holder_pool_.pop_front();
478 void return_holder(SamplesHolder *holder)
480 holder->sample_seq_.clear();
481 holder->info_seq_.clear();
482 holder_pool_.push_front(holder);
487 RTI_RoutingServiceStreamReaderExt native_;
488 StreamReader *stream_reader_;
490 std::list<SamplesHolder*> holder_pool_;
Listener representation used by StreamReader to notify RTI Routing Service when new data is available...
Provides a way to read samples of a specific type from a data domain. In the XML configuration file,...
Definition: StreamReader.hpp:45
virtual void take(std::vector< SamplePtr > &sample_seq, std::vector< InfoPtr > &info_seq)=0
Takes a collection of all data samples and info samples available from an input stream.
virtual void read(std::vector< SamplePtr > &sample_seq, std::vector< InfoPtr > &info_seq)=0
Variation of StreamReader::take where the returned samples will remain in the StreamReader's cache,...