from typing import Any, Callable
from rclpy.node import Node
from rclpy.task import Future
from .executor import error_catcher
[docs]
class CallablePublisher:
def __init__(
self,
node: Node,
topic_type: type,
topic_name: str,
qos: int = 10,
*args,
**kwargs
):
self.__type = topic_type
self.pub = node.create_publisher(topic_type, topic_name, 10, *args, **kwargs)
def __call__(self, msg) -> None:
assert isinstance(msg, self.__type)
self.pub.publish(msg)
[docs]
def link_startup_action(node: Node, startup_callback: Callable, argument: Any) -> Future:
"""Creates a callback to be execute on the node's startup given arguments.
Args:
node: spinning node
lvl1: lvl1 core
"""
future = Future()
@error_catcher
def execute():
if future.done():
return
startup_callback(argument)
future.set_result(True)
timer = node.create_timer(1, execute)
future.add_done_callback(lambda fut: node.destroy_timer(timer))
return future