Request-Reply and Remote Procedure Calls
As applications become more complex, it often becomes necessary to use other communication patterns in addition to publish-subscribe. Sometimes an application needs to get a one-time snapshot of information; for example, to make a query into a database or retrieve configuration parameters that never change. Other times an application needs to ask a remote application to perform an action on its behalf; for example, to invoke a remote procedure call or a service.
The request-reply pattern has two roles: the rti.rpc.Requester
(service consumer or client)
sends a request message and waits for a reply message. The rti.rpc.Replier
(service provider)
receives the request message and responds with a reply message.
The Remote Procedure Call pattern is a specialization of the Request-Reply one. In this pattern a client and a server communicate using a functional interface. The client application calls functions on the interface, and the server application implements the functions.
The Connext Core Libraries User’s Manual provides more information about these communication patterns in Alternative Communication Models.
A full request-reply example application is available in the Connext Examples repository.
Requesters and Repliers
To create a Requester or a Replier, you need the following:
A DomainParticipant (see DomainParticipant)
The request type and the reply type (see Data Types)
A service name, or alternatively, you can specify the request topic name and the reply topic name.
A Requester and a Replier will communicate when they join the same domain id and use the same service name (or same topic names), compatible request and reply types, and compatible QoS.
The following code creates a rti.rpc.Requester
:
import rti.connextdds as dds
from rti.rpc import Requester
participant = dds.DomainParticipant(domain_id=0)
requester = Requester(
request_type=MyRequestType,
reply_type=MyReplyType,
participant=participant,
service_name="MyServiceName"
)
The same or a different application can create a rti.rpc.Replier
as follows:
from rti.rpc import Replier
replier = Replier(
request_type=MyRequestType,
reply_type=MyReplyType,
participant=participant,
service_name="MyServiceName"
)
For this example MyRequestType
and MyReplyType
are defined as follows:
import rti.types as idl
@idl.struct
class MyRequestType:
x: int = 0
y: int = 0
@idl.struct
class MyReplyType:
result: int = 0
The request_type
or reply_type
arguments can also be instances of
rti.connextdds.DynamicType
or one of the built-in types
(rti.types.builtin).
There also are a number of optional parameters you can specify, including the quality of service.
Requester: sending requests and receiving replies
To send a request, create an instance of the request type and send it:
request = MyRequestType(x=1, y=2)
requester.send_request(request)
After sending a request, the Requester will typically wait for one or more replies:
replies = requester.receive_replies(dds.Duration(seconds=20))
for data, info in replies:
print(data)
The receive_replies
operation is equivalent to wait_for_replies
followed
by take_replies
.
Replier: receiving requests and sending replies
The replier will typically process requests in a loop. The following example receives requests, processes each of them to produce a reply, and sends it.
while True:
requests = replier.receive_requests(dds.Duration(seconds=20))
for data, info in requests:
reply = MyReplyType(result=data.x + data.y)
replier.send_reply(reply, info)
Note how send_reply
expects the SampleInfo
from the request. This will
allow delivering the reply only to the requester that sent the request. Using
the same info
object, multiple replies can be sent for the same request; and
replies can be sent in any order: the data
and info
objects can be saved
for later use.
SimpleReplier
The rti.rpc.SimpleReplier
class is a convenience class that allows
for an easy implementation of the replier application using a callback function.
The following example shows how to create a SimpleReplier that implements the same behavior as the Replier above:
from rti.rpc import SimpleReplier
simple_replier = SimpleReplier(
request_type=MyRequestType,
reply_type=MyReplyType,
participant=participant,
service_name="MyServiceName",
handler=lambda request: MyReplyType(result=request.x + request.y)
)
The handler
argument is a function that receives a single argument of the
type request_type
and must return an instance of the type reply_type
. A
SimpleReplier is only suitable for applications where each request generates
exactly one reply and when the process to generate the reply is fast
(otherwise the handler
function would hold internal Connext threads).
Waiting for a reply to a specific request
A Requester can send multiple requests and then wait for replies to any of them or to a specific one.
When sending requests, save the returned request id:
request_id1 = requester.send_request(MyRequestType(x=1, y=1))
request_id2 = requester.send_request(MyRequestType(x=2, y=2))
Then, when waiting for and taking the replies, you can specify the request id:
if not requester.wait_for_replies(
related_request_id=request_id2,
max_wait=dds.Duration(seconds=20)):
raise Exception("Timeout waiting for replies")
replies = requester.take_replies(related_request_id=request_id2)
for data, info in replies:
print(data) # Prints MyReplyType(result=4)
The first operation, wait_for_replies
, will wait until at least one reply for
request_id2
is received. The second operation, take_replies
, will take
all replies for request_id2
.
Accessing the underlying readers and writers
Requesters and Repliers each contain one DataWriter and one DataReader for the request and reply topics. These can be accessed for advanced use cases.
For example, the Replier’s reader can be used to asynchronously read requests, which allows simplifying the replier loop as follows:
async def process_requests(replier: Replier):
async for data, info in replier.request_datareader.take_async():
reply = MyReplyType(result=data.x + data.y)
replier.send_reply(reply, info)
For more information, see Accessing Underlying DataWriters and DataReaders in the Connext Core Libraries User’s Manual.
Remote Procedure Calls
Warning
This feature is experimental and subject to change in future releases. It is included in this release to gather customer interest and feedback. For this reason, do not deploy any applications using Remote Procedure Calls in production. The Requester and Replier APIs described above can be used in production.
For support, you may contact support@rti.com.
With this pattern, a service interface allows a client to make remote function calls into a service. A service interface can be defined as follows:
import abc
import rti.types as idl
import rti.rpc as rpc
@idl.struct
class Coordinates:
x: int = 0
y: int = 0
@rpc.service
class RobotControl(abc.ABC):
@rpc.operation
async def walk_to(self, destination: Coordinates,
speed: idl.float32) -> Coordinates: ...
@rpc.operation
async def get_speed(self) -> idl.float32: ...
A service interface such as RobotControl
is an abstract base class (ABC)
decorated with the @rpc.service
decorator and with a set of async
abstract
methods decorated with the @rpc.operation
decorator. The names and types of
each operation and its parameters are used to synthesize the DDS topic-types that
allow the client and service to communicate. The parameters and return values of
an operation can be other IDL-based types (see Data Types).
The Python class RobotControl
is derived from this IDL definition:
struct Coordinates {
int64 x;
int64 y;
};
@service
interface RobotControl {
Coordinates walk_to(Coordinates destination, float speed);
float get_speed();
};
Warning
Code generation from IDL interface
or exception
definitions to
Python is not supported in this release. Other features, such as out
parameters or attributes are not supported either.
From the RobotControl
class you can define a client and a service.
The client class is defined simply as:
class RobotControlClient(RobotControl, rpc.ClientBase): ...
The base class rpc.ClientBase
provides the implementation of all the
methods decorated with @rpc.operation
. The RobotControlClient
class
doesn’t have any implementation of its own.
A RobotControlClient
can be instantiated in a DomainParticipant with a
service name:
import rti.connextdds as dds
participant = dds.DomainParticipant(domain_id=0)
client = RobotControlClient(participant, "My RobotControl")
And then used to make remote calls:
result = await client.walk_to(destination=Coordinates(x=10, y=10), speed=50.0)
print(result)
The call to walk_to
will send a request to the server and return a coroutine
that will complete when the reply is received, providing the return value.
On the service side, an implementation of RobotControl
, with the server-side
logic must be implemented. For example, we can define RobotControlExample
as
follows:
class RobotControlExample(RobotControl):
def __init__(self):
self.position = Coordinates(0, 0)
self.speed = 0.0
async def walk_to(self, destination: Coordinates, speed: idl.float32) -> Coordinates:
if self.speed <= 0.0:
return self.position # return previously known position
if speed > 100.0:
self.speed = 100 # maximum speed
else:
self.speed = speed
# sleep for 0 to 10 seconds, depending on the speed
await asyncio.sleep(10.0 - 0.1 * self.speed)
self.speed = 0.0
self.position = destination
return destination
def get_speed(self) -> idl.float32:
return self.speed
The implementation of each method can be asynchronous or synchronous.
To start receiving function calls into RobotControlExample
we need to
create a rpc.Service
with an instance of RobotControlExample
, a
DomainParticipant
and a service name:
participant = dds.DomainParticipant(domain_id=0)
robot_control = RobotControlExample()
service = rpc.Service(robot_control, participant, "My RobotControl")
await service.run() # runs forever
While service
is running it will receive calls from matching clients
(those with the same service name and compatible interfaces and QoS), call The
corresponding method in robot_control
and send the result of the operation
back to the client that called it.
Concurrency model
Both the service and the client provide async
methods and therefore need
to be run in an asyncio
event loop.
All of the client methods derived from the service interface are async
;
for each operation (such as walk_to
) the client uses a Requester
to send
a request and returns a coroutine that completes when the reply from the service
is received, providing the return value.
The server uses the asyncio
event loop and a Replier
to run the
following tasks when run()
is called:
A read task that receives requests using the
Replier
.One or more process tasks that call the methods in the service implementation and send the result back using the
Replier
.
If a method is asynchronous (for example RobotControlExample.walk_to
in the
example above), the process task will await for the method to complete.
If the method is not asynchronous (RobotControlExample.get_speed
), it should
return a value immediately to not block the event loop.
When all the process tasks are busy, the read task will stop receiving
requests. The number of process tasks can be configured with the
task_count
Service parameter:
server = rpc.Service(robot_control, participant, "My RobotControl",
task_count=10)
The default value is 4. Note that with a value of 1, the server will be forced to process requests sequentially in the order they are received.
The service’s run()
method runs until it is cancelled. For example:
service = rpc.Service(robot_control, participant, "My RobotControl")
await service.run() # runs forever
# or
service = rpc.Service(robot_control, participant, "My RobotControl")
service_task = asyncio.create_task(service.run())
await asyncio.sleep(10.0) # run for 10 seconds
service_task.cancel()
await service_task
Note that asyncio
event loops are single-threaded. If your service implementation
needs to run an operation in a separate thread, you can use the
asyncio.run_in_executor
function. For example, the following implementation
of walk_to
runs the synchronous function move_robot_impl
in a separate
thread:
class RobotControlExample(RobotControl):
async def walk_to(self, destination: Coordinates, speed: idl.float32) -> Coordinates:
# ...
self.speed = speed
# run move_robot_impl in a separate thread
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, move_robot_impl, destination, speed)
self.speed = 0.0
self.position = destination
return destination
If this concurrency model doesn’t fit your needs, you can use the Requester
and Replier
classes directly, which provide more flexibility.
Exceptions
Operations in a service interface may throw exceptions. Exceptions are
@idl.struct
classes that inherit from Exception
. Operations need to
declare all the exceptions that they may throw. When a service implementation
throws an exception, the rpc.Service
will catch it and send it to the client,
which will rethrow it.
For example, to have the walk_to
operation throw a TooFast
exception
when the speed is too high, we can define the exception as follows:
@idl.struct
class TooFast(Exception):
speed: idl.float32
And declare it in the walk_to
operation:
@rpc.service
class RobotControl:
@rpc.operation(raises=[TooFast])
async def walk_to(self, destination: Coordinates, speed: idl.float32) -> Coordinates: ...
We can now modify ExampleRobotControl
to throw TooFast
:
class RobotControlExample(RobotControl):
async def walk_to(self, destination: Coordinates, speed: idl.float32) -> Coordinates:
if speed > 100.0:
raise TooFast(speed)
# ...
The client can now catch the exception:
try:
await client.walk_to(destination, speed)
except TooFast as e:
print(f"Too fast: {e.speed}")
Exceptions that are not declared in the raises
parameter of an operation
will be propagated and raised in the client as rpc.RemoteUnknownExceptionError
.