rti.rpc

rti.rpc is the package containing the RTI Connext Request-Reply and Remote Procedure APIs.

See Request-Reply and Remote Procedure Calls for an overview of the API.

class rti.rpc.ByRef(value: T)

This class is designed to emulate pass-by-reference behavior in Python, which natively uses pass-by-value semantics for primitive types (like int, float, and str). It is used to implement the rpc.out_param and rpc.inout_param for the primitive types.

For example for the following operation: ` void op(out long param); `

The Python equivalent will be: ` def op(self, param: rpc.ByRef[idl.int32]): `

This allows the modification of the parameter value in the service implementation, and this change will be reflected in the client side variable used to call the operation. `param.value = 10`

class rti.rpc.ClientBase(participant: ~rti.connextdds.DomainParticipant, service_name: str, max_wait_per_call: ~rti.connextdds.Duration = <Duration(sec=10, nanosec=0)>, datawriter_qos: ~typing.Optional[~rti.connextdds.DataWriterQos] = None, datareader_qos: ~typing.Optional[~rti.connextdds.DataReaderQos] = None, publisher: ~typing.Optional[~rti.connextdds.Publisher] = None, subscriber: ~typing.Optional[~rti.connextdds.Subscriber] = None, require_matching_service_on_send_request: bool = True)

Base class for RPC clients.

An actual Client must inherit from a service interface and from this class, for example:

` class RobotClient(Robot, rpc.ClientBase): ... `

This base class injects an implementation for all the @operation methods found in Robot, which uses a Requester to make RPC calls and return the values it receives.

The base class also provides an __init__, close and other methods.

close()

Closes the DDS entities used by this client.

property matched_service_count: int

The number of RPC services that match this client.

async wait_for_service_async(max_wait: Duration) bool

Waits for a service to be discovered.

This method returns True if a service is discovered and ready to receive requests within the specified maximum wait. It returns False if it times out.

exception rti.rpc.RemoteError

Base class for all built-in exceptions that can be reported by a service and thrown by a client. This doesn’t include user-defined exceptions.

exception rti.rpc.RemoteInvalidArgumentError

Exception thrown by a client operation when the server indicates that the operation failed because of an invalid argument.

exception rti.rpc.RemoteOutOfResourcesError

Exception thrown by a client operation when the server indicates that the operation failed because the server ran out of resources.

exception rti.rpc.RemoteUnknownExceptionError

Exception thrown by a client operation when the server operation fails with an exception that is not declared in the interface.

exception rti.rpc.RemoteUnknownOperationError

Exception thrown by a client operation when the server indicates that the operation is unknown to the server.

exception rti.rpc.RemoteUnsupportedOperationError

Exception thrown by a client operation when the server indicates that the operation is not supported.

class rti.rpc.Replier(request_type: Union[type, DynamicType], reply_type: Union[type, DynamicType], participant: DomainParticipant, service_name: Optional[str] = None, request_topic: Optional[Union[Topic, ContentFilteredTopic, str, object]] = None, reply_topic: Optional[Union[Topic, str, object]] = None, datawriter_qos: Optional[DataWriterQos] = None, datareader_qos: Optional[DataReaderQos] = None, publisher: Optional[Publisher] = None, subscriber: Optional[Subscriber] = None, on_request_available: Optional[Callable[[object], object]] = None)

A replier object for handling request-reply interactions with DDS.

Parameters:
  • request_type – The type of the request data.

  • reply_type – The type of the reply data.

  • participant – The DomainParticipant that will hold the reply writer and request reader.

  • service_name – Name that will be used to derive the topic name, defaults to None (rely only on custom topics).

  • request_topic – Topic object or name that will be used for the request data, must be set if service_name is None, otherwise overrides service_name, defaults to None (use service_name).

  • reply_topic – Topic object or name that will be used for the reply data, must be set if service_name is None, otherwise overrides service_name, defaults to None (use service_name).

  • datawriter_qos – QoS object to use for reply writer, defaults to None (use the QoS defined by the built-in profile dds.BuiltinProfiles.pattern_rpc).

  • datareader_qos – QoS object to use for request reader, defaults to None (use the QoS defined by the built-in profile dds.BuiltinProfiles.pattern_rpc).

  • publisher – Publisher used to create the request writer, defaults to None (use participant.builtin_publisher).

  • subscriber – Subscriber used to create the reply reader, defaults to None (use participant.builtin_subscriber).

  • on_reply_available – The callback that handles incoming requests (optional).

async wait_for_requests_async(max_wait: Duration, min_count: Optional[int] = 1) bool

Wait asynchronously for a minimum number of requests within a timeout period.

Parameters:
  • max_wait – Maximum time to wait for requests before timing out. .

  • min_count – Minimum number of requests to receive, default 1.

Returns:

Boolean indicating whether min_count requests were received within max_wait time.

class rti.rpc.Requester(request_type: Union[type, DynamicType], reply_type: Union[type, DynamicType], participant: DomainParticipant, service_name: Optional[str] = None, request_topic: Optional[Union[Topic, Topic, str, object]] = None, reply_topic: Optional[Union[Topic, Topic, str, object]] = None, datawriter_qos: Optional[DataWriterQos] = None, datareader_qos: Optional[DataReaderQos] = None, publisher: Optional[Publisher] = None, subscriber: Optional[Subscriber] = None, on_reply_available: Optional[Callable[[object], object]] = None, require_matching_service_on_send_request: bool = True)

A Requester allows sending requests and receiving replies

Parameters:
  • request_type – The type of the request data. It can be an @idl.struct, an @idl.union, or a dds.DynamicType. (See Data Types.)

  • reply_type – The type of the reply data.

  • participant – The DomainParticipant that will hold the request writer and reply reader.

  • service_name – Name that will be used to derive the topic name, defaults to None (rely only on custom topics).

  • request_topic – Topic object or name that will be used for the request data, must be set if service_name is None, otherwise overrides service_name, defaults to None (use service_name).

  • reply_topic – Topic object or name that will be used for the reply data, must be set if service_name is None, otherwise overrides service_name, defaults to None (use service_name).

  • datawriter_qos – QoS object to use for request writer, defaults to None (use the QoS defined by the built-in profile dds.BuiltinProfiles.pattern_rpc).

  • datareader_qos – QoS object to use for reply reader, defaults to None (use the QoS defined by the built-in profile dds.BuiltinProfiles.pattern_rpc).

  • publisher – Publisher used to create the request writer, defaults to None (use participant.builtin_publisher).

  • subscriber – Subscriber used to create the reply reader, defaults to None (use participant.builtin_subscriber).

  • on_reply_available – The callback that handles incoming replies.

  • require_matching_service_on_send_request – Indicates whether send_request fails if no service has been discovered, default True.

async wait_for_replies_async(max_wait: Duration, min_count: int = 1, related_request_id: Optional[SampleIdentity] = None) bool

Wait for received replies asynchronously.

Parameters:
  • max_wait – Maximum time to wait for replies before timing out.

  • min_count – Minimum number of replies to receive, default 1.

  • related_request_id – The request id used to correlate replies, default None (receive any replies).

Returns:

Boolean indicating whether min_count replies were received within max_wait time.

async wait_for_service_async(max_wait: Duration) bool

Asynchronously waits for a Replier to be discovered.

This method returns True if a Replier is discovered and ready to receive requests within the specified maximum wait. It returns False if it times out.

class rti.rpc.Service(service_instance: ABC, participant: DomainParticipant, service_name: str, task_count: int = 4, datawriter_qos: Optional[DataWriterQos] = None, datareader_qos: Optional[DataReaderQos] = None, publisher: Optional[Publisher] = None, subscriber: Optional[Subscriber] = None)

A service allows running a service_instance in a DDS domain using asyncio.

The service useses a Replier to receive RPC calls and then dispatches them to the service_instance, calling the appropriate method. The value returned by the method is then sent back to the remote caller.

The service runs asynchronously (run method) until the task is cancelled.

close()

Closes the DDS entities used by this service.

property matched_client_count: int

The number of RPC clients that match this service.

async run(close_on_cancel: bool = False)

Starts receiving RPC calls (requests) and processing them.

This method runs until the task it returns is cancelled.

If close_on_cancel is True, the service will close the DDS entities when the task is canceled. By default it is False, which means you can call run() again after a run() task is cancelled.

Exceptions raised during the execution of the service are logged as warnings module and do not stop the execution of the service.

class rti.rpc.SimpleReplier(request_type: Union[DynamicType, type], reply_type: Union[DynamicType, type], participant: DomainParticipant, handler: Callable[[object], object], service_name: Optional[str] = None, request_topic: Optional[Union[Topic, ContentFilteredTopic, str, object]] = None, reply_topic: Optional[Union[Topic, str, object]] = None, datawriter_qos: Optional[DataWriterQos] = None, datareader_qos: Optional[DataReaderQos] = None, publisher: Optional[Publisher] = None, subscriber: Optional[Subscriber] = None)

A special replier that uses a user callback to produce one reply per request.

Parameters:
  • request_type – The type of the request data.

  • reply_type – The type of the reply data.

  • participant – The DomainParticipant that will hold the request reader and reply writer.

  • handler – The callback that handles incoming requests and returns a reply. The callback must have a single argument of type request_type and must return an instance of type reply_type.

  • service_name – Name that will be used to derive the topic name, defaults to None (rely only on custom topics).

  • request_topic – Topic object or name that will be used for the request data, must be set if service_name is None, otherwise overrides service_name, defaults to None (use service_name).

  • reply_topic – Topic object or name that will be used for the reply data, must be set if service_name is None, otherwise overrides service_name, defaults to None (use service_name).

  • datawriter_qos – QoS object to use for reply writer, defaults to None (use default RequestReply QoS).

  • datareader_qos – QoS object to use for request reader, defaults to None (use default RequestReply QoS).

  • publisher – Publisher used to hold reply writer, defaults to None (use participant builtin publisher).

  • subscriber – Subscriber used to hold request reader, defaults to None (use participant builtin subscriber).

property matched_requester_count: int

The number of discovered matched requesters.

Getter:

Returns the number of matched requesters.

rti.rpc.get_interface_types(interface: type) List[type]

Returns a list of all the IDL types used by this interface (call, return, in, out, result)

rti.rpc.operation(funcobj=None, *, raises=[], operation_annotations=[], parameter_annotations={})

This decorator marks a method as an remote operation of a service interface.

It also marks it as an @abc.abstractmethod.

Only methods marked with this decorator will be callable using an RPC Client or an RPC Service.

rti.rpc.service(cls=None, *, type_annotations=[], member_annotations={})

This decorator marks an abstract base class as a remote service interface.

A class annotated with this decorator can be used to create a Client or to define the implementation to be run in a Service.

The operations that will be remotely callable need to be marked with the @operation decorator.