rti.rpc

rti.rpc is the package containing the RTI Connext Request-Reply and Remote Procedure APIs.

See Request-Reply and Remote Procedure Calls for an overview of the API.

class rti.rpc.Requester(request_type: Union[type, rti.connextdds.DynamicType], reply_type: Union[type, rti.connextdds.DynamicType], participant: rti.connextdds.DomainParticipant, service_name: Optional[str] = None, request_topic: Optional[Union[rti.connextdds.Topic, rti.connextdds.DynamicData.Topic, str, object]] = None, reply_topic: Optional[Union[rti.connextdds.Topic, rti.connextdds.DynamicData.Topic, str, object]] = None, datawriter_qos: Optional[rti.connextdds.DataWriterQos] = None, datareader_qos: Optional[rti.connextdds.DataReaderQos] = None, publisher: Optional[rti.connextdds.Publisher] = None, subscriber: Optional[rti.connextdds.Subscriber] = None, on_reply_available: Optional[Callable[[object], object]] = None)

A Requester allows sending requests and receiving replies

Parameters
  • request_type – The type of the request data. It can be an @idl.struct, an @idl.union, or a dds.DynamicType. (See Data Types.)

  • reply_type – The type of the reply data.

  • participant – The DomainParticipant that will hold the request writer and reply reader.

  • service_name – Name that will be used to derive the topic name, defaults to None (rely only on custom topics).

  • request_topic – Topic object or name that will be used for the request data, must be set if service_name is None, otherwise overrides service_name, defaults to None (use service_name).

  • reply_topic – Topic object or name that will be used for the reply data, must be set if service_name is None, otherwise overrides service_name, defaults to None (use service_name).

  • datawriter_qos – QoS object to use for request writer, defaults to None (use default RequestReply QoS).

  • datareader_qos – QoS object to use for reply reader, defaults to None (use default RequestReply QoS).

  • publisher – Publisher used to hold request writer, defaults to None (use participant builtin publisher).

  • subscriber – Subscriber used to hold reply reader, defaults to None (use participant builtin subscriber).

  • on_reply_available – The callback that handles incoming replies.

__init__(request_type: Union[type, rti.connextdds.DynamicType], reply_type: Union[type, rti.connextdds.DynamicType], participant: rti.connextdds.DomainParticipant, service_name: Optional[str] = None, request_topic: Optional[Union[rti.connextdds.Topic, rti.connextdds.DynamicData.Topic, str, object]] = None, reply_topic: Optional[Union[rti.connextdds.Topic, rti.connextdds.DynamicData.Topic, str, object]] = None, datawriter_qos: Optional[rti.connextdds.DataWriterQos] = None, datareader_qos: Optional[rti.connextdds.DataReaderQos] = None, publisher: Optional[rti.connextdds.Publisher] = None, subscriber: Optional[rti.connextdds.Subscriber] = None, on_reply_available: Optional[Callable[[object], object]] = None) None
__weakref__

list of weak references to the object (if defined)

close() None

Close the resources for this request-reply object.

property closed: bool

Returns true if this request-reply object has been closed.

Getter

Returns the number of matched requesters.

classmethod is_final_reply(reply_info: Union[rti.connextdds.SampleInfo, object]) bool

Check a reply is the last of the sequence.

Parameters

reply_info – The reply info with the flags to check.

Returns

Boolean indicating whether reply is the last for a request.

Check a request if against a reply’s metadata for correlation.

Parameters
  • request_id – The request id used to correlate replies.

  • reply_info – The reply info used for the correlation check.

Returns

Boolean indicating whether the request and reply are correlated.

property matched_replier_count: int

The number of discovered matched repliers.

Getter

Returns the number of matched repliers.

property on_reply_available: Optional[Callable[[object], object]]

The listener callback used to process received replies.

Getter

Returns the callback function.

Setter

Set the callback function.

read_replies(related_request_id: Optional[rti.connextdds.SampleIdentity] = None) Union[rti.connextdds.DataReader.LoanedSamples, rti.connextdds.DynamicData.LoanedSamples]

Read received replies.

Parameters

related_request_id – The id used to correlate replies to a specific request, default None (read any replies).

Returns

A loaned samples object containing the replies.

receive_replies(max_wait: rti.connextdds.Duration, min_count: int = 1, related_request_id: Optional[rti.connextdds.SampleIdentity] = None) Union[rti.connextdds.DataReader.LoanedSamples, rti.connextdds.DynamicData.LoanedSamples, object]

Wait for replies and take them.

Parameters
  • max_wait – Maximum time to wait for replies before timing out.

  • min_count – Minimum number of replies to receive, default 1.

  • related_request_id – The request id used to correlate replies, default None (receive any replies).

Raises

dds.TimeoutError – Thrown if min_count not received within max_wait.

Returns

A loaned samples object containing the replies.

property reply_datareader: Union[rti.connextdds.DataReader, rti.connextdds.DynamicData.DataReader, object]

The DataReader used to receive reply data.

Getter

Returns the reply DataReader.

property request_datawriter: Union[rti.connextdds.DataWriter, rti.connextdds.DynamicData.DataWriter]

The DataWriter used to send request data.

Getter

Returns the request DataWriter.

send_request(request: Union[object, rti.connextdds.DynamicData], params: Optional[rti.connextdds.WriteParams] = None) rti.connextdds.SampleIdentity

Send a request and return the identity of the request for correlating received replies.

Parameters
  • request – The request to send.

  • params – Parameters used for writing the request.

Returns

The identity of the request.

take_replies(related_request_id: Optional[rti.connextdds.SampleIdentity] = None) Union[rti.connextdds.DataReader.LoanedSamples, rti.connextdds.DynamicData.LoanedSamples]

Take received replies.

Parameters

related_request_id – The id used to correlate replies to a specific request, default None (take any replies).

Returns

A loaned samples object containing the replies.

wait_for_replies(max_wait: rti.connextdds.Duration, min_count: int = 1, related_request_id: Optional[rti.connextdds.SampleIdentity] = None) bool

Wait for received replies.

Parameters
  • max_wait – Maximum time to wait for replies before timing out.

  • min_count – Minimum number of replies to receive, default 1.

  • related_request_id – The request id used to correlate replies, default None (receive any replies).

Returns

Boolean indicating whether min_count replies were received within max_wait time.

async wait_for_replies_async(max_wait: rti.connextdds.Duration, min_count: int = 1, related_request_id: Optional[rti.connextdds.SampleIdentity] = None) bool

Wait for received replies asynchronously.

Parameters
  • max_wait – Maximum time to wait for replies before timing out.

  • min_count – Minimum number of replies to receive, default 1.

  • related_request_id – The request id used to correlate replies, default None (receive any replies).

Returns

Boolean indicating whether min_count replies were received within max_wait time.

class rti.rpc.Replier(request_type: Union[type, rti.connextdds.DynamicType], reply_type: Union[type, rti.connextdds.DynamicType], participant: rti.connextdds.DomainParticipant, service_name: Optional[str] = None, request_topic: Optional[Union[rti.connextdds.DynamicData.Topic, rti.connextdds.DynamicData.ContentFilteredTopic, str, object]] = None, reply_topic: Optional[Union[rti.connextdds.DynamicData.Topic, str, object]] = None, datawriter_qos: Optional[rti.connextdds.DataWriterQos] = None, datareader_qos: Optional[rti.connextdds.DataReaderQos] = None, publisher: Optional[rti.connextdds.Publisher] = None, subscriber: Optional[rti.connextdds.Subscriber] = None, on_request_available: Optional[Callable[[object], object]] = None)

A replier object for handling request-reply interactions with DDS.

Parameters
  • request_type – The type of the request data.

  • reply_type – The type of the reply data.

  • participant – The DomainParticipant that will hold the reply writer and request reader.

  • service_name – Name that will be used to derive the topic name, defaults to None (rely only on custom topics).

  • request_topic – Topic object or name that will be used for the request data, must be set if service_name is None, otherwise overrides service_name, defaults to None (use service_name).

  • reply_topic – Topic object or name that will be used for the reply data, must be set if service_name is None, otherwise overrides service_name, defaults to None (use service_name).

  • datawriter_qos – QoS object to use for reply writer, defaults to None (use default RequestReply QoS).

  • datareader_qos – QoS object to use for request reader, defaults to None (use default RequestReply QoS).

  • publisher – Publisher used to hold reply writer, defaults to None (use participant builtin publisher).

  • subscriber – Subscriber used to hold request reader, defaults to None (use participant builtin subscriber).

  • on_reply_available – The callback that handles incoming requests.

__init__(request_type: Union[type, rti.connextdds.DynamicType], reply_type: Union[type, rti.connextdds.DynamicType], participant: rti.connextdds.DomainParticipant, service_name: Optional[str] = None, request_topic: Optional[Union[rti.connextdds.DynamicData.Topic, rti.connextdds.DynamicData.ContentFilteredTopic, str, object]] = None, reply_topic: Optional[Union[rti.connextdds.DynamicData.Topic, str, object]] = None, datawriter_qos: Optional[rti.connextdds.DataWriterQos] = None, datareader_qos: Optional[rti.connextdds.DataReaderQos] = None, publisher: Optional[rti.connextdds.Publisher] = None, subscriber: Optional[rti.connextdds.Subscriber] = None, on_request_available: Optional[Callable[[object], object]] = None) None
__weakref__

list of weak references to the object (if defined)

close() None

Close the resources for this request-reply object.

property closed: bool

Returns true if this request-reply object has been closed.

Getter

Returns the number of matched requesters.

property matched_requester_count: int

The number of discovered matched requesters.

Getter

Returns the number of matched requesters.

property on_request_available

The listener callback used to process received requests.

Getter

Returns the callback function.

Setter

Set the callback function.

Type

Optional[Callable[[Replier]]]

read_requests() Union[rti.connextdds.DataReader.LoanedSamples, rti.connextdds.DynamicData.LoanedSamples]

Read received requests.

Returns

A loaned samples object containing the requests.

receive_requests(max_wait: rti.connextdds.Duration, min_count: int = 1) Union[rti.connextdds.DataReader.LoanedSamples, rti.connextdds.DynamicData.LoanedSamples]

Receive a minimum number of requests within a timeout period.

Parameters
  • max_wait – Maximum time to wait for requests before timing out. .

  • min_count – Minimum number of requests to receive, default 1.

Raises

dds.TimeoutError – Thrown if min_count not received within max_wait.

Returns

A loaned samples object containing the requests.

property reply_datawriter: Union[rti.connextdds.DataWriter, rti.connextdds.DynamicData.DataWriter]

The DataWriter used to send reply data.

Getter

Returns the reply DataWriter.

property request_datareader: Union[rti.connextdds.DataReader, rti.connextdds.DynamicData.DataReader]

The DataReader used to receive request data.

Getter

Returns the request DataReader.

send_reply(reply: Union[rti.connextdds.DynamicData, object], param: Union[rti.connextdds.SampleIdentity, rti.connextdds.SampleInfo, rti.connextdds.WriteParams], final: bool = True) None

Send a reply to a received request.

Parameters
  • reply – The reply to send.

  • param – Parameters used for writing the request.

  • final – Indicates whether this is the final reply for a specific request, default True.

Raises

dds.InvalidArgumentError – Thrown if param is not a type that can be used for correlation.

take_requests() Union[rti.connextdds.DataReader.LoanedSamples, rti.connextdds.DynamicData.LoanedSamples]

Take received requests.

Returns

A loaned samples object containing the requests.

Return type

Union[dds.DynamicData.LoanedSamples, object]

wait_for_requests(max_wait: rti.connextdds.Duration, min_count: int = 1) bool

Wait for a minimum number of requests within a timeout period.

Parameters
  • max_wait – Maximum time to wait for requests before timing out. .

  • min_count – Minimum number of requests to receive, default 1.

Returns

Boolean indicating whether min_count requests were received within max_wait time.

async wait_for_requests_async(max_wait: rti.connextdds.Duration, min_count: Optional[int] = 1) bool

Wait asynchronously for a minimum number of requests within a timeout period.

Parameters
  • max_wait – Maximum time to wait for requests before timing out. .

  • min_count – Minimum number of requests to receive, default 1.

Returns

Boolean indicating whether min_count requests were received within max_wait time.

class rti.rpc.SimpleReplier(request_type: Union[rti.connextdds.DynamicType, type], reply_type: Union[rti.connextdds.DynamicType, type], participant: rti.connextdds.DomainParticipant, handler: Callable[[object], object], service_name: Optional[str] = None, request_topic: Optional[Union[rti.connextdds.DynamicData.Topic, rti.connextdds.DynamicData.ContentFilteredTopic, str, object]] = None, reply_topic: Optional[Union[rti.connextdds.DynamicData.Topic, str, object]] = None, datawriter_qos: Optional[rti.connextdds.DataWriterQos] = None, datareader_qos: Optional[rti.connextdds.DataReaderQos] = None, publisher: Optional[rti.connextdds.Publisher] = None, subscriber: Optional[rti.connextdds.Subscriber] = None)

A special replier that uses a user callback to produce one reply per request.

Parameters
  • request_type – The type of the request data.

  • reply_type – The type of the reply data.

  • participant – The DomainParticipant that will hold the request reader and reply writer.

  • handler – The callback that handles incoming requests and returns a reply. The callback must have a single argument of type request_type and must return an instance of type reply_type.

  • service_name – Name that will be used to derive the topic name, defaults to None (rely only on custom topics).

  • request_topic – Topic object or name that will be used for the request data, must be set if service_name is None, otherwise overrides service_name, defaults to None (use service_name).

  • reply_topic – Topic object or name that will be used for the reply data, must be set if service_name is None, otherwise overrides service_name, defaults to None (use service_name).

  • datawriter_qos – QoS object to use for reply writer, defaults to None (use default RequestReply QoS).

  • datareader_qos – QoS object to use for request reader, defaults to None (use default RequestReply QoS).

  • publisher – Publisher used to hold reply writer, defaults to None (use participant builtin publisher).

  • subscriber – Subscriber used to hold request reader, defaults to None (use participant builtin subscriber).

__init__(request_type: Union[rti.connextdds.DynamicType, type], reply_type: Union[rti.connextdds.DynamicType, type], participant: rti.connextdds.DomainParticipant, handler: Callable[[object], object], service_name: Optional[str] = None, request_topic: Optional[Union[rti.connextdds.DynamicData.Topic, rti.connextdds.DynamicData.ContentFilteredTopic, str, object]] = None, reply_topic: Optional[Union[rti.connextdds.DynamicData.Topic, str, object]] = None, datawriter_qos: Optional[rti.connextdds.DataWriterQos] = None, datareader_qos: Optional[rti.connextdds.DataReaderQos] = None, publisher: Optional[rti.connextdds.Publisher] = None, subscriber: Optional[rti.connextdds.Subscriber] = None) None
__weakref__

list of weak references to the object (if defined)

close() None

Close the resources for this request-reply object.

property closed: bool

Returns true if this request-reply object has been closed.

Getter

Returns the number of matched requesters.

property matched_requester_count: int

The number of discovered matched requesters.

Getter

Returns the number of matched requesters.

class rti.rpc.Service(service_instance: abc.ABC, participant: rti.connextdds.DomainParticipant, service_name: str, task_count: int = 4, datawriter_qos: Optional[rti.connextdds.DataWriterQos] = None, datareader_qos: Optional[rti.connextdds.DataReaderQos] = None, publisher: Optional[rti.connextdds.Publisher] = None, subscriber: Optional[rti.connextdds.Subscriber] = None)

A service allows running a service_instance in a DDS domain using asyncio.

The service useses a Replier to receive RPC calls and then dispatches them to the service_instance, calling the appropriate method. The value returned by the method is then sent back to the remote caller.

The service runs asynchronously (run method) until the task is cancelled.

close()

Closes the DDS entities used by this service.

property matched_client_count: int

The number of RPC clients that match this service.

async run(close_on_cancel: bool = False)

Starts receiving RPC calls (requests) and processing them.

This method runs until the task it returns is cancelled.

If close_on_cancel is True, the service will close the DDS entities when the task is canceled. By default it is False, which means you can call run() again after a run() task is cancelled.

Exceptions raised during the execution of the service are logged as warnings module and do not stop the execution of the service.

class rti.rpc.ClientBase(participant: rti.connextdds.DomainParticipant, service_name: str, max_wait_per_call: rti.connextdds.Duration = <rti.connextdds.Duration object>, datawriter_qos: Optional[rti.connextdds.DataWriterQos] = None, datareader_qos: Optional[rti.connextdds.DataReaderQos] = None, publisher: Optional[rti.connextdds.Publisher] = None, subscriber: Optional[rti.connextdds.Subscriber] = None)

Base class for RPC clients.

An actual Client must inherit from a service interface and from this class, for example:

` class RobotClient(Robot, rpc.ClientBase): ... `

This base class injects an implementation for all the @operation methods found in Robot, which uses a Requester to make RPC calls and return the values it receives.

The base class also provides an __init__, close and other methods.

close()

Closes the DDS entities used by this client.

property matched_service_count: int

The number of RPC services that match this client.

@rti.rpc.service(cls=None, *, type_annotations=[], member_annotations={})

This decorator marks an abstract base class as a remote service interface.

A class annotated with this decorator can be used to create a Client or to define the implementation to be run in a Service.

The operations that will be remotely callable need to be marked with the @operation decorator.

@rti.rpc.operation(funcobj=None, *, raises=[], parameter_annotations={})

This decorator marks a method as an remote operation of a service interface.

It also marks it as an @abc.abstractmethod.

Only methods marked with this decorator will be callable using an RPC Client or an RPC Service.

exception rti.rpc.RemoteUnknownOperationError

Exception thrown by a client operation when the server indicates that the operation is unknown to the server.

exception rti.rpc.RemoteUnknownExceptionError

Exception thrown by a client operation when the server operation fails with an exception that is not declared in the interface.