RTI Recording Service  Version 6.0.1
 All Data Structures Namespaces Files Functions Typedefs Enumerations Enumerator Groups
StorageStreamWriterForwarder.hpp
1 /*
2  * (c) Copyright, Real-Time Innovations, 2017.
3  * All rights reserved.
4  *
5  * No duplications, whole or partial, manual or electronic, may be made
6  * without express written permission. Any such copies, or
7  * revisions thereof, must display this notice unaltered.
8  * This code contains trade secrets of Real-Time Innovations, Inc.
9  */
10 
11 #ifndef RTI_RECORDING_STORAGE_DETAIL_STORAGE_STREAM_WRITER_FORWARDER_HPP_
12 #define RTI_RECORDING_STORAGE_DETAIL_STORAGE_STREAM_WRITER_FORWARDER_HPP_
13 
14 #include <map>
15 #include <string>
16 
17 #include "log/log_common.h"
18 #include "recordingservice/recordingservice_storagewriter.h"
19 
20 #include "rtiboost/smart_ptr/shared_ptr.hpp"
21 #include "rti/routing/detail/ForwarderUtils.hpp"
22 
24 #include "rti/recording/storage/StorageStreamWriter.hpp"
25 
26 namespace rti { namespace recording { namespace storage { namespace detail {
27 
34 public:
35  StorageStreamWriterDeleter(StorageWriter *storage_writer_parent)
36  : storage_writer_parent_(storage_writer_parent)
37  {
38  RTIBOOST_ASSERT(storage_writer_parent != NULL);
39  }
40 
41  void operator ()(StorageStreamWriter *stream_writer_instance)
42  {
43  if (stream_writer_instance != NULL) {
44  storage_writer_parent_->delete_stream_writer(
45  stream_writer_instance);
46  }
47  }
48 
49 private:
50  StorageWriter *storage_writer_parent_;
51 };
52 
53 class StorageStreamWriterForwarder :
54  public RTI_RecordingServiceStorageStreamWriter {
55 public:
56 
57  StorageStreamWriterForwarder(
58  StorageStreamWriter *stream_writer,
59  StorageWriter *storage_writer_parent)
60  : storage_stream_writer_(
61  stream_writer,
62  StorageStreamWriterDeleter(storage_writer_parent))
63  {
64  store = StorageStreamWriterForwarder::store_fwd;
65  stream_writer_data = this;
66  }
67 
68  ~StorageStreamWriterForwarder()
69  {
70  }
71 
72  static void store_fwd(
73  void *stream_writer_data,
74  const RTI_RoutingServiceSample *samples,
75  const RTI_RoutingServiceSampleInfo *sample_infos,
76  const int count)
77  {
78  try {
79  StorageStreamWriterForwarder *forwarder =
80  static_cast<StorageStreamWriterForwarder *>(
81  stream_writer_data);
82  // Fill in internal vector with incoming samples
83  forwarder->sample_seq_.assign(samples, samples + count);
84  // Fill in internal vector with incoming sample infos, the sample
85  // info array is optional so check for validity before doing
86  // anything
87  if (sample_infos != NULL) {
88  forwarder->info_seq_.assign(sample_infos, sample_infos + count);
89  }
90  // Forward the call to the StorageStreamWriter implementation
91  forwarder->storage_stream_writer_->store(
92  forwarder->sample_seq_,
93  forwarder->info_seq_);
94  } catch (const std::exception& ex) {
95  RTILog_printContextAndMsg(
96  RTI_LOG_BIT_EXCEPTION,
97  RTI_FUNCTION_NAME,
98  &RTI_LOG_ANY_s,
99  ex.what());
100  } catch (...) {
101  RTILog_printContextAndMsg(
102  RTI_LOG_BIT_EXCEPTION,
103  RTI_FUNCTION_NAME,
104  &RTI_LOG_ANY_s,
105  "unknown exception");
106  }
107  }
108 
109  StorageStreamWriter * stream_writer()
110  {
111  return storage_stream_writer_.get();
112  }
113 
114 private:
115  rtiboost::shared_ptr<StorageStreamWriter> storage_stream_writer_;
116 
117  std::vector<StorageStreamWriter::SamplePtr> sample_seq_;
118  std::vector<StorageStreamWriter::InfoPtr> info_seq_;
119 };
120 
131 template<typename NativeStreamWriterType, typename NativeStoredType>
132 class StorageGenericStreamWriterForwarder : public NativeStreamWriterType {
133 public:
134 
136  StorageStreamWriter *stream_writer,
137  StorageWriter *storage_writer_parent)
138  : storage_stream_writer_(
139  stream_writer,
140  StorageStreamWriterDeleter(storage_writer_parent))
141  {
142  }
143 
145  {
146  }
147 
148  static void store_fwd(
149  void *stream_writer_data,
150  const NativeStoredType **samples,
151  const RTI_RoutingServiceSampleInfo *sample_infos,
152  const int count)
153  {
154  try {
157  stream_writer_data);
158  /*
159  * In the case of the generic stream writer forwarder, used for the
160  * built-in DDS discovery topics, we are not guaranteed that the
161  * stream writer will not be null. This can happen if the user
162  * doesn't override the default implementations, which return null.
163  * The user may not have interest in recording one or any of the
164  * topics. We should account for that case.
165  */
166  if (forwarder->storage_stream_writer_.get() == NULL) {
167  return;
168  }
169  // Fill in internal vector with incoming samples
170  StorageStreamWriter::SamplePtr *untyped_samples =
171  (StorageStreamWriter::SamplePtr *) samples;
172  forwarder->sample_seq_.assign(
173  untyped_samples,
174  untyped_samples + count);
175  /*
176  * Fill in internal vector with incoming sample infos, the sample
177  * info array is optional so check for validity before doing
178  * anything
179  */
180  if (sample_infos != NULL) {
181  forwarder->info_seq_.assign(sample_infos, sample_infos + count);
182  }
183  // Forward the call to the StorageStreamWriter implementation
184  forwarder->storage_stream_writer_->store(
185  forwarder->sample_seq_,
186  forwarder->info_seq_);
187  } catch (const std::exception& ex) {
188  RTILog_printContextAndMsg(
189  RTI_LOG_BIT_EXCEPTION,
190  RTI_FUNCTION_NAME,
191  &RTI_LOG_ANY_s,
192  ex.what());
193  } catch (...) {
194  RTILog_printContextAndMsg(
195  RTI_LOG_BIT_EXCEPTION,
196  RTI_FUNCTION_NAME,
197  &RTI_LOG_ANY_s,
198  "unknown exception");
199  }
200  }
201 
202  StorageStreamWriter * stream_writer()
203  {
204  return storage_stream_writer_.get();
205  }
206 
207 private:
208  rtiboost::shared_ptr<StorageStreamWriter> storage_stream_writer_;
209 
210  std::vector<StorageStreamWriter::SamplePtr> sample_seq_;
211  std::vector<StorageStreamWriter::InfoPtr> info_seq_;
212 
213 };
214 
215 class StorageParticipantWriterForwarder :
217  RTI_RecordingServiceStorageParticipantWriter,
218  DDS_ParticipantBuiltinTopicData> {
219 public:
220 
221  StorageParticipantWriterForwarder(
222  StorageStreamWriter *stream_writer,
223  StorageWriter *storage_writer_parent) :
225  RTI_RecordingServiceStorageParticipantWriter,
226  DDS_ParticipantBuiltinTopicData>(
227  stream_writer,
228  storage_writer_parent)
229  {
230  store = store_fwd;
231  stream_writer_data = this;
232  }
233 };
234 
235 class StoragePublicationWriterForwarder :
236  public StorageGenericStreamWriterForwarder<
237  RTI_RecordingServiceStoragePublicationWriter,
238  DDS_PublicationBuiltinTopicData> {
239 public:
240 
241  StoragePublicationWriterForwarder(
242  StorageStreamWriter *stream_writer,
243  StorageWriter *storage_writer_parent) :
244  StorageGenericStreamWriterForwarder<
245  RTI_RecordingServiceStoragePublicationWriter,
246  DDS_PublicationBuiltinTopicData>(
247  stream_writer,
248  storage_writer_parent)
249  {
250  store = store_fwd;
251  stream_writer_data = this;
252  }
253 };
254 
255 class StorageSubscriptionWriterForwarder :
256  public StorageGenericStreamWriterForwarder<
257  RTI_RecordingServiceStorageSubscriptionWriter,
258  DDS_SubscriptionBuiltinTopicData> {
259 public:
260 
261  StorageSubscriptionWriterForwarder(
262  StorageStreamWriter *stream_writer,
263  StorageWriter *storage_writer_parent) :
264  StorageGenericStreamWriterForwarder<
265  RTI_RecordingServiceStorageSubscriptionWriter,
266  DDS_SubscriptionBuiltinTopicData>(
267  stream_writer,
268  storage_writer_parent)
269  {
270  store = store_fwd;
271  stream_writer_data = this;
272  }
273 };
274 
275 } } } } // rti::recording::storage::detail
276 
277 #endif // RTI_RECORDING_STORAGE_DETAIL_STORAGE_STREAM_WRITER_FORWARDER_HPP_

RTI Recording Service Version 6.0.1 Copyright © Sun Nov 17 2019 Real-Time Innovations, Inc