Source code for motion_stack.ros2.base_node.lvl2

"""Template for a ROS2 node running the python core of lvl2."""

from abc import ABC, abstractmethod
from typing import Any, Callable, List

import rclpy
from rclpy.node import Node
from rclpy.task import Future

from motion_stack.core.lvl2_ik import IKCore
from motion_stack.core.utils.pose import Pose

from ...core.utils.joint_state import JState
from ..utils.executor import Ros2Spinner, error_catcher, my_main
from ..utils.linking import link_startup_action


[docs] class Lvl2Node(Node, ABC): """Abstract base class for ros2, to be completed by the user. To see the default behavior implemented using this template, refer to :py:class:`.ros2.default_node.lvl1.DefaultLvl2`. """ _spinner: Ros2Spinner core_class = IKCore #: Class from which the core is instantiated. Overwrite this with a modified core to change behavior not related to ROS2. core: core_class #: Instance of the python core. def __init__(self): super().__init__("lvl2") self._spinner = Ros2Spinner(self) self.core = self.core_class(self._spinner) # self._spinner.wait_for_lower_level() self._link_publishers() self._link_subscribers() self._link_sensor_check() self._on_startup()
[docs] @abstractmethod def subscribe_to_lvl1(self, lvl1_input: Callable[[List[JState]], Any]): r"""Starts transmitting incomming **state data** to the python core. \ ``lvl1_input`` is a function that must be called when new state data is available. The data type must be a list of JState. Tipical steps: - Subscribe to a topic. - In the subscription callback: - Convert the incomming messages to a List[JState]. - Call ``lvl1_input`` using your processed messages. .. Important:: This function is called **once** at startup to setup some kind of continuous process. This continuous process must then call ``lvl0_input``. .. Note:: \ ``lvl1_input`` is typically :py:meth:`.IKCore.state_from_lvl1` Args: lvl1_input: Interface function of the ik core, to call (in a callback) when new state data is available. """ ...
[docs] @abstractmethod def subscribe_to_lvl3(self, lvl3_input: Callable[[Pose], Any]): r"""Starts transmitting incomming **ik targets** to the python core. \ ``lvl3_input`` is a function that must be called when new ik target is available. The data type must be a Pose. Tipical steps: - Subscribe to a topic. - In the subscription callback: - Convert the incomming messages to a Pose. - Call ``lvl3_input`` with the processed messages. .. Important:: This function is called **once** at startup to setup some kind of continuous process. This continuous process must then call ``lvl3_input``. .. Note:: \ ``lvl3_input`` is typically :py:meth:`.IKCore.ik_target` Args: lvl3_input: Interface function of the joint core, to call (in a callback) when new ik target is available. """ ...
def _link_subscribers(self): self.subscribe_to_lvl1(lvl1_input=error_catcher(self.core.state_from_lvl1)) self.subscribe_to_lvl3(lvl3_input=error_catcher(self.core.ik_target))
[docs] @abstractmethod def publish_to_lvl1(self, states: List[JState]): r"""This method is called every time some **joint targets** need to be sent to lvl1. ``states`` should be processed then sent onto the next step (published by ROS2). Tipical steps: - Make a publisher in the __init__. - Process ``state`` into a message. - call publisher.publish with your message .. Note:: \ This method will typically be called by :py:meth:`.IKCore.send_to_lvl1` Args: states: Joint states to be sent. """ ...
[docs] @abstractmethod def publish_to_lvl3(self, pose: Pose): r"""This method is called every time some **end effector pose** needs to be sent to lvl3. ``pose`` should be processed then sent onto the next step (published by ROS2). Tipical steps: - Make a publisher in the __init__. - Process ``pose`` into a message. - call publisher.publish with your message .. Note:: \ This method will typically be called by :py:meth:`.IKCore.send_to_lvl3` Args: states: Joint states to be sent. """ ...
def _link_publishers(self): self.core.send_to_lvl1_callbacks.append(self.publish_to_lvl1) self.core.send_to_lvl3_callbacks.append(self.publish_to_lvl3) def _link_sensor_check(self): fut = Future() @error_catcher def execute(): job_done = self.core.verbose_check() if job_done: fut.set_result(True) tmr = self.create_timer(1 / 10, execute) fut.add_done_callback(lambda *_: self.destroy_timer(tmr))
[docs] @abstractmethod def startup_action(self, lvl2: IKCore): """This will be executed *once* during the first ros spin of the node. You can keep this empty, but typically: - a message with only joint names and no data is sent to initialise lvl0 (if using Rviz this step is pretty much necessary). - "alive" services are started to signal that the node is ready. """ ...
def _on_startup(self): link_startup_action(self, self.startup_action, self.core)
[docs] @classmethod def spin(cls): """Spins the node""" my_main(cls, multi_threaded=False)