Hi all,
currently, I am trying to build a small (example) application with the following architecture:
A python application pyapp, using RTI Connext Python API, is triggering another application called pubexp to send endlessly messages to an application called subexp.
So subexp is subscribing/listening to pubexp, and pubexp is subscribing/listening to pyapp.
Main code of pubexp:
class TriggerListener : public dds::sub::NoOpDataReaderListener<ExpTopic::TriggerData>
{
public:
TriggerListener(dds::pub::DataWriter<ExpTopic::OutData>& dataWriter) :
dataWriter_(dataWriter),
timeStep_(0)
{}
virtual void on_data_available(dds::sub::DataReader<ExpTopic::TriggerData>& dataReader)
{
dataReader.take();
while (true)
writeData();
}
private:
dds::pub::DataWriter<ExpTopic::OutData>& dataWriter_;
long timeStep_;
void writeData()
{
std::cout << "cycle " << timeStep_++ << std::endl;
int numberEntities = 100;
ExpTopic::OutData entity;
for (unsigned int i = 0; i < numberEntities; ++i)
{
entity.id(i);
dataWriter_.write(entity);
}
std::cout << "written " << numberEntities << " entities" << std::endl;
}
};
int main(int argc, char* argv[])
{
dds::domain::DomainParticipant domainParticipant(0);
dds::topic::Topic<ExpTopic::TriggerData> topicTrigger(domainParticipant, "Trigger");
dds::topic::Topic<ExpTopic::OutData> topicOut(domainParticipant, "LotsOfData");
dds::core::QosProvider qosProvider(getDefaultQoSFilepath(), "DefLib::DefProfile");
dds::pub::qos::PublisherQos qosPublisher = qosProvider.publisher_qos();
dds::sub::qos::SubscriberQos qosSubscriber = qosProvider.subscriber_qos();
dds::pub::Publisher publisher(domainParticipant, qosPublisher);
dds::sub::Subscriber subscriber(domainParticipant, qosSubscriber);
dds::pub::qos::DataWriterQos qos = qosProvider.datawriter_qos();
dds::sub::qos::DataReaderQos qosR = qosProvider.datareader_qos();
dds::pub::DataWriter<ExpTopic::OutData> dataWriter(publisher, topicOut, qos);
dds::sub::DataReader<ExpTopic::TriggerData> dataReaderTrigger(subscriber, topicTrigger, qosR);
TriggerListener listener(dataWriter);
dataReaderTrigger.listener(&listener, dds::core::status::StatusMask::data_available());
while (true) {}
return 0;
}
Main code of subexp:
class DataListener : public dds::sub::NoOpDataReaderListener<ExpTopic::OutData>
{
public:
virtual void on_data_available(dds::sub::DataReader<ExpTopic::OutData>& dataReader)
{
auto samples = dataReader.take();
}
};
int main(int argc, char* argv[])
{
dds::domain::DomainParticipant domainParticipant(0);
dds::topic::Topic<ExpTopic::OutData> topicData(domainParticipant, "LotsOfData");
dds::core::QosProvider qosProvider(getDefaultQoSFilepath(), "DefLib::DefProfile");
dds::sub::qos::SubscriberQos qosSubscriber = qosProvider.subscriber_qos();
dds::sub::qos::DataReaderQos qos = qosProvider.datareader_qos();
dds::sub::Subscriber subscriber(domainParticipant, qosSubscriber);
dds::sub::DataReader<ExpTopic::OutData> dataReader(subscriber, topicData, qos);
DataListener listener;
dataReader.listener(&listener, dds::core::status::StatusMask::data_available());
while (true) {}
return 0;
}
Used QoS:
<dds>
<qos_library name="DefLib">
<qos_profile name="DefProfile" base_name="BuiltinQosLib::Generic.StrictReliable" is_default_qos="true">
<base_name>
<element>BuiltinQosSnippetLib::QosPolicy.History.KeepAll</element>
<element>BuiltinQosSnippetLib::QosPolicy.Durability.TransientLocal</element>
</base_name>
</qos_profile>
</qos_library>
</dds>
However, this fails during runtime with
cycle 0
[0x01015891,0xAC7973A0,0x1E6741A3:0x80000004{E=DR,I=21}|RECEIVE FROM 0x0101C567,0xF3977BEA,0x8F8E8B7A:0x80000003] data_available_forward:!write
Is this due to some Exclusive Area violation? If yes, is there a straightforward way to fix this?
Thanks a lot!
No, there's no EA issue. The error messages for EA issues will tell you that's an EA issue.
I think your problem is here:
virtual void on_data_available(dds::sub::DataReader<ExpTopic::TriggerData>& dataReader)
{
dataReader.take();
while (true)
writeData();
}
you are writing data in an infinite loop in a listener callback. The Listener is being called by an RTI receive thread. You should not be blocking, taking extensive time to do process or fundamentally causing significant delay in your listener code. That will prevent the RTI receive thread from doing its own work.