11 #ifndef RTI_ROUTING_ADAPTER_DETAIL_STREAM_READER_FORWARDER_HPP_
12 #define RTI_ROUTING_ADAPTER_DETAIL_STREAM_READER_FORWARDER_HPP_
15 #include <rti/core/Exception.hpp>
17 #include <rti/routing/adapter/StreamReader.hpp>
18 #include <rti/routing/StreamInfo.hpp>
19 #include <rti/routing/detail/UpdatableEntityForwarder.hpp>
20 #include <rti/routing/detail/ForwarderUtils.hpp>
21 #include <rti/routing/adapter/detail/DiscoveryStreamReaderForwarder.hpp>
24 namespace rti {
namespace routing {
namespace adapter {
namespace detail {
26 template <
bool ReadOrTake,
bool HasSelector>
struct read_or_take;
30 struct read_or_take<false, false> {
34 std::vector<StreamReader::SamplePtr>& sample_seq,
35 std::vector<StreamReader::InfoPtr>& info_seq,
36 const struct RTI_RoutingServiceSelectorState *)
38 stream_reader.
take(sample_seq, info_seq);
43 struct read_or_take<true, false> {
47 std::vector<StreamReader::SamplePtr>& sample_seq,
48 std::vector<StreamReader::InfoPtr>& info_seq,
49 const struct RTI_RoutingServiceSelectorState *)
51 stream_reader.
read(sample_seq, info_seq);
56 struct read_or_take<false, true> {
60 std::vector<StreamReader::SamplePtr>& sample_seq,
61 std::vector<StreamReader::InfoPtr>& info_seq,
62 const struct RTI_RoutingServiceSelectorState *native_selector)
67 SelectorState(*native_selector));
73 struct read_or_take<true, true> {
77 std::vector<StreamReader::SamplePtr>& sample_seq,
78 std::vector<StreamReader::InfoPtr>& info_seq,
79 const struct RTI_RoutingServiceSelectorState *native_selector)
84 SelectorState(*native_selector));
89 struct SamplesHolder {
96 std::vector<StreamReader::SamplePtr> sample_seq_;
97 std::vector<StreamReader::InfoPtr> info_seq_;
100 class StreamReaderForwarder {
102 static RTI_RoutingServiceStreamReaderExt* create_native(
103 Connection *connection,
105 const struct RTI_RoutingServiceStreamInfo *native_stream_info,
106 const struct RTI_RoutingServiceProperties *native_properties,
107 const struct RTI_RoutingServiceStreamReaderListenerExt *native_listener,
108 RTI_RoutingServiceEnvironment *environment)
112 using rti::routing::detail::ScopedForwarder;
114 StreamInfo stream_info(*native_stream_info);
116 std::map<std::string, std::string> properties;
117 rti::routing::PropertyAdapter::add_properties_from_native(
121 StreamReaderForwarder *forwarder =
new StreamReaderForwarder(
123 ScopedForwarder<Connection, StreamReaderForwarder> scoped(
127 forwarder->stream_reader_ = connection->create_stream_reader(
131 &forwarder->listener_);
132 RTI_ROUTING_THROW_ON_NULL(forwarder->stream_reader_);
135 return forwarder->native();
137 }
catch(
const std::exception& ex) {
138 RTI_RoutingServiceEnvironment_set_error(
144 RTI_RoutingServiceEnvironment_set_error(
146 "unexpected exception");
153 static void delete_native(
154 Connection *connection,
155 RTI_RoutingServiceStreamReaderExt *native_stream_reader,
156 RTI_RoutingServiceEnvironment *environment)
158 StreamReaderForwarder *stream_reader_forwarder =
159 from_native(native_stream_reader);
161 if (stream_reader_forwarder->stream_reader_ != NULL) {
162 connection->delete_stream_reader(
163 stream_reader_forwarder->stream_reader_);
164 stream_reader_forwarder->stream_reader_ = NULL;
166 }
catch(
const std::exception& ex) {
167 RTI_RoutingServiceEnvironment_set_error(
172 RTI_RoutingServiceEnvironment_set_error(
174 "unexpected exception");
177 delete stream_reader_forwarder;
181 RTI_RoutingServiceStreamReaderExt* native()
183 return &this->native_;
189 StreamReaderForwarder(
190 const RTI_RoutingServiceStreamReaderListenerExt *native_listener) :
191 stream_reader_(NULL),
192 listener_(native_listener)
194 RTIOsapiMemory_zero(&native_,
sizeof(native_));
195 native_.stream_reader_data =
196 static_cast<void *
>(
this);
198 StreamReaderForwarder::take;
200 StreamReaderForwarder::read;
201 native_.take_w_selector =
202 StreamReaderForwarder::take_with_selector;
203 native_.read_w_selector =
204 StreamReaderForwarder::read_with_selector;
205 native_.return_loan =
206 StreamReaderForwarder::return_loan;
207 native_.create_content_query =
208 StreamReaderForwarder::create_content_query;
209 native_.delete_content_query =
210 StreamReaderForwarder::delete_content_query;
212 StreamReaderForwarder::update;
215 ~StreamReaderForwarder()
218 for (std::list<SamplesHolder*>::iterator it = holder_pool_.begin();
219 it != holder_pool_.end();
227 void *native_stream_reader_data,
228 RTI_RoutingServiceSample **sample_array,
229 RTI_RoutingServiceSampleInfo **sample_info_array,
231 RTI_RoutingServiceEnvironment *environment)
233 proxy_read<false, false>(
234 native_stream_reader_data,
243 void *native_stream_reader_data,
244 RTI_RoutingServiceSample **sample_array,
245 RTI_RoutingServiceSampleInfo **sample_info_array,
247 RTI_RoutingServiceEnvironment *environment)
249 proxy_read<true, false>(
250 native_stream_reader_data,
258 static void take_with_selector(
259 void *native_stream_reader_data,
260 RTI_RoutingServiceSample **sample_array,
261 RTI_RoutingServiceSampleInfo **sample_info_array,
263 const struct RTI_RoutingServiceSelectorState *native_selector,
264 RTI_RoutingServiceEnvironment *environment)
266 proxy_read<false, true>(
267 native_stream_reader_data,
275 static void read_with_selector(
276 void *native_stream_reader_data,
277 RTI_RoutingServiceSample **sample_array,
278 RTI_RoutingServiceSampleInfo **sample_info_array,
280 const struct RTI_RoutingServiceSelectorState *native_selector,
281 RTI_RoutingServiceEnvironment *environment)
283 proxy_read<true, true>(
284 native_stream_reader_data,
292 static void return_loan(
293 void *native_stream_reader_data,
294 RTI_RoutingServiceSample *native_samples,
295 RTI_RoutingServiceSampleInfo *,
297 RTI_RoutingServiceEnvironment *environment)
299 StreamReaderForwarder *forwarder =
300 static_cast<StreamReaderForwarder*
>(native_stream_reader_data);
301 RTIBOOST_ASSERT(forwarder != NULL);
303 SamplesHolder *holder =
static_cast<SamplesHolder*
>(
304 native_samples[count]);
305 RTIBOOST_ASSERT(holder != NULL);
309 forwarder->stream_reader_->return_loan(
312 }
catch (
const std::exception& ex) {
313 RTI_RoutingServiceEnvironment_set_error(
318 RTI_RoutingServiceEnvironment_set_error(
320 "unexpected exception");
323 forwarder->return_holder(holder);
327 void *native_stream_reader_data,
328 const struct RTI_RoutingServiceProperties * native_properties,
329 RTI_RoutingServiceEnvironment * environment)
332 StreamReaderForwarder *stream_reader_forwarder =
333 static_cast<StreamReaderForwarder*
>(native_stream_reader_data);
335 rti::routing::detail::UpdatableEntityForwarder::update(
336 stream_reader_forwarder->stream_reader_,
341 static void* create_content_query(
342 void *native_stream_reader_data,
343 RTI_RoutingServiceSelectorStateQueryData old_query_data,
344 const struct RTI_RoutingServiceSelectorContent *content,
345 RTI_RoutingServiceEnvironment *environment)
347 StreamReaderForwarder *forwarder =
348 static_cast<StreamReaderForwarder*
>(native_stream_reader_data);
350 void *query_data = NULL;
352 dds::topic::Filter filter(
353 content->expression == NULL ?
"" : content->expression,
354 rti::core::native_conversions::from_native<std::string>(
355 content->expression_parameters));
356 query_data = forwarder->stream_reader_->create_content_query(
359 }
catch (
const std::exception& ex) {
360 RTI_RoutingServiceEnvironment_set_error(
365 RTI_RoutingServiceEnvironment_set_error(
367 "unexpected exception");
373 static void delete_content_query(
374 void *native_stream_reader_data,
375 RTI_RoutingServiceSelectorStateQueryData query_data,
376 RTI_RoutingServiceEnvironment *environment)
378 StreamReaderForwarder *forwarder =
379 static_cast<StreamReaderForwarder*
>(native_stream_reader_data);
381 forwarder->stream_reader_->delete_content_query(query_data);
382 }
catch (
const std::exception& ex) {
383 RTI_RoutingServiceEnvironment_set_error(
388 RTI_RoutingServiceEnvironment_set_error(
390 "unexpected exception");
394 static StreamReaderForwarder* from_native(
395 RTI_RoutingServiceStreamReaderExt *native)
397 return static_cast<StreamReaderForwarder*
>(native->stream_reader_data);
400 template <
bool ReadOrTake,
bool HasSelector>
401 static void proxy_read(
402 void *native_stream_reader_data,
403 RTI_RoutingServiceSample **sample_array,
404 RTI_RoutingServiceSampleInfo **sample_info_array,
406 const struct RTI_RoutingServiceSelectorState *native_selector,
407 RTI_RoutingServiceEnvironment *environment)
409 StreamReaderForwarder *forwarder =
410 static_cast<StreamReaderForwarder*
>(native_stream_reader_data);
411 RTIBOOST_ASSERT(forwarder != NULL);
413 SamplesHolder *holder = forwarder->get_holder();
417 read_or_take<ReadOrTake, HasSelector>::read(
418 *forwarder->stream_reader_,
422 if (!holder->info_seq_.empty()
423 && (holder->info_seq_.size() != holder->sample_seq_.size())) {
424 throw dds::core::PreconditionNotMetError(
425 "sample and info sequences length mismatch");
427 holder->sample_seq_.reserve(holder->sample_seq_.size() + 1);
428 holder->sample_seq_[holder->sample_seq_.size()] = holder;
429 }
catch (
const std::exception& ex) {
430 RTI_RoutingServiceEnvironment_set_error(
434 forwarder->return_holder(holder);
437 RTI_RoutingServiceEnvironment_set_error(
439 "unexpected exception");
440 forwarder->return_holder(holder);
445 *array_length = holder->sample_seq_.size();
446 *sample_array = &holder->sample_seq_[0];
447 if (holder->info_seq_.empty()) {
448 *sample_info_array = NULL;
450 *sample_info_array = &holder->info_seq_[0];
454 SamplesHolder* get_holder()
456 if (holder_pool_.size() == 0) {
457 return new SamplesHolder();
460 SamplesHolder *holder = holder_pool_.front();
461 holder_pool_.pop_front();
466 void return_holder(SamplesHolder *holder)
468 holder->sample_seq_.clear();
469 holder->info_seq_.clear();
470 holder_pool_.push_front(holder);
475 RTI_RoutingServiceStreamReaderExt native_;
476 StreamReader *stream_reader_;
477 StreamReaderListener listener_;
478 std::list<SamplesHolder*> holder_pool_;
483 #endif // RTI_ROUTING_ADAPTER_DETAIL_STREAM_READER_FORWARDER_HPP_