This example shows how to read data from a real-time thread.
package com.rti.ndds.rtsj.example.nodestatus;
import javax.realtime.Clock;
import javax.realtime.ImmortalMemory;
import javax.realtime.MemoryArea;
import javax.realtime.MemoryParameters;
import javax.realtime.PeriodicParameters;
import javax.realtime.PriorityParameters;
import javax.realtime.ProcessingGroupParameters;
import javax.realtime.RealtimeThread;
import javax.realtime.RelativeTime;
import javax.realtime.ReleaseParameters;
import com.rti.dds.domain.DomainParticipant;
import com.rti.dds.domain.DomainParticipantFactory;
import com.rti.dds.infrastructure.RETCODE_NO_DATA;
import com.rti.dds.infrastructure.ResourceLimitsQosPolicy;
import com.rti.dds.infrastructure.StatusKind;
import com.rti.dds.subscription.DataReader;
import com.rti.dds.subscription.DataReaderAdapter;
import com.rti.dds.subscription.DataReaderListener;
import com.rti.dds.subscription.InstanceStateKind;
import com.rti.dds.subscription.SampleInfo;
import com.rti.dds.subscription.SampleInfoSeq;
import com.rti.dds.subscription.SampleStateKind;
import com.rti.dds.subscription.Subscriber;
import com.rti.dds.subscription.ViewStateKind;
import com.rti.dds.topic.Topic;
import com.rti.ndds.rtsj.RtsjProperty_t;
import com.rti.ndds.rtsj.RtsjThreadSupport;
import com.rti.ndds.rtsj.ThreadKind;
public class NodeStatusSubscriber {
public static void main(String[] args) {
int domainId = 0;
if (args.length >= 1) {
domainId = Integer.valueOf(args[0]).intValue();
}
int sampleCount = 0;
if (args.length >= 2) {
sampleCount = Integer.valueOf(args[1]).intValue();
}
final int domainID = domainId;
final int samples = sampleCount;
MemoryArea memArea = ImmortalMemory.instance();
MemoryParameters memoryParameters =
new MemoryParameters(2 * 1048576, 4 * 1048576);
Thread thread = new RealtimeThread(
null, null, memoryParameters, memArea, null,
new Runnable() {
public void run() {
subscriberMain(domainID, samples);
}
}
);
thread.start();
}
private NodeStatusSubscriber() {
super();
}
private static void registerThreadFactories() {
PriorityParameters priorityParameters = new PriorityParameters(16);
RelativeTime start = new RelativeTime(0, 0);
RelativeTime period = new RelativeTime(0, 1000);
RelativeTime cost = new RelativeTime(0, 1000);
RelativeTime deadline = new RelativeTime(0, 1000);
ReleaseParameters releaseParameters = new PeriodicParameters(
start, period, cost, deadline, null, null);
MemoryParameters memoryParameters = new MemoryParameters(
1048576, 1048576);
ProcessingGroupParameters procGrpParameters =
new ProcessingGroupParameters(
start, period, cost, deadline, null, null);
RtsjProperty_t thread_properties = new RtsjProperty_t();
thread_properties.event_thread.processing_group_params =
procGrpParameters;
thread_properties.event_thread.scheduling_params =
priorityParameters;
thread_properties.event_thread.memory_params =
memoryParameters;
thread_properties.event_thread.release_params =
releaseParameters;
thread_properties.database_thread.processing_group_params =
procGrpParameters;
thread_properties.database_thread.scheduling_params =
priorityParameters;
thread_properties.database_thread.memory_params =
memoryParameters;
thread_properties.database_thread.release_params =
releaseParameters;
thread_properties.receive_thread.processing_group_params =
procGrpParameters;
thread_properties.receive_thread.scheduling_params =
priorityParameters;
thread_properties.receive_thread.memory_params =
memoryParameters;
thread_properties.receive_thread.release_params = releaseParameters;
thread_properties.event_thread.memory_area =
ImmortalMemory.instance();
thread_properties.event_thread.thread_kind =
ThreadKind.NO_HEAP_REALTIME_THREAD_KIND;
thread_properties.receive_thread.memory_area =
ImmortalMemory.instance();
thread_properties.receive_thread.thread_kind =
ThreadKind.NO_HEAP_REALTIME_THREAD_KIND;
thread_properties.database_thread.memory_area =
ImmortalMemory.instance();
thread_properties.database_thread.thread_kind =
ThreadKind.NO_HEAP_REALTIME_THREAD_KIND;
thread_properties.asynchronous_publisher_thread.memory_area =
ImmortalMemory.instance();
thread_properties.asynchronous_publisher_thread.thread_kind =
ThreadKind.NO_HEAP_REALTIME_THREAD_KIND;
RtsjThreadSupport.register(
DomainParticipantFactory.TheParticipantFactory,
thread_properties);
}
private static void subscriberMain(int domainId, int sampleCount) {
DomainParticipant participant = null;
try {
Clock clock = Clock.getRealtimeClock();
RelativeTime resolution = clock.getResolution();
System.out.println(
"resolution = " + resolution.getMilliseconds() +
"ms, " + resolution.getNanoseconds() + "ns");
registerThreadFactories();
participant = DomainParticipantFactory.TheParticipantFactory.
create_participant(
domainId, DomainParticipantFactory.PARTICIPANT_QOS_DEFAULT,
null , StatusKind.STATUS_MASK_NONE);
Subscriber subscriber = participant.create_subscriber(
DomainParticipant.SUBSCRIBER_QOS_DEFAULT, null ,
StatusKind.STATUS_MASK_NONE);
String typeName = NodeStatusTypeSupport.get_type_name();
NodeStatusTypeSupport.register_type(participant, typeName);
Topic topic = participant.create_topic(
"Example NodeStatus",
typeName, DomainParticipant.TOPIC_QOS_DEFAULT,
null , StatusKind.STATUS_MASK_NONE);
DataReaderListener listener = new NodeStatusListener();
NodeStatusDataReader reader = (NodeStatusDataReader)
subscriber.create_datareader(
topic, Subscriber.DATAREADER_QOS_DEFAULT, listener,
StatusKind.STATUS_MASK_ALL);
final long receivePeriodSec = 4;
System.out.println("waiting for data");
for (int count = 0;
(sampleCount == 0) || (count < sampleCount);
++count) {
try {
Thread.sleep(receivePeriodSec * 1000);
} catch (InterruptedException ix) {
System.err.println("INTERRUPTED");
break;
}
}
} finally {
if(participant != null) {
participant.delete_contained_entities();
DomainParticipantFactory.TheParticipantFactory.
delete_participant(participant);
}
}
}
private static class NodeStatusListener extends DataReaderAdapter {
private NodeStatusSeq dataSeq = new NodeStatusSeq();
private SampleInfoSeq infoSeq = new SampleInfoSeq();
private int count = 0;
public void on_data_available(DataReader reader) {
NodeStatusDataReader NodeStatusReader =
(NodeStatusDataReader)reader;
try {
NodeStatusReader.take(
dataSeq, infoSeq,
ResourceLimitsQosPolicy.LENGTH_UNLIMITED,
SampleStateKind.ANY_SAMPLE_STATE,
ViewStateKind.ANY_VIEW_STATE,
InstanceStateKind.ANY_INSTANCE_STATE);
System.out.println("take called");
for (int i = 0; i < dataSeq.size(); ++i) {
SampleInfo info = (SampleInfo) infoSeq.get(i);
if (info.valid_data) {
count++;
if (count % 1024 == 0)
System.out.println("rec'd 1024 more samples");
}
}
} catch (RETCODE_NO_DATA noData) {
} finally {
NodeStatusReader.return_loan(dataSeq, infoSeq);
}
}
}
}