11 #ifndef RTI_ROUTING_PROCESSOR_INPUT_HPP_
12 #define RTI_ROUTING_PROCESSOR_INPUT_HPP_
14 #include <dds/core/Value.hpp>
15 #include <dds/core/SafeEnumeration.hpp>
16 #include <rti/core/NativeValueType.hpp>
17 #include <rti/core/Entity.hpp>
18 #include <dds/sub/DataReader.hpp>
19 #include <dds/core/xtypes/DynamicData.hpp>
21 #include "routingservice/routingservice_processor.h"
22 #include "routingservice/routingservice_adapter_new.h"
23 #include <rti/routing/StreamInfo.hpp>
24 #include <rti/routing/adapter/StreamReader.hpp>
25 #include <rti/routing/processor/LoanedSamples.hpp>
26 #include <rti/routing/processor/Query.hpp>
28 namespace rti {
namespace routing {
namespace processor {
32 template <
typename Data,
typename Info>
class TypedInput;
36 RTI_RoutingServiceStreamReaderExt *native,
37 RTI_RoutingServiceRoute *native_route)
40 *RTI_RoutingServiceRoute_get_input_stream_info(native_route, native));
44 std::string name_from_native_input(
45 RTI_RoutingServiceStreamReaderExt *native,
46 RTI_RoutingServiceRoute *native_route)
48 return RTI_RoutingServiceRoute_get_input_name(
53 template <
typename Data,
typename Info = dds::sub::SampleInfo>
75 Input(RTI_RoutingServiceStreamReaderExt *native,
77 RTI_RoutingServiceRoute *native_route,
78 RTI_RoutingServiceEnvironment *native_env)
81 native_route_(native_route),
82 native_env_(native_env),
83 stream_info_(from_native_input(native, native_route)),
84 name_(name_from_native_input(native, native_route))
99 const std::string& name()
const
122 template <
typename Data,
typename Info>
137 template <
typename Data>
164 std::vector<SamplePtr>& sample_seq,
165 std::vector<InfoPtr>& info_seq) RTI_FINAL
167 using namespace rti::routing::adapter;
169 detail::NativeSamples native_samples;
171 native_->stream_reader_data,
172 &native_samples.sample_array_,
173 &native_samples.info_array_,
174 &native_samples.length_,
176 RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
177 get_samples_from_native(sample_seq, info_seq, native_samples);
181 std::vector<SamplePtr>& sample_seq,
182 std::vector<InfoPtr>& info_seq) RTI_FINAL
184 using namespace rti::routing::adapter;
186 detail::NativeSamples native_samples;
188 native_->stream_reader_data,
189 &native_samples.sample_array_,
190 &native_samples.info_array_,
191 &native_samples.length_,
193 RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
194 get_samples_from_native(sample_seq, info_seq, native_samples);
198 std::vector<SamplePtr>& sample_seq,
199 std::vector<InfoPtr>& info_seq,
202 detail::NativeSamples native_samples;
203 native_->take_w_selector(
204 native_->stream_reader_data,
205 &native_samples.sample_array_,
206 &native_samples.info_array_,
207 &native_samples.length_,
208 &selector_state.native(),
210 RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
211 get_samples_from_native(sample_seq, info_seq, native_samples);
215 std::vector<SamplePtr>& sample_seq,
216 std::vector<InfoPtr>& info_seq,
219 detail::NativeSamples native_samples;
220 native_->read_w_selector(
221 native_->stream_reader_data,
222 &native_samples.sample_array_,
223 &native_samples.info_array_,
224 &native_samples.length_,
225 &selector_state.native(),
227 RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
228 get_samples_from_native(sample_seq, info_seq, native_samples);
232 std::vector<SamplePtr>& sample_seq,
233 std::vector<InfoPtr>& info_seq) RTI_FINAL
235 detail::NativeSamples native_samples;
236 native_samples.length_ = sample_seq.size();
237 native_samples.sample_array_ =
static_cast<RTI_RoutingServiceSample*
>
238 (sample_seq[native_samples.length_]);
239 if (info_seq.size() > 0) {
240 native_samples.info_array_ =
static_cast<RTI_RoutingServiceSample*
>
241 (info_seq[info_seq.size()]);
244 native_->return_loan(
245 native_->stream_reader_data,
246 native_samples.sample_array_,
247 native_samples.info_array_,
248 native_samples.length_,
250 RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
256 void* create_content_query(
257 void* old_query_data,
258 const dds::topic::Filter& filter) RTI_FINAL
260 if (native_->create_content_query == NULL) {
261 throw dds::core::UnsupportedError(
"Unsupported operation: create_content_query");
264 RTI_RoutingServiceSelectorContent content =
265 RTI_RoutingServiceSelectorContent_INITIALIZER;
266 content.expression = (
char *) filter.expression().c_str();
267 rti::core::native_conversions::to_native(
268 content.expression_parameters,
269 filter->parameters());
270 void* query_data = native_->create_content_query(
271 native_->stream_reader_data,
275 DDS_StringSeq_finalize(&content.expression_parameters);
276 RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
280 void delete_content_query(
void* query_data) RTI_FINAL
282 if (native_->delete_content_query == NULL) {
283 throw dds::core::UnsupportedError(
"delete_content_query");
286 native_->delete_content_query(
287 native_->stream_reader_data,
290 RTI_ROUTING_THROW_ON_ENV_ERROR(native_env_);
294 virtual void update(
const std::map<std::string, std::string>&) RTI_FINAL
296 throw dds::core::PreconditionNotMetError(
297 "update not applicable within a Processor notification context");
302 void get_samples_from_native(
303 std::vector<SamplePtr>& sample_seq,
304 std::vector<InfoPtr>& info_seq,
305 detail::NativeSamples& native_samples)
307 sample_seq.reserve(native_samples.length_ + 1);
308 RTI_ROUTING_SAMPLE_VECTOR_COPY_FROM_NATIVE(
310 native_samples.sample_array_,
311 native_samples.length_);
312 sample_seq[native_samples.length_] =
313 native_samples.sample_array_;
314 if (native_samples.info_array_ != NULL) {
315 info_seq.reserve(native_samples.length_ + 1);
316 RTI_ROUTING_SAMPLE_VECTOR_COPY_FROM_NATIVE(
318 native_samples.info_array_,
319 native_samples.length_);
320 info_seq[info_seq.size()] =
321 native_samples.info_array_;
327 RTI_RoutingServiceStreamReaderExt* native()
332 template <
typename Data,
typename Info>
friend class TypedInput;
334 friend class std::allocator<Input>;
335 RTI_RoutingServiceStreamReaderExt *native_;
337 RTI_RoutingServiceRoute *native_route_;
338 RTI_RoutingServiceEnvironment *native_env_;
341 DDS_Entity *native_topic_;
344 template <
typename Data,
typename Info = dds::sub::SampleInfo>
355 template <
typename Data,
typename Info>
376 const std::string& name()
const;
431 const dds::topic::Filter& filter);
490 dds::sub::DataReader<Data> dds_data_reader()
492 using rti::core::detail::create_from_native_entity;
494 DDS_DataReader *native_reader = RTI_RoutingServiceRoute_get_dds_reader(
495 input_->native_route_,
497 if (native_reader == NULL) {
498 throw dds::core::InvalidArgumentError(
499 "invalid argument: input does not hold a DDS StreamReader");
502 typedef dds::sub::DataReader<Data> data_reader_type;
503 return rti::core::detail::create_from_native_entity<data_reader_type>(
557 template <
typename Data,
typename Info>
570 : typed_input_(input), query_(dds::core::null)
579 : typed_input_(other.typed_input_),
580 state_(other.state_),
598 const dds::sub::status::DataState& the_state)
600 state_.
state(the_state);
637 const dds::core::InstanceHandle& the_handle)
673 const dds::core::InstanceHandle& the_handle)
693 const dds::topic::Filter& the_filter)
695 state_.
filter(the_filter);
718 state_.content(the_query.delegate().get()->query_data_);
734 return typed_input_.take(state_);
749 return typed_input_.read(state_);
759 template <
typename Data,
typename Info>
760 struct create_data_from_input {
762 static Data
get(TypedInput<Data, Info>& )
768 template <
typename Info>
769 struct create_data_from_input<dds::core::xtypes::DynamicData, Info> {
771 static dds::core::xtypes::DynamicData
get(
772 TypedInput<dds::core::xtypes::DynamicData, Info>& input)
774 if (input->stream_info().type_info().type_representation_kind()
775 != TypeRepresentationKind::DYNAMIC_TYPE) {
776 throw dds::core::PreconditionNotMetError(
777 "inconsistent data representation kind");
779 dds::core::xtypes::DynamicType *type_code =
780 static_cast<dds::core::xtypes::DynamicType *
> (
781 input->stream_info().type_info().type_representation());
782 return dds::core::xtypes::DynamicData(*type_code);
786 template <
typename Data,
typename Info>
787 TypedInput<Data, Info>::TypedInput(Input *input)
792 template <
typename Data,
typename Info>
795 return input_->stream_info_;
798 template <
typename Data,
typename Info>
801 return input_->name_;
804 template <
typename Data,
typename Info>
807 detail::NativeSamples native_samples;
808 input_->native_->take(
809 input_->native_->stream_reader_data,
810 &native_samples.sample_array_,
811 &native_samples.info_array_,
812 &native_samples.length_,
813 input_->native_env_);
814 RTI_ROUTING_THROW_ON_ENV_ERROR(input_->native_env_);
818 input_->native_env_);
821 template <
typename Data,
typename Info>
824 detail::NativeSamples native_samples;
825 input_->native_->read(
826 input_->native_->stream_reader_data,
827 &native_samples.sample_array_,
828 &native_samples.info_array_,
829 &native_samples.length_,
830 input_->native_env_);
831 RTI_ROUTING_THROW_ON_ENV_ERROR(input_->native_env_);
835 input_->native_env_);
838 template <
typename Data,
typename Info>
845 template <
typename Data,
typename Info>
847 const dds::topic::Filter& filter)
850 &input_->stream_reader(),
855 template <
typename Data,
typename Info>
859 detail::NativeSamples native_samples;
860 input_->native_->take_w_selector(
861 input_->native_->stream_reader_data,
862 &native_samples.sample_array_,
863 &native_samples.info_array_,
864 &native_samples.length_,
865 &selector_state.native(),
866 input_->native_env_);
867 RTI_ROUTING_THROW_ON_ENV_ERROR(input_->native_env_);
868 return LoanedSamples<Data, Info>(
871 input_->native_env_);
874 template <
typename Data,
typename Info>
878 detail::NativeSamples native_samples;
879 input_->native_->read_w_selector(
880 input_->native_->stream_reader_data,
881 &native_samples.sample_array_,
882 &native_samples.info_array_,
883 &native_samples.length_,
884 &selector_state.native(),
885 input_->native_env_);
886 RTI_ROUTING_THROW_ON_ENV_ERROR(input_->native_env_);
887 return LoanedSamples<Data, Info>(
890 input_->native_env_);
893 template <
typename Data,
typename Info>
896 return RTI_RoutingServiceRoute_is_input_active(
897 input_->native_route_,
898 input_->native_) ?
true :
false;
901 template <
typename Data,
typename Info>
904 return create_data_from_input<Data, Info>::get(*
this);
915 typedef dds::sub::DataReader<dds::core::xtypes::DynamicData> DynamicDataReader;
920 #endif // RTI_ROUTING_PROCESSOR_INPUT_HPP_