How to subscribe to a topic and work with Subscribers and DataReaders.
How to subscribe to a topic and work with Subscribers and DataReaders.
Sections:
See also Entity examples for examples that affect all DDS Entities, including Subscribers and DataReaders.
Creating and configuring a DataReader
Example: create a DataReader using default QoS
factory.CreateParticipant(domainId: 0);
participant.CreateTopic<Shape>("Square");
participant.ImplicitSubscriber.CreateDataReader(topic);
Singleton that manages the creation of DomainParticipant objects.
Definition: DomainParticipantFactory.cs:25
static DomainParticipantFactory Instance
Gets the singleton instance of this class.
Definition: DomainParticipantFactory.cs:39
Container for all other Entity objects.
Definition: DomainParticipant.cs:39
Allows the application to: (1) declare the data it wishes to receive (i.e. make a subscription) and (...
Definition: DataReader.cs:27
The most basic description of the data to be published and subscribed.
Definition: Topic.cs:33
Contains DomainParticipant and related classes.
Definition: DomainParticipant.cs:33
Contains the classes to support subscribing to topics.
Definition: Namespaces.cs:81
Contains classes for defining topics.
Definition: Namespaces.cs:67
Contains the RTI Connext DDS C# API.
Definition: AsyncWaitSetProperty.cs:18
Contains the RTI Connext C# API.
Definition: Logger.cs:20
The IDL definition of the Shape
type used in the example is the following:
struct Shape {
@key string<128> color;
int32 x;
int32 y;
int32 shapesize;
};
};
Example: configure the DataReader QoS
Topic<Shape> topic = participant.
CreateTopic<Shape>(
"Square");
DataReaderQos readerQos = participant.DefaultDataReaderQos
.WithSubscriptionName(policy => policy.Name = "MySquareReader")
.WithTimeBasedFilter(
policy => policy.MinimumSeparation = Duration.FromMilliseconds(100));
DataReader<Shape> reader =
participant.ImplicitSubscriber.CreateDataReader(topic, readerQos);
DomainParticipant CreateParticipant(int domainId)
Creates a new DomainParticipant with default QoS.
Topics.IAnyTopic CreateTopic(string name, Type topicType)
Creates a Topic<T> specifying its type reflectively
Definition: DomainParticipant.cs:284
ReliabilityKind
Kinds of reliability
Definition: Reliability.cs:190
See also Configuring a Publisher; a Subscriber can be configured similarly.
Reading data
There are several ways to wait for data:
Example: Handling the DataAvailable event
reader.DataAvailable += _ =>
{
using (LoanedSamples<Shape> samples = reader.Take())
{
foreach (var sample in samples)
{
if (sample.Info.ValidData)
{
Shape shape = sample.Data;
Console.WriteLine(
$"Received {shape.color} square ({shape.x}, {shape.y})");
}
}
}
};
Example: Using a ReadCondition and a WaitSet to read data
reader.CreateReadCondition(DataState.Any);
{
using LoanedSamples<Shape> samples = reader.Take();
foreach (var sample in samples)
{
if (sample.Info.ValidData)
{
Console.WriteLine(sample.Data);
}
}
};
while (running)
{
waitset.Dispatch(Duration.FromSeconds(4));
}
TriggeredEventHandler Triggered
Event triggered in the context of WaitSet.Dispatch() when TriggerValue becomes true.
Definition: Condition.cs:39
Allows an application to wait until one or more of the attached Condition objects have a trigger valu...
Definition: WaitSet.cs:23
void AttachCondition(Condition condition)
Attaches a Condition to the WaitSet.
Conditions specifically dedicated to read operations and attached to one DataReader<T>.
Definition: ReadCondition.cs:32
Contains infrastructure types.
Definition: AsyncWaitSetProperty.cs:18
Example: Store the data for future access
var myRedShapes = new List<Shape>();
using (LoanedSamples<Shape> samples = reader.Take())
{
foreach (var sample in samples)
{
if (sample.Info.ValidData)
{
Shape shape = sample.Data;
if (shape.color == "RED")
{
myRedShapes.Add(new Shape(shape));
}
}
}
}
foreach (Shape redShape in myRedShapes)
{
}
Example: Store the data for future access using LINQ
var myRedShapes = new List<Shape>();
using (var samples = reader.Take())
{
myRedShapes.AddRange(samples
.ValidData()
.Where(shape => shape.color == "RED")
.Select(shape => new Shape(shape)));
}
The Group Coherent Presentation example from the rticonnextdds-examples GitHub repository shows additional ways to read and wait for data such as:
Selecting data
The Read()
and Take()
operations access all the data that has been received. The Select()
operation allows specifying a series of conditions on what to read.
Example: selecting data by state and instance
InstanceHandle greenInstance = reader.LookupInstance(
keyHolder: new Shape { color = "GREEN" });
using LoanedSamples<Shape> samples = reader.Select()
.WithInstance(greenInstance)
.Read();
foreach (Shape sample in samples.ValidData())
{
Console.WriteLine(sample);
}
ViewState
Indicates whether or not an instance is new.
Definition: DataState.cs:49
InstanceState
Indicates if the samples are from a live DataWriter or not.
Definition: DataState.cs:79
SampleState
Indicates whether or not a sample has ever been read.
Definition: DataState.cs:20
Example: selecting data by content using a QueryCondition
"x < 100 and y < 100",
DataState.Any);
{
using var samples = reader.Select()
.WithCondition(condition)
.Take();
foreach (Shape shape in samples.ValidData())
{
Console.WriteLine(shape);
}
};
var waitset = new WaitSet();
waitset.AttachCondition(condition);
while (running)
{
waitset.Dispatch(Duration.FromSeconds(4));
}
A specialized ReadCondition that allows specifying a filter on the locally available data.
Definition: QueryCondition.cs:22
Reading data asynchronously
- Note
- This section requires the package
Rti.ConnextDds.Extra
and to explicitly add using Rti.Dds.Subscription
to have access to the TakeAsync
extension method. The examples also use the package System.Linq.Async
.
Example: asynchronously print all data received by a reader until cancelled
public static async Task PrintData(
DataReader<Shape> reader,
System.Threading.CancellationToken cancellationToken)
{
await foreach (LoanedSample<Shape> sample in reader.TakeAsync()
.WithCancellation(cancellationToken))
{
if (sample.Info.ValidData)
{
Console.WriteLine(sample.Data);
}
else
{
Console.WriteLine($"Instance state update: {sample.Info.State.Instance}");
}
}
await foreach (Shape shape in reader.TakeAsync()
.ValidData()
.WithCancellation(cancellationToken))
{
Console.WriteLine(shape);
}
}
Example: asynchronously copy a number of data samples into a list, waiting as needed until they are received
public static async ValueTask<List<Shape>> TakeN(DataReader<Shape> reader, int n)
{
return await reader.TakeAsync()
.ValidData()
.Select(data => new Shape(data))
.Take(n)
.ToListAsync();
}
TakeAsync
and DataReader.Select()
can be combined.
Example: asynchronously process an instance until it is disposed
public static async Task ProcessInstance(
DataReader<Shape> reader,
System.Threading.CancellationToken cancellationToken)
{
InstanceHandle greenShapeInstance = reader.LookupInstance(
new Shape { color = "GREEN" });
await foreach (LoanedSample<Shape> sample in reader.Select()
.WithInstance(greenShapeInstance)
.TakeAsync()
.TakeWhile(
.WithCancellation(cancellationToken))
{
Console.WriteLine("Green Shape update: " + sample);
}
}
- See also
- Rti.Dds.Subscription.DataReaderAsyncExtensions
Subscribing to a content-filtered topic
Example: create a content-filtered topic to subscribe to a subset of the published data
Topic<Shape> topic = participant.CreateTopic<Shape>("Square");
ContentFilteredTopic<Shape> filteredTopic = participant.CreateContentFilteredTopic(
name: "FilteredSquare",
relatedTopic: topic,
filter: "color = 'BLUE'");
DataReader<Shape> reader =
participant.ImplicitSubscriber.CreateDataReader(filteredTopic);
The Multichannel example from the rticonnextdds-examples GitHub repository shows an advanced usage of content filtering.
Looking up matched publications
Example: different ways to look up matched publications
foreach (var publication in reader.GetMatchedPublicationData())
{
string typeName = publication.TypeName;
bool isReliable = publication.Reliability.Kind ==
ReliabilityKind.Reliable;
}
using var samples = reader.Read();
foreach (var sample in samples)
{
var publication = reader.GetMatchedPublicationData(
sample.Info.PublicationHandle);
Console.WriteLine(
$"Received {sample.Data} from publication {publication.PublicationName.Name}");
}
foreach (var publication in reader.MatchedPublications)
{
}
See also Looking up a matched subscription and Accessing Information on Entity Discovery.