Source code for motion_stack.ros2.utils.linking

from typing import Any, Callable, Union

from rclpy.node import Node
from rclpy.qos import QoSProfile
from rclpy.task import Future

from motion_stack.ros2 import communication

from .executor import error_catcher


[docs] class CallablePublisher: def __init__( self, node: Node, topic_type: type, topic_name: str, qos_profile: Union[QoSProfile, int] = communication.DEFAULT_QOS, *args, **kwargs ): self.__type = topic_type self.pub = node.create_publisher( topic_type, topic_name, qos_profile, *args, **kwargs ) def __call__(self, msg) -> None: assert isinstance(msg, self.__type) self.pub.publish(msg)