Source code for motion_stack.api.ros2.state_to_topic

"""Provides StatesToTopics, to be injected in a Node.
see the class docstring for details
"""

from typing import Callable, Dict, Final, Iterable, List, Optional, Union

from rclpy.node import Node, Publisher
from sensor_msgs.msg import JointState
from std_msgs.msg import Float64

from motion_stack.core.utils.robot_parsing import replace_incompatible_char_ros2
from motion_stack.ros2.base_node.lvl1 import Lvl1Node
from motion_stack.ros2.utils.joint_state import JState, ros2js

JSAtrr = str
JointName = str


[docs] def default_joint_to_topic_name(attribute: str, joint_name: str) -> str: """Return the topic name associated with an attribute and joint. Note: This is the default implementation. You might want to make your own. Args: attribute: position, velocity or effort joint_name: name of the joint Returns: name of the associated topic """ topic_name = f"driver/{joint_name}/{attribute}" return topic_name
[docs] class StatesToTopic: """Publishes joint states onto individual topics. Features: - Publishes a list of JState or a JointStates onto individual Float64 topics - Provide joint_to_topic_name with the the naming convention you need - Lazily creates the topics as they are published - topics will not be created at startup, but the first time they are used - publish a state with np.nan instead of None to force the creation. Args: ros_node: ROS2 node joint_to_topic_name: Function, Args: [`attribute`, `joint_name`] Return: [`topic_name`]. default function: :py:func:`.state_to_topic.default_joint_to_topic_name` """ def __init__( self, ros_node: Node, joint_to_topic_name: Callable[[str, str], str] = default_joint_to_topic_name, ) -> None: self._node = ros_node JS_ATTRIBUTES: Final[set[str]] = set(JState.__annotations__.keys()) - { "name", "time", } self.attributes: Iterable[str] = set(JS_ATTRIBUTES) self.make_topic_name = joint_to_topic_name self._pub2_dict: Dict[JSAtrr, Dict[JointName, Publisher]] = {} self._create_publisher = self._node.create_publisher
[docs] @classmethod def setup_lvl0_command( cls, lvl1_ros_node: Lvl1Node, joint_to_topic_name: Callable[[str, str], str] = default_joint_to_topic_name, ) -> "StatesToTopic": """All joints will have their own individual float topic. Applies :py:class:`.state_to_topic.StatesToTopic` to outgoing motor commands of lvl1. Args: lvl1_ros_node: ROS2 node running lvl1 joint_to_topic_name: Function returning the topic name associated with an attribute and joint. """ s2t = cls(lvl1_ros_node, joint_to_topic_name) lvl1_ros_node.core.send_to_lvl0_callbacks.append(s2t.publish) return s2t
def _make_topic_name(self, attribute: str, joint_name: str) -> str: # attribute = replace_incompatible_char_ros2(attribute) # joint_name = replace_incompatible_char_ros2(joint_name) raw = self.make_topic_name(attribute, joint_name) sanitized = replace_incompatible_char_ros2(raw) if raw != sanitized: self._node.get_logger().warn( f"Invalid ros2 topic name. Renamed: '{raw}' -> '{sanitized}'" ) return sanitized def _get_create_motor_pub(self, attr: str, name: str) -> Publisher: """Return the publisher correspondong to the joint and attribute (pos, vel..). If does not exists, creates it Args: attr: position, velocity or effort name: joint name Returns: pub on topic "make_topic_name(attr, name)" """ pudic: Optional[Dict[str, Publisher]] = self._pub2_dict.get(attr) if pudic is None: pudic = {} self._pub2_dict[attr] = pudic pub = pudic.get(name) if pub is None: pub = self._create_publisher( Float64, self._make_topic_name(attr, name), 10, ) self._pub2_dict[attr][name] = pub return pub def _pub_attribute(self, attr: str, state: JState): """publishes the given attribute of the state""" value = getattr(state, attr, None) if value is None: # if no data, does not publish return pub = self._get_create_motor_pub(attr, state.name) pub.publish(Float64(data=float(value)))
[docs] def publish(self, states: Union[Iterable[JState], JointState]): """publishes a list of JState over float topics (lazily created).""" if isinstance(states, JointState): states = ros2js(states) for state in states: if state.name is None: continue for attr in self.attributes: self._pub_attribute(attr, state)