Request-Reply and Remote Procedure Calls

As applications become more complex, it often becomes necessary to use other communication patterns in addition to publish-subscribe. Sometimes an application needs to get a one-time snapshot of information; for example, to make a query into a database or retrieve configuration parameters that never change. Other times an application needs to ask a remote application to perform an action on its behalf; for example, to invoke a remote procedure call or a service.

The request-reply pattern has two roles: the rti.rpc.Requester (service consumer or client) sends a request message and waits for a reply message. The rti.rpc.Replier (service provider) receives the request message and responds with a reply message.

The Remote Procedure Call pattern is a specialization of the Request-Reply one. In this pattern a client and a server communicate using a functional interface. The client application calls functions on the interface, and the server application implements the functions.

The Connext Core Libraries User’s Manual provides more information about these communication patterns in Alternative Communication Models.

A full request-reply example application is available in the Connext Examples repository.

Requesters and Repliers

To create a Requester or a Replier, you need the following:

  • A DomainParticipant (see DomainParticipant)

  • The request type and the reply type (see Data Types)

  • A service name, or alternatively, you can specify the request topic name and the reply topic name.

A Requester and a Replier will communicate when they join the same domain id and use the same service name (or same topic names), compatible request and reply types, and compatible QoS.

The following code creates a rti.rpc.Requester:

import rti.connextdds as dds
from rti.rpc import Requester

participant = dds.DomainParticipant(domain_id=0)

requester = Requester(
    request_type=MyRequestType,
    reply_type=MyReplyType,
    participant=participant,
    service_name="MyServiceName"
)

The same or a different application can create a rti.rpc.Replier as follows:

from rti.rpc import Replier

replier = Replier(
    request_type=MyRequestType,
    reply_type=MyReplyType,
    participant=participant,
    service_name="MyServiceName"
)

For this example MyRequestType and MyReplyType are defined as follows:

import rti.types as idl

@idl.struct
class MyRequestType:
    x: int = 0
    y: int = 0

@idl.struct
class MyReplyType:
    result: int = 0

The request_type or reply_type arguments can also be instances of rti.connextdds.DynamicType or one of the built-in types (rti.types.builtin).

There also are a number of optional parameters you can specify, including the quality of service.

Requester: sending requests and receiving replies

To send a request, create an instance of the request type and send it:

request = MyRequestType(x=1, y=2)
requester.send_request(request)

After sending a request, the Requester will typically wait for one or more replies:

replies = requester.receive_replies(dds.Duration(seconds=20))
for data, info in replies:
    print(data)

The receive_replies operation is equivalent to wait_for_replies followed by take_replies.

Replier: receiving requests and sending replies

The replier will typically process requests in a loop. The following example receives requests, processes each of them to produce a reply, and sends it.

while True:
    requests = replier.receive_requests(dds.Duration(seconds=20))
    for data, info in requests:
        reply = MyReplyType(result=data.x + data.y)
        replier.send_reply(reply, info)

Note how send_reply expects the SampleInfo from the request. This will allow delivering the reply only to the requester that sent the request. Using the same info object, multiple replies can be sent for the same request; and replies can be sent in any order: the data and info objects can be saved for later use.

SimpleReplier

The rti.rpc.SimpleReplier class is a convenience class that allows for an easy implementation of the replier application using a callback function.

The following example shows how to create a SimpleReplier that implements the same behavior as the Replier above:

from rti.rpc import SimpleReplier

simple_replier = SimpleReplier(
    request_type=MyRequestType,
    reply_type=MyReplyType,
    participant=participant,
    service_name="MyServiceName",
    handler=lambda request: MyReplyType(result=request.x + request.y)
)

The handler argument is a function that receives a single argument of the type request_type and must return an instance of the type reply_type. A SimpleReplier is only suitable for applications where each request generates exactly one reply and when the process to generate the reply is fast (otherwise the handler function would hold internal Connext threads).

Waiting for a reply to a specific request

A Requester can send multiple requests and then wait for replies to any of them or to a specific one.

When sending requests, save the returned request id:

request_id1 = requester.send_request(MyRequestType(x=1, y=1))
request_id2 = requester.send_request(MyRequestType(x=2, y=2))

Then, when waiting for and taking the replies, you can specify the request id:

if not requester.wait_for_replies(
    related_request_id=request_id2,
    max_wait=dds.Duration(seconds=20)):

    raise Exception("Timeout waiting for replies")

replies = requester.take_replies(related_request_id=request_id2)
for data, info in replies:
    print(data) # Prints MyReplyType(result=4)

The first operation, wait_for_replies, will wait until at least one reply for request_id2 is received. The second operation, take_replies, will take all replies for request_id2.

Accessing the underlying readers and writers

Requesters and Repliers each contain one DataWriter and one DataReader for the request and reply topics. These can be accessed for advanced use cases.

For example, the Replier’s reader can be used to asynchronously read requests, which allows simplifying the replier loop as follows:

async def process_requests(replier: Replier):
    async for data, info in replier.request_datareader.take_async():
        reply = MyReplyType(result=data.x + data.y)
        replier.send_reply(reply, info)

For more information, see Accessing Underlying DataWriters and DataReaders in the Connext Core Libraries User’s Manual.

Remote Procedure Calls

Warning

This feature is experimental and subject to change in future releases. It is included in this release to gather customer interest and feedback. For this reason, do not deploy any applications using Remote Procedure Calls in production. The Requester and Replier APIs described above can be used in production.

For support, you may contact support@rti.com.

With this pattern, a service interface allows a client to make remote function calls into a service. A service interface can be defined as follows:

import abc
import rti.types as idl
import rti.rpc as rpc

@idl.struct
class Coordinates:
    x: int = 0
    y: int = 0

@rpc.service
class RobotControl(abc.ABC):
    @rpc.operation
    async def walk_to(self, destination: Coordinates,
                speed: idl.float32) -> Coordinates: ...

    @rpc.operation
    async def get_speed(self) -> idl.float32: ...

A service interface such as RobotControl is an abstract base class (ABC) decorated with the @rpc.service decorator and with a set of async abstract methods decorated with the @rpc.operation decorator. The names and types of each operation and its parameters are used to synthesize the DDS topic-types that allow the client and service to communicate. The parameters and return values of an operation can be other IDL-based types (see Data Types).

The Python class RobotControl is derived from this IDL definition:

struct Coordinates {
    int64 x;
    int64 y;
};


@service
interface RobotControl {
    Coordinates walk_to(Coordinates destination, float speed);
    float get_speed();
};

Warning

Code generation from IDL interface or exception definitions to Python is not supported in this release. Other features, such as out parameters or attributes are not supported either.

From the RobotControl class you can define a client and a service.

The client class is defined simply as:

class RobotControlClient(RobotControl, rpc.ClientBase): ...

The base class rpc.ClientBase provides the implementation of all the methods decorated with @rpc.operation. The RobotControlClient class doesn’t have any implementation of its own.

A RobotControlClient can be instantiated in a DomainParticipant with a service name:

import rti.connextdds as dds

participant = dds.DomainParticipant(domain_id=0)
client = RobotControlClient(participant, "My RobotControl")

And then used to make remote calls:

result = await client.walk_to(destination=Coordinates(x=10, y=10), speed=50.0)
print(result)

The call to walk_to will send a request to the server and return a coroutine that will complete when the reply is received, providing the return value.

On the service side, an implementation of RobotControl, with the server-side logic must be implemented. For example, we can define RobotControlExample as follows:

class RobotControlExample(RobotControl):
    def __init__(self):
        self.position = Coordinates(0, 0)
        self.speed = 0.0

    async def walk_to(self, destination: Coordinates, speed: idl.float32) -> Coordinates:
        if self.speed <= 0.0:
            return self.position # return previously known position

        if speed > 100.0:
            self.speed = 100 # maximum speed
        else:
            self.speed = speed

        # sleep for 0 to 10 seconds, depending on the speed
        await asyncio.sleep(10.0 - 0.1 * self.speed)

        self.speed = 0.0
        self.position = destination
        return destination

    def get_speed(self) -> idl.float32:
        return self.speed

The implementation of each method can be asynchronous or synchronous.

To start receiving function calls into RobotControlExample we need to create a rpc.Service with an instance of RobotControlExample, a DomainParticipant and a service name:

participant = dds.DomainParticipant(domain_id=0)
robot_control = RobotControlExample()
service = rpc.Service(robot_control, participant, "My RobotControl")
await service.run() # runs forever

While service is running it will receive calls from matching clients (those with the same service name and compatible interfaces and QoS), call The corresponding method in robot_control and send the result of the operation back to the client that called it.

Concurrency model

Both the service and the client provide async methods and therefore need to be run in an asyncio event loop.

All of the client methods derived from the service interface are async; for each operation (such as walk_to) the client uses a Requester to send a request and returns a coroutine that completes when the reply from the service is received, providing the return value.

The server uses the asyncio event loop and a Replier to run the following tasks when run() is called:

  • A read task that receives requests using the Replier.

  • One or more process tasks that call the methods in the service implementation and send the result back using the Replier.

If a method is asynchronous (for example RobotControlExample.walk_to in the example above), the process task will await for the method to complete. If the method is not asynchronous (RobotControlExample.get_speed), it should return a value immediately to not block the event loop.

When all the process tasks are busy, the read task will stop receiving requests. The number of process tasks can be configured with the task_count Service parameter:

server = rpc.Service(robot_control, participant, "My RobotControl",
                     task_count=10)

The default value is 4. Note that with a value of 1, the server will be forced to process requests sequentially in the order they are received.

The service’s run() method runs until it is cancelled. For example:

service = rpc.Service(robot_control, participant, "My RobotControl")
await service.run() # runs forever

# or

service = rpc.Service(robot_control, participant, "My RobotControl")
service_task = asyncio.create_task(service.run())
await asyncio.sleep(10.0) # run for 10 seconds
service_task.cancel()
await service_task

Note that asyncio event loops are single-threaded. If your service implementation needs to run an operation in a separate thread, you can use the asyncio.run_in_executor function. For example, the following implementation of walk_to runs the synchronous function move_robot_impl in a separate thread:

class RobotControlExample(RobotControl):
    async def walk_to(self, destination: Coordinates, speed: idl.float32) -> Coordinates:
        # ...
        self.speed = speed

        # run move_robot_impl in a separate thread
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(None, move_robot_impl, destination, speed)

        self.speed = 0.0
        self.position = destination
        return destination

If this concurrency model doesn’t fit your needs, you can use the Requester and Replier classes directly, which provide more flexibility.

Exceptions

Operations in a service interface may throw exceptions. Exceptions are @idl.struct classes that inherit from Exception. Operations need to declare all the exceptions that they may throw. When a service implementation throws an exception, the rpc.Service will catch it and send it to the client, which will rethrow it.

For example, to have the walk_to operation throw a TooFast exception when the speed is too high, we can define the exception as follows:

@idl.struct
class TooFast(Exception):
    speed: idl.float32

And declare it in the walk_to operation:

@rpc.service
class RobotControl:
    @rpc.operation(raises=[TooFast])
    async def walk_to(self, destination: Coordinates, speed: idl.float32) -> Coordinates: ...

We can now modify ExampleRobotControl to throw TooFast:

class RobotControlExample(RobotControl):
    async def walk_to(self, destination: Coordinates, speed: idl.float32) -> Coordinates:
        if speed > 100.0:
            raise TooFast(speed)

        # ...

The client can now catch the exception:

try:
    await client.walk_to(destination, speed)
except TooFast as e:
    print(f"Too fast: {e.speed}")

Exceptions that are not declared in the raises parameter of an operation will be propagated and raised in the client as rpc.RemoteUnknownExceptionError.