RTI Routing Service  Version 6.0.1
 All Data Structures Files Functions Typedefs Enumerations Enumerator Groups Pages
StreamReaderForwarder.hpp
1 /*
2  * (c) Copyright, Real-Time Innovations, 2017.
3  * All rights reserved.
4  *
5  * No duplications, whole or partial, manual or electronic, may be made
6  * without express written permission. Any such copies, or
7  * revisions thereof, must display this notice unaltered.
8  * This code contains trade secrets of Real-Time Innovations, Inc.
9  */
10 
11 #ifndef RTI_ROUTING_ADAPTER_DETAIL_STREAM_READER_FORWARDER_HPP_
12 #define RTI_ROUTING_ADAPTER_DETAIL_STREAM_READER_FORWARDER_HPP_
13 
14 #include <list>
15 #include <rti/core/Exception.hpp>
16 
17 #include <rti/routing/adapter/StreamReader.hpp>
18 #include <rti/routing/StreamInfo.hpp>
19 #include <rti/routing/detail/UpdatableEntityForwarder.hpp>
20 #include <rti/routing/detail/ForwarderUtils.hpp>
21 #include <rti/routing/adapter/detail/DiscoveryStreamReaderForwarder.hpp>
22 
23 
24 namespace rti { namespace routing { namespace adapter { namespace detail {
25 
26 template <bool ReadOrTake, bool HasSelector> struct read_or_take;
27 
28 
29 template<>
30 struct read_or_take<false, false> {
31 
32  static void read(
34  std::vector<StreamReader::SamplePtr>& sample_seq,
35  std::vector<StreamReader::InfoPtr>& info_seq,
36  const struct RTI_RoutingServiceSelectorState *)
37  {
38  stream_reader.take(sample_seq, info_seq);
39  }
40 };
41 
42 template<>
43 struct read_or_take<true, false> {
44 
45  static void read(
47  std::vector<StreamReader::SamplePtr>& sample_seq,
48  std::vector<StreamReader::InfoPtr>& info_seq,
49  const struct RTI_RoutingServiceSelectorState *)
50  {
51  stream_reader.read(sample_seq, info_seq);
52  }
53 };
54 
55 template<>
56 struct read_or_take<false, true> {
57 
58  static void read(
60  std::vector<StreamReader::SamplePtr>& sample_seq,
61  std::vector<StreamReader::InfoPtr>& info_seq,
62  const struct RTI_RoutingServiceSelectorState *native_selector)
63  {
64  stream_reader.take(
65  sample_seq,
66  info_seq,
67  SelectorState(*native_selector));
68  }
69 };
70 
71 
72 template<>
73 struct read_or_take<true, true> {
74 
75  static void read(
77  std::vector<StreamReader::SamplePtr>& sample_seq,
78  std::vector<StreamReader::InfoPtr>& info_seq,
79  const struct RTI_RoutingServiceSelectorState *native_selector)
80  {
81  stream_reader.read(
82  sample_seq,
83  info_seq,
84  SelectorState(*native_selector));
85  }
86 };
87 
88 
89 struct SamplesHolder {
90 public:
91 
92  SamplesHolder()
93  {
94  }
95 
96  std::vector<StreamReader::SamplePtr> sample_seq_;
97  std::vector<StreamReader::InfoPtr> info_seq_;
98 };
99 
100 class StreamReaderForwarder {
101 public:
102  static RTI_RoutingServiceStreamReaderExt* create_native(
103  Connection *connection,
104  Session *session,
105  const struct RTI_RoutingServiceStreamInfo *native_stream_info,
106  const struct RTI_RoutingServiceProperties *native_properties,
107  const struct RTI_RoutingServiceStreamReaderListenerExt *native_listener,
108  RTI_RoutingServiceEnvironment *environment)
109  {
110 
111  try {
112  using rti::routing::detail::ScopedForwarder;
113 
114  StreamInfo stream_info(*native_stream_info);
115 
116  std::map<std::string, std::string> properties;
117  rti::routing::PropertyAdapter::add_properties_from_native(
118  properties,
119  native_properties);
120 
121  StreamReaderForwarder *forwarder = new StreamReaderForwarder(
122  native_listener);
123  ScopedForwarder<Connection, StreamReaderForwarder> scoped(
124  connection,
125  forwarder,
126  environment);
127  forwarder->stream_reader_ = connection->create_stream_reader(
128  session,
129  stream_info,
130  properties,
131  &forwarder->listener_);
132  RTI_ROUTING_THROW_ON_NULL(forwarder->stream_reader_);
133 
134  scoped.release();
135  return forwarder->native();
136 
137  } catch(const std::exception& ex) {
138  RTI_RoutingServiceEnvironment_set_error(
139  environment,
140  "%s",
141  ex.what());
142  return NULL;
143  } catch (...) {
144  RTI_RoutingServiceEnvironment_set_error(
145  environment,
146  "unexpected exception");
147  return NULL;
148  }
149 
150 
151  }
152 
153  static void delete_native(
154  Connection *connection,
155  RTI_RoutingServiceStreamReaderExt *native_stream_reader,
156  RTI_RoutingServiceEnvironment *environment)
157  {
158  StreamReaderForwarder *stream_reader_forwarder =
159  from_native(native_stream_reader);
160  try {
161  if (stream_reader_forwarder->stream_reader_ != NULL) {
162  connection->delete_stream_reader(
163  stream_reader_forwarder->stream_reader_);
164  stream_reader_forwarder->stream_reader_ = NULL;
165  }
166  } catch(const std::exception& ex) {
167  RTI_RoutingServiceEnvironment_set_error(
168  environment,
169  "%s",
170  ex.what());
171  } catch (...) {
172  RTI_RoutingServiceEnvironment_set_error(
173  environment,
174  "unexpected exception");
175  }
176 
177  delete stream_reader_forwarder;
178  }
179 
180 
181  RTI_RoutingServiceStreamReaderExt* native()
182  {
183  return &this->native_;
184  }
185 
186 
187 private:
188 
189  StreamReaderForwarder(
190  const RTI_RoutingServiceStreamReaderListenerExt *native_listener) :
191  stream_reader_(NULL),
192  listener_(native_listener)
193  {
194  RTIOsapiMemory_zero(&native_, sizeof(native_));
195  native_.stream_reader_data =
196  static_cast<void *>(this);
197  native_.take =
198  StreamReaderForwarder::take;
199  native_.read =
200  StreamReaderForwarder::read;
201  native_.take_w_selector =
202  StreamReaderForwarder::take_with_selector;
203  native_.read_w_selector =
204  StreamReaderForwarder::read_with_selector;
205  native_.return_loan =
206  StreamReaderForwarder::return_loan;
207  native_.create_content_query =
208  StreamReaderForwarder::create_content_query;
209  native_.delete_content_query =
210  StreamReaderForwarder::delete_content_query;
211  native_.update =
212  StreamReaderForwarder::update;
213  }
214 
215  ~StreamReaderForwarder()
216  {
217  // delete holders
218  for (std::list<SamplesHolder*>::iterator it = holder_pool_.begin();
219  it != holder_pool_.end();
220  ++it) {
221  delete (*it);
222  }
223  }
224 
225 
226  static void take(
227  void *native_stream_reader_data,
228  RTI_RoutingServiceSample **sample_array,
229  RTI_RoutingServiceSampleInfo **sample_info_array,
230  int *array_length,
231  RTI_RoutingServiceEnvironment *environment)
232  {
233  proxy_read<false, false>(
234  native_stream_reader_data,
235  sample_array,
236  sample_info_array,
237  array_length,
238  NULL,
239  environment);
240  }
241 
242  static void read(
243  void *native_stream_reader_data,
244  RTI_RoutingServiceSample **sample_array,
245  RTI_RoutingServiceSampleInfo **sample_info_array,
246  int *array_length,
247  RTI_RoutingServiceEnvironment *environment)
248  {
249  proxy_read<true, false>(
250  native_stream_reader_data,
251  sample_array,
252  sample_info_array,
253  array_length,
254  NULL,
255  environment);
256  }
257 
258  static void take_with_selector(
259  void *native_stream_reader_data,
260  RTI_RoutingServiceSample **sample_array,
261  RTI_RoutingServiceSampleInfo **sample_info_array,
262  int *array_length,
263  const struct RTI_RoutingServiceSelectorState *native_selector,
264  RTI_RoutingServiceEnvironment *environment)
265  {
266  proxy_read<false, true>(
267  native_stream_reader_data,
268  sample_array,
269  sample_info_array,
270  array_length,
271  native_selector,
272  environment);
273  }
274 
275  static void read_with_selector(
276  void *native_stream_reader_data,
277  RTI_RoutingServiceSample **sample_array,
278  RTI_RoutingServiceSampleInfo **sample_info_array,
279  int *array_length,
280  const struct RTI_RoutingServiceSelectorState *native_selector,
281  RTI_RoutingServiceEnvironment *environment)
282  {
283  proxy_read<true, true>(
284  native_stream_reader_data,
285  sample_array,
286  sample_info_array,
287  array_length,
288  native_selector,
289  environment);
290  }
291 
292  static void return_loan(
293  void *native_stream_reader_data,
294  RTI_RoutingServiceSample *native_samples,
295  RTI_RoutingServiceSampleInfo *,
296  int count,
297  RTI_RoutingServiceEnvironment *environment)
298  {
299  StreamReaderForwarder *forwarder =
300  static_cast<StreamReaderForwarder*>(native_stream_reader_data);
301  RTIBOOST_ASSERT(forwarder != NULL);
302 
303  SamplesHolder *holder = static_cast<SamplesHolder*>(
304  native_samples[count]);
305  RTIBOOST_ASSERT(holder != NULL);
306 
307 
308  try {
309  forwarder->stream_reader_->return_loan(
310  holder->sample_seq_,
311  holder->info_seq_);
312  } catch (const std::exception& ex) {
313  RTI_RoutingServiceEnvironment_set_error(
314  environment,
315  "%s",
316  ex.what());
317  } catch (...) {
318  RTI_RoutingServiceEnvironment_set_error(
319  environment,
320  "unexpected exception");
321  }
322 
323  forwarder->return_holder(holder);
324  }
325 
326  static void update(
327  void *native_stream_reader_data,
328  const struct RTI_RoutingServiceProperties * native_properties,
329  RTI_RoutingServiceEnvironment * environment)
330  {
331 
332  StreamReaderForwarder *stream_reader_forwarder =
333  static_cast<StreamReaderForwarder*>(native_stream_reader_data);
334 
335  rti::routing::detail::UpdatableEntityForwarder::update(
336  stream_reader_forwarder->stream_reader_,
337  native_properties,
338  environment);
339  }
340 
341  static void* create_content_query(
342  void *native_stream_reader_data,
343  RTI_RoutingServiceSelectorStateQueryData old_query_data,
344  const struct RTI_RoutingServiceSelectorContent *content,
345  RTI_RoutingServiceEnvironment *environment)
346  {
347  StreamReaderForwarder *forwarder =
348  static_cast<StreamReaderForwarder*>(native_stream_reader_data);
349 
350  void *query_data = NULL;
351  try {
352  dds::topic::Filter filter(
353  content->expression == NULL ? "" : content->expression,
354  rti::core::native_conversions::from_native<std::string>(
355  content->expression_parameters));
356  query_data = forwarder->stream_reader_->create_content_query(
357  old_query_data,
358  filter);
359  } catch (const std::exception& ex) {
360  RTI_RoutingServiceEnvironment_set_error(
361  environment,
362  "%s",
363  ex.what());
364  } catch (...) {
365  RTI_RoutingServiceEnvironment_set_error(
366  environment,
367  "unexpected exception");
368  }
369 
370  return query_data;
371  }
372 
373  static void delete_content_query(
374  void *native_stream_reader_data,
375  RTI_RoutingServiceSelectorStateQueryData query_data,
376  RTI_RoutingServiceEnvironment *environment)
377  {
378  StreamReaderForwarder *forwarder =
379  static_cast<StreamReaderForwarder*>(native_stream_reader_data);
380  try {
381  forwarder->stream_reader_->delete_content_query(query_data);
382  } catch (const std::exception& ex) {
383  RTI_RoutingServiceEnvironment_set_error(
384  environment,
385  "%s",
386  ex.what());
387  } catch (...) {
388  RTI_RoutingServiceEnvironment_set_error(
389  environment,
390  "unexpected exception");
391  }
392  }
393 
394  static StreamReaderForwarder* from_native(
395  RTI_RoutingServiceStreamReaderExt *native)
396  {
397  return static_cast<StreamReaderForwarder*>(native->stream_reader_data);
398  }
399 
400  template <bool ReadOrTake, bool HasSelector>
401  static void proxy_read(
402  void *native_stream_reader_data,
403  RTI_RoutingServiceSample **sample_array,
404  RTI_RoutingServiceSampleInfo **sample_info_array,
405  int *array_length,
406  const struct RTI_RoutingServiceSelectorState *native_selector,
407  RTI_RoutingServiceEnvironment *environment)
408  {
409  StreamReaderForwarder *forwarder =
410  static_cast<StreamReaderForwarder*>(native_stream_reader_data);
411  RTIBOOST_ASSERT(forwarder != NULL);
412 
413  SamplesHolder *holder = forwarder->get_holder();
414 
415  *array_length = 0;
416  try {
417  read_or_take<ReadOrTake, HasSelector>::read(
418  *forwarder->stream_reader_,
419  holder->sample_seq_,
420  holder->info_seq_,
421  native_selector);
422  if (!holder->info_seq_.empty()
423  && (holder->info_seq_.size() != holder->sample_seq_.size())) {
424  throw dds::core::PreconditionNotMetError(
425  "sample and info sequences length mismatch");
426  }
427  holder->sample_seq_.reserve(holder->sample_seq_.size() + 1);
428  holder->sample_seq_[holder->sample_seq_.size()] = holder;
429  } catch (const std::exception& ex) {
430  RTI_RoutingServiceEnvironment_set_error(
431  environment,
432  "%s",
433  ex.what());
434  forwarder->return_holder(holder);
435  return;
436  } catch (...) {
437  RTI_RoutingServiceEnvironment_set_error(
438  environment,
439  "unexpected exception");
440  forwarder->return_holder(holder);
441  return;
442  }
443 
444  // direct mapping
445  *array_length = holder->sample_seq_.size();
446  *sample_array = &holder->sample_seq_[0];
447  if (holder->info_seq_.empty()) {
448  *sample_info_array = NULL;
449  } else {
450  *sample_info_array = &holder->info_seq_[0];
451  }
452  }
453 
454  SamplesHolder* get_holder()
455  {
456  if (holder_pool_.size() == 0) {
457  return new SamplesHolder();
458  }
459 
460  SamplesHolder *holder = holder_pool_.front();
461  holder_pool_.pop_front();
462 
463  return holder;
464  }
465 
466  void return_holder(SamplesHolder *holder)
467  {
468  holder->sample_seq_.clear();
469  holder->info_seq_.clear();
470  holder_pool_.push_front(holder);
471  }
472 
473 private:
474 
475  RTI_RoutingServiceStreamReaderExt native_;
476  StreamReader *stream_reader_;
477  StreamReaderListener listener_;
478  std::list<SamplesHolder*> holder_pool_;
479 };
480 
481 }}}}
482 
483 #endif // RTI_ROUTING_ADAPTER_DETAIL_STREAM_READER_FORWARDER_HPP_

RTI Routing Service Version 6.0.1 Copyright © Sun Nov 17 2019 Real-Time Innovations, Inc