import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import com.rti.dds.domain.*;
import com.rti.dds.infrastructure.*;
import com.rti.dds.subscription.*;
import com.rti.dds.topic.*;
import com.rti.ndds.config.*;
public class MessageDDSSubscriber {
private int domainId;
private boolean end;
public MessageDDSSubscriber(int domainId) {
this.domainId = domainId;
}
public boolean run() {
DomainParticipant participant = null;
Subscriber subscriber = null;
Topic topic = null;
DataReaderListener listener = null;
MessageDataReader reader = null;
try {
participant = DomainParticipantFactory.TheParticipantFactory.
create_participant(
domainId, DomainParticipantFactory.PARTICIPANT_QOS_DEFAULT,
null , StatusKind.STATUS_MASK_NONE);
if (participant == null) {
throw new Exception("Error creating participant");
}
subscriber = participant.create_subscriber(
DomainParticipant.SUBSCRIBER_QOS_DEFAULT, null ,
StatusKind.STATUS_MASK_NONE);
if (subscriber == null) {
throw new Exception("Error creating subscriber");
}
String typeName = MessageTypeSupport.get_type_name();
MessageTypeSupport.register_type(participant, typeName);
topic = participant.create_topic(
"Message",
typeName, DomainParticipant.TOPIC_QOS_DEFAULT,
null , StatusKind.STATUS_MASK_NONE);
listener = new MessageListener();
reader = (MessageDataReader)
subscriber.create_datareader(
topic, Subscriber.DATAREADER_QOS_DEFAULT, listener,
StatusKind.STATUS_MASK_ALL);
System.out.println("DDS Subscriber is now ready to receive requests...");
while (!((MessageListener)listener).end) {
Thread.sleep(1000);
}
return true;
} catch (Exception e) {
return false;
} finally {
if(participant != null) {
participant.delete_contained_entities();
DomainParticipantFactory.TheParticipantFactory.
delete_participant(participant);
}
}
}
private static class MessageListener extends DataReaderAdapter {
public boolean end;
MessageSeq _dataSeq = new MessageSeq();
SampleInfoSeq _infoSeq = new SampleInfoSeq();
Message msg;
public MessageListener() {
end = false;
}
public void on_data_available(DataReader reader) {
MessageDataReader MessageReader =
(MessageDataReader)reader;
try {
MessageReader.take(
_dataSeq, _infoSeq,
ResourceLimitsQosPolicy.LENGTH_UNLIMITED,
SampleStateKind.ANY_SAMPLE_STATE,
ViewStateKind.ANY_VIEW_STATE,
InstanceStateKind.ANY_INSTANCE_STATE);
for(int i = 0; i < _dataSeq.size(); ++i) {
SampleInfo info = (SampleInfo)_infoSeq.get(i);
if (info.valid_data) {
msg = (Message)_dataSeq.get(i);
if (msg.msg.equals("quit")) {
end = true;
} else if (!end) {
System.out.println(
"[" + msg.time.month + "/" +
msg.time.day + "/" +
msg.time.year + " " +
msg.time.hour + ":" +
msg.time.minute + ":" +
msg.time.second +
"] " + msg.msg);
}
}
}
} catch (RETCODE_NO_DATA noData) {
} finally {
MessageReader.return_loan(_dataSeq, _infoSeq);
}
}
}
}