Source code for motion_stack.ros2.utils.linking

from typing import Any, Callable

from rclpy.node import Node
from rclpy.task import Future

from .executor import error_catcher


[docs] class CallablePublisher: def __init__( self, node: Node, topic_type: type, topic_name: str, qos: int = 10, *args, **kwargs ): self.__type = topic_type self.pub = node.create_publisher(topic_type, topic_name, 10, *args, **kwargs) def __call__(self, msg) -> None: assert isinstance(msg, self.__type) self.pub.publish(msg)