Request-Reply and Remote Procedure Calls

As applications become more complex, it often becomes necessary to use other communication patterns in addition to publish-subscribe. Sometimes an application needs to get a one-time snapshot of information; for example, to make a query into a database or retrieve configuration parameters that never change. Other times an application needs to ask a remote application to perform an action on its behalf; for example, to invoke a remote procedure call or a service.

The request-reply pattern has two roles: the rti.rpc.Requester (service consumer or client) sends a request message and waits for a reply message. The rti.rpc.Replier (service provider) receives the request message and responds with a reply message.

The Remote Procedure Call pattern is a specialization of the Request-Reply one. In this pattern a client and a server communicate using a functional interface. The client application calls functions on the interface, and the server application implements the functions.

For more information, see Request-Reply Exchanges and Remote Procedure Calls in the RTI Connext Core Libraries User’s Manual.

A full request-reply example application is available in the Connext Examples repository.

Requesters and Repliers

To create a Requester or a Replier, you need the following:

  • A DomainParticipant (see DomainParticipant)

  • The request type and the reply type (see Data Types)

  • A service name, or alternatively, you can specify the request topic name and the reply topic name.

A Requester and a Replier will communicate when they join the same domain id and use the same service name (or same topic names), compatible request and reply types, and compatible QoS.

The following code creates a rti.rpc.Requester:

import rti.connextdds as dds
from rti.rpc import Requester

participant = dds.DomainParticipant(domain_id=0)

requester = Requester(
    request_type=MyRequestType,
    reply_type=MyReplyType,
    participant=participant,
    service_name="MyServiceName"
)

The same or a different application can create a rti.rpc.Replier as follows:

from rti.rpc import Replier

replier = Replier(
    request_type=MyRequestType,
    reply_type=MyReplyType,
    participant=participant,
    service_name="MyServiceName"
)

For this example MyRequestType and MyReplyType are defined as follows:

import rti.types as idl

@idl.struct
class MyRequestType:
    x: int = 0
    y: int = 0

@idl.struct
class MyReplyType:
    result: int = 0

The request_type or reply_type arguments can also be instances of rti.connextdds.DynamicType or one of the built-in types (rti.types.builtin).

There also are a number of optional parameters you can specify, including the quality of service.

Requester: sending requests and receiving replies

First wait for the requester to discover a replier using rti.rpc.Requester.wait_for_service_async():

await requester.wait_for_service_async(dds.Duration(10))

Once a replier is discovered, create an instance of the request type and send it:

request = MyRequestType(x=1, y=2)
requester.send_request(request)

After sending a request, the Requester will typically wait for one or more replies:

replies = requester.receive_replies(dds.Duration(seconds=20))
for data, info in replies:
    print(data)

The receive_replies operation is equivalent to wait_for_replies followed by take_replies.

Replier: receiving requests and sending replies

The replier will typically process requests in a loop. The following example receives requests, processes each of them to produce a reply, and sends it.

while True:
    requests = replier.receive_requests(dds.Duration(seconds=20))
    for data, info in requests:
        reply = MyReplyType(result=data.x + data.y)
        replier.send_reply(reply, info)

Note how send_reply expects the SampleInfo from the request. This will allow delivering the reply only to the requester that sent the request. Using the same info object, multiple replies can be sent for the same request; and replies can be sent in any order: the data and info objects can be saved for later use.

SimpleReplier

The rti.rpc.SimpleReplier class is a convenience class that allows for an easy implementation of the replier application using a callback function.

The following example shows how to create a SimpleReplier that implements the same behavior as the Replier above:

from rti.rpc import SimpleReplier

simple_replier = SimpleReplier(
    request_type=MyRequestType,
    reply_type=MyReplyType,
    participant=participant,
    service_name="MyServiceName",
    handler=lambda request: MyReplyType(result=request.x + request.y)
)

The handler argument is a function that receives a single argument of the type request_type and must return an instance of the type reply_type. A SimpleReplier is only suitable for applications where each request generates exactly one reply and when the process to generate the reply is fast (otherwise the handler function would hold internal Connext threads).

Waiting for a reply to a specific request

A Requester can send multiple requests and then wait for replies to any of them or to a specific one.

When sending requests, save the returned request id:

request_id1 = requester.send_request(MyRequestType(x=1, y=1))
request_id2 = requester.send_request(MyRequestType(x=2, y=2))

Then, when waiting for and taking the replies, you can specify the request id:

if not requester.wait_for_replies(
    related_request_id=request_id2,
    max_wait=dds.Duration(seconds=20)):

    raise Exception("Timeout waiting for replies")

replies = requester.take_replies(related_request_id=request_id2)
for data, info in replies:
    print(data) # Prints MyReplyType(result=4)

The first operation, wait_for_replies, will wait until at least one reply for request_id2 is received. The second operation, take_replies, will take all replies for request_id2.

Accessing the underlying readers and writers

Requesters and Repliers each contain one DataWriter and one DataReader for the request and reply topics. These can be accessed for advanced use cases.

For example, the Replier’s reader can be used to asynchronously read requests, which allows simplifying the replier loop as follows:

async def process_requests(replier: Replier):
    async for data, info in replier.request_datareader.take_async():
        reply = MyReplyType(result=data.x + data.y)
        replier.send_reply(reply, info)

For more information, see Accessing Underlying DataWriters and DataReaders in the Connext Core Libraries User’s Manual.

Remote Procedure Calls

With this pattern, a service interface allows a client to make remote function calls into a service. A service interface can be defined as follows:

import abc
import rti.types as idl
import rti.rpc as rpc

@idl.struct
class Coordinates:
    x: int = 0
    y: int = 0

@rpc.service
class RobotControl(abc.ABC):
    @rpc.operation
    async def walk_to(self, destination: Coordinates,
                speed: idl.float32) -> Coordinates: ...

    @rpc.operation
    async def get_speed(self) -> idl.float32: ...

A service interface such as RobotControl is an abstract base class (ABC) decorated with the @rpc.service decorator and with a set of async abstract methods decorated with the @rpc.operation decorator. The names and types of each operation and its parameters are used to synthesize the DDS topic-types that allow the client and service to communicate. The parameters and return values of an operation can be other IDL-based types (see Data Types).

The Python class RobotControl is derived from this IDL definition:

struct Coordinates {
    int64 x;
    int64 y;
};


@service
interface RobotControl {
    Coordinates walk_to(Coordinates destination, float speed);
    float get_speed();
};

From the RobotControl class you can define a client and a service.

The client class is defined simply as:

class RobotControlClient(RobotControl, rpc.ClientBase): ...

The base class rpc.ClientBase provides the implementation of all the methods decorated with @rpc.operation. The RobotControlClient class doesn’t have any implementation of its own.

A RobotControlClient can be instantiated in a DomainParticipant with a service name:

import rti.connextdds as dds

participant = dds.DomainParticipant(domain_id=0)
client = RobotControlClient(participant, "My RobotControl")

First, wait to discover a service using rti.rpc.ClientBase.wait_for_service_async():

await client.wait_for_service_async(dds.Duration(10))

Once the service is discovered, the client can make remote calls:

result = await client.walk_to(destination=Coordinates(x=10, y=10), speed=50.0)
print(result)

The call to walk_to will send a request to the server and return a coroutine that will complete when the reply is received, providing the return value.

On the service side, an implementation of RobotControl, with the server-side logic must be implemented. For example, we can define RobotControlExample as follows:

class RobotControlExample(RobotControl):
    def __init__(self):
        self.position = Coordinates(0, 0)
        self.speed = 0.0

    async def walk_to(self, destination: Coordinates, speed: idl.float32) -> Coordinates:
        if self.speed <= 0.0:
            return self.position # return previously known position

        if speed > 100.0:
            self.speed = 100 # maximum speed
        else:
            self.speed = speed

        # sleep for 0 to 10 seconds, depending on the speed
        await asyncio.sleep(10.0 - 0.1 * self.speed)

        self.speed = 0.0
        self.position = destination
        return destination

    def get_speed(self) -> idl.float32:
        return self.speed

The implementation of each method can be asynchronous or synchronous.

To start receiving function calls into RobotControlExample we need to create a rpc.Service with an instance of RobotControlExample, a DomainParticipant and a service name:

participant = dds.DomainParticipant(domain_id=0)
robot_control = RobotControlExample()
service = rpc.Service(robot_control, participant, "My RobotControl")
await service.run() # runs forever

While service is running it will receive calls from matching clients (those with the same service name and compatible interfaces and QoS), call The corresponding method in robot_control and send the result of the operation back to the client that called it.

Concurrency model

Both the service and the client provide async methods and therefore need to be run in an asyncio event loop.

All of the client methods derived from the service interface are async; for each operation (such as walk_to) the client uses a Requester to send a request and returns a coroutine that completes when the reply from the service is received, providing the return value.

The server uses the asyncio event loop and a Replier to run the following tasks when run() is called:

  • A read task that receives requests using the Replier.

  • One or more process tasks that call the methods in the service implementation and send the result back using the Replier.

If a method is asynchronous (for example RobotControlExample.walk_to in the example above), the process task will await for the method to complete. If the method is not asynchronous (RobotControlExample.get_speed), it should return a value immediately to not block the event loop.

When all the process tasks are busy, the read task will stop receiving requests. The number of process tasks can be configured with the task_count Service parameter:

server = rpc.Service(robot_control, participant, "My RobotControl",
                     task_count=10)

The default value is 4. Note that with a value of 1, the server will be forced to process requests sequentially in the order they are received.

The service’s run() method runs until it is cancelled. For example:

service = rpc.Service(robot_control, participant, "My RobotControl")
await service.run() # runs forever

# or

service = rpc.Service(robot_control, participant, "My RobotControl")
service_task = asyncio.create_task(service.run())
await asyncio.sleep(10.0) # run for 10 seconds
service_task.cancel()
await service_task

Note that asyncio event loops are single-threaded. If your service implementation needs to run an operation in a separate thread, you can use the asyncio.run_in_executor function. For example, the following implementation of walk_to runs the synchronous function move_robot_impl in a separate thread:

class RobotControlExample(RobotControl):
    async def walk_to(self, destination: Coordinates, speed: idl.float32) -> Coordinates:
        # ...
        self.speed = speed

        # run move_robot_impl in a separate thread
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(None, move_robot_impl, destination, speed)

        self.speed = 0.0
        self.position = destination
        return destination

If this concurrency model doesn’t fit your needs, you can use the Requester and Replier classes directly, which provide more flexibility.

Exceptions

Operations in a service interface may throw exceptions. Exceptions are @idl.struct classes that inherit from Exception. Operations need to declare all the exceptions that they may throw. When a service implementation throws an exception, the rpc.Service will catch it and send it to the client, which will rethrow it.

For example, to have the walk_to operation throw a TooFast exception when the speed is too high, we can define the exception as follows:

@idl.struct
class TooFast(Exception):
    speed: idl.float32

And declare it in the walk_to operation:

@rpc.service
class RobotControl:
    @rpc.operation(raises=[TooFast])
    async def walk_to(self, destination: Coordinates, speed: idl.float32) -> Coordinates: ...

We can now modify ExampleRobotControl to throw TooFast:

class RobotControlExample(RobotControl):
    async def walk_to(self, destination: Coordinates, speed: idl.float32) -> Coordinates:
        if speed > 100.0:
            raise TooFast(speed)

        # ...

The client can now catch the exception:

try:
    await client.walk_to(destination, speed)
except TooFast as e:
    print(f"Too fast: {e.speed}")

A service may also raise any of the following built-in exceptions, which will be propagated and raised by the client:

When an operation doesn’t exist in the service, the client will raise rti.rpc.RemoteUnknownOperationError.

Any other exception in the service will be propagated and raised in the client as rti.rpc.RemoteUnknownExceptionError.

Parameters

Operations in a service may have in, out, and inout parameters. The out and inout parameters can be modified by the service implementation. The modified values are sent back to the client in the response to the remote procedure call.

To specify the kind of parameter, use rpc.in_param, rpc.out_param, or rpc.inout_param. If not specified, the default, rpc.in_param, is used.

Example operation using each type of parameter:

@rpc.service
class RobotControl:
    @rpc.operation(
        parameter_annotations={
            "out_": [rpc.out_param],
            "inout_": [rpc.inout_param],
        },
    )
    async def operation(self, in_: Coordinates, out_: Coordinates, inout_: Coordinates):
    ...

To implement the above operation and modify the out and inout parameters:

class RobotControlExample(RobotControl):
    async def operation(self, in_: Coordinates, out_: Coordinates, inout_: Coordinates):
        # The in parameter flows from the client to the service.
        # The received value is the one the client used as a parameter.
        # Any modification won't affect the client parameter.
        print(in_)

        # The out parameter only flows from the service to the client.
        # The received value is the not the one the client used as a
        # parameter, it is default value.
        # Any modification will affect the client parameter.
        out_.x = 10

        # The inout parameter flows in both directions.
        # The received value is the one the client used as a parameter.
        # Any modification will affect the client parameter.
        inout_.x += 1

The client now can execute the operation, and the parameters will be modified.

# The value we assign here doesn't matter because the service will work
# with the default value of Coordinates.
out_ = Coordinates(2,2)

# The value we assign here does matter, because the service will receive
# this value.
inout_ = Coordinates(0,2)

client.operation(Coordinates(0,0), out_, inout_)
assert out_ == Coordinates(10,0)
assert inout_ == Coordinates(1,2)

Complex types can be modifed in the operations. However, primitive types (like int, float, and str) cannot be changed within the method in Python. Connext provides rpc.ByRef to encapsulate primitives values and allow modification of the values in the methods, implementing the functionality for out and inout parameters.

For example, in this operation the out and inout parameters are strings:

@rpc.service
class RobotControl:
    @rpc.operation(
        parameter_annotations={
            "out_": [rpc.out_param],
            "inout_": [rpc.inout_param],
        },
    )
    async def operation(self, out_: rpc.ByRef[str], inout_: rpc.ByRef[str]):
        ...

In the implementation, you can modify out_.value and inout_.value, and the changes will be reflected in the client parameters:

class RobotControlExample(RobotControl):
    async def operation(self, out_: rpc.ByRef[str], inout_: rpc.ByRef[str]):
        out_.value = "Hello World!"
        inout_.value += " World!"

The client now can execute the operation, and the parameters will be modified:

out_ = rpc.ByRef[str]("")
inout_ = rpc.ByRef[str]("Hello")

client.operation(out_, inout_)
assert out_.value == "Hello World!"
assert inout_.value == "Hello World!"