RTI Connext C# API  6.1.2
Subscription examples

How to subscribe to a topic and work with Subscribers and DataReaders.

How to subscribe to a topic and work with Subscribers and DataReaders.

Sections:

See also Entity examples for examples that affect all DDS Entities, including Subscribers and DataReaders.

Creating and configuring a DataReader

Example: create a DataReader using default QoS

// Create a DomainParticipant and a Topic
using Rti.Dds.Domain.DomainParticipant participant =
factory.CreateParticipant(domainId: 0);
Rti.Dds.Topics.Topic<Shape> topic =
participant.CreateTopic<Shape>("Square");
// In many cases there is no need to explicitly create a Subscriber;
// the ImplicitSubscriber can be used to create the DataReader.
Rti.Dds.Subscription.DataReader<Shape> reader =
participant.ImplicitSubscriber.CreateDataReader(topic);
Singleton that manages the creation of DomainParticipant objects.
Definition: DomainParticipantFactory.cs:25
static DomainParticipantFactory? Instance
Gets the singleton instance of this class.
Definition: DomainParticipantFactory.cs:39
Container for all other Entity objects.
Definition: DomainParticipant.cs:39
Allows the application to: (1) declare the data it wishes to receive (i.e. make a subscription) and (...
Definition: DataReader.cs:27
The most basic description of the data to be published and subscribed.
Definition: Topic.cs:33
Contains DomainParticipant and related classes.
Definition: DomainParticipant.cs:33
Contains the classes to support subscribing to topics.
Definition: Namespaces.cs:81
Contains classes for defining topics.
Definition: Namespaces.cs:67
Contains the RTI Connext DDS C# API.
Definition: AsyncWaitSetProperty.cs:18
Contains the RTI Connext C# API.
Definition: Logger.cs:20

The IDL definition of the Shape type used in the example is the following:

module Example {
struct Shape {
@key string<128> color;
int32 x;
int32 y;
int32 shapesize;
};
};
Definition: MyType.cs:17

Example: configure the DataReader QoS

// Create a DomainParticipant and a Topic
using DomainParticipant participant = factory.CreateParticipant(domainId: 0);
Topic<Shape> topic = participant.CreateTopic<Shape>("Square");
// Configure the QoS starting from the default and changing the
// Reliability and TimeBasedFilter QoS policies. We also set a
// subscription name, which helps debugging the system.
DataReaderQos readerQos = participant.DefaultDataReaderQos
.WithSubscriptionName(policy => policy.Name = "MySquareReader")
.WithReliability(policy => policy.Kind = ReliabilityKind.Reliable)
.WithTimeBasedFilter(
policy => policy.MinimumSeparation = Duration.FromMilliseconds(100));
DataReader<Shape> reader =
participant.ImplicitSubscriber.CreateDataReader(topic, readerQos);
DomainParticipant CreateParticipant(int domainId)
Creates a new DomainParticipant with default QoS.
Topics.IAnyTopic CreateTopic(string name, Type topicType)
Creates a Topic<T> specifying its type reflectively
Definition: DomainParticipant.cs:284
ReliabilityKind
Kinds of reliability
Definition: Reliability.cs:172

See also Configuring a Publisher; a Subscriber can be configured similarly.

Reading data

There are several ways to wait for data:

Example: Handling the DataAvailable event

// The DataAvailabe event triggers when the DataReader receives new data;
// the handler is executed by the same internal thread that receives
// the data. To avoid blocking that thread, use the DataAvailable
// handler only when the data processing is simple and fast.
reader.DataAvailable += _ =>
{
// LoanedSamples uses internal resources that need to be disposed.
// The data in the collection cannot be used after it has been disposed.
using (LoanedSamples<Shape> samples = reader.Take())
{
foreach (var sample in samples)
{
// sample.Info contains meta-data
if (sample.Info.ValidData)
{
// sample.Data contains the actual data
Shape shape = sample.Data;
Console.WriteLine(
$"Received {shape.color} square ({shape.x}, {shape.y})");
}
}
}
};

Example: Using a ReadCondition and a WaitSet to read data

// Create a ReadCondition that will activate when data with any
// DataState is available.
reader.CreateReadCondition(DataState.Any);
// Handle the event when the condition triggers
condition.Triggered += _ =>
{
// When the data is taken, the condition's trigger value is reset
using LoanedSamples<Shape> samples = reader.Take();
foreach (var sample in samples)
{
if (sample.Info.ValidData)
{
Console.WriteLine(sample.Data);
}
}
};
// Create a WaitSet to wait on this condition (and possibly others)
var waitset = new Rti.Dds.Core.WaitSet();
waitset.AttachCondition(condition);
// ... you can add more conditions from other entities to a waitset
while (running)
{
// waitset.Dispatch executes (in the current thread) the conditions'
// handler for the Triggered event or returns after 4 seconds if no
// condition has been triggered.
waitset.Dispatch(Duration.FromSeconds(4));
}
TriggeredEventHandler Triggered
Event triggered in the context of WaitSet.Dispatch() when TriggerValue becomes true.
Definition: Condition.cs:39
Allows an application to wait until one or more of the attached Condition objects have a trigger valu...
Definition: WaitSet.cs:23
void AttachCondition(Condition condition)
Attaches a Condition to the WaitSet.
Conditions specifically dedicated to read operations and attached to one DataReader<T>.
Definition: ReadCondition.cs:32
Contains infrastructure types.
Definition: AsyncWaitSetProperty.cs:18

Example: Store the data for future access

var myRedShapes = new List<Shape>(); // we will store some data we receive
// ...
using (LoanedSamples<Shape> samples = reader.Take())
{
foreach (var sample in samples)
{
if (sample.Info.ValidData)
{
Shape shape = sample.Data;
if (shape.color == "RED")
{
// DO NOT store the loaned object directly:
// myRedShapes.Add(shape); // WRONG!
// You can copy the full sample to access it later
myRedShapes.Add(new Shape(shape)); // CORRECT
}
}
}
}
// ...
foreach (Shape redShape in myRedShapes) // access the data we copied
{
// ...
}

Example: Store the data for future access using LINQ

var myRedShapes = new List<Shape>();
// ...
using (var samples = reader.Take())
{
// The same can be achieved more succinctly using ValidData() and
// System.Linq
myRedShapes.AddRange(samples
.ValidData() // ignore invalid data and the SampleInfo
.Where(shape => shape.color == "RED")
.Select(shape => new Shape(shape)));
}
// ...

The Group Coherent Presentation example from the rticonnextdds-examples GitHub repository shows additional ways to read and wait for data such as:

Selecting data

The Read() and Take() operations access all the data that has been received. The Select() operation allows specifying a series of conditions on what to read.

Example: selecting data by state and instance

// LookupInstance returns the handle that identifies an instance. In this
// case we're getting the handle for an instance identified by Shape's
// single key field, "color"
InstanceHandle greenInstance = reader.LookupInstance(
keyHolder: new Shape { color = "GREEN" });
using LoanedSamples<Shape> samples = reader.Select()
// Read samples for the green instance only
.WithInstance(greenInstance)
// And read only unread samples of an alive instance
.WithState(DataState.With(SampleState.NotRead, ViewState.Any, InstanceState.Alive))
.Read(); // Read(), unlike Take(), doesn't remove the data
// You can use samples.ValidData() to iterate over the data, ignoring
// the SampleInfo.
foreach (Shape sample in samples.ValidData())
{
Console.WriteLine(sample);
}
ViewState
Indicates whether or not an instance is new.
Definition: DataState.cs:49
SampleState
Indicates whether or not a sample has ever been read.
Definition: DataState.cs:20
InstanceState
Indicates if the samples are from a live DataWriter or not.
Definition: DataState.cs:79

Example: selecting data by content using a QueryCondition

// Create a QueryCondition that will activate when shapes within
// certain coordinates are available
Rti.Dds.Subscription.QueryCondition condition = reader.CreateQueryCondition(
"x < 100 and y < 100",
DataState.Any);
// Handle the event when the condition triggers
condition.Triggered += _ =>
{
// Take only the data selected by the QueryCondition
using var samples = reader.Select()
.WithCondition(condition)
.Take();
foreach (Shape shape in samples.ValidData())
{
Console.WriteLine(shape);
}
};
// Create a WaitSet to wait on the QueryCondition
var waitset = new WaitSet();
waitset.AttachCondition(condition);
while (running)
{
// Dispatch executes the conditions' Triggered events in the
// current thread or return after 4 seconds if no condition has
// been triggered.
waitset.Dispatch(Duration.FromSeconds(4));
}
A specialized ReadCondition that allows specifying a filter on the locally available data.
Definition: QueryCondition.cs:22

Reading data asynchronously

Note
This section requires the package Rti.ConnextDds.Extra and to explicitly add using Rti.Dds.Subscription to have access to the TakeAsync extension method. The examples also use the package System.Linq.Async.

Example: asynchronously print all data received by a reader until cancelled

public static async Task PrintData(
DataReader<Shape> reader,
System.Threading.CancellationToken cancellationToken)
{
await foreach (LoanedSample<Shape> sample in reader.TakeAsync()
.WithCancellation(cancellationToken))
{
if (sample.Info.ValidData)
{
Console.WriteLine(sample.Data);
}
else
{
Console.WriteLine($"Instance state update: {sample.Info.State.Instance}");
}
// Important: the sample contents are only valid in the current
// iteration. As soon as the iterator advances or is disposed, the
// sample cannot be used (unless the data is copied).
}
// Or if you don't need the SampleInfo:
await foreach (Shape shape in reader.TakeAsync()
.ValidData()
.WithCancellation(cancellationToken))
{
Console.WriteLine(shape);
// Like before, the current shape is valid until the iterator advances.
}
}

Example: asynchronously copy a number of data samples into a list, waiting as needed until they are received

public static async ValueTask<List<Shape>> TakeN(DataReader<Shape> reader, int n)
{
return await reader.TakeAsync()
.ValidData() // ignore invalid data and the SampleInfo
// The following extension methods require the the System.Linq.Async package
.Select(data => new Shape(data)) // copy the data (since sample is loaned)
.Take(n) // advance the iterator n times
.ToListAsync(); // add the result to a list
}

TakeAsync and DataReader.Select() can be combined.

Example: asynchronously process an instance until it is disposed

public static async Task ProcessInstance(
DataReader<Shape> reader,
System.Threading.CancellationToken cancellationToken)
{
InstanceHandle greenShapeInstance = reader.LookupInstance(
new Shape { color = "GREEN" });
await foreach (LoanedSample<Shape> sample in reader.Select()
.WithInstance(greenShapeInstance) // only take samples for this instance
.TakeAsync()
.TakeWhile( // stop iterating when the instance is no longer alive
sample => sample.Info.State.Instance == InstanceState.Alive)
.WithCancellation(cancellationToken))
{
Console.WriteLine("Green Shape update: " + sample);
}
}
See also
Rti.Dds.Subscription.DataReaderAsyncExtensions

Subscribing to a content-filtered topic

Example: create a content-filtered topic to subscribe to a subset of the published data

// Create a regular Topic
Topic<Shape> topic = participant.CreateTopic<Shape>("Square");
// Create a ContentFilteredTopic for a subset of the data from the
// previous topic
ContentFilteredTopic<Shape> filteredTopic = participant.CreateContentFilteredTopic(
name: "FilteredSquare", // this name is local only
relatedTopic: topic,
filter: "color = 'BLUE'");
// Create a reader that will only receive blue squares
DataReader<Shape> reader =
participant.ImplicitSubscriber.CreateDataReader(filteredTopic);
// ...

The Multichannel example from the rticonnextdds-examples GitHub repository shows an advanced usage of content filtering.

Looking up matched publications

Example: different ways to look up matched publications

// Look up all the publications:
foreach (var publication in reader.GetMatchedPublicationData())
{
string typeName = publication.TypeName;
bool isReliable = publication.Reliability.Kind == ReliabilityKind.Reliable;
// ...
}
// Look up a specific publication by the instance handle that
// identifies it. You can get the publication handle, for example, from
// a data sample:
using var samples = reader.Read();
foreach (var sample in samples)
{
var publication = reader.GetMatchedPublicationData(
sample.Info.PublicationHandle);
Console.WriteLine(
$"Received {sample.Data} from publication {publication.PublicationName.Name}");
}
// You can get all the publication handles as well:
foreach (var publication in reader.MatchedPublications)
{
// ...
}

See also Looking up a matched subscription and Entity Discovery.