RTI Routing Service Version 7.6.0
ConnectionForwarder.hpp
1/*
2 * (c) Copyright, Real-Time Innovations, 2017.
3 * All rights reserved.
4 *
5 * No duplications, whole or partial, manual or electronic, may be made
6 * without express written permission. Any such copies, or
7 * revisions thereof, must display this notice unaltered.
8 * This code contains trade secrets of Real-Time Innovations, Inc.
9 */
10
11#ifndef RTI_ROUTING_ADAPTER_DETAIL_CONNECTION_FORWARDER_HPP_
12#define RTI_ROUTING_ADAPTER_DETAIL_CONNECTION_FORWARDER_HPP_
13
14#include <rti/core/Exception.hpp>
15
16#include <osapi/osapi_heap.h>
17#include <routingservice/routingservice_adapter_new.h>
18
19#include <rti/routing/adapter/detail/SessionForwarder.hpp>
20#include <rti/routing/adapter/detail/StreamReaderForwarder.hpp>
21#include <rti/routing/adapter/detail/StreamWriterForwarder.hpp>
22#include <rti/routing/adapter/detail/DiscoveryStreamReaderForwarder.hpp>
23#include <rti/routing/detail/UpdatableEntityForwarder.hpp>
24#include <rti/routing/detail/ForwarderUtils.hpp>
25
26namespace rti { namespace routing { namespace adapter { namespace detail {
27
28class ConnectionForwarder {
29public:
30 static RTI_RoutingServiceConnectionExt * create_native(
31 AdapterPlugin *adapter,
32 const char *,
33 const char *,
34 const RTI_RoutingServiceStreamReaderListenerExt *native_output_stream_listener,
35 const RTI_RoutingServiceStreamReaderListenerExt *native_input_stream_listener,
36 const RTI_RoutingServiceTypeInfo **,
37 int,
38 const RTI_RoutingServiceProperties *native_properties,
39 RTI_RoutingServiceEnvironment *environment)
40 {
41 try {
42 using rti::routing::detail::ScopedForwarder;
43
44 // Set properties
45 std::map<std::string, std::string> properties;
46 rti::routing::PropertyAdapter::add_properties_from_native(
47 properties,
48 native_properties);
49
50 ConnectionForwarder *forwarder = new ConnectionForwarder(
51 native_output_stream_listener,
52 native_input_stream_listener);
53 ScopedForwarder<AdapterPlugin, ConnectionForwarder> scoped(
54 adapter,
55 forwarder,
56 environment);
57 // Create connection
58 forwarder->connection_ = adapter->create_connection(
59 &forwarder->input_discovery_listener_,
60 &forwarder->output_discovery_listener_,
61 properties);
62 RTI_ROUTING_THROW_ON_NULL(forwarder->connection_);
63
64 // Create built-in SRs
65 forwarder->input_stream_discovery_forwarder_ =
66 new DiscoveryStreamReaderForwarder(
67 forwarder->connection_
68 ->input_stream_discovery_reader());
69
70 forwarder->output_stream_discovery_forwarder_ =
71 new DiscoveryStreamReaderForwarder(
72 forwarder->connection_
73 ->output_stream_discovery_reader());
74
75 scoped.release();
76
77 // TRUST-134: the forwarder is not leaked here. It might be leaked
78 // if delete_native() were not to be called, but Routing Service
79 // will make sure that is not the case
80
81 /* coverity[leaked_storage : FALSE] */
82 return forwarder->native();
83 } catch (const std::exception& ex) {
84 RTI_RoutingServiceEnvironment_set_error(
85 environment,
86 "%s",
87 ex.what());
88 return NULL;
89 } catch (...) {
90 RTI_RoutingServiceEnvironment_set_error(
91 environment,
92 "unexpected exception");
93 return NULL;
94 }
95 }
96
97 static void delete_native(
98 AdapterPlugin *adapter,
99 RTI_RoutingServiceConnectionExt *native_connection,
100 RTI_RoutingServiceEnvironment *environment)
101 {
102 ConnectionForwarder *forwarder = static_cast<ConnectionForwarder*>(
103 native_connection->connection_data);
104 try {
105 // delete built-in SRs
106 delete forwarder->input_stream_discovery_forwarder_;
107 delete forwarder->output_stream_discovery_forwarder_;
108
109 // delete connection
110 if (forwarder->connection_ != NULL) {
111 adapter->delete_connection(forwarder->connection_);
112 forwarder->connection_ = NULL;
113 }
114 } catch(const std::exception& ex) {
115 RTI_RoutingServiceEnvironment_set_error(
116 environment,
117 "%s",
118 ex.what());
119 } catch (...) {
120 }
121
122 delete forwarder;
123 }
124
125
126 static RTI_RoutingServiceStreamReaderExt *
127 get_input_stream_discovery_reader(
128 void *native_connection_data,
129 RTI_RoutingServiceEnvironment *)
130 {
131 ConnectionForwarder *forwarder = static_cast<ConnectionForwarder*>(
132 native_connection_data);
133 return forwarder->input_stream_discovery_forwarder_->native();
134 }
135
136 static RTI_RoutingServiceStreamReaderExt *
137 get_output_stream_discovery_reader(
138 void *native_connection_data,
139 RTI_RoutingServiceEnvironment *)
140 {
141 ConnectionForwarder *forwarder = static_cast<ConnectionForwarder*>(
142 native_connection_data);
143 return forwarder->output_stream_discovery_forwarder_->native();
144 }
145
146 static RTI_RoutingServiceSessionExt * create_session(
147 void *native_connection_data,
148 const struct RTI_RoutingServiceProperties * native_properties,
149 RTI_RoutingServiceEnvironment *environment)
150 {
151
152 ConnectionForwarder *forwarder = static_cast<ConnectionForwarder*>(
153 native_connection_data);
154
155 return SessionForwarder::create_native(
156 forwarder->connection_,
157 native_properties,
158 environment);
159 }
160
161 static void delete_session(
162 void *native_connection_data,
163 RTI_RoutingServiceSessionExt *native_session,
164 RTI_RoutingServiceEnvironment *environment)
165 {
166 ConnectionForwarder *forwarder = static_cast<ConnectionForwarder*> (
167 native_connection_data);
168
169 SessionForwarder::delete_native(
170 forwarder->connection_,
171 native_session,
172 environment);
173 }
174
175 static RTI_RoutingServiceStreamReaderExt* create_stream_reader(
176 void *native_connection_data,
177 void *native_session_data,
178 const struct RTI_RoutingServiceStreamInfo *native_stream_info,
179 const struct RTI_RoutingServiceProperties *native_properties,
180 const struct RTI_RoutingServiceStreamReaderListenerExt *native_listener,
181 RTI_RoutingServiceEnvironment *environment)
182 {
183 ConnectionForwarder *forwarder = static_cast<ConnectionForwarder*>(
184 native_connection_data);
185 SessionForwarder *session_forwarder = static_cast<SessionForwarder*>(
186 native_session_data);
187
188 return StreamReaderForwarder::create_native(
189 forwarder->connection_,
190 session_forwarder->session(),
191 native_stream_info,
192 native_properties,
193 native_listener,
194 environment);
195 }
196
197 static void delete_stream_reader(
198 void *native_connection_data,
199 RTI_RoutingServiceStreamReaderExt *native_reader,
200 RTI_RoutingServiceEnvironment *environment)
201 {
202 ConnectionForwarder *forwarder = static_cast<ConnectionForwarder*>(
203 native_connection_data);
204 StreamReaderForwarder::delete_native(
205 forwarder->connection_,
206 native_reader,
207 environment);
208 }
209
210 static RTI_RoutingServiceStreamWriterExt* create_stream_writer(
211 void *native_connection_data,
212 void *native_session_data,
213 const struct RTI_RoutingServiceStreamInfo *native_stream_info,
214 const struct RTI_RoutingServiceProperties *native_properties,
215 const struct RTI_RoutingServiceStreamWriterListenerExt *,
216 RTI_RoutingServiceEnvironment *environment)
217 {
218 ConnectionForwarder *forwarder = static_cast<ConnectionForwarder*>(
219 native_connection_data);
220 SessionForwarder *session_forwarder = static_cast<SessionForwarder*>(
221 native_session_data);
222
223 return StreamWriterForwarder::create_native(
224 forwarder->connection_,
225 session_forwarder->session(),
226 native_stream_info,
227 native_properties,
228 environment);
229 }
230
231 static void delete_stream_writer(
232 void *native_connection_data,
233 RTI_RoutingServiceStreamWriterExt *native_writer,
234 RTI_RoutingServiceEnvironment *environment)
235 {
236 ConnectionForwarder *forwarder = static_cast<ConnectionForwarder*>(
237 native_connection_data);
238 StreamWriterForwarder::delete_native(
239 forwarder->connection_,
240 native_writer,
241 environment);
242 }
243
244 static void update(
245 void *native_connection_data,
246 const struct RTI_RoutingServiceProperties *native_properties,
247 RTI_RoutingServiceEnvironment *environment)
248 {
249 ConnectionForwarder *forwarder =
250 static_cast<ConnectionForwarder*> (native_connection_data);
251
252 rti::routing::detail::UpdatableEntityForwarder::update(
253 forwarder->connection_,
254 native_properties,
255 environment);
256 }
257
258
259 Connection * connection()
260 {
261 return connection_;
262 }
263
264 RTI_RoutingServiceConnectionExt* native()
265 {
266 return &this->native_;
267 }
268
269 RTI_RoutingServiceStreamReaderExt* native_input_discovery_reader()
270 {
271 return input_stream_discovery_forwarder_->native();
272 }
273
274 RTI_RoutingServiceStreamReaderExt* native_output_discovery_reader()
275 {
276 return output_stream_discovery_forwarder_->native();
277 }
278
279private:
280
281 ConnectionForwarder(
282 const RTI_RoutingServiceStreamReaderListenerExt *native_output_stream_listener,
283 const RTI_RoutingServiceStreamReaderListenerExt *native_input_stream_listener) :
284 connection_(NULL),
285 output_stream_discovery_forwarder_(NULL),
286 output_discovery_listener_(native_output_stream_listener),
287 input_stream_discovery_forwarder_(NULL),
288 input_discovery_listener_(native_input_stream_listener)
289 {
290 RTIOsapiMemory_zero(&native_, sizeof(native_));
291
292 native_.connection_data = static_cast<void*>(this);
293 native_.create_session = ConnectionForwarder::create_session;
294 native_.delete_session = ConnectionForwarder::delete_session;
295 native_.create_stream_reader =
296 ConnectionForwarder::create_stream_reader;
297 native_.delete_stream_reader =
298 ConnectionForwarder::delete_stream_reader;
299 native_.create_stream_writer =
300 ConnectionForwarder::create_stream_writer;
301 native_.delete_stream_writer =
302 ConnectionForwarder::delete_stream_writer;
303 native_.get_input_stream_discovery_reader =
304 ConnectionForwarder::get_input_stream_discovery_reader;
305 native_.get_output_stream_discovery_reader =
306 ConnectionForwarder::get_output_stream_discovery_reader;
307 native_.update = ConnectionForwarder::update;
308 }
309
310 ~ConnectionForwarder()
311 {
312 }
313
314private:
315 RTI_RoutingServiceConnectionExt native_;
316 Connection *connection_;
317 DiscoveryStreamReaderForwarder *output_stream_discovery_forwarder_;
318 StreamReaderListener output_discovery_listener_;
319 DiscoveryStreamReaderForwarder *input_stream_discovery_forwarder_;
320 StreamReaderListener input_discovery_listener_;
321
322};
323
324}}}}
325
326#endif // RTI_ROUTING_ADAPTER_DETAIL_CONNECTION_FORWARDER_HPP_
Listener representation used by StreamReader to notify RTI Routing Service when new data is available...