RTI Routing Service  Version 6.0.1
 All Data Structures Files Functions Typedefs Enumerations Enumerator Groups Pages
ProcessorForwarder.hpp
1 /*
2  * (c) Copyright, Real-Time Innovations, 2017.
3  * All rights reserved.
4  *
5  * No duplications, whole or partial, manual or electronic, may be made
6  * without express written permission. Any such copies, or
7  * revisions thereof, must display this notice unaltered.
8  * This code contains trade secrets of Real-Time Innovations, Inc.
9  */
10 
11 #ifndef RTI_ROUTING_PROCESSOR_DETAIL_PROCESSOR_FORWARDER_HPP_
12 #define RTI_ROUTING_PROCESSOR_DETAIL_PROCESSOR_FORWARDER_HPP_
13 
14 #include <rti/core/Exception.hpp>
15 
16 #include "routingservice/routingservice_processor_impl.h"
17 #include <rti/routing/detail/ForwarderUtils.hpp>
18 #include <rti/routing/detail/UpdatableEntityForwarder.hpp>
19 #include <rti/routing/processor/Route.hpp>
20 #include <rti/routing/processor/Input.hpp>
21 #include <rti/routing/processor/Output.hpp>
22 
23 namespace rti { namespace routing { namespace processor { namespace detail {
24 
25 class ProcessorForwarder {
26 
27 private:
28 
29  template <typename PORT, typename NATIVE>
30  class ScopedPort {
31 
32  public:
33  ScopedPort(Route& route, int32_t index, NATIVE *native)
34  : route_(route),
35  port_(new PORT(native, index, route.native_route_, route.native_env_))
36  {
37  }
38 
39  PORT* get()
40  {
41  return port_;
42  }
43 
44  void clear()
45  {
46  port_ = NULL;
47  }
48 
49  ~ScopedPort()
50  {
51  if (port_ != NULL) {
52  delete port_;
53  port_ = NULL;
54  }
55  }
56 
57  private:
58  Route& route_;
59  PORT *port_;
60  };
61 
62 public:
63 
64  static RTI_RoutingServiceProcessor * create_native(
65  ProcessorPlugin *plugin,
66  RTI_RoutingServiceRoute *native_route,
67  const struct RTI_RoutingServiceProperties *native_properties,
68  RTI_RoutingServiceEnvironment *environment)
69  {
70  try {
71  using rti::routing::detail::ScopedForwarder;
72 
73  std::map<std::string, std::string> properties;
74  rti::routing::PropertyAdapter::add_properties_from_native(
75  properties,
76  native_properties);
77 
78  ProcessorForwarder *forwarder = new ProcessorForwarder(
79  native_route,
80  environment);
81  ScopedForwarder<ProcessorPlugin, ProcessorForwarder> scoped(
82  plugin,
83  forwarder,
84  environment);
85  forwarder->processor_ = plugin->create_processor(
86  forwarder->route(),
87  properties);
88  RTI_ROUTING_THROW_ON_NULL(forwarder->processor_);
89 
90  scoped.release();
91  return forwarder->native();
92  } catch(const std::exception& ex) {
93  RTI_RoutingServiceEnvironment_set_error(
94  environment,
95  "%s",
96  ex.what());
97  return NULL;
98  } catch (...) {
99  RTI_RoutingServiceEnvironment_set_error(
100  environment,
101  "unexpected exception");
102  return NULL;
103  }
104 
105 
106  }
107 
108  static void delete_native(
109  ProcessorPlugin *plugin,
110  RTI_RoutingServiceProcessor *native_processor,
111  RTI_RoutingServiceEnvironment *environment)
112  {
113  ProcessorForwarder *processor_forwarder =
114  static_cast<ProcessorForwarder*>(native_processor->processor_data);
115  try {
116  if (processor_forwarder->processor_ != NULL) {
117  plugin->delete_processor(
118  processor_forwarder->route_,
119  processor_forwarder->processor_);
120  processor_forwarder->processor_ = NULL;
121  }
122  } catch(const std::exception& ex) {
123  RTI_RoutingServiceEnvironment_set_error(
124  environment,
125  "%s",
126  ex.what());
127  } catch (...) {
128  RTI_RoutingServiceEnvironment_set_error(
129  environment,
130  "unexpected exception");
131  }
132 
133  delete processor_forwarder;
134  }
135 
136 
137  static void forward_on_route_event(
138  void *native_processor_data,
139  RTI_RoutingServiceRouteEvent *native_route_event,
140  RTI_RoutingServiceEnvironment *environment)
141  {
142 
143  ProcessorForwarder *forwarder =
144  static_cast<ProcessorForwarder*>(native_processor_data);
145 
146  try {
147 
148  // build up wrapper objects based on the event
149  switch (RTI_RoutingServiceRouteEvent_get_kind(native_route_event)) {
150 
151  case RTI_ROUTING_SERVICE_ROUTE_EVENT_DATA_ON_INPUTS:
152  forwarder->processor_->on_data_available(forwarder->route());
153  break;
154 
155  case RTI_ROUTING_SERVICE_ROUTE_EVENT_PERIODIC_ACTION:
156 
157  forwarder->processor_->on_periodic_action(forwarder->route());
158  break;
159 
160  case RTI_ROUTING_SERVICE_ROUTE_EVENT_INPUT_ENABLED:
161  {
162  void *affected_entity =
163  RTI_RoutingServiceRouteEvent_get_affected_entity(native_route_event);
164  void *index =
165  RTI_RoutingServiceRouteEvent_get_event_data(native_route_event);
166  ScopedPort<Input, RTI_RoutingServiceStreamReaderExt> port(
167  forwarder->route_,
168  *(static_cast<int32_t*>(index)),
169  static_cast<RTI_RoutingServiceStreamReaderExt *>(affected_entity));
170  forwarder->processor_->on_input_enabled(
171  forwarder->route(),
172  *port.get());
173  RTI_RoutingServiceRoute_set_stream_port_user_data(
174  forwarder->route_.native_route_,
175  static_cast<RTI_RoutingServiceStreamReaderExt *>(affected_entity)->stream_reader_data,
176  port.get());
177  port.clear();
178 
179  }
180  break;
181 
182  case RTI_ROUTING_SERVICE_ROUTE_EVENT_INPUT_DISABLED:
183  {
184 
185  void *affected_entity =
186  RTI_RoutingServiceRouteEvent_get_affected_entity(native_route_event);
187  RTI_RoutingServiceStreamReaderExt *native_input =
188  static_cast<RTI_RoutingServiceStreamReaderExt *>(affected_entity);
189  Input *input = reinterpret_cast<Input*>(
190  RTI_RoutingServiceRoute_get_stream_port_user_data(
191  forwarder->route_.native_route_,
192  native_input->stream_reader_data));
193  forwarder->processor_->on_input_disabled(
194  forwarder->route(),
195  *input);
196  RTI_RoutingServiceRoute_set_stream_port_user_data(
197  forwarder->route_.native_route_,
198  native_input->stream_reader_data,
199  NULL);
200  delete input;
201  }
202  break;
203 
204  case RTI_ROUTING_SERVICE_ROUTE_EVENT_OUTPUT_ENABLED:
205  {
206  void *affected_entity =
207  RTI_RoutingServiceRouteEvent_get_affected_entity(native_route_event);
208  void *index =
209  RTI_RoutingServiceRouteEvent_get_event_data(native_route_event);
210  ScopedPort<Output, RTI_RoutingServiceStreamWriterExt> port(
211  forwarder->route_,
212  *(static_cast<int32_t*>(index)),
213  static_cast<RTI_RoutingServiceStreamWriterExt *>(affected_entity));
214  forwarder->processor_->on_output_enabled(
215  forwarder->route(),
216  *port.get());
217  RTI_RoutingServiceRoute_set_stream_port_user_data(
218  forwarder->route_.native_route_,
219  static_cast<RTI_RoutingServiceStreamWriterExt *>(affected_entity)->stream_writer_data,
220  port.get());
221  port.clear();
222 
223  }
224  break;
225 
226  case RTI_ROUTING_SERVICE_ROUTE_EVENT_OUTPUT_DISABLED:
227  {
228  void *affected_entity =
229  RTI_RoutingServiceRouteEvent_get_affected_entity(native_route_event);
230  RTI_RoutingServiceStreamWriterExt *native_output =
231  static_cast<RTI_RoutingServiceStreamWriterExt *>(affected_entity);
232  Output *output = reinterpret_cast<Output*>(
233  RTI_RoutingServiceRoute_get_stream_port_user_data(
234  forwarder->route_.native_route_,
235  native_output->stream_writer_data));
236  forwarder->processor_->on_output_disabled(
237  forwarder->route(),
238  *output);
239  RTI_RoutingServiceRoute_set_stream_port_user_data(
240  forwarder->route_.native_route_,
241  native_output->stream_writer_data,
242  NULL);
243  delete output;
244  }
245  break;
246 
247  case RTI_ROUTING_SERVICE_ROUTE_EVENT_ROUTE_STARTED:
248 
249  forwarder->processor_->on_start(forwarder->route());
250  break;
251 
252  case RTI_ROUTING_SERVICE_ROUTE_EVENT_ROUTE_STOPPED:
253 
254  forwarder->processor_->on_stop(forwarder->route());
255  break;
256 
257  case RTI_ROUTING_SERVICE_ROUTE_EVENT_ROUTE_RUNNING:
258 
259  forwarder->processor_->on_run(forwarder->route());
260  break;
261 
262  case RTI_ROUTING_SERVICE_ROUTE_EVENT_ROUTE_PAUSED:
263 
264  forwarder->processor_->on_pause(forwarder->route());
265  break;
266 
267  default:
268  break;
269  };
270 
271  } catch (const std::exception& ex) {
272  RTI_RoutingServiceEnvironment_set_error(
273  environment,
274  "%s",
275  ex.what());
276  } catch (...) {
277  RTI_RoutingServiceEnvironment_set_error(
278  environment,
279  "%s",
280  "unexpected exception");
281  }
282  }
283 
284  static void forward_update(
285  void *native_processor_data,
286  const struct RTI_RoutingServiceProperties *native_properties,
287  RTI_RoutingServiceEnvironment *environment)
288  {
289 
290  ProcessorForwarder *processorForwarder =
291  static_cast<ProcessorForwarder*> (native_processor_data);
292 
293  rti::routing::detail::UpdatableEntityForwarder::update(
294  processorForwarder->processor_,
295  native_properties,
296  environment);
297  }
298 
299  RTI_RoutingServiceProcessor* native()
300  {
301  return &native_;
302  }
303 
304 private:
305 
306  ProcessorForwarder(
307  RTI_RoutingServiceRoute *native_route,
308  RTI_RoutingServiceEnvironment *native_env) :
309  processor_(NULL),
310  route_(native_route, native_env)
311  {
312  RTIOsapiMemory_zero(&native_, sizeof(native_));
313  /* initialize native implementation */
314  native_.processor_data =
315  static_cast<void*>(this);
316  native_.on_route_event =
317  ProcessorForwarder::forward_on_route_event;
318  native_.update =
319  ProcessorForwarder::forward_update;
320  }
321 
323  {
324  return route_;
325  }
326 
327  ~ProcessorForwarder()
328  {
329  }
330 
331 private:
332  RTI_RoutingServiceProcessor native_;
333  Processor *processor_;
335 
336 };
337 
338 }}}}
339 
340 #endif // RTI_ROUTING_PROCESSOR_DETAIL_PROCESSOR_FORWARDER_HPP_

RTI Routing Service Version 6.0.1 Copyright © Sun Nov 17 2019 Real-Time Innovations, Inc