RTI Routing Service Version 7.2.0
StreamWriterForwarder.hpp
1/*
2 * (c) Copyright, Real-Time Innovations, 2017.
3 * All rights reserved.
4 *
5 * No duplications, whole or partial, manual or electronic, may be made
6 * without express written permission. Any such copies, or
7 * revisions thereof, must display this notice unaltered.
8 * This code contains trade secrets of Real-Time Innovations, Inc.
9 */
10
11#ifndef RTI_ROUTING_ADAPTER_DETAIL_STREAM_WRITER_FORWARDER_HPP_
12#define RTI_ROUTING_ADAPTER_DETAIL_STREAM_WRITER_FORWARDER_HPP_
13
14#include <rti/core/Exception.hpp>
15
16#include <rti/routing/adapter/StreamWriter.hpp>
17#include <rti/routing/detail/UpdatableEntityForwarder.hpp>
18#include <rti/routing/detail/ForwarderUtils.hpp>
19
20namespace rti { namespace routing { namespace adapter { namespace detail {
21
22
23class StreamWriterForwarder {
24public:
25
26 static RTI_RoutingServiceStreamWriterExt* create_native(
27 Connection *connection,
28 Session *session,
29 const struct RTI_RoutingServiceStreamInfo *native_stream_info,
30 const struct RTI_RoutingServiceProperties *native_properties,
31 RTI_RoutingServiceEnvironment *environment)
32 {
33 try {
34 using rti::routing::detail::ScopedForwarder;
35
36 StreamInfo stream_info(*native_stream_info);
37
38 std::map<std::string, std::string> properties;
39 rti::routing::PropertyAdapter::add_properties_from_native(
40 properties,
41 native_properties);
42
43 StreamWriterForwarder *forwarder = new StreamWriterForwarder(
44 NULL);
45 ScopedForwarder<Connection, StreamWriterForwarder> scoped(
46 connection,
47 forwarder,
48 environment);
49 forwarder->stream_writer_ = connection->create_stream_writer(
50 session,
51 stream_info,
52 properties);
53 RTI_ROUTING_THROW_ON_NULL(forwarder->stream_writer_);
54
55 scoped.release();
56
57 // TRUST-134: the forwarder is not leaked here. It might be leaked
58 // if delete_native() were not to be called, but Routing Service
59 // will make sure that is not the case
60
61 /* coverity[leaked_storage : FALSE] */
62 return forwarder->native();
63 } catch(const std::exception& ex) {
64 RTI_RoutingServiceEnvironment_set_error(
65 environment,
66 "%s",
67 ex.what());
68 return NULL;
69 } catch (...) {
70 RTI_RoutingServiceEnvironment_set_error(
71 environment,
72 "unexpected exception");
73 return NULL;
74 }
75
76 }
77
78 static void delete_native(
79 Connection *connection,
80 RTI_RoutingServiceStreamWriterExt *native_stream_writer,
81 RTI_RoutingServiceEnvironment *environment)
82 {
83 StreamWriterForwarder *stream_writer_forwarder =
84 static_cast<StreamWriterForwarder*>(
85 native_stream_writer->stream_writer_data);
86 try {
87 if (stream_writer_forwarder->stream_writer_ != NULL) {
88 connection->delete_stream_writer(
89 stream_writer_forwarder->stream_writer_);
90 stream_writer_forwarder->stream_writer_ = NULL;
91 }
92 } catch(const std::exception& ex) {
93 RTI_RoutingServiceEnvironment_set_error(
94 environment,
95 "%s",
96 ex.what());
97 } catch (...) {
98 }
99
100 delete stream_writer_forwarder;
101 }
102
103
104 static int write(
105 void *native_stream_writer_data,
106 const RTI_RoutingServiceSample *sample_array,
107 const RTI_RoutingServiceSampleInfo *sample_info_array,
108 int array_length,
109 RTI_RoutingServiceEnvironment *environment)
110 {
111
112 StreamWriterForwarder *forwarder =
113 static_cast<StreamWriterForwarder*>(native_stream_writer_data);
114
115 try {
116 RTI_ROUTING_SAMPLE_VECTOR_COPY_FROM_NATIVE(
117 forwarder->sample_seq_,
118 sample_array,
119 array_length);
120 if (sample_info_array != NULL) {
121 RTI_ROUTING_SAMPLE_VECTOR_COPY_FROM_NATIVE(
122 forwarder->info_seq_,
123 sample_info_array,
124 array_length);
125 } else {
126 forwarder->info_seq_.clear();
127 }
128 return forwarder->stream_writer_->write(
129 forwarder->sample_seq_,
130 forwarder->info_seq_);
131 } catch (const std::exception& ex) {
132 RTI_RoutingServiceEnvironment_set_error(
133 environment,
134 "%s",
135 ex.what());
136 } catch (...) {
137 RTI_RoutingServiceEnvironment_set_error(
138 environment,
139 "unexpected exception");
140 }
141
142 return 0;
143 }
144
145 static void update(
146 void *native_stream_writer_data,
147 const struct RTI_RoutingServiceProperties * native_properties,
148 RTI_RoutingServiceEnvironment * environment)
149 {
150
151 StreamWriterForwarder *stream_writer_forwarder =
152 static_cast<StreamWriterForwarder*> (native_stream_writer_data);
153
154 rti::routing::detail::UpdatableEntityForwarder::update(
155 stream_writer_forwarder->stream_writer_,
156 native_properties,
157 environment);
158 }
159
160 RTI_RoutingServiceStreamWriterExt* native()
161 {
162 return &this->native_;
163 }
164
165private:
166
167 StreamWriterForwarder(
168 StreamWriter *stream_writer) :
169 stream_writer_(stream_writer)
170 {
171 RTIOsapiMemory_zero(&native_, sizeof(native_));
172 native_.stream_writer_data =
173 static_cast<void*>(this);
174 native_.write =
175 StreamWriterForwarder::write;
176 native_.update =
177 StreamWriterForwarder::update;
178
179 }
180
181 ~StreamWriterForwarder()
182 {
183 }
184
185private:
186 RTI_RoutingServiceStreamWriterExt native_;
187 StreamWriter *stream_writer_;
188 std::vector<StreamWriter::SamplePtr> sample_seq_;
189 std::vector<StreamWriter::InfoPtr> info_seq_;
190
191};
192
193}}}}
194
195#endif // RTI_ROUTING_ADAPTER_DETAIL_STREAM_WRITER_FORWARDER_HPP_