RTI Routing Service Version 7.2.0
StreamReaderForwarder.hpp
1/*
2 * (c) Copyright, Real-Time Innovations, 2017.
3 * All rights reserved.
4 *
5 * No duplications, whole or partial, manual or electronic, may be made
6 * without express written permission. Any such copies, or
7 * revisions thereof, must display this notice unaltered.
8 * This code contains trade secrets of Real-Time Innovations, Inc.
9 */
10
11#ifndef RTI_ROUTING_ADAPTER_DETAIL_STREAM_READER_FORWARDER_HPP_
12#define RTI_ROUTING_ADAPTER_DETAIL_STREAM_READER_FORWARDER_HPP_
13
14#include <assert.h>
15#include <list>
16#include <rti/core/Exception.hpp>
17
18#include <rti/routing/adapter/StreamReader.hpp>
19#include <rti/routing/StreamInfo.hpp>
20#include <rti/routing/detail/UpdatableEntityForwarder.hpp>
21#include <rti/routing/detail/ForwarderUtils.hpp>
22#include <rti/routing/adapter/detail/DiscoveryStreamReaderForwarder.hpp>
23
24
25namespace rti { namespace routing { namespace adapter { namespace detail {
26
27template <bool ReadOrTake, bool HasSelector> struct read_or_take;
28
29
30template<>
31struct read_or_take<false, false> {
32
33 static void read(
35 std::vector<StreamReader::SamplePtr>& sample_seq,
36 std::vector<StreamReader::InfoPtr>& info_seq,
37 const struct RTI_RoutingServiceSelectorState *)
38 {
39 stream_reader.take(sample_seq, info_seq);
40 }
41};
42
43template<>
44struct read_or_take<true, false> {
45
46 static void read(
48 std::vector<StreamReader::SamplePtr>& sample_seq,
49 std::vector<StreamReader::InfoPtr>& info_seq,
50 const struct RTI_RoutingServiceSelectorState *)
51 {
52 stream_reader.read(sample_seq, info_seq);
53 }
54};
55
56template<>
57struct read_or_take<false, true> {
58
59 static void read(
61 std::vector<StreamReader::SamplePtr>& sample_seq,
62 std::vector<StreamReader::InfoPtr>& info_seq,
63 const struct RTI_RoutingServiceSelectorState *native_selector)
64 {
65 stream_reader.take(
66 sample_seq,
67 info_seq,
68 SelectorState(*native_selector));
69 }
70};
71
72
73template<>
74struct read_or_take<true, true> {
75
76 static void read(
78 std::vector<StreamReader::SamplePtr>& sample_seq,
79 std::vector<StreamReader::InfoPtr>& info_seq,
80 const struct RTI_RoutingServiceSelectorState *native_selector)
81 {
82 stream_reader.read(
83 sample_seq,
84 info_seq,
85 SelectorState(*native_selector));
86 }
87};
88
89
90struct SamplesHolder {
91public:
92
93 SamplesHolder()
94 {
95 }
96
97 std::vector<StreamReader::SamplePtr> sample_seq_;
98 std::vector<StreamReader::InfoPtr> info_seq_;
99};
100
101class StreamReaderForwarder {
102public:
103 static RTI_RoutingServiceStreamReaderExt* create_native(
104 Connection *connection,
105 Session *session,
106 const struct RTI_RoutingServiceStreamInfo *native_stream_info,
107 const struct RTI_RoutingServiceProperties *native_properties,
108 const struct RTI_RoutingServiceStreamReaderListenerExt *native_listener,
109 RTI_RoutingServiceEnvironment *environment)
110 {
111
112 try {
113 using rti::routing::detail::ScopedForwarder;
114
115 StreamInfo stream_info(*native_stream_info);
116
117 std::map<std::string, std::string> properties;
118 rti::routing::PropertyAdapter::add_properties_from_native(
119 properties,
120 native_properties);
121
122 StreamReaderForwarder *forwarder = new StreamReaderForwarder(
123 native_listener);
124 ScopedForwarder<Connection, StreamReaderForwarder> scoped(
125 connection,
126 forwarder,
127 environment);
128 forwarder->stream_reader_ = connection->create_stream_reader(
129 session,
130 stream_info,
131 properties,
132 &forwarder->listener_);
133 RTI_ROUTING_THROW_ON_NULL(forwarder->stream_reader_);
134
135 scoped.release();
136
137 // TRUST-134: the forwarder is not leaked here. It might be leaked
138 // if delete_native() were not to be called, but Routing Service
139 // will make sure that is not the case
140
141 /* coverity[leaked_storage : FALSE] */
142 return forwarder->native();
143 } catch(const std::exception& ex) {
144 RTI_RoutingServiceEnvironment_set_error(
145 environment,
146 "%s",
147 ex.what());
148 return NULL;
149 } catch (...) {
150 RTI_RoutingServiceEnvironment_set_error(
151 environment,
152 "unexpected exception");
153 return NULL;
154 }
155
156
157 }
158
159 static void delete_native(
160 Connection *connection,
161 RTI_RoutingServiceStreamReaderExt *native_stream_reader,
162 RTI_RoutingServiceEnvironment *environment)
163 {
164 StreamReaderForwarder *stream_reader_forwarder =
165 from_native(native_stream_reader);
166 try {
167 if (stream_reader_forwarder->stream_reader_ != NULL) {
168 connection->delete_stream_reader(
169 stream_reader_forwarder->stream_reader_);
170 stream_reader_forwarder->stream_reader_ = NULL;
171 }
172 } catch(const std::exception& ex) {
173 RTI_RoutingServiceEnvironment_set_error(
174 environment,
175 "%s",
176 ex.what());
177 } catch (...) {
178 RTI_RoutingServiceEnvironment_set_error(
179 environment,
180 "unexpected exception");
181 }
182
183 delete stream_reader_forwarder;
184 }
185
186
187 RTI_RoutingServiceStreamReaderExt* native()
188 {
189 return &this->native_;
190 }
191
192
193private:
194
195 StreamReaderForwarder(
196 const RTI_RoutingServiceStreamReaderListenerExt *native_listener) :
197 stream_reader_(NULL),
198 listener_(native_listener)
199 {
200 RTIOsapiMemory_zero(&native_, sizeof(native_));
201 native_.stream_reader_data =
202 static_cast<void *>(this);
203 native_.take =
204 StreamReaderForwarder::take;
205 native_.read =
206 StreamReaderForwarder::read;
207 native_.take_w_selector =
208 StreamReaderForwarder::take_with_selector;
209 native_.read_w_selector =
210 StreamReaderForwarder::read_with_selector;
211 native_.return_loan =
212 StreamReaderForwarder::return_loan;
213 native_.create_content_query =
214 StreamReaderForwarder::create_content_query;
215 native_.delete_content_query =
216 StreamReaderForwarder::delete_content_query;
217 native_.update =
218 StreamReaderForwarder::update;
219 }
220
221 ~StreamReaderForwarder()
222 {
223 // delete holders
224 for (std::list<SamplesHolder*>::iterator it = holder_pool_.begin();
225 it != holder_pool_.end();
226 ++it) {
227 delete (*it);
228 }
229 }
230
231
232 static void take(
233 void *native_stream_reader_data,
234 RTI_RoutingServiceSample **sample_array,
235 RTI_RoutingServiceSampleInfo **sample_info_array,
236 int *array_length,
237 RTI_RoutingServiceEnvironment *environment)
238 {
239 proxy_read<false, false>(
240 native_stream_reader_data,
241 sample_array,
242 sample_info_array,
243 array_length,
244 NULL,
245 environment);
246 }
247
248 static void read(
249 void *native_stream_reader_data,
250 RTI_RoutingServiceSample **sample_array,
251 RTI_RoutingServiceSampleInfo **sample_info_array,
252 int *array_length,
253 RTI_RoutingServiceEnvironment *environment)
254 {
255 proxy_read<true, false>(
256 native_stream_reader_data,
257 sample_array,
258 sample_info_array,
259 array_length,
260 NULL,
261 environment);
262 }
263
264 static void take_with_selector(
265 void *native_stream_reader_data,
266 RTI_RoutingServiceSample **sample_array,
267 RTI_RoutingServiceSampleInfo **sample_info_array,
268 int *array_length,
269 const struct RTI_RoutingServiceSelectorState *native_selector,
270 RTI_RoutingServiceEnvironment *environment)
271 {
272 proxy_read<false, true>(
273 native_stream_reader_data,
274 sample_array,
275 sample_info_array,
276 array_length,
277 native_selector,
278 environment);
279 }
280
281 static void read_with_selector(
282 void *native_stream_reader_data,
283 RTI_RoutingServiceSample **sample_array,
284 RTI_RoutingServiceSampleInfo **sample_info_array,
285 int *array_length,
286 const struct RTI_RoutingServiceSelectorState *native_selector,
287 RTI_RoutingServiceEnvironment *environment)
288 {
289 proxy_read<true, true>(
290 native_stream_reader_data,
291 sample_array,
292 sample_info_array,
293 array_length,
294 native_selector,
295 environment);
296 }
297
298 static void return_loan(
299 void *native_stream_reader_data,
300 RTI_RoutingServiceSample *native_samples,
301 RTI_RoutingServiceSampleInfo *,
302 int count,
303 RTI_RoutingServiceEnvironment *environment)
304 {
305 StreamReaderForwarder *forwarder =
306 static_cast<StreamReaderForwarder*>(native_stream_reader_data);
307 assert(forwarder != NULL);
308
309 SamplesHolder *holder = static_cast<SamplesHolder*>(
310 native_samples[count]);
311 assert(holder != NULL);
312
313
314 try {
315 forwarder->stream_reader_->return_loan(
316 holder->sample_seq_,
317 holder->info_seq_);
318 } catch (const std::exception& ex) {
319 RTI_RoutingServiceEnvironment_set_error(
320 environment,
321 "%s",
322 ex.what());
323 } catch (...) {
324 RTI_RoutingServiceEnvironment_set_error(
325 environment,
326 "unexpected exception");
327 }
328
329 forwarder->return_holder(holder);
330 }
331
332 static void update(
333 void *native_stream_reader_data,
334 const struct RTI_RoutingServiceProperties * native_properties,
335 RTI_RoutingServiceEnvironment * environment)
336 {
337
338 StreamReaderForwarder *stream_reader_forwarder =
339 static_cast<StreamReaderForwarder*>(native_stream_reader_data);
340
341 rti::routing::detail::UpdatableEntityForwarder::update(
342 stream_reader_forwarder->stream_reader_,
343 native_properties,
344 environment);
345 }
346
347 static void* create_content_query(
348 void *native_stream_reader_data,
349 RTI_RoutingServiceSelectorStateQueryData old_query_data,
350 const struct RTI_RoutingServiceSelectorContent *content,
351 RTI_RoutingServiceEnvironment *environment)
352 {
353 StreamReaderForwarder *forwarder =
354 static_cast<StreamReaderForwarder*>(native_stream_reader_data);
355
356 void *query_data = NULL;
357 try {
358 dds::topic::Filter filter(
359 content->expression == NULL ? "" : content->expression,
360 rti::core::native_conversions::from_native<std::string>(
361 content->expression_parameters));
362 query_data = forwarder->stream_reader_->create_content_query(
363 old_query_data,
364 filter);
365 } catch (const std::exception& ex) {
366 RTI_RoutingServiceEnvironment_set_error(
367 environment,
368 "%s",
369 ex.what());
370 } catch (...) {
371 RTI_RoutingServiceEnvironment_set_error(
372 environment,
373 "unexpected exception");
374 }
375
376 return query_data;
377 }
378
379 static void delete_content_query(
380 void *native_stream_reader_data,
381 RTI_RoutingServiceSelectorStateQueryData query_data,
382 RTI_RoutingServiceEnvironment *environment)
383 {
384 StreamReaderForwarder *forwarder =
385 static_cast<StreamReaderForwarder*>(native_stream_reader_data);
386 try {
387 forwarder->stream_reader_->delete_content_query(query_data);
388 } catch (const std::exception& ex) {
389 RTI_RoutingServiceEnvironment_set_error(
390 environment,
391 "%s",
392 ex.what());
393 } catch (...) {
394 RTI_RoutingServiceEnvironment_set_error(
395 environment,
396 "unexpected exception");
397 }
398 }
399
400 static StreamReaderForwarder* from_native(
401 RTI_RoutingServiceStreamReaderExt *native)
402 {
403 return static_cast<StreamReaderForwarder*>(native->stream_reader_data);
404 }
405
406 template <bool ReadOrTake, bool HasSelector>
407 static void proxy_read(
408 void *native_stream_reader_data,
409 RTI_RoutingServiceSample **sample_array,
410 RTI_RoutingServiceSampleInfo **sample_info_array,
411 int *array_length,
412 const struct RTI_RoutingServiceSelectorState *native_selector,
413 RTI_RoutingServiceEnvironment *environment)
414 {
415 StreamReaderForwarder *forwarder =
416 static_cast<StreamReaderForwarder*>(native_stream_reader_data);
417 assert(forwarder != NULL);
418
419 SamplesHolder *holder = forwarder->get_holder();
420
421 *array_length = 0;
422 try {
423 read_or_take<ReadOrTake, HasSelector>::read(
424 *forwarder->stream_reader_,
425 holder->sample_seq_,
426 holder->info_seq_,
427 native_selector);
428 if (!holder->info_seq_.empty()
429 && (holder->info_seq_.size() != holder->sample_seq_.size())) {
430 throw dds::core::PreconditionNotMetError(
431 "sample and info sequences length mismatch");
432 }
433 /*
434 * We reserve a hidden slot in the sample_seq to keep the original
435 * loaned returned from the native StreamReader. This allows us
436 * to quickly access it on return_loan(), instead of having to search
437 * in a list.
438 */
439 holder->sample_seq_.reserve(holder->sample_seq_.size() + 1);
440 *(&holder->sample_seq_[0] + holder->sample_seq_.size()) = holder;
441 } catch (const std::exception& ex) {
442 RTI_RoutingServiceEnvironment_set_error(
443 environment,
444 "%s",
445 ex.what());
446 forwarder->return_holder(holder);
447 return;
448 } catch (...) {
449 RTI_RoutingServiceEnvironment_set_error(
450 environment,
451 "unexpected exception");
452 forwarder->return_holder(holder);
453 return;
454 }
455
456 // direct mapping
457 *array_length = static_cast<int>(holder->sample_seq_.size());
458 *sample_array = &holder->sample_seq_[0];
459 if (holder->info_seq_.empty()) {
460 *sample_info_array = NULL;
461 } else {
462 *sample_info_array = &holder->info_seq_[0];
463 }
464 }
465
466 SamplesHolder* get_holder()
467 {
468 if (holder_pool_.size() == 0) {
469 return new SamplesHolder();
470 }
471
472 SamplesHolder *holder = holder_pool_.front();
473 holder_pool_.pop_front();
474
475 return holder;
476 }
477
478 void return_holder(SamplesHolder *holder)
479 {
480 holder->sample_seq_.clear();
481 holder->info_seq_.clear();
482 holder_pool_.push_front(holder);
483 }
484
485private:
486
487 RTI_RoutingServiceStreamReaderExt native_;
488 StreamReader *stream_reader_;
489 StreamReaderListener listener_;
490 std::list<SamplesHolder*> holder_pool_;
491};
492
493}}}}
494
495#endif // RTI_ROUTING_ADAPTER_DETAIL_STREAM_READER_FORWARDER_HPP_
Listener representation used by StreamReader to notify RTI Routing Service when new data is available...
Provides a way to read samples of a specific type from a data domain. In the XML configuration file,...
Definition: StreamReader.hpp:45
virtual void take(std::vector< SamplePtr > &sample_seq, std::vector< InfoPtr > &info_seq)=0
Takes a collection of all data samples and info samples available from an input stream.
virtual void read(std::vector< SamplePtr > &sample_seq, std::vector< InfoPtr > &info_seq)=0
Variation of StreamReader::take where the returned samples will remain in the StreamReader's cache,...