RTI Recording Service  Version 6.0.0
 All Data Structures Namespaces Files Functions Typedefs Enumerations Enumerator Groups
StorageStreamWriterForwarder.hpp
1 /*
2  * (c) Copyright, Real-Time Innovations, 2017.
3  * All rights reserved.
4  *
5  * No duplications, whole or partial, manual or electronic, may be made
6  * without express written permission. Any such copies, or
7  * revisions thereof, must display this notice unaltered.
8  * This code contains trade secrets of Real-Time Innovations, Inc.
9  */
10 
11 #ifndef RTI_RECORDING_STORAGE_DETAIL_STORAGE_STREAM_WRITER_FORWARDER_HPP_
12 #define RTI_RECORDING_STORAGE_DETAIL_STORAGE_STREAM_WRITER_FORWARDER_HPP_
13 
14 #include <map>
15 #include <string>
16 
17 #include "log/log_common.h"
18 #include "recordingservice/recordingservice_storagewriter.h"
19 
20 #include "rtiboost/smart_ptr/shared_ptr.hpp"
21 #include "rti/routing/detail/ForwarderUtils.hpp"
22 
24 #include "rti/recording/storage/StorageStreamWriter.hpp"
25 
26 namespace rti { namespace recording { namespace storage { namespace detail {
27 
34 public:
35  StorageStreamWriterDeleter(StorageWriter *storage_writer_parent)
36  : storage_writer_parent_(storage_writer_parent)
37  {
38  RTIBOOST_ASSERT(storage_writer_parent != NULL);
39  }
40 
41  void operator ()(StorageStreamWriter *stream_writer_instance)
42  {
43  if (stream_writer_instance != NULL) {
44  storage_writer_parent_->delete_stream_writer(
45  stream_writer_instance);
46  }
47  }
48 
49 private:
50  StorageWriter *storage_writer_parent_;
51 };
52 
53 class StorageStreamWriterForwarder :
54  public RTI_RecordingServiceStorageStreamWriter {
55 public:
56 
57  StorageStreamWriterForwarder(
58  StorageStreamWriter *stream_writer,
59  StorageWriter *storage_writer_parent)
60  : storage_stream_writer_(
61  stream_writer,
62  StorageStreamWriterDeleter(storage_writer_parent))
63  {
64  store = StorageStreamWriterForwarder::store_fwd;
65  stream_writer_data = this;
66  }
67 
68  ~StorageStreamWriterForwarder()
69  {
70  }
71 
72  static void store_fwd(
73  void *stream_writer_data,
74  const RTI_RoutingServiceSample *samples,
75  const RTI_RoutingServiceSampleInfo *sample_infos,
76  const int count)
77  {
78  try {
79  StorageStreamWriterForwarder *forwarder =
80  static_cast<StorageStreamWriterForwarder *>(
81  stream_writer_data);
82  // Fill in internal vector with incoming samples
83  forwarder->sample_seq_.assign(samples, samples + count);
84  // Fill in internal vector with incoming sample infos, the sample
85  // info array is optional so check for validity before doing
86  // anything
87  if (sample_infos != NULL) {
88  forwarder->info_seq_.assign(sample_infos, sample_infos + count);
89  }
90  // Forward the call to the StorageStreamWriter implementation
91  forwarder->storage_stream_writer_->store(
92  forwarder->sample_seq_,
93  forwarder->info_seq_);
94  } catch (const std::exception& ex) {
95  RTILog_printContextAndMsg(
96  RTI_FUNCTION_NAME,
97  &RTI_LOG_ANY_s,
98  ex.what());
99  } catch (...) {
100  RTILog_printContextAndMsg(
101  RTI_FUNCTION_NAME,
102  &RTI_LOG_ANY_s,
103  "unknown exception");
104  }
105  }
106 
107  StorageStreamWriter * stream_writer()
108  {
109  return storage_stream_writer_.get();
110  }
111 
112 private:
113  rtiboost::shared_ptr<StorageStreamWriter> storage_stream_writer_;
114 
115  std::vector<StorageStreamWriter::SamplePtr> sample_seq_;
116  std::vector<StorageStreamWriter::InfoPtr> info_seq_;
117 };
118 
129 template<typename NativeStreamWriterType, typename NativeStoredType>
130 class StorageGenericStreamWriterForwarder : public NativeStreamWriterType {
131 public:
132 
134  StorageStreamWriter *stream_writer,
135  StorageWriter *storage_writer_parent)
136  : storage_stream_writer_(
137  stream_writer,
138  StorageStreamWriterDeleter(storage_writer_parent))
139  {
140  }
141 
143  {
144  }
145 
146  static void store_fwd(
147  void *stream_writer_data,
148  const NativeStoredType **samples,
149  const RTI_RoutingServiceSampleInfo *sample_infos,
150  const int count)
151  {
152  try {
155  stream_writer_data);
156  /*
157  * In the case of the generic stream writer forwarder, used for the
158  * built-in DDS discovery topics, we are not guaranteed that the
159  * stream writer will not be null. This can happen if the user
160  * doesn't override the default implementations, which return null.
161  * The user may not have interest in recording one or any of the
162  * topics. We should account for that case.
163  */
164  if (forwarder->storage_stream_writer_.get() == NULL) {
165  return;
166  }
167  // Fill in internal vector with incoming samples
168  StorageStreamWriter::SamplePtr *untyped_samples =
169  (StorageStreamWriter::SamplePtr *) samples;
170  forwarder->sample_seq_.assign(
171  untyped_samples,
172  untyped_samples + count);
173  /*
174  * Fill in internal vector with incoming sample infos, the sample
175  * info array is optional so check for validity before doing
176  * anything
177  */
178  if (sample_infos != NULL) {
179  forwarder->info_seq_.assign(sample_infos, sample_infos + count);
180  }
181  // Forward the call to the StorageStreamWriter implementation
182  forwarder->storage_stream_writer_->store(
183  forwarder->sample_seq_,
184  forwarder->info_seq_);
185  } catch (const std::exception& ex) {
186  RTILog_printContextAndMsg(
187  RTI_FUNCTION_NAME,
188  &RTI_LOG_ANY_s,
189  ex.what());
190  } catch (...) {
191  RTILog_printContextAndMsg(
192  RTI_FUNCTION_NAME,
193  &RTI_LOG_ANY_s,
194  "unknown exception");
195  }
196  }
197 
198  StorageStreamWriter * stream_writer()
199  {
200  return storage_stream_writer_.get();
201  }
202 
203 private:
204  rtiboost::shared_ptr<StorageStreamWriter> storage_stream_writer_;
205 
206  std::vector<StorageStreamWriter::SamplePtr> sample_seq_;
207  std::vector<StorageStreamWriter::InfoPtr> info_seq_;
208 
209 };
210 
211 class StorageParticipantWriterForwarder :
213  RTI_RecordingServiceStorageParticipantWriter,
214  DDS_ParticipantBuiltinTopicData> {
215 public:
216 
217  StorageParticipantWriterForwarder(
218  StorageStreamWriter *stream_writer,
219  StorageWriter *storage_writer_parent) :
221  RTI_RecordingServiceStorageParticipantWriter,
222  DDS_ParticipantBuiltinTopicData>(
223  stream_writer,
224  storage_writer_parent)
225  {
226  store = store_fwd;
227  stream_writer_data = this;
228  }
229 };
230 
231 class StoragePublicationWriterForwarder :
232  public StorageGenericStreamWriterForwarder<
233  RTI_RecordingServiceStoragePublicationWriter,
234  DDS_PublicationBuiltinTopicData> {
235 public:
236 
237  StoragePublicationWriterForwarder(
238  StorageStreamWriter *stream_writer,
239  StorageWriter *storage_writer_parent) :
240  StorageGenericStreamWriterForwarder<
241  RTI_RecordingServiceStoragePublicationWriter,
242  DDS_PublicationBuiltinTopicData>(
243  stream_writer,
244  storage_writer_parent)
245  {
246  store = store_fwd;
247  stream_writer_data = this;
248  }
249 };
250 
251 class StorageSubscriptionWriterForwarder :
252  public StorageGenericStreamWriterForwarder<
253  RTI_RecordingServiceStorageSubscriptionWriter,
254  DDS_SubscriptionBuiltinTopicData> {
255 public:
256 
257  StorageSubscriptionWriterForwarder(
258  StorageStreamWriter *stream_writer,
259  StorageWriter *storage_writer_parent) :
260  StorageGenericStreamWriterForwarder<
261  RTI_RecordingServiceStorageSubscriptionWriter,
262  DDS_SubscriptionBuiltinTopicData>(
263  stream_writer,
264  storage_writer_parent)
265  {
266  store = store_fwd;
267  stream_writer_data = this;
268  }
269 };
270 
271 } } } } // rti::recording::storage::detail
272 
273 #endif // RTI_RECORDING_STORAGE_DETAIL_STORAGE_STREAM_WRITER_FORWARDER_HPP_

RTI Recording Service Version 6.0.0 Copyright © Sun Mar 3 2019 Real-Time Innovations, Inc