.. py:currentmodule:: rti.rpc Request-Reply and Remote Procedure Calls ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ As applications become more complex, it often becomes necessary to use other communication patterns in addition to publish-subscribe. Sometimes an application needs to get a one-time snapshot of information; for example, to make a query into a database or retrieve configuration parameters that never change. Other times an application needs to ask a remote application to perform an action on its behalf; for example, to invoke a remote procedure call or a service. The request-reply pattern has two roles: the :class:`rti.rpc.Requester` (service consumer or client) sends a request message and waits for a reply message. The :class:`rti.rpc.Replier` (service provider) receives the request message and responds with a reply message. The Remote Procedure Call pattern is a specialization of the Request-Reply one. In this pattern a client and a server communicate using a functional interface. The client application calls functions on the interface, and the server application implements the functions. The Connext Core Libraries User's Manual provides more information about these communication patterns in `Alternative Communication Models `_. A full request-reply example application is available in the `Connext Examples repository `_. Requesters and Repliers ----------------------- To create a *Requester* or a *Replier*, you need the following: * A *DomainParticipant* (see :ref:`participant:DomainParticipant`) * The *request type* and the *reply type* (see :ref:`types:Data Types`) * A *service name*, or alternatively, you can specify the *request topic name* and the *reply topic name*. A *Requester* and a *Replier* will communicate when they join the same *domain id* and use the same *service name* (or same topic names), compatible *request* and *reply* types, and compatible *QoS*. The following code creates a :class:`rti.rpc.Requester`: .. code-block:: python import rti.connextdds as dds from rti.rpc import Requester participant = dds.DomainParticipant(domain_id=0) requester = Requester( request_type=MyRequestType, reply_type=MyReplyType, participant=participant, service_name="MyServiceName" ) The same or a different application can create a :class:`rti.rpc.Replier` as follows: .. code-block:: python from rti.rpc import Replier replier = Replier( request_type=MyRequestType, reply_type=MyReplyType, participant=participant, service_name="MyServiceName" ) For this example ``MyRequestType`` and ``MyReplyType`` are defined as follows: .. code-block:: python import rti.types as idl @idl.struct class MyRequestType: x: int = 0 y: int = 0 @idl.struct class MyReplyType: result: int = 0 The ``request_type`` or ``reply_type`` arguments can also be instances of :class:`rti.connextdds.DynamicType` or one of the built-in types (:ref:`rti.types:rti.types.builtin`). There also are a number of optional parameters you can specify, including the quality of service. Requester: sending requests and receiving replies ================================================= To send a request, create an instance of the request type and send it: .. code-block:: python request = MyRequestType(x=1, y=2) requester.send_request(request) After sending a request, the Requester will typically wait for one or more replies: .. code-block:: python replies = requester.receive_replies(dds.Duration(seconds=20)) for data, info in replies: print(data) The ``receive_replies`` operation is equivalent to ``wait_for_replies`` followed by ``take_replies``. Replier: receiving requests and sending replies =============================================== The replier will typically process requests in a loop. The following example receives requests, processes each of them to produce a reply, and sends it. .. code-block:: python while True: requests = replier.receive_requests(dds.Duration(seconds=20)) for data, info in requests: reply = MyReplyType(result=data.x + data.y) replier.send_reply(reply, info) Note how ``send_reply`` expects the ``SampleInfo`` from the request. This will allow delivering the reply only to the requester that sent the request. Using the same ``info`` object, multiple replies can be sent for the same request; and replies can be sent in any order: the ``data`` and ``info`` objects can be saved for later use. SimpleReplier ============= The :class:`rti.rpc.SimpleReplier` class is a convenience class that allows for an easy implementation of the replier application using a callback function. The following example shows how to create a SimpleReplier that implements the same behavior as the Replier above: .. code-block:: python from rti.rpc import SimpleReplier simple_replier = SimpleReplier( request_type=MyRequestType, reply_type=MyReplyType, participant=participant, service_name="MyServiceName", handler=lambda request: MyReplyType(result=request.x + request.y) ) The ``handler`` argument is a function that receives a single argument of the type ``request_type`` and must return an instance of the type ``reply_type``. A SimpleReplier is only suitable for applications where each request generates exactly one reply and when the process to generate the reply is fast (otherwise the ``handler`` function would hold internal Connext threads). Waiting for a reply to a specific request ========================================= A *Requester* can send multiple requests and then wait for replies to any of them or to a specific one. When sending requests, save the returned request id: .. code-block:: python request_id1 = requester.send_request(MyRequestType(x=1, y=1)) request_id2 = requester.send_request(MyRequestType(x=2, y=2)) Then, when waiting for and taking the replies, you can specify the request id: .. code-block:: python if not requester.wait_for_replies( related_request_id=request_id2, max_wait=dds.Duration(seconds=20)): raise Exception("Timeout waiting for replies") replies = requester.take_replies(related_request_id=request_id2) for data, info in replies: print(data) # Prints MyReplyType(result=4) The first operation, ``wait_for_replies``, will wait until at least one reply for ``request_id2`` is received. The second operation, ``take_replies``, will take all replies for ``request_id2``. Accessing the underlying readers and writers ============================================ *Requesters* and *Repliers* each contain one *DataWriter* and one *DataReader* for the request and reply topics. These can be accessed for advanced use cases. For example, the *Replier's* reader can be used to asynchronously read requests, which allows simplifying the replier loop as follows: .. code-block:: python async def process_requests(replier: Replier): async for data, info in replier.request_datareader.take_async(): reply = MyReplyType(result=data.x + data.y) replier.send_reply(reply, info) For more information, see `Accessing Underlying DataWriters and DataReaders `_ in the Connext Core Libraries User's Manual. Remote Procedure Calls ---------------------- .. warning:: This feature is experimental and subject to change in future releases. It is included in this release to gather customer interest and feedback. For this reason, do not deploy any applications using Remote Procedure Calls in production. The Requester and Replier APIs described above can be used in production. For support, you may contact support@rti.com. With this pattern, a service interface allows a client to make remote function calls into a service. A service interface can be defined as follows: .. code-block:: python import abc import rti.types as idl import rti.rpc as rpc @idl.struct class Coordinates: x: int = 0 y: int = 0 @rpc.service class RobotControl(abc.ABC): @rpc.operation async def walk_to(self, destination: Coordinates, speed: idl.float32) -> Coordinates: ... @rpc.operation async def get_speed(self) -> idl.float32: ... A service interface such as ``RobotControl`` is an abstract base class (ABC) decorated with the ``@rpc.service`` decorator and with a set of ``async`` abstract methods decorated with the ``@rpc.operation`` decorator. The names and types of each operation and its parameters are used to synthesize the DDS topic-types that allow the client and service to communicate. The parameters and return values of an operation can be other IDL-based types (see :ref:`types:Data Types`). The Python class ``RobotControl`` is derived from this IDL definition: .. code-block:: idl struct Coordinates { int64 x; int64 y; }; @service interface RobotControl { Coordinates walk_to(Coordinates destination, float speed); float get_speed(); }; .. warning:: Code generation from IDL ``interface`` or ``exception`` definitions to Python is not supported in this release. Other features, such as out parameters or attributes are not supported either. From the ``RobotControl`` class you can define a client and a service. The client class is defined simply as: .. code-block:: python class RobotControlClient(RobotControl, rpc.ClientBase): ... The base class ``rpc.ClientBase`` provides the implementation of all the methods decorated with ``@rpc.operation``. The ``RobotControlClient`` class doesn't have any implementation of its own. A ``RobotControlClient`` can be instantiated in a DomainParticipant with a service name: .. code-block:: python import rti.connextdds as dds participant = dds.DomainParticipant(domain_id=0) client = RobotControlClient(participant, "My RobotControl") And then used to make remote calls: .. code-block:: python result = await client.walk_to(destination=Coordinates(x=10, y=10), speed=50.0) print(result) The call to ``walk_to`` will send a request to the server and return a coroutine that will complete when the reply is received, providing the return value. On the service side, an implementation of ``RobotControl``, with the server-side logic must be implemented. For example, we can define ``RobotControlExample`` as follows: .. code-block:: python class RobotControlExample(RobotControl): def __init__(self): self.position = Coordinates(0, 0) self.speed = 0.0 async def walk_to(self, destination: Coordinates, speed: idl.float32) -> Coordinates: if self.speed <= 0.0: return self.position # return previously known position if speed > 100.0: self.speed = 100 # maximum speed else: self.speed = speed # sleep for 0 to 10 seconds, depending on the speed await asyncio.sleep(10.0 - 0.1 * self.speed) self.speed = 0.0 self.position = destination return destination def get_speed(self) -> idl.float32: return self.speed The implementation of each method can be asynchronous or synchronous. To start receiving function calls into ``RobotControlExample`` we need to create a ``rpc.Service`` with an instance of ``RobotControlExample``, a ``DomainParticipant`` and a service name: .. code-block:: python participant = dds.DomainParticipant(domain_id=0) robot_control = RobotControlExample() service = rpc.Service(robot_control, participant, "My RobotControl") await service.run() # runs forever While ``service`` is running it will receive calls from matching clients (those with the same service name and compatible interfaces and QoS), call The corresponding method in ``robot_control`` and send the result of the operation back to the client that called it. Concurrency model ================= Both the service and the client provide ``async`` methods and therefore need to be run in an ``asyncio`` event loop. All of the client methods derived from the service interface are ``async``; for each operation (such as ``walk_to``) the client uses a ``Requester`` to send a request and returns a coroutine that completes when the reply from the service is received, providing the return value. The server uses the ``asyncio`` event loop and a ``Replier`` to run the following tasks when ``run()`` is called: * A *read* task that receives requests using the ``Replier``. * One or more *process* tasks that call the methods in the service implementation and send the result back using the ``Replier``. If a method is asynchronous (for example ``RobotControlExample.walk_to`` in the example above), the *process* task will await for the method to complete. If the method is not asynchronous (``RobotControlExample.get_speed``), it should return a value immediately to not block the event loop. When all the *process* tasks are busy, the *read* task will stop receiving requests. The number of *process* tasks can be configured with the ``task_count`` Service parameter: .. code-block:: python server = rpc.Service(robot_control, participant, "My RobotControl", task_count=10) The default value is 4. Note that with a value of 1, the server will be forced to process requests sequentially in the order they are received. The service's ``run()`` method runs until it is cancelled. For example: .. code-block:: python service = rpc.Service(robot_control, participant, "My RobotControl") await service.run() # runs forever # or service = rpc.Service(robot_control, participant, "My RobotControl") service_task = asyncio.create_task(service.run()) await asyncio.sleep(10.0) # run for 10 seconds service_task.cancel() await service_task Note that ``asyncio`` event loops are single-threaded. If your service implementation needs to run an operation in a separate thread, you can use the ``asyncio.run_in_executor`` function. For example, the following implementation of ``walk_to`` runs the synchronous function ``move_robot_impl`` in a separate thread: .. code-block:: python class RobotControlExample(RobotControl): async def walk_to(self, destination: Coordinates, speed: idl.float32) -> Coordinates: # ... self.speed = speed # run move_robot_impl in a separate thread loop = asyncio.get_running_loop() await loop.run_in_executor(None, move_robot_impl, destination, speed) self.speed = 0.0 self.position = destination return destination If this concurrency model doesn't fit your needs, you can use the ``Requester`` and ``Replier`` classes directly, which provide more flexibility. Exceptions ========== Operations in a service interface may throw exceptions. Exceptions are ``@idl.struct`` classes that inherit from ``Exception``. Operations need to declare all the exceptions that they may throw. When a service implementation throws an exception, the ``rpc.Service`` will catch it and send it to the client, which will rethrow it. For example, to have the ``walk_to`` operation throw a ``TooFast`` exception when the speed is too high, we can define the exception as follows: .. code-block:: python @idl.struct class TooFast(Exception): speed: idl.float32 And declare it in the ``walk_to`` operation: .. code-block:: python @rpc.service class RobotControl: @rpc.operation(raises=[TooFast]) async def walk_to(self, destination: Coordinates, speed: idl.float32) -> Coordinates: ... We can now modify ``ExampleRobotControl`` to throw ``TooFast``: .. code-block:: python class RobotControlExample(RobotControl): async def walk_to(self, destination: Coordinates, speed: idl.float32) -> Coordinates: if speed > 100.0: raise TooFast(speed) # ... The client can now catch the exception: .. code-block:: python try: await client.walk_to(destination, speed) except TooFast as e: print(f"Too fast: {e.speed}") Exceptions that are not declared in the ``raises`` parameter of an operation will be propagated and raised in the client as ``rpc.RemoteUnknownExceptionError``.