Source code for motion_stack.high_level_louis

"""This gives example of a high level node using the motion stack API

Warning:
    To make this example as easy as possible, async/await is heavily used. 
    This is unusual, you do not need and even, should not use async/await with Ros2. 
    The motion stack uses generic Future and callback, async/await style 
    is not required for the motion stack.

    In this example every time ``await``, is used (on a ros2 Future, or python awaitable), 
    the code pauses until the awaitable finishes, however it does not block the ros2 executor. 
    Basically, this ``await`` sleeps/waits without blocking ros2 operations 
    (incomming/outgoing messages).

    async/await is easier to read, however much more reliable and performant code is 
    possible using ros2 future+callback and especially timers.

"""

from typing import Coroutine

import numpy as np
from rclpy.node import Node

pass
import motion_stack.ros2.ros2_asyncio.ros2_asyncio as rao
from motion_stack.api.ik_syncer import XyzQuat
from motion_stack.api.ros2.ik_api import IkHandler, IkSyncerRos
from motion_stack.api.ros2.joint_api import JointHandler, JointSyncerRos
from motion_stack.core.utils.math import patch_numpy_display_light, qt
from motion_stack.core.utils.pose import Pose
from motion_stack.ros2.utils.conversion import ros_now
from motion_stack.ros2.utils.executor import error_catcher, my_main


# lighter numpy display
patch_numpy_display_light()


[docs] class TutoNode(Node): #: list of limbs number that are controlled LIMBS = [3, 201] def __init__(self) -> None: super().__init__("test_node") print("initing TutoNode") self.create_timer(1 / 30, self.exec_loop) # regular execution self.startTMR = self.create_timer(0.1, self.startup) # executed once # API objects: # Handles ros2 joints lvl1 (subscribers, publishers and more) self.joint_handlers = [JointHandler(self, l) for l in self.LIMBS] # Syncronises several joints self.joint_syncer = JointSyncerRos(self.joint_handlers) # Handles ros2 ik lvl2 self.ik_handlers = [IkHandler(self, l) for l in self.LIMBS] # Syncronises several IK self.ik_syncer = IkSyncerRos( self.ik_handlers, interpolation_delta=XyzQuat(20, np.inf), on_target_delta=XyzQuat(2, np.inf), ) self.get_logger().info("init done")
[docs] @error_catcher async def main(self): # wait for all handlers to be ready await self.joints_ready() await self.ik_ready() # send to all angle at 0.0 print("angles to zero starting") await self.angles_to_zero() print("angles to zero done") # send to all angle at pi/3 await self.angles_to_test(3.14 / 3 ) await self.angles_to_zero() # send end effectors to a delta pose using IK delta_pose = Pose( time=ros_now(self), xyz=np.array([-100, -100, -100], dtype=float), quat=qt.one, ) await self.delta_ee(delta_pose) print("finished")
[docs] async def joints_ready(self): """Returns once all joints are ready""" ready_tasks = [jh.ready for jh in self.joint_handlers] try: print("Waiting for joints.") fused_task = rao.gather(self, *ready_tasks) await rao.wait_for(self, fused_task, timeout_sec=100) print(f"Joints ready.") strlist = "\n".join( [f"limb {jh.limb_number}: {jh.tracked}" for jh in self.joint_handlers] ) print(f"Joints are:\n{strlist}") return except TimeoutError: raise TimeoutError("Joint data unavailable after 100 sec")
[docs] async def ik_ready(self): """Returns once all ik are ready""" ready_tasks = [ih.ready for ih in self.ik_handlers] try: print("Waiting for ik.") fused_task = rao.gather(self, *ready_tasks) await rao.wait_for(self, fused_task, timeout_sec=100) print(f"Ik ready.") strlist = "\n".join( [f"limb {ih.limb_number}: {ih.ee_pose}" for ih in self.ik_handlers] ) print(f"EE poses are:\n{strlist}") return except TimeoutError: raise TimeoutError("Ik data unavailable after 100 sec")
[docs] def angles_to_zero(self) -> Coroutine: """sends all joints to 0.0""" target = {} for jh in self.joint_handlers: target.update({jname: 0.0 for jname in jh.tracked}) task = self.joint_syncer.asap(target) return rao.wait_for(self, task, timeout_sec=100)
[docs] def angles_to_test(self, test_angle) -> Coroutine: """sends all joints to 0.0""" target = {} for jh in self.joint_handlers: target.update({jname: test_angle for jname in jh.tracked}) task = self.joint_syncer.asap(target) return rao.wait_for(self, task, timeout_sec=100)
[docs] def delta_ee(self, delta_pose:Pose) -> Coroutine: # """moves the end effectors by a dela Pose using IK""" ee_poses = [ih.ee_pose for ih in self.ik_handlers] target = { leg_num: pose - delta_pose for leg_num, pose in zip(self.LIMBS, ee_poses) } task = self.ik_syncer.lerp(target) return rao.wait_for(self, task, timeout_sec=100)
[docs] @error_catcher def startup(self): """Execute once at startup""" # Ros2 will executor will handle main() rao.ensure_future(self, self.main()) # destroys timer self.destroy_timer(self.startTMR) print("Startup done.")
[docs] @error_catcher def exec_loop(self): """Regularly executes the syncers""" self.joint_syncer.execute() self.ik_syncer.execute()
[docs] def main(*args): my_main(TutoNode)