RTI Connext RTSJ Extension Kit  Version 5.1.0
NodeStatusSubscriber.java

Node Status Subscription Example

This example shows how to read data from a real-time thread.

The corresponding publication code example can be found in the Node Status Publication Example.

Source Code

/* NodeStatusSubscriber.java
A subscription of data of type NodeStatus.
This file is derived from code automatically generated by the rtiddsgen
command:
rtiddsgen -language java -example <arch> NodeStatus.idl
Example subscription of type NodeStatus automatically generated by
'rtiddsgen' To test them follow these steps:
(1) Compile this file and the example subscription.
(2) Start the subscription on the same domain used for RTI Data Distribution
Service with the command
java NodeStatusSubscriber <domain_id> <sample_count>
(3) Start the publication on the same domain used for RTI Data Distribution
Service with the command
java NodeStatusPublisher <domain_id> <sample_count>
(4) [Optional] Specify the list of discovery initial peers and
multicast receive addresses via an environment variable or a file
(in the current working directory) called NDDS_DISCOVERY_PEERS.
You can run any number of publishers and subscribers programs, and can
add and remove them dynamically from the domain.
Example:
To run the example application on domain <domain_id>:
Ensure that $(NDDSHOME)/lib/<arch> is on the dynamic library path for
Java.
On Unix:
add $(NDDSHOME)/lib/<arch> to the 'LD_LIBRARY_PATH' environment
variable
On Windows:
add $(NDDSHOME)\lib<arch> to the 'Path' environment variable
Run the Java applications:
java -Djava.ext.dirs=$NDDSHOME/class NodeStatusPublisher <domain_id>
java -Djava.ext.dirs=$NDDSHOME/class NodeStatusSubscriber <domain_id>
*/
package com.rti.ndds.rtsj.example.nodestatus;
import javax.realtime.Clock;
import javax.realtime.ImmortalMemory;
import javax.realtime.MemoryArea;
import javax.realtime.MemoryParameters;
import javax.realtime.PeriodicParameters;
import javax.realtime.PriorityParameters;
import javax.realtime.ProcessingGroupParameters;
import javax.realtime.RealtimeThread;
import javax.realtime.RelativeTime;
import javax.realtime.ReleaseParameters;
import com.rti.dds.domain.DomainParticipant;
import com.rti.dds.domain.DomainParticipantFactory;
import com.rti.dds.infrastructure.RETCODE_NO_DATA;
import com.rti.dds.infrastructure.ResourceLimitsQosPolicy;
import com.rti.dds.infrastructure.StatusKind;
import com.rti.dds.subscription.DataReader;
import com.rti.dds.subscription.DataReaderAdapter;
import com.rti.dds.subscription.DataReaderListener;
import com.rti.dds.subscription.InstanceStateKind;
import com.rti.dds.subscription.SampleInfo;
import com.rti.dds.subscription.SampleInfoSeq;
import com.rti.dds.subscription.SampleStateKind;
import com.rti.dds.subscription.Subscriber;
import com.rti.dds.subscription.ViewStateKind;
import com.rti.dds.topic.Topic;
import com.rti.ndds.rtsj.RtsjProperty_t;
import com.rti.ndds.rtsj.RtsjThreadSupport;
import com.rti.ndds.rtsj.ThreadKind;
// ===========================================================================
public class NodeStatusSubscriber {
// -----------------------------------------------------------------------
// Public Methods
// -----------------------------------------------------------------------
public static void main(String[] args) {
// --- Get domain ID --- //
int domainId = 0;
if (args.length >= 1) {
domainId = Integer.valueOf(args[0]).intValue();
}
// -- Get max loop count; 0 means infinite loop --- //
int sampleCount = 0;
if (args.length >= 2) {
sampleCount = Integer.valueOf(args[1]).intValue();
}
/* Uncomment this to turn on additional logging
Logger.get_instance().set_verbosity_by_category(
LogCategory.NDDS_CONFIG_LOG_CATEGORY_API,
LogVerbosity.NDDS_CONFIG_LOG_VERBOSITY_STATUS_ALL);
*/
// --- Run --- //
final int domainID = domainId;
final int samples = sampleCount;
MemoryArea memArea = ImmortalMemory.instance();
MemoryParameters memoryParameters =
new MemoryParameters(2 * 1048576, 4 * 1048576);
Thread thread = new RealtimeThread(
null, null, memoryParameters, memArea, null,
new Runnable() {
public void run() {
subscriberMain(domainID, samples);
}
}
);
thread.start();
}
// -----------------------------------------------------------------------
// Private Methods
// -----------------------------------------------------------------------
// --- Constructors: -----------------------------------------------------
private NodeStatusSubscriber() {
super();
}
private static void registerThreadFactories() {
// // Setup the appropriate thread atributes
PriorityParameters priorityParameters = new PriorityParameters(16);
RelativeTime start = new RelativeTime(0, 0);
RelativeTime period = new RelativeTime(0, 1000);
RelativeTime cost = new RelativeTime(0, 1000);
RelativeTime deadline = new RelativeTime(0, 1000);
ReleaseParameters releaseParameters = new PeriodicParameters(
start, period, cost, deadline, null, null);
MemoryParameters memoryParameters = new MemoryParameters(
1048576, 1048576);
ProcessingGroupParameters procGrpParameters =
new ProcessingGroupParameters(
start, period, cost, deadline, null, null);
RtsjProperty_t thread_properties = new RtsjProperty_t();
thread_properties.event_thread.processing_group_params =
procGrpParameters;
thread_properties.event_thread.scheduling_params =
priorityParameters;
thread_properties.event_thread.memory_params =
memoryParameters;
thread_properties.event_thread.release_params =
releaseParameters;
thread_properties.database_thread.processing_group_params =
procGrpParameters;
thread_properties.database_thread.scheduling_params =
priorityParameters;
thread_properties.database_thread.memory_params =
memoryParameters;
thread_properties.database_thread.release_params =
releaseParameters;
thread_properties.receive_thread.processing_group_params =
procGrpParameters;
thread_properties.receive_thread.scheduling_params =
priorityParameters;
thread_properties.receive_thread.memory_params =
memoryParameters;
thread_properties.receive_thread.release_params = releaseParameters;
// set the event thread properties
thread_properties.event_thread.memory_area =
ImmortalMemory.instance();
thread_properties.event_thread.thread_kind =
ThreadKind.NO_HEAP_REALTIME_THREAD_KIND;
// set the receive thread properties
thread_properties.receive_thread.memory_area =
ImmortalMemory.instance();
thread_properties.receive_thread.thread_kind =
ThreadKind.NO_HEAP_REALTIME_THREAD_KIND;
// set the database thread properties
thread_properties.database_thread.memory_area =
ImmortalMemory.instance();
thread_properties.database_thread.thread_kind =
ThreadKind.NO_HEAP_REALTIME_THREAD_KIND;
// set the database thread properties
thread_properties.asynchronous_publisher_thread.memory_area =
ImmortalMemory.instance();
thread_properties.asynchronous_publisher_thread.thread_kind =
ThreadKind.NO_HEAP_REALTIME_THREAD_KIND;
// Register the thread properties in a given domain
// participant
// factory.
RtsjThreadSupport.register(
DomainParticipantFactory.TheParticipantFactory,
thread_properties);
}
// -----------------------------------------------------------------------
private static void subscriberMain(int domainId, int sampleCount) {
DomainParticipant participant = null;
try {
//Get the real time clock and its resolution
Clock clock = Clock.getRealtimeClock();
RelativeTime resolution = clock.getResolution();
System.out.println(
"resolution = " + resolution.getMilliseconds() +
"ms, " + resolution.getNanoseconds() + "ns");
//Create the required thread parameters and
//Register the thread factories with the 'participant factory'
registerThreadFactories();
// --- Create participant --- //
/* To create participant with default QoS,
use DomainParticipantFactory.DomainParticipantFactory.
participant.get_default_publisher_qos() instead */
participant = DomainParticipantFactory.TheParticipantFactory.
create_participant(
domainId, DomainParticipantFactory.PARTICIPANT_QOS_DEFAULT,
null /* listener */, StatusKind.STATUS_MASK_NONE);
// --- Create subscriber --- //
/* To customize subscriber QoS, use
participant.get_default_subscriber_qos() instead */
Subscriber subscriber = participant.create_subscriber(
DomainParticipant.SUBSCRIBER_QOS_DEFAULT, null /* listener */,
StatusKind.STATUS_MASK_NONE);
// --- Create topic --- //
/* Register type before creating topic */
String typeName = NodeStatusTypeSupport.get_type_name();
NodeStatusTypeSupport.register_type(participant, typeName);
/* To customize topic QoS, use
participant.get_default_topic_qos() instead */
Topic topic = participant.create_topic(
"Example NodeStatus",
typeName, DomainParticipant.TOPIC_QOS_DEFAULT,
null /* listener */, StatusKind.STATUS_MASK_NONE);
// --- Create reader --- //
// Specify a reasonable value
DataReaderListener listener = new NodeStatusListener();
/* To customize data reader QoS, use
subscriber.get_default_datareader_qos() instead */
NodeStatusDataReader reader = (NodeStatusDataReader)
subscriber.create_datareader(
topic, Subscriber.DATAREADER_QOS_DEFAULT, listener,
StatusKind.STATUS_MASK_ALL);
// --- Wait for data --- //
final long receivePeriodSec = 4;
System.out.println("waiting for data");
for (int count = 0;
(sampleCount == 0) || (count < sampleCount);
++count) {
try {
Thread.sleep(receivePeriodSec * 1000); // in millisec
} catch (InterruptedException ix) {
System.err.println("INTERRUPTED");
break;
}
}
} finally {
// --- Shutdown --- //
if(participant != null) {
participant.delete_contained_entities();
DomainParticipantFactory.TheParticipantFactory.
delete_participant(participant);
}
/* RTI Data Distribution Service provides finalize_instance()
method for people who want to release memory used by the
participant factory singleton. Uncomment the following block of
code for clean destruction of the participant factory
singleton. */
//DomainParticipantFactory.finalize_instance();
}
}
// -----------------------------------------------------------------------
// Private Types
// -----------------------------------------------------------------------
// =======================================================================
private static class NodeStatusListener extends DataReaderAdapter {
private NodeStatusSeq dataSeq = new NodeStatusSeq();
private SampleInfoSeq infoSeq = new SampleInfoSeq();
private int count = 0;
public void on_data_available(DataReader reader) {
NodeStatusDataReader NodeStatusReader =
(NodeStatusDataReader)reader;
try {
NodeStatusReader.take(
dataSeq, infoSeq,
ResourceLimitsQosPolicy.LENGTH_UNLIMITED,
SampleStateKind.ANY_SAMPLE_STATE,
ViewStateKind.ANY_VIEW_STATE,
InstanceStateKind.ANY_INSTANCE_STATE);
System.out.println("take called");
for (int i = 0; i < dataSeq.size(); ++i) {
SampleInfo info = (SampleInfo) infoSeq.get(i);
if (info.valid_data) {
count++; //May want to print the value of count, but will allocate memory
if (count % 1024 == 0)
System.out.println("rec'd 1024 more samples");
}
}
} catch (RETCODE_NO_DATA noData) {
// nothing to do
} finally {
NodeStatusReader.return_loan(dataSeq, infoSeq);
}
}
}
}

RTI Connext RTSJ Extension Kit Version 5.1.0 Copyright © Tue Feb 4 2014 Real-Time Innovations, Inc