11 #ifndef RTI_ROUTING_ADAPTER_DETAIL_STREAM_READER_FORWARDER_HPP_
12 #define RTI_ROUTING_ADAPTER_DETAIL_STREAM_READER_FORWARDER_HPP_
15 #include <rti/core/Exception.hpp>
17 #include <rti/routing/adapter/StreamReader.hpp>
18 #include <rti/routing/StreamInfo.hpp>
19 #include <rti/routing/detail/UpdatableEntityForwarder.hpp>
20 #include <rti/routing/detail/ForwarderUtils.hpp>
21 #include <rti/routing/adapter/detail/DiscoveryStreamReaderForwarder.hpp>
24 namespace rti {
namespace routing {
namespace adapter {
namespace detail {
26 template <
bool ReadOrTake,
bool HasSelector>
struct read_or_take;
30 struct read_or_take<false, false> {
34 std::vector<StreamReader::SamplePtr>& sample_seq,
35 std::vector<StreamReader::InfoPtr>& info_seq,
36 const struct RTI_RoutingServiceSelectorState *)
38 stream_reader.
take(sample_seq, info_seq);
43 struct read_or_take<true, false> {
47 std::vector<StreamReader::SamplePtr>& sample_seq,
48 std::vector<StreamReader::InfoPtr>& info_seq,
49 const struct RTI_RoutingServiceSelectorState *)
51 stream_reader.
read(sample_seq, info_seq);
56 struct read_or_take<false, true> {
60 std::vector<StreamReader::SamplePtr>& sample_seq,
61 std::vector<StreamReader::InfoPtr>& info_seq,
62 const struct RTI_RoutingServiceSelectorState *native_selector)
67 SelectorState(*native_selector));
73 struct read_or_take<true, true> {
77 std::vector<StreamReader::SamplePtr>& sample_seq,
78 std::vector<StreamReader::InfoPtr>& info_seq,
79 const struct RTI_RoutingServiceSelectorState *native_selector)
84 SelectorState(*native_selector));
89 struct SamplesHolder {
96 std::vector<StreamReader::SamplePtr> sample_seq_;
97 std::vector<StreamReader::InfoPtr> info_seq_;
100 class StreamReaderForwarder {
102 static RTI_RoutingServiceStreamReaderExt* create_native(
103 Connection *connection,
105 const struct RTI_RoutingServiceStreamInfo *native_stream_info,
106 const struct RTI_RoutingServiceProperties *native_properties,
107 const struct RTI_RoutingServiceStreamReaderListenerExt *native_listener,
108 RTI_RoutingServiceEnvironment *environment)
112 StreamInfo stream_info(*native_stream_info);
114 std::map<std::string, std::string> properties;
115 rti::routing::PropertyAdapter::add_properties_from_native(
119 StreamReaderForwarder *forwarder =
new StreamReaderForwarder(
121 ScopedForwarder<Connection, StreamReaderForwarder> scoped(
125 forwarder->stream_reader_ = connection->create_stream_reader(
129 &forwarder->listener_);
130 RTI_ROUTING_THROW_ON_NULL(forwarder->stream_reader_);
133 return forwarder->native();
135 }
catch(
const std::exception& ex) {
136 RTI_RoutingServiceEnvironment_set_error(
142 RTI_RoutingServiceEnvironment_set_error(
144 "unexpected exception");
151 static void delete_native(
152 Connection *connection,
153 RTI_RoutingServiceStreamReaderExt *native_stream_reader,
154 RTI_RoutingServiceEnvironment *environment)
156 StreamReaderForwarder *stream_reader_forwarder =
157 from_native(native_stream_reader);
159 if (stream_reader_forwarder->stream_reader_ != NULL) {
160 connection->delete_stream_reader(
161 stream_reader_forwarder->stream_reader_);
162 stream_reader_forwarder->stream_reader_ = NULL;
164 }
catch(
const std::exception& ex) {
165 RTI_RoutingServiceEnvironment_set_error(
170 RTI_RoutingServiceEnvironment_set_error(
172 "unexpected exception");
175 delete stream_reader_forwarder;
179 RTI_RoutingServiceStreamReaderExt* native()
181 return &this->native_;
187 StreamReaderForwarder(
188 const RTI_RoutingServiceStreamReaderListenerExt *native_listener) :
189 stream_reader_(NULL),
190 listener_(native_listener)
192 RTIOsapiMemory_zero(&native_,
sizeof(native_));
193 native_.stream_reader_data =
194 static_cast<void *
>(
this);
196 StreamReaderForwarder::take;
198 StreamReaderForwarder::read;
199 native_.take_w_selector =
200 StreamReaderForwarder::take_with_selector;
201 native_.read_w_selector =
202 StreamReaderForwarder::read_with_selector;
203 native_.return_loan =
204 StreamReaderForwarder::return_loan;
205 native_.create_content_query =
206 StreamReaderForwarder::create_content_query;
207 native_.delete_content_query =
208 StreamReaderForwarder::delete_content_query;
210 StreamReaderForwarder::update;
213 ~StreamReaderForwarder()
216 for (std::list<SamplesHolder*>::iterator it = holder_pool_.begin();
217 it != holder_pool_.end();
225 void *native_stream_reader_data,
226 RTI_RoutingServiceSample **sample_array,
227 RTI_RoutingServiceSampleInfo **sample_info_array,
229 RTI_RoutingServiceEnvironment *environment)
231 proxy_read<false, false>(
232 native_stream_reader_data,
241 void *native_stream_reader_data,
242 RTI_RoutingServiceSample **sample_array,
243 RTI_RoutingServiceSampleInfo **sample_info_array,
245 RTI_RoutingServiceEnvironment *environment)
247 proxy_read<true, false>(
248 native_stream_reader_data,
256 static void take_with_selector(
257 void *native_stream_reader_data,
258 RTI_RoutingServiceSample **sample_array,
259 RTI_RoutingServiceSampleInfo **sample_info_array,
261 const struct RTI_RoutingServiceSelectorState *native_selector,
262 RTI_RoutingServiceEnvironment *environment)
264 proxy_read<false, true>(
265 native_stream_reader_data,
273 static void read_with_selector(
274 void *native_stream_reader_data,
275 RTI_RoutingServiceSample **sample_array,
276 RTI_RoutingServiceSampleInfo **sample_info_array,
278 const struct RTI_RoutingServiceSelectorState *native_selector,
279 RTI_RoutingServiceEnvironment *environment)
281 proxy_read<true, true>(
282 native_stream_reader_data,
290 static void return_loan(
291 void *native_stream_reader_data,
292 RTI_RoutingServiceSample *native_samples,
293 RTI_RoutingServiceSampleInfo *,
295 RTI_RoutingServiceEnvironment *environment)
297 StreamReaderForwarder *forwarder =
298 static_cast<StreamReaderForwarder*
>(native_stream_reader_data);
299 RTIBOOST_ASSERT(forwarder != NULL);
301 SamplesHolder *holder =
static_cast<SamplesHolder*
>(
302 native_samples[count]);
303 RTIBOOST_ASSERT(holder != NULL);
307 forwarder->stream_reader_->return_loan(
310 }
catch (
const std::exception& ex) {
311 RTI_RoutingServiceEnvironment_set_error(
316 RTI_RoutingServiceEnvironment_set_error(
318 "unexpected exception");
321 forwarder->return_holder(holder);
325 void *native_stream_reader_data,
326 const struct RTI_RoutingServiceProperties * native_properties,
327 RTI_RoutingServiceEnvironment * environment)
330 StreamReaderForwarder *stream_reader_forwarder =
331 static_cast<StreamReaderForwarder*
>(native_stream_reader_data);
333 rti::routing::detail::UpdatableEntityForwarder::update(
334 stream_reader_forwarder->stream_reader_,
339 static void* create_content_query(
340 void *native_stream_reader_data,
341 RTI_RoutingServiceSelectorStateQueryData old_query_data,
342 const struct RTI_RoutingServiceSelectorContent *content,
343 RTI_RoutingServiceEnvironment *environment)
345 StreamReaderForwarder *forwarder =
346 static_cast<StreamReaderForwarder*
>(native_stream_reader_data);
348 void *query_data = NULL;
350 dds::topic::Filter filter(
351 content->expression == NULL ?
"" : content->expression,
352 rti::core::native_conversions::from_native<std::string>(
353 content->expression_parameters));
354 query_data = forwarder->stream_reader_->create_content_query(
357 }
catch (
const std::exception& ex) {
358 RTI_RoutingServiceEnvironment_set_error(
363 RTI_RoutingServiceEnvironment_set_error(
365 "unexpected exception");
371 static void delete_content_query(
372 void *native_stream_reader_data,
373 RTI_RoutingServiceSelectorStateQueryData query_data,
374 RTI_RoutingServiceEnvironment *environment)
376 StreamReaderForwarder *forwarder =
377 static_cast<StreamReaderForwarder*
>(native_stream_reader_data);
379 forwarder->stream_reader_->delete_content_query(query_data);
380 }
catch (
const std::exception& ex) {
381 RTI_RoutingServiceEnvironment_set_error(
386 RTI_RoutingServiceEnvironment_set_error(
388 "unexpected exception");
392 static StreamReaderForwarder* from_native(
393 RTI_RoutingServiceStreamReaderExt *native)
395 return static_cast<StreamReaderForwarder*
>(native->stream_reader_data);
398 template <
bool ReadOrTake,
bool HasSelector>
399 static void proxy_read(
400 void *native_stream_reader_data,
401 RTI_RoutingServiceSample **sample_array,
402 RTI_RoutingServiceSampleInfo **sample_info_array,
404 const struct RTI_RoutingServiceSelectorState *native_selector,
405 RTI_RoutingServiceEnvironment *environment)
407 StreamReaderForwarder *forwarder =
408 static_cast<StreamReaderForwarder*
>(native_stream_reader_data);
409 RTIBOOST_ASSERT(forwarder != NULL);
411 SamplesHolder *holder = forwarder->get_holder();
415 read_or_take<ReadOrTake, HasSelector>::read(
416 *forwarder->stream_reader_,
420 if (!holder->info_seq_.empty()
421 && (holder->info_seq_.size() != holder->sample_seq_.size())) {
422 throw dds::core::PreconditionNotMetError(
423 "sample and info sequences length mismatch");
425 holder->sample_seq_.reserve(holder->sample_seq_.size() + 1);
426 holder->sample_seq_[holder->sample_seq_.size()] = holder;
427 }
catch (
const std::exception& ex) {
428 RTI_RoutingServiceEnvironment_set_error(
432 forwarder->return_holder(holder);
435 RTI_RoutingServiceEnvironment_set_error(
437 "unexpected exception");
438 forwarder->return_holder(holder);
443 *array_length = holder->sample_seq_.size();
444 *sample_array = &holder->sample_seq_[0];
445 if (holder->info_seq_.empty()) {
446 *sample_info_array = NULL;
448 *sample_info_array = &holder->info_seq_[0];
452 SamplesHolder* get_holder()
454 if (holder_pool_.size() == 0) {
455 return new SamplesHolder();
458 SamplesHolder *holder = holder_pool_.front();
459 holder_pool_.pop_front();
464 void return_holder(SamplesHolder *holder)
466 holder->sample_seq_.clear();
467 holder->info_seq_.clear();
468 holder_pool_.push_front(holder);
473 RTI_RoutingServiceStreamReaderExt native_;
474 StreamReader *stream_reader_;
475 StreamReaderListener listener_;
476 std::list<SamplesHolder*> holder_pool_;
481 #endif // RTI_ROUTING_ADAPTER_DETAIL_STREAM_READER_FORWARDER_HPP_