RTI Routing Service  Version 6.0.0
 All Data Structures Files Functions Typedefs Enumerations Enumerator Groups Pages
StreamReaderForwarder.hpp
1 /*
2  * (c) Copyright, Real-Time Innovations, 2017.
3  * All rights reserved.
4  *
5  * No duplications, whole or partial, manual or electronic, may be made
6  * without express written permission. Any such copies, or
7  * revisions thereof, must display this notice unaltered.
8  * This code contains trade secrets of Real-Time Innovations, Inc.
9  */
10 
11 #ifndef RTI_ROUTING_ADAPTER_DETAIL_STREAM_READER_FORWARDER_HPP_
12 #define RTI_ROUTING_ADAPTER_DETAIL_STREAM_READER_FORWARDER_HPP_
13 
14 #include <list>
15 #include <rti/core/Exception.hpp>
16 
17 #include <rti/routing/adapter/StreamReader.hpp>
18 #include <rti/routing/StreamInfo.hpp>
19 #include <rti/routing/detail/UpdatableEntityForwarder.hpp>
20 #include <rti/routing/detail/ForwarderUtils.hpp>
21 #include <rti/routing/adapter/detail/DiscoveryStreamReaderForwarder.hpp>
22 
23 
24 namespace rti { namespace routing { namespace adapter { namespace detail {
25 
26 template <bool ReadOrTake, bool HasSelector> struct read_or_take;
27 
28 
29 template<>
30 struct read_or_take<false, false> {
31 
32  static void read(
34  std::vector<StreamReader::SamplePtr>& sample_seq,
35  std::vector<StreamReader::InfoPtr>& info_seq,
36  const struct RTI_RoutingServiceSelectorState *)
37  {
38  stream_reader.take(sample_seq, info_seq);
39  }
40 };
41 
42 template<>
43 struct read_or_take<true, false> {
44 
45  static void read(
47  std::vector<StreamReader::SamplePtr>& sample_seq,
48  std::vector<StreamReader::InfoPtr>& info_seq,
49  const struct RTI_RoutingServiceSelectorState *)
50  {
51  stream_reader.read(sample_seq, info_seq);
52  }
53 };
54 
55 template<>
56 struct read_or_take<false, true> {
57 
58  static void read(
60  std::vector<StreamReader::SamplePtr>& sample_seq,
61  std::vector<StreamReader::InfoPtr>& info_seq,
62  const struct RTI_RoutingServiceSelectorState *native_selector)
63  {
64  stream_reader.take(
65  sample_seq,
66  info_seq,
67  SelectorState(*native_selector));
68  }
69 };
70 
71 
72 template<>
73 struct read_or_take<true, true> {
74 
75  static void read(
77  std::vector<StreamReader::SamplePtr>& sample_seq,
78  std::vector<StreamReader::InfoPtr>& info_seq,
79  const struct RTI_RoutingServiceSelectorState *native_selector)
80  {
81  stream_reader.read(
82  sample_seq,
83  info_seq,
84  SelectorState(*native_selector));
85  }
86 };
87 
88 
89 struct SamplesHolder {
90 public:
91 
92  SamplesHolder()
93  {
94  }
95 
96  std::vector<StreamReader::SamplePtr> sample_seq_;
97  std::vector<StreamReader::InfoPtr> info_seq_;
98 };
99 
100 class StreamReaderForwarder {
101 public:
102  static RTI_RoutingServiceStreamReaderExt* create_native(
103  Connection *connection,
104  Session *session,
105  const struct RTI_RoutingServiceStreamInfo *native_stream_info,
106  const struct RTI_RoutingServiceProperties *native_properties,
107  const struct RTI_RoutingServiceStreamReaderListenerExt *native_listener,
108  RTI_RoutingServiceEnvironment *environment)
109  {
110 
111  try {
112  StreamInfo stream_info(*native_stream_info);
113 
114  std::map<std::string, std::string> properties;
115  rti::routing::PropertyAdapter::add_properties_from_native(
116  properties,
117  native_properties);
118 
119  StreamReaderForwarder *forwarder = new StreamReaderForwarder(
120  native_listener);
121  ScopedForwarder<Connection, StreamReaderForwarder> scoped(
122  connection,
123  forwarder,
124  environment);
125  forwarder->stream_reader_ = connection->create_stream_reader(
126  session,
127  stream_info,
128  properties,
129  &forwarder->listener_);
130  RTI_ROUTING_THROW_ON_NULL(forwarder->stream_reader_);
131 
132  scoped.release();
133  return forwarder->native();
134 
135  } catch(const std::exception& ex) {
136  RTI_RoutingServiceEnvironment_set_error(
137  environment,
138  "%s",
139  ex.what());
140  return NULL;
141  } catch (...) {
142  RTI_RoutingServiceEnvironment_set_error(
143  environment,
144  "unexpected exception");
145  return NULL;
146  }
147 
148 
149  }
150 
151  static void delete_native(
152  Connection *connection,
153  RTI_RoutingServiceStreamReaderExt *native_stream_reader,
154  RTI_RoutingServiceEnvironment *environment)
155  {
156  StreamReaderForwarder *stream_reader_forwarder =
157  from_native(native_stream_reader);
158  try {
159  if (stream_reader_forwarder->stream_reader_ != NULL) {
160  connection->delete_stream_reader(
161  stream_reader_forwarder->stream_reader_);
162  stream_reader_forwarder->stream_reader_ = NULL;
163  }
164  } catch(const std::exception& ex) {
165  RTI_RoutingServiceEnvironment_set_error(
166  environment,
167  "%s",
168  ex.what());
169  } catch (...) {
170  RTI_RoutingServiceEnvironment_set_error(
171  environment,
172  "unexpected exception");
173  }
174 
175  delete stream_reader_forwarder;
176  }
177 
178 
179  RTI_RoutingServiceStreamReaderExt* native()
180  {
181  return &this->native_;
182  }
183 
184 
185 private:
186 
187  StreamReaderForwarder(
188  const RTI_RoutingServiceStreamReaderListenerExt *native_listener) :
189  stream_reader_(NULL),
190  listener_(native_listener)
191  {
192  RTIOsapiMemory_zero(&native_, sizeof(native_));
193  native_.stream_reader_data =
194  static_cast<void *>(this);
195  native_.take =
196  StreamReaderForwarder::take;
197  native_.read =
198  StreamReaderForwarder::read;
199  native_.take_w_selector =
200  StreamReaderForwarder::take_with_selector;
201  native_.read_w_selector =
202  StreamReaderForwarder::read_with_selector;
203  native_.return_loan =
204  StreamReaderForwarder::return_loan;
205  native_.create_content_query =
206  StreamReaderForwarder::create_content_query;
207  native_.delete_content_query =
208  StreamReaderForwarder::delete_content_query;
209  native_.update =
210  StreamReaderForwarder::update;
211  }
212 
213  ~StreamReaderForwarder()
214  {
215  // delete holders
216  for (std::list<SamplesHolder*>::iterator it = holder_pool_.begin();
217  it != holder_pool_.end();
218  ++it) {
219  delete (*it);
220  }
221  }
222 
223 
224  static void take(
225  void *native_stream_reader_data,
226  RTI_RoutingServiceSample **sample_array,
227  RTI_RoutingServiceSampleInfo **sample_info_array,
228  int *array_length,
229  RTI_RoutingServiceEnvironment *environment)
230  {
231  proxy_read<false, false>(
232  native_stream_reader_data,
233  sample_array,
234  sample_info_array,
235  array_length,
236  NULL,
237  environment);
238  }
239 
240  static void read(
241  void *native_stream_reader_data,
242  RTI_RoutingServiceSample **sample_array,
243  RTI_RoutingServiceSampleInfo **sample_info_array,
244  int *array_length,
245  RTI_RoutingServiceEnvironment *environment)
246  {
247  proxy_read<true, false>(
248  native_stream_reader_data,
249  sample_array,
250  sample_info_array,
251  array_length,
252  NULL,
253  environment);
254  }
255 
256  static void take_with_selector(
257  void *native_stream_reader_data,
258  RTI_RoutingServiceSample **sample_array,
259  RTI_RoutingServiceSampleInfo **sample_info_array,
260  int *array_length,
261  const struct RTI_RoutingServiceSelectorState *native_selector,
262  RTI_RoutingServiceEnvironment *environment)
263  {
264  proxy_read<false, true>(
265  native_stream_reader_data,
266  sample_array,
267  sample_info_array,
268  array_length,
269  native_selector,
270  environment);
271  }
272 
273  static void read_with_selector(
274  void *native_stream_reader_data,
275  RTI_RoutingServiceSample **sample_array,
276  RTI_RoutingServiceSampleInfo **sample_info_array,
277  int *array_length,
278  const struct RTI_RoutingServiceSelectorState *native_selector,
279  RTI_RoutingServiceEnvironment *environment)
280  {
281  proxy_read<true, true>(
282  native_stream_reader_data,
283  sample_array,
284  sample_info_array,
285  array_length,
286  native_selector,
287  environment);
288  }
289 
290  static void return_loan(
291  void *native_stream_reader_data,
292  RTI_RoutingServiceSample *native_samples,
293  RTI_RoutingServiceSampleInfo *,
294  int count,
295  RTI_RoutingServiceEnvironment *environment)
296  {
297  StreamReaderForwarder *forwarder =
298  static_cast<StreamReaderForwarder*>(native_stream_reader_data);
299  RTIBOOST_ASSERT(forwarder != NULL);
300 
301  SamplesHolder *holder = static_cast<SamplesHolder*>(
302  native_samples[count]);
303  RTIBOOST_ASSERT(holder != NULL);
304 
305 
306  try {
307  forwarder->stream_reader_->return_loan(
308  holder->sample_seq_,
309  holder->info_seq_);
310  } catch (const std::exception& ex) {
311  RTI_RoutingServiceEnvironment_set_error(
312  environment,
313  "%s",
314  ex.what());
315  } catch (...) {
316  RTI_RoutingServiceEnvironment_set_error(
317  environment,
318  "unexpected exception");
319  }
320 
321  forwarder->return_holder(holder);
322  }
323 
324  static void update(
325  void *native_stream_reader_data,
326  const struct RTI_RoutingServiceProperties * native_properties,
327  RTI_RoutingServiceEnvironment * environment)
328  {
329 
330  StreamReaderForwarder *stream_reader_forwarder =
331  static_cast<StreamReaderForwarder*>(native_stream_reader_data);
332 
333  rti::routing::detail::UpdatableEntityForwarder::update(
334  stream_reader_forwarder->stream_reader_,
335  native_properties,
336  environment);
337  }
338 
339  static void* create_content_query(
340  void *native_stream_reader_data,
341  RTI_RoutingServiceSelectorStateQueryData old_query_data,
342  const struct RTI_RoutingServiceSelectorContent *content,
343  RTI_RoutingServiceEnvironment *environment)
344  {
345  StreamReaderForwarder *forwarder =
346  static_cast<StreamReaderForwarder*>(native_stream_reader_data);
347 
348  void *query_data = NULL;
349  try {
350  dds::topic::Filter filter(
351  content->expression == NULL ? "" : content->expression,
352  rti::core::native_conversions::from_native<std::string>(
353  content->expression_parameters));
354  query_data = forwarder->stream_reader_->create_content_query(
355  old_query_data,
356  filter);
357  } catch (const std::exception& ex) {
358  RTI_RoutingServiceEnvironment_set_error(
359  environment,
360  "%s",
361  ex.what());
362  } catch (...) {
363  RTI_RoutingServiceEnvironment_set_error(
364  environment,
365  "unexpected exception");
366  }
367 
368  return query_data;
369  }
370 
371  static void delete_content_query(
372  void *native_stream_reader_data,
373  RTI_RoutingServiceSelectorStateQueryData query_data,
374  RTI_RoutingServiceEnvironment *environment)
375  {
376  StreamReaderForwarder *forwarder =
377  static_cast<StreamReaderForwarder*>(native_stream_reader_data);
378  try {
379  forwarder->stream_reader_->delete_content_query(query_data);
380  } catch (const std::exception& ex) {
381  RTI_RoutingServiceEnvironment_set_error(
382  environment,
383  "%s",
384  ex.what());
385  } catch (...) {
386  RTI_RoutingServiceEnvironment_set_error(
387  environment,
388  "unexpected exception");
389  }
390  }
391 
392  static StreamReaderForwarder* from_native(
393  RTI_RoutingServiceStreamReaderExt *native)
394  {
395  return static_cast<StreamReaderForwarder*>(native->stream_reader_data);
396  }
397 
398  template <bool ReadOrTake, bool HasSelector>
399  static void proxy_read(
400  void *native_stream_reader_data,
401  RTI_RoutingServiceSample **sample_array,
402  RTI_RoutingServiceSampleInfo **sample_info_array,
403  int *array_length,
404  const struct RTI_RoutingServiceSelectorState *native_selector,
405  RTI_RoutingServiceEnvironment *environment)
406  {
407  StreamReaderForwarder *forwarder =
408  static_cast<StreamReaderForwarder*>(native_stream_reader_data);
409  RTIBOOST_ASSERT(forwarder != NULL);
410 
411  SamplesHolder *holder = forwarder->get_holder();
412 
413  *array_length = 0;
414  try {
415  read_or_take<ReadOrTake, HasSelector>::read(
416  *forwarder->stream_reader_,
417  holder->sample_seq_,
418  holder->info_seq_,
419  native_selector);
420  if (!holder->info_seq_.empty()
421  && (holder->info_seq_.size() != holder->sample_seq_.size())) {
422  throw dds::core::PreconditionNotMetError(
423  "sample and info sequences length mismatch");
424  }
425  holder->sample_seq_.reserve(holder->sample_seq_.size() + 1);
426  holder->sample_seq_[holder->sample_seq_.size()] = holder;
427  } catch (const std::exception& ex) {
428  RTI_RoutingServiceEnvironment_set_error(
429  environment,
430  "%s",
431  ex.what());
432  forwarder->return_holder(holder);
433  return;
434  } catch (...) {
435  RTI_RoutingServiceEnvironment_set_error(
436  environment,
437  "unexpected exception");
438  forwarder->return_holder(holder);
439  return;
440  }
441 
442  // direct mapping
443  *array_length = holder->sample_seq_.size();
444  *sample_array = &holder->sample_seq_[0];
445  if (holder->info_seq_.empty()) {
446  *sample_info_array = NULL;
447  } else {
448  *sample_info_array = &holder->info_seq_[0];
449  }
450  }
451 
452  SamplesHolder* get_holder()
453  {
454  if (holder_pool_.size() == 0) {
455  return new SamplesHolder();
456  }
457 
458  SamplesHolder *holder = holder_pool_.front();
459  holder_pool_.pop_front();
460 
461  return holder;
462  }
463 
464  void return_holder(SamplesHolder *holder)
465  {
466  holder->sample_seq_.clear();
467  holder->info_seq_.clear();
468  holder_pool_.push_front(holder);
469  }
470 
471 private:
472 
473  RTI_RoutingServiceStreamReaderExt native_;
474  StreamReader *stream_reader_;
475  StreamReaderListener listener_;
476  std::list<SamplesHolder*> holder_pool_;
477 };
478 
479 }}}}
480 
481 #endif // RTI_ROUTING_ADAPTER_DETAIL_STREAM_READER_FORWARDER_HPP_

RTI Routing Service Version 6.0.0 Copyright © Sun Mar 3 2019 Real-Time Innovations, Inc