11 #ifndef RTI_ROUTING_PROCESSOR_INPUT_HPP_
12 #define RTI_ROUTING_PROCESSOR_INPUT_HPP_
14 #include <dds/core/Value.hpp>
15 #include <dds/core/SafeEnumeration.hpp>
16 #include <rti/core/NativeValueType.hpp>
18 #include "routingservice/routingservice_processor.h"
19 #include "routingservice/routingservice_adapter_new.h"
20 #include <rti/routing/StreamInfo.hpp>
21 #include <rti/routing/adapter/StreamReader.hpp>
22 #include <rti/routing/processor/LoanedSamples.hpp>
23 #include <rti/routing/processor/Query.hpp>
25 namespace rti {
namespace routing {
namespace processor {
29 template <
typename Data,
typename Info>
class TypedInput;
33 RTI_RoutingServiceStreamReaderExt *native,
34 RTI_RoutingServiceRoute *native_route)
37 *RTI_RoutingServiceRoute_get_input_stream_info(native_route, native));
41 std::string name_from_native_input(
42 RTI_RoutingServiceStreamReaderExt *native,
43 RTI_RoutingServiceRoute *native_route)
45 return RTI_RoutingServiceRoute_get_input_name(
50 template <
typename Data,
typename Info = dds::sub::SampleInfo>
72 Input(RTI_RoutingServiceStreamReaderExt *native,
74 RTI_RoutingServiceRoute *native_route,
75 RTI_RoutingServiceEnvironment *native_env)
78 native_route_(native_route),
79 native_env_(native_env),
80 stream_info_(from_native_input(native, native_route)),
81 name_(name_from_native_input(native, native_route))
96 const std::string& name()
const
119 template <
typename Data,
typename Info>
134 template <
typename Data>
161 std::vector<SamplePtr>& sample_seq,
162 std::vector<InfoPtr>& info_seq) RTI_FINAL
164 using namespace rti::routing::adapter;
166 detail::NativeSamples native_samples;
168 native_->stream_reader_data,
169 &native_samples.sample_array_,
170 &native_samples.info_array_,
171 &native_samples.length_,
173 RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
174 get_samples_from_native(sample_seq, info_seq, native_samples);
178 std::vector<SamplePtr>& sample_seq,
179 std::vector<InfoPtr>& info_seq) RTI_FINAL
181 using namespace rti::routing::adapter;
183 detail::NativeSamples native_samples;
185 native_->stream_reader_data,
186 &native_samples.sample_array_,
187 &native_samples.info_array_,
188 &native_samples.length_,
190 RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
191 get_samples_from_native(sample_seq, info_seq, native_samples);
195 std::vector<SamplePtr>& sample_seq,
196 std::vector<InfoPtr>& info_seq,
199 detail::NativeSamples native_samples;
200 native_->take_w_selector(
201 native_->stream_reader_data,
202 &native_samples.sample_array_,
203 &native_samples.info_array_,
204 &native_samples.length_,
205 &selector_state.native(),
207 RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
208 get_samples_from_native(sample_seq, info_seq, native_samples);
212 std::vector<SamplePtr>& sample_seq,
213 std::vector<InfoPtr>& info_seq,
216 detail::NativeSamples native_samples;
217 native_->read_w_selector(
218 native_->stream_reader_data,
219 &native_samples.sample_array_,
220 &native_samples.info_array_,
221 &native_samples.length_,
222 &selector_state.native(),
224 RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
225 get_samples_from_native(sample_seq, info_seq, native_samples);
229 std::vector<SamplePtr>& sample_seq,
230 std::vector<InfoPtr>& info_seq) RTI_FINAL
232 detail::NativeSamples native_samples;
233 native_samples.length_ = sample_seq.size();
234 native_samples.sample_array_ =
static_cast<RTI_RoutingServiceSample*
>
235 (sample_seq[native_samples.length_]);
236 if (info_seq.size() > 0) {
237 native_samples.info_array_ =
static_cast<RTI_RoutingServiceSample*
>
238 (info_seq[info_seq.size()]);
241 native_->return_loan(
242 native_->stream_reader_data,
243 native_samples.sample_array_,
244 native_samples.info_array_,
245 native_samples.length_,
247 RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
253 void* create_content_query(
254 void* old_query_data,
255 const dds::topic::Filter& filter) RTI_FINAL
257 if (native_->create_content_query == NULL) {
258 throw dds::core::UnsupportedError(
"Unsupported operation: create_content_query");
261 RTI_RoutingServiceSelectorContent content =
262 RTI_RoutingServiceSelectorContent_INITIALIZER;
263 content.expression = (
char *) filter.expression().c_str();
264 rti::core::native_conversions::to_native(
265 content.expression_parameters,
266 filter->parameters());
267 void* query_data = native_->create_content_query(
268 native_->stream_reader_data,
272 DDS_StringSeq_finalize(&content.expression_parameters);
273 RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
277 void delete_content_query(
void* query_data) RTI_FINAL
279 if (native_->delete_content_query == NULL) {
280 throw dds::core::UnsupportedError(
"delete_content_query");
283 native_->delete_content_query(
284 native_->stream_reader_data,
287 RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
291 virtual void update(
const std::map<std::string, std::string>&) RTI_FINAL
293 throw dds::core::PreconditionNotMetError(
294 "update not applicable within a Processor notification context");
299 void get_samples_from_native(
300 std::vector<SamplePtr>& sample_seq,
301 std::vector<InfoPtr>& info_seq,
302 detail::NativeSamples& native_samples)
304 sample_seq.reserve(native_samples.length_ + 1);
305 RTI_ROUTING_SAMPLE_VECTOR_COPY_FROM_NATIVE(
307 native_samples.sample_array_,
308 native_samples.length_);
309 sample_seq[native_samples.length_] =
310 native_samples.sample_array_;
311 if (native_samples.info_array_ != NULL) {
312 info_seq.reserve(native_samples.length_ + 1);
313 RTI_ROUTING_SAMPLE_VECTOR_COPY_FROM_NATIVE(
315 native_samples.info_array_,
316 native_samples.length_);
317 info_seq[info_seq.size()] =
318 native_samples.info_array_;
324 RTI_RoutingServiceStreamReaderExt* native()
329 template <
typename Data,
typename Info>
friend class TypedInput;
331 friend class std::allocator<Input>;
332 RTI_RoutingServiceStreamReaderExt *native_;
334 RTI_RoutingServiceRoute *native_route_;
335 RTI_RoutingServiceEnvironment *native_env_;
340 template <
typename Data,
typename Info = dds::sub::SampleInfo>
351 template <
typename Data,
typename Info>
372 const std::string& name()
const;
427 const dds::topic::Filter& filter);
519 template <
typename Data,
typename Info>
532 : typed_input_(input), query_(dds::core::null)
541 : typed_input_(other.typed_input_),
542 state_(other.state_),
560 const dds::sub::status::DataState& the_state)
562 state_.
state(the_state);
599 const dds::core::InstanceHandle& the_handle)
635 const dds::core::InstanceHandle& the_handle)
655 const dds::topic::Filter& the_filter)
657 state_.
filter(the_filter);
680 state_.content(the_query.delegate().get()->query_data_);
696 return typed_input_.take(state_);
711 return typed_input_.read(state_);
721 template <
typename Data,
typename Info>
722 struct create_data_from_input {
724 static Data
get(TypedInput<Data, Info>& )
730 template <
typename Info>
731 struct create_data_from_input<dds::core::xtypes::DynamicData, Info> {
733 static dds::core::xtypes::DynamicData
get(
734 TypedInput<dds::core::xtypes::DynamicData, Info>& input)
736 if (input->stream_info().type_info().type_representation_kind()
737 != TypeRepresentationKind::DYNAMIC_TYPE) {
738 throw dds::core::PreconditionNotMetError(
739 "inconsistent data representation kind");
741 dds::core::xtypes::DynamicType *type_code =
742 static_cast<dds::core::xtypes::DynamicType *
> (
743 input->stream_info().type_info().type_representation());
744 return dds::core::xtypes::DynamicData(*type_code);
748 template <
typename Data,
typename Info>
749 TypedInput<Data, Info>::TypedInput(Input *input)
754 template <
typename Data,
typename Info>
757 return input_->stream_info_;
760 template <
typename Data,
typename Info>
763 return input_->name_;
766 template <
typename Data,
typename Info>
769 detail::NativeSamples native_samples;
770 input_->native_->take(
771 input_->native_->stream_reader_data,
772 &native_samples.sample_array_,
773 &native_samples.info_array_,
774 &native_samples.length_,
775 input_->native_env_);
776 RTI_ROUTING_THROW_ON_ENV_ERROR(input_->native_env_);
780 input_->native_env_);
783 template <
typename Data,
typename Info>
786 detail::NativeSamples native_samples;
787 input_->native_->read(
788 input_->native_->stream_reader_data,
789 &native_samples.sample_array_,
790 &native_samples.info_array_,
791 &native_samples.length_,
792 input_->native_env_);
793 RTI_ROUTING_THROW_ON_ENV_ERROR(input_->native_env_);
797 input_->native_env_);
800 template <
typename Data,
typename Info>
807 template <
typename Data,
typename Info>
809 const dds::topic::Filter& filter)
812 &input_->stream_reader(),
817 template <
typename Data,
typename Info>
821 detail::NativeSamples native_samples;
822 input_->native_->take_w_selector(
823 input_->native_->stream_reader_data,
824 &native_samples.sample_array_,
825 &native_samples.info_array_,
826 &native_samples.length_,
827 &selector_state.native(),
828 input_->native_env_);
829 RTI_ROUTING_THROW_ON_ENV_ERROR(input_->native_env_);
830 return LoanedSamples<Data, Info>(
833 input_->native_env_);
836 template <
typename Data,
typename Info>
840 detail::NativeSamples native_samples;
841 input_->native_->read_w_selector(
842 input_->native_->stream_reader_data,
843 &native_samples.sample_array_,
844 &native_samples.info_array_,
845 &native_samples.length_,
846 &selector_state.native(),
847 input_->native_env_);
848 RTI_ROUTING_THROW_ON_ENV_ERROR(input_->native_env_);
849 return LoanedSamples<Data, Info>(
852 input_->native_env_);
855 template <
typename Data,
typename Info>
858 return RTI_RoutingServiceRoute_is_input_active(
859 input_->native_route_,
860 input_->native_) ?
true :
false;
863 template <
typename Data,
typename Info>
866 return create_data_from_input<Data, Info>::get(*
this);
881 #endif // RTI_ROUTING_PROCESSOR_INPUT_HPP_