Source code for motion_stack.ros2.ros2_asyncio.ros2_asyncio

from rclpy.task import Future
import time
from rclpy.node import Node
from rclpy.task import Future, Task
import rclpy
from typing import Union
from collections.abc import Awaitable
import asyncio
import inspect


[docs] def ensure_future(node, coro_or_future: Union[Future, Awaitable]) -> Task: """Wrap a coroutine or an awaitable in a Ros2 future.""" # If the argument is a Future, it is returned directly. if isinstance(coro_or_future, Future): return coro_or_future if not asyncio.iscoroutine(coro_or_future): if inspect.isawaitable(coro_or_future): async def _wrap_awaitable(awaitable): return await awaitable coro_or_future = _wrap_awaitable(coro_or_future) else: raise TypeError('An rclpy.Future, a coroutine or an awaitable ' 'is required') try: assert node and node.executor # create a task from the coroutine and return the Future object return node.executor.create_task(coro_or_future) except RuntimeError: raise
[docs] async def sleep(node: Node, timeout_sec: float): """Ros2 compatible coroutine sleep function. It works similar to asyncio.sleep() but it is compatible with the Ros2 event loop. @see https://www.notion.so/aescape/Asynchronous-programming-with-ROS-2-Python-Api-f26cb40c0f634528816abea0d66d968b """ my_future = Future(executor=node.executor) def _on_done(): my_future.set_result(True) timer = node.create_timer(timeout_sec, callback=_on_done, callback_group=rclpy.callback_groups.ReentrantCallbackGroup()) try: await my_future finally: # Important! Clean up the timer properly node.destroy_timer(timer)
[docs] async def wait_for(node: Node, coro_or_future: Union[Future, Awaitable], timeout_sec: float): """Ros2 compatible coroutine to wait for a future with timeout. It works similar to asyncio.wait_for() but it is compatible with the Ros2 event loop. @see https://www.notion.so/aescape/Asynchronous-programming-with-ROS-2-Python-Api-f26cb40c0f634528816abea0d66d968b """ start_time = time.time() coro_or_future = ensure_future(node, coro_or_future) if not coro_or_future.done(): wait_future = Future() def _on_finished(f=None): wait_future.set_result(True) # create a timer to check in timeout_sec if the future is done timer = node.create_timer(timeout_sec, callback=_on_finished, callback_group=rclpy.callback_groups.ReentrantCallbackGroup()) try: # register a callback to stop early if the future finished before the timeout is reached coro_or_future.add_done_callback(_on_finished) # yield control to the Ros2 event loop. await wait_future if not coro_or_future.done() and (time.time() - start_time > timeout_sec): raise TimeoutError( "Timeout while waiting for future to complete") if coro_or_future.exception() is not None: raise coro_or_future.exception() finally: node.destroy_timer(timer) return coro_or_future.result()
[docs] def gather(node: Node, *coros_or_futures, return_exceptions=False): """Return a future aggregating results from the given coroutines/futures. Based on Python's asyncio/tasks.py """ # check for empty input and behave similar to asyncio.gather if not coros_or_futures: outer = Future(executor=node.executor) outer.set_result([]) return outer def _done_callback(fut): nonlocal nfinished nfinished += 1 if nfinished == nfuts: # All futures are done; create a list of results # and set it to the 'outer' future. results = [] for fut in children: res = fut.exception() if res is None: res = fut.result() results.append(res) outer.set_result(results) arg_to_fut = {} children = [] nfuts = 0 nfinished = 0 outer = None for arg in coros_or_futures: if arg not in arg_to_fut: # Wrap the coroutine to intercept exceptions. # Unfortunately, the rclpy executor immediately raises an exeption if it encounters one. # In order to "silently" handle them, we need to catch them async def _intercept_exception(awaitable): try: return await awaitable except Exception as e: if not return_exceptions: # Pass on the exception outer.set_exception(e) else: # Treat the exception as an ordinary result return e if isinstance(arg, Future) and return_exceptions: raise ValueError( "Cannot return exceptions if the input is a Future object") fut = ensure_future(node, _intercept_exception(arg)) nfuts += 1 arg_to_fut[arg] = fut fut.add_done_callback(_done_callback) else: # There's a duplicate Future object in coros_or_futures. fut = arg_to_fut[arg] children.append(fut) outer = Future(executor=node.executor) return outer