Source code for motion_stack.ros2.base_node.lvl1

"""Template for a ROS2 node running the python core of lvl1."""

from abc import ABC, abstractmethod
from typing import Any, Callable, List

import rclpy
from rclpy.node import Node

from ...core.lvl1_joint import JointCore
from ...core.utils.joint_state import JState
from ..utils.executor import Ros2Spinner, error_catcher, my_main
from ..utils.linking import link_startup_action


[docs] class Lvl1Node(Node, ABC): """Abstract base class for ros2, to be completed by the user. To see the default behavior implemented using this template, refer to :py:class:`.ros2.default_node.lvl1`. """ _spinner: Ros2Spinner core_class = JointCore #: Class from which the core is instantiated. Overwrite this with a modified core to change behavior not related to ROS2. core: core_class #: Instance of the python core. def __init__(self): super().__init__("lvl1") self._spinner = Ros2Spinner(self) self.core = self.core_class(self._spinner) self._spinner.wait_for_lower_level() self._link_publishers() self._link_subscribers() self._link_timers() self._link_sensor_check() self._on_startup()
[docs] @abstractmethod def subscribe_to_lvl0(self, lvl0_input: Callable[[List[JState]], Any]): r"""Starts transmitting incomming **sensor data** to the python core. \ ``lvl0_input`` is a function that must be called when new sensor data is available. The data type must be a list of JState. Tipical steps: - Subscribe to a topic. - In the subscription callback: - Convert the incomming messages to a List[JState]. - Call ``lvl0_input`` using your processed messages. .. Important:: This function is called **once** at startup to setup some kind of continuous process. This continuous process must then call ``lvl0_input``. .. Note:: \ ``lvl0_input`` is typically :py:meth:`.JointCore.coming_from_lvl0` Args: lvl0_input: Interface function of the joint core, to call (in a callback) when new sensor data is available. """ ...
[docs] @abstractmethod def subscribe_to_lvl2(self, lvl2_input: Callable[[List[JState]], Any]): r"""Starts transmitting incomming **joint targets** to the python core. \ ``lvl2_input`` is a function that must be called when new joint targets (typically resulting from IK) is available. The data type must be a list of JState. Tipical steps: - Subscribe to a topic. - In the subscription callback: - Convert the incomming messages to a List[JState]. - Call ``lvl2_input`` using your processed messages. .. Important:: This function is called **once** at startup to setup some kind of continuous process. This continuous process must then call ``lvl2_input``. .. Note:: \ ``lvl2_input`` is typically :py:meth:`.JointCore.coming_from_lvl2` Args: lvl0_input: Interface function of the joint core, to call (in a callback) when new joint targets are available. """ ...
def _link_subscribers(self): self.subscribe_to_lvl0(lvl0_input=error_catcher(self.core.coming_from_lvl0)) self.subscribe_to_lvl2(lvl2_input=error_catcher(self.core.coming_from_lvl2))
[docs] @abstractmethod def publish_to_lvl0(self, states: List[JState]): r"""This method is called every time some **motor commands** need to be sent to lvl0. ``states`` should be processed then sent onto the next step (published by ROS2). Tipical steps: - Make a publisher in the __init__. - Process ``state`` in a message. - call publisher.publish with your message .. Note:: \ This method will typically be called by :py:meth:`.JointCore.send_to_lvl0` Args: states: Joint states to be sent. """ ...
[docs] @abstractmethod def publish_to_lvl2(self, states: List[JState]): r"""This method is called every time some **joint states** need to be sent to lvl2. ``states`` should be processed then sent onto the next step (published by ROS2). Tipical steps: - Make a publisher in the __init__. - Process ``state`` in a message. - call publisher.publish with your message .. Note:: \ This method will typically be called by :py:meth:`.JointCore.send_to_lvl2` Args: states: Joint states to be sent. """ ...
def _link_publishers(self): self.core.send_to_lvl0_callbacks.append(self.publish_to_lvl0) self.core.send_to_lvl2_callbacks.append(self.publish_to_lvl2)
[docs] @abstractmethod def frequently_send_to_lvl2(self, send_function: Callable[[], None]): r"""Starts executing ``send_function`` regularly. Fresh sensor states must be send regularly to lvl2 (IK) using send_function. When using speed mode, it is also necessary to regularly send speed. Tipical steps: - Make a timer. - Call send_function in the timer. .. Note:: \ ``send_function`` is typically :py:meth:`.JointCore.send_sensor_up` and :py:meth:`.JointCore.send_command_down` Args: send_function: Function sending fresh sensor states to lvl2 """ ...
def _link_timers(self): def execute(): self.core.send_sensor_up() self.core.send_command_down() self.frequently_send_to_lvl2(error_catcher(execute)) def _link_sensor_check(self): fut = rclpy.Future() @error_catcher def execute(): all_done = self.core.sensor_check_verbose() if all_done: fut.set_result(True) tmr = self.create_timer(1 / 10, execute) fut.add_done_callback(lambda *_: self.destroy_timer(tmr))
[docs] @abstractmethod def startup_action(self, core: JointCore): """This will be executed *once* during the first ros spin of the node. You can keep this empty, but typically: - a message with only joint names and no data is sent to initialise lvl0 (if using Rviz this step is pretty much necessary). - "alive" services are started to signal that the node is ready. """ ...
def _on_startup(self): link_startup_action(self, self.startup_action, self.core)
[docs] @classmethod def spin(cls): """Spins the node""" my_main(cls, multi_threaded=False)