RTI Routing Service Version 7.2.0
ConnectionForwarder.hpp
1/*
2 * (c) Copyright, Real-Time Innovations, 2017.
3 * All rights reserved.
4 *
5 * No duplications, whole or partial, manual or electronic, may be made
6 * without express written permission. Any such copies, or
7 * revisions thereof, must display this notice unaltered.
8 * This code contains trade secrets of Real-Time Innovations, Inc.
9 */
10
11#ifndef RTI_ROUTING_ADAPTER_DETAIL_CONNECTION_FORWARDER_HPP_
12#define RTI_ROUTING_ADAPTER_DETAIL_CONNECTION_FORWARDER_HPP_
13
14#include <rti/core/Exception.hpp>
15
16#include <osapi/osapi_heap.h>
17#include <routingservice/routingservice_adapter_new.h>
18
20#include <rti/routing/adapter/detail/SessionForwarder.hpp>
21#include <rti/routing/adapter/detail/StreamReaderForwarder.hpp>
22#include <rti/routing/adapter/detail/StreamWriterForwarder.hpp>
23#include <rti/routing/adapter/detail/DiscoveryStreamReaderForwarder.hpp>
24#include <rti/routing/detail/UpdatableEntityForwarder.hpp>
25#include <rti/routing/detail/ForwarderUtils.hpp>
26
27namespace rti { namespace routing { namespace adapter { namespace detail {
28
29class ConnectionForwarder {
30public:
31 static RTI_RoutingServiceConnectionExt * create_native(
32 AdapterPlugin *adapter,
33 const char *,
34 const char *,
35 const RTI_RoutingServiceStreamReaderListenerExt *native_output_stream_listener,
36 const RTI_RoutingServiceStreamReaderListenerExt *native_input_stream_listener,
37 const RTI_RoutingServiceTypeInfo **,
38 int,
39 const RTI_RoutingServiceProperties *native_properties,
40 RTI_RoutingServiceEnvironment *environment)
41 {
42 try {
43 using rti::routing::detail::ScopedForwarder;
44
45 // Set properties
46 std::map<std::string, std::string> properties;
47 rti::routing::PropertyAdapter::add_properties_from_native(
48 properties,
49 native_properties);
50
51 ConnectionForwarder *forwarder = new ConnectionForwarder(
52 native_output_stream_listener,
53 native_input_stream_listener);
54 ScopedForwarder<AdapterPlugin, ConnectionForwarder> scoped(
55 adapter,
56 forwarder,
57 environment);
58 // Create connection
59 forwarder->connection_ = adapter->create_connection(
60 &forwarder->input_discovery_listener_,
61 &forwarder->output_discovery_listener_,
62 properties);
63 RTI_ROUTING_THROW_ON_NULL(forwarder->connection_);
64
65 // Create built-in SRs
66 forwarder->input_stream_discovery_forwarder_ =
67 new DiscoveryStreamReaderForwarder(
68 forwarder->connection_
69 ->input_stream_discovery_reader());
70
71 forwarder->output_stream_discovery_forwarder_ =
72 new DiscoveryStreamReaderForwarder(
73 forwarder->connection_
74 ->output_stream_discovery_reader());
75
76 scoped.release();
77
78 // TRUST-134: the forwarder is not leaked here. It might be leaked
79 // if delete_native() were not to be called, but Routing Service
80 // will make sure that is not the case
81
82 /* coverity[leaked_storage : FALSE] */
83 return forwarder->native();
84 } catch (const std::exception& ex) {
85 RTI_RoutingServiceEnvironment_set_error(
86 environment,
87 "%s",
88 ex.what());
89 return NULL;
90 } catch (...) {
91 RTI_RoutingServiceEnvironment_set_error(
92 environment,
93 "unexpected exception");
94 return NULL;
95 }
96 }
97
98 static void delete_native(
99 AdapterPlugin *adapter,
100 RTI_RoutingServiceConnectionExt *native_connection,
101 RTI_RoutingServiceEnvironment *environment)
102 {
103 ConnectionForwarder *forwarder = static_cast<ConnectionForwarder*>(
104 native_connection->connection_data);
105 try {
106 // delete built-in SRs
107 delete forwarder->input_stream_discovery_forwarder_;
108 delete forwarder->output_stream_discovery_forwarder_;
109
110 // delete connection
111 if (forwarder->connection_ != NULL) {
112 adapter->delete_connection(forwarder->connection_);
113 forwarder->connection_ = NULL;
114 }
115 } catch(const std::exception& ex) {
116 RTI_RoutingServiceEnvironment_set_error(
117 environment,
118 "%s",
119 ex.what());
120 } catch (...) {
121 }
122
123 delete forwarder;
124 }
125
126
127 static RTI_RoutingServiceStreamReaderExt *
128 get_input_stream_discovery_reader(
129 void *native_connection_data,
130 RTI_RoutingServiceEnvironment *)
131 {
132 ConnectionForwarder *forwarder = static_cast<ConnectionForwarder*>(
133 native_connection_data);
134 return forwarder->input_stream_discovery_forwarder_->native();
135 }
136
137 static RTI_RoutingServiceStreamReaderExt *
138 get_output_stream_discovery_reader(
139 void *native_connection_data,
140 RTI_RoutingServiceEnvironment *)
141 {
142 ConnectionForwarder *forwarder = static_cast<ConnectionForwarder*>(
143 native_connection_data);
144 return forwarder->output_stream_discovery_forwarder_->native();
145 }
146
147 static RTI_RoutingServiceSessionExt * create_session(
148 void *native_connection_data,
149 const struct RTI_RoutingServiceProperties * native_properties,
150 RTI_RoutingServiceEnvironment *environment)
151 {
152
153 ConnectionForwarder *forwarder = static_cast<ConnectionForwarder*>(
154 native_connection_data);
155
156 return SessionForwarder::create_native(
157 forwarder->connection_,
158 native_properties,
159 environment);
160 }
161
162 static void delete_session(
163 void *native_connection_data,
164 RTI_RoutingServiceSessionExt *native_session,
165 RTI_RoutingServiceEnvironment *environment)
166 {
167 ConnectionForwarder *forwarder = static_cast<ConnectionForwarder*> (
168 native_connection_data);
169
170 SessionForwarder::delete_native(
171 forwarder->connection_,
172 native_session,
173 environment);
174 }
175
176 static RTI_RoutingServiceStreamReaderExt* create_stream_reader(
177 void *native_connection_data,
178 void *native_session_data,
179 const struct RTI_RoutingServiceStreamInfo *native_stream_info,
180 const struct RTI_RoutingServiceProperties *native_properties,
181 const struct RTI_RoutingServiceStreamReaderListenerExt *native_listener,
182 RTI_RoutingServiceEnvironment *environment)
183 {
184 ConnectionForwarder *forwarder = static_cast<ConnectionForwarder*>(
185 native_connection_data);
186 SessionForwarder *session_forwarder = static_cast<SessionForwarder*>(
187 native_session_data);
188
189 return StreamReaderForwarder::create_native(
190 forwarder->connection_,
191 session_forwarder->session(),
192 native_stream_info,
193 native_properties,
194 native_listener,
195 environment);
196 }
197
198 static void delete_stream_reader(
199 void *native_connection_data,
200 RTI_RoutingServiceStreamReaderExt *native_reader,
201 RTI_RoutingServiceEnvironment *environment)
202 {
203 ConnectionForwarder *forwarder = static_cast<ConnectionForwarder*>(
204 native_connection_data);
205 StreamReaderForwarder::delete_native(
206 forwarder->connection_,
207 native_reader,
208 environment);
209 }
210
211 static RTI_RoutingServiceStreamWriterExt* create_stream_writer(
212 void *native_connection_data,
213 void *native_session_data,
214 const struct RTI_RoutingServiceStreamInfo *native_stream_info,
215 const struct RTI_RoutingServiceProperties *native_properties,
216 const struct RTI_RoutingServiceStreamWriterListenerExt *,
217 RTI_RoutingServiceEnvironment *environment)
218 {
219 ConnectionForwarder *forwarder = static_cast<ConnectionForwarder*>(
220 native_connection_data);
221 SessionForwarder *session_forwarder = static_cast<SessionForwarder*>(
222 native_session_data);
223
224 return StreamWriterForwarder::create_native(
225 forwarder->connection_,
226 session_forwarder->session(),
227 native_stream_info,
228 native_properties,
229 environment);
230 }
231
232 static void delete_stream_writer(
233 void *native_connection_data,
234 RTI_RoutingServiceStreamWriterExt *native_writer,
235 RTI_RoutingServiceEnvironment *environment)
236 {
237 ConnectionForwarder *forwarder = static_cast<ConnectionForwarder*>(
238 native_connection_data);
239 StreamWriterForwarder::delete_native(
240 forwarder->connection_,
241 native_writer,
242 environment);
243 }
244
245 static void update(
246 void *native_connection_data,
247 const struct RTI_RoutingServiceProperties *native_properties,
248 RTI_RoutingServiceEnvironment *environment)
249 {
250 ConnectionForwarder *forwarder =
251 static_cast<ConnectionForwarder*> (native_connection_data);
252
253 rti::routing::detail::UpdatableEntityForwarder::update(
254 forwarder->connection_,
255 native_properties,
256 environment);
257 }
258
259
260 Connection * connection()
261 {
262 return connection_;
263 }
264
265 RTI_RoutingServiceConnectionExt* native()
266 {
267 return &this->native_;
268 }
269
270 RTI_RoutingServiceStreamReaderExt* native_input_discovery_reader()
271 {
272 return input_stream_discovery_forwarder_->native();
273 }
274
275 RTI_RoutingServiceStreamReaderExt* native_output_discovery_reader()
276 {
277 return output_stream_discovery_forwarder_->native();
278 }
279
280private:
281
282 ConnectionForwarder(
283 const RTI_RoutingServiceStreamReaderListenerExt *native_output_stream_listener,
284 const RTI_RoutingServiceStreamReaderListenerExt *native_input_stream_listener) :
285 connection_(NULL),
286 output_stream_discovery_forwarder_(NULL),
287 output_discovery_listener_(native_output_stream_listener),
288 input_stream_discovery_forwarder_(NULL),
289 input_discovery_listener_(native_input_stream_listener)
290 {
291 RTIOsapiMemory_zero(&native_, sizeof(native_));
292
293 native_.connection_data = static_cast<void*>(this);
294 native_.create_session = ConnectionForwarder::create_session;
295 native_.delete_session = ConnectionForwarder::delete_session;
296 native_.create_stream_reader =
297 ConnectionForwarder::create_stream_reader;
298 native_.delete_stream_reader =
299 ConnectionForwarder::delete_stream_reader;
300 native_.create_stream_writer =
301 ConnectionForwarder::create_stream_writer;
302 native_.delete_stream_writer =
303 ConnectionForwarder::delete_stream_writer;
304 native_.get_input_stream_discovery_reader =
305 ConnectionForwarder::get_input_stream_discovery_reader;
306 native_.get_output_stream_discovery_reader =
307 ConnectionForwarder::get_output_stream_discovery_reader;
308 native_.update = ConnectionForwarder::update;
309 }
310
311 ~ConnectionForwarder()
312 {
313 }
314
315private:
316 RTI_RoutingServiceConnectionExt native_;
317 Connection *connection_;
318 DiscoveryStreamReaderForwarder *output_stream_discovery_forwarder_;
319 StreamReaderListener output_discovery_listener_;
320 DiscoveryStreamReaderForwarder *input_stream_discovery_forwarder_;
321 StreamReaderListener input_discovery_listener_;
322
323};
324
325}}}}
326
327#endif // RTI_ROUTING_ADAPTER_DETAIL_CONNECTION_FORWARDER_HPP_
RTI Routing Service C++ Adapter API.
Listener representation used by StreamReader to notify RTI Routing Service when new data is available...