Source code for easy_robot_control.leg_node

"""
This node is responsible for recieving targets with trajectories of the
corresponding leg. It will performe smooth movements in the body center frame of reference
or relative to the current end effector position.
This node keeps track of the leg end effector to generate trajectories to the target.

Author: Elian NEPPEL
Lab: SRL, Moonshot team
"""

from typing import Callable, List, Optional, Tuple

import numpy as np
import quaternion as qt
from geometry_msgs.msg import Transform
from motion_stack_msgs.srv import ReturnVect3, TFService
from numpy.typing import NDArray
from rclpy.callback_groups import (
    CallbackGroup,
    MutuallyExclusiveCallbackGroup,
    ReentrantCallbackGroup,
)
from rclpy.node import Service
from rclpy.task import Future
from rclpy.time import Duration
from scipy.spatial import geometric_slerp
from std_msgs.msg import Float64
from std_srvs.srv import Empty

from easy_robot_control.EliaNode import EliaNode, Time, error_catcher, myMain, np2tf, tf2np
from easy_robot_control.ros2_numpy.transformations import quaternion_slerp

# import trajectories as tj


SUCCESS = ""
TARGET_OVERWRITE_DIST = 20
QUAT_OVERWRITE_DIST = 0.05
TARGET_TIMEOUT_BEFORE_OVERWRITE = 0.95  # seconds
WAIT_AFTER_MOTION = 0.1  # seconds
STEPSIZE = 100


[docs] class LegNode(EliaNode): def __init__(self): # rclpy.init() super().__init__(f"leg") # type: ignore self.wait_for_lower_level(["ik_alive"]) self.rollFlip = False # self.guard_test = self.create_guard_condition(self.guar_test_cbk) # V Parameters V # \ / # # \/ # self.WAIT_ANGLE_MES: Duration = Duration(seconds=1) self.WAIT_ANGLE_ABORT: Duration = Duration(seconds=3) self.declare_parameter("leg_number", 0) self.leg_num = ( self.get_parameter("leg_number").get_parameter_value().integer_value ) # if self.leg_num == 0: # self.Yapping = True # else: # self.Yapping = False self.Alias = f"L{self.leg_num}" self.declare_parameter("std_movement_time", 3.0) self.movementTime = ( self.get_parameter("std_movement_time").get_parameter_value().double_value ) self.declare_parameter("mvmt_update_rate", 30.0) self.movementUpdateRate = ( self.get_parameter("mvmt_update_rate").get_parameter_value().double_value ) # self.movementUpdateRate = 2.0 # /\ # # / \ # # ^ Parameters ^ self.lastTarget: Optional[NDArray] = None # self.lastTarget: Optional[NDArray] = np.zeros(3, dtype=float) self.lastQuat = qt.one.copy() self.currentTipXYZ: Optional[NDArray] = None # self.currentTipXYZ = np.zeros(3, dtype=float) self.trajectory_q_xyz: NDArray = np.zeros((0, 3), dtype=float) self.trajectory_q_quat: qt.quaternion = qt.from_vector_part( self.trajectory_q_xyz # zeros ) self.trajectory_q_roll: NDArray = np.zeros(0, dtype=float) # V Callback Groups V # \ / # # \/ # movement_cbk_group = ReentrantCallbackGroup() self.trajectory_safe_cbkgrp: CallbackGroup = MutuallyExclusiveCallbackGroup() # /\ # # / \ # # ^ Callback Groups ^ # V Publishers V # \ / # # \/ # self.ik_pub = self.create_publisher(Transform, f"set_ik_target", 10) self.roll_pub = self.create_publisher(Float64, f"roll", 10) # /\ # # / \ # # ^ Publishers ^ # V Subscribers V # \ / # # \/ # self.tip_pos_sub = self.create_subscription( Transform, f"tip_pos", self.tip_pos_received_cbk, 10, ) self.smart_roll_sub = self.create_subscription( Float64, f"smart_roll", self.smart_roll_cbk, 10, ) # /\ # # / \ # # ^ Subscribers ^ # V Service server V # \ / # # \/ # self.iAmAlive: Optional[Service] = None self.rel_transl_server = self.create_service( TFService, f"rel_transl", self.rel_transl_srv_cbk, callback_group=movement_cbk_group, ) self.rel_hop_server = self.create_service( TFService, f"rel_hop", self.rel_hop_srv_cbk, callback_group=movement_cbk_group, ) self.shift_server = self.create_service( TFService, f"shift", self.shift_cbk, callback_group=movement_cbk_group, ) self.rot_server = self.create_service( TFService, f"rot", self.rot_cbk, callback_group=movement_cbk_group, ) self.point_server = self.create_service(TFService, f"point", self.point_cbk) self.tipos_server = self.create_service( ReturnVect3, f"tip_pos", self.send_most_recent_tip, ) # /\ # # / \ # # ^ Service sever ^ # V Timers V # \ / # # \/ # self.trajectory_timer = self.create_timer( 1 / (self.movementUpdateRate * 1.0), self.trajectory_executor, callback_group=self.trajectory_safe_cbkgrp, ) self.trajectory_timer.cancel() self.overwriteTargetTimer = self.create_timer( TARGET_TIMEOUT_BEFORE_OVERWRITE, self.overwrite_target, callback_group=self.trajectory_safe_cbkgrp, ) self.overwriteTargetTimer.cancel() self.FisrtTS: Optional[Time] = None self.firstSpin = self.create_timer( timer_period_sec=0.1, callback=self.firstSpinCBK, callback_group=MutuallyExclusiveCallbackGroup(), ) self.last_exec = self.getNow() self.call_forward: List[Callable] = [] # /\ # # / \ # # ^ Timers ^
[docs] @error_catcher def firstSpinCBK(self): if self.FisrtTS is None: self.FisrtTS = self.get_clock().now() areUnknownAngle = self.lastTarget is None if areUnknownAngle: sinceLaunch: Duration = self.get_clock().now() - self.FisrtTS if sinceLaunch > self.WAIT_ANGLE_MES: self.get_logger().warn("Waiting for end effector coordinates", once=True) if sinceLaunch > self.WAIT_ANGLE_ABORT: self.get_logger().warn("Waited too long, EE assumed zero", once=True) self.lastTarget = np.zeros(shape=(3,), dtype=float) return # waits for the first EE position before being ready self.iAmAlive = self.create_service(Empty, f"leg_alive", (lambda req, res: res)) self.destroy_timer(self.firstSpin)
[docs] @error_catcher def publish_to_roll(self, roll: Optional[float] = None): """publishes roll value towards ik node Args: roll: """ if roll is None: # roll = np.nan return msg = Float64() msg.data = float(roll) self.roll_pub.publish(msg)
[docs] @error_catcher def smart_roll_cbk(self, msg: Float64): speed = msg.data if self.rollFlip: speed *= -1 self.roll_pub.publish(Float64(data=speed))
[docs] @error_catcher def publish_to_ik( self, xyz: Optional[NDArray] = None, quat: Optional[qt.quaternion] = None ): """publishes target towards ik node Args: target: np.array float(3,) """ if xyz is None: if self.lastTarget is None: self.perror("No known last leg position") xyz = self.lastTarget else: self.lastTarget = xyz.copy() if quat is None: quat = self.lastQuat else: self.lastQuat = quat.copy() msg = np2tf(xyz, quat) self.ik_pub.publish(msg)
[docs] @error_catcher def trajectory_executor(self) -> None: """pops target from trajectory queue and publishes it. This follows and handle the trajectory_timer""" # safe_copy = self.call_forward.copy() # self.call_forward = [] # for f in safe_copy: # f() now = self.getNow() # time_lost = self.trajectory_timer.time_since_last_call() # time_until_next = self.trajectory_timer.time_until_next_call() time_between = now - self.last_exec late = ( self.trajectory_timer.timer_period_ns - time_between.nanoseconds ) / self.trajectory_timer.timer_period_ns self.last_exec = now # self.pwarn(f"lateness: {late*100:.1f}%") xyz, quat = self.pop_xyzq_from_traj() roll = self.pop_roll_from_traj() if xyz is not None or quat is not None or roll is not None: self.publish_to_ik(xyz, quat) # self.publish_to_roll(roll) # self.pinfo(target) else: self.trajectory_finished_cbk()
[docs] @error_catcher def trajectory_finished_cbk(self) -> None: """executes after the last target in the trajectory is processed""" self.trajectory_timer.cancel() return
[docs] def queue_xyz_empty(self) -> bool: return self.trajectory_q_xyz.shape[0] <= 0
[docs] def queue_quat_empty(self) -> bool: return self.trajectory_q_quat.shape[0] <= 0
[docs] def queue_roll_empty(self) -> bool: return self.trajectory_q_roll.shape[0] <= 0
[docs] @error_catcher def pop_xyzq_from_traj( self, index: int = 0 ) -> Tuple[Optional[NDArray], Optional[qt.quaternion]]: """deletes and returns the first value of the trajectory_queue for coordinates and quaternions. Args: index: if you wanna pop not the first but somewhere else Returns: value: np.array float(3,) - popped value """ if self.queue_xyz_empty(): xyz = None else: xyz = self.trajectory_q_xyz[0, :].copy() self.trajectory_q_xyz = np.delete(self.trajectory_q_xyz, index, axis=0) if self.queue_quat_empty(): quat = None else: quat = self.trajectory_q_quat[0].copy() self.trajectory_q_quat = np.delete(self.trajectory_q_quat, index, axis=0) if not (xyz is None and quat is None): # self.pwarn( # f"xyz: {self.trajectory_q_xyz.shape[0]}, " # f"q: {self.trajectory_q_quat.shape[0]}" # ) pass return xyz, quat
[docs] @error_catcher def pop_roll_from_traj(self, index: int = 0) -> Optional[float]: """Deletes and returns the first value of the trajectory_queue for the roll. Args: index: if you wanna pop not the first but somewhere else Returns: value: np.array float(3,) - popped value """ if self.queue_roll_empty(): roll = None else: roll = self.trajectory_q_roll[0].copy() self.trajectory_q_roll = np.delete(self.trajectory_q_roll, index, axis=0) return roll
[docs] def get_final_xyz(self) -> NDArray: if self.queue_xyz_empty(): if self.lastTarget is None: self.perror("Unknown last leg position for trajectory. Assuming origin.") end_point = np.zeros(shape=3, dtype=float) else: end_point = self.lastTarget else: end_point = self.trajectory_q_xyz[-1, :] return end_point.copy()
[docs] def get_final_quat(self) -> qt.quaternion: if self.queue_quat_empty(): if self.lastQuat is None: self.perror("Unknown last leg quat for trajectory. Assuming origin.") end_quat = qt.one.copy() else: end_quat = self.lastQuat else: end_quat = self.trajectory_q_quat[-1] return end_quat.copy()
[docs] def get_final_target(self) -> Tuple[NDArray, qt.quaternion]: """returns the final position of where the leg is. Or will be at the end of the current trajectory_queue. Returns: end_point: np.array float(3,) - final coordinates """ return self.get_final_xyz(), self.get_final_quat()
[docs] @error_catcher def fuse_xyz_trajectory(self, xyz_traj: Optional[NDArray] = None): if xyz_traj is None: return queue_final_xyz = self.get_final_xyz() final_len = max(xyz_traj.shape[0], self.trajectory_q_xyz.shape[0]) fused_traj = np.zeros((final_len, 3), dtype=float) fused_traj[:, :] = queue_final_xyz fused_traj[: self.trajectory_q_xyz.shape[0], :] = self.trajectory_q_xyz fused_traj[: xyz_traj.shape[0] - 1, :] += xyz_traj[:-1, :] - queue_final_xyz fused_traj[xyz_traj.shape[0] - 1 :, :] += xyz_traj[-1, :] - queue_final_xyz self.trajectory_q_xyz = fused_traj
[docs] @error_catcher def fuse_quat_trajectory(self, quat_traj: Optional[qt.quaternion] = None): if quat_traj is None: return queue_final_quat: qt.quaternion = self.get_final_quat() final_len: int = max(quat_traj.shape[0], self.trajectory_q_quat.shape[0]) fused_traj = qt.from_vector_part(np.zeros((final_len, 3), dtype=float)) fused_traj[:] = queue_final_quat fused_traj[: self.trajectory_q_quat.shape[0]] = self.trajectory_q_quat fused_traj[: quat_traj.shape[0] - 1] *= 1 / queue_final_quat * quat_traj[:-1] fused_traj[quat_traj.shape[0] - 1 :] *= 1 / queue_final_quat * quat_traj[-1] self.trajectory_q_quat = fused_traj
[docs] @error_catcher def fuse_roll_trajectory(self, roll_traj: Optional[NDArray] = None): if roll_traj is None: return final_roll_len = max(roll_traj.shape[0], self.trajectory_q_roll.shape[0]) fused_traj_roll = np.zeros((final_roll_len,), dtype=float) fused_traj_roll[: self.trajectory_q_roll.shape[0]] = self.trajectory_q_roll fused_traj_roll[: roll_traj.shape[0]] += roll_traj self.trajectory_q_roll: NDArray = fused_traj_roll
[docs] @error_catcher def add_to_trajectory( self, xyz_traj: Optional[NDArray] = None, quat_traj: Optional[qt.quaternion] = None, ) -> None: """Adds a trajectory RELATIVE TO THE BODY CENTER to the trajectory queue Args: new_traj: np.array float(:, 3) - xyz trajectory RELATIVE TO THE BODY CENTER quat_traj: qt.quaternion shape(:) - quaternion trajectory RELATIVE TO THE BODY CENTER """ self.fuse_xyz_trajectory(xyz_traj) self.fuse_quat_trajectory(quat_traj)
[docs] @error_catcher def send_most_recent_tip( self, request: ReturnVect3.Request, response: ReturnVect3.Response ) -> ReturnVect3.Response: """Publish the last target sent Args: request: ReturnVect3.Request - Nothing response: ReturnVect3.Response - Vector3 (float x3) Returns: ReturnVect3 - Vector3 (float x3) """ if self.lastTarget is None: self.perror("No End Effector positon data. No angle data received yet.") return response response.vector.x = self.lastTarget[0] response.vector.y = self.lastTarget[1] response.vector.z = self.lastTarget[2] return response
[docs] def smoother(self, x: NDArray) -> NDArray: """smoothes the interval [0, 1] to have a soft start and end (derivative is zero) """ # x = (1 - np.cos(x * np.pi)) / 2 # x = (1 - np.cos(x * np.pi)) / 2 return x
[docs] @error_catcher def smoother_complement(self, x: NDArray) -> NDArray: """changes the interval [0, 1] to 0->1->0 and smoothes to have a soft start and end """ x = (1 - np.cos(x * np.pi)) / 2 # x = x * 1.1 - 0.1 z = np.sin(x * np.pi) return z
[docs] @error_catcher def rel_transl( self, xyz: Optional[np.ndarray] = None, quat: Optional[qt.quaternion] = None ) -> int: """performs translation to the target relative to body Args: target: np.array float(3,) - target relative to the body center quat: qt.quaternion - target quaternion relative to body center Returns: target: np.array float(3,) - target relative to the body center """ samples = int(self.movementTime * self.movementUpdateRate) start_target, start_quat = self.get_final_target() t = np.linspace(0 + 1 / samples, 1, num=samples - 0) # x: ]0->1] t = self.smoother(t) trajectory: Optional[NDArray] = None if xyz is not None: t3 = np.tile(t, (3, 1)).transpose() trajectory = xyz * t3 + start_target * (1 - t3) quat_traj: Optional[qt.quaternion] = None if quat is not None: quat_arr = np.empty((len(t), 4), dtype=float) for index, frac in enumerate(t): # TODO, improve that for loop quat_arr[index] = quaternion_slerp( quat0=qt.as_float_array(start_quat), quat1=qt.as_float_array(quat.copy()), fraction=frac, ) quat_traj = qt.as_quat_array(quat_arr) self.add_to_trajectory(trajectory, quat_traj) return samples
@error_catcher def rel_rotation( self, quat: qt.quaternion, center: NDArray = np.array([0, 0, 0], dtype=float), ) -> int: """performs rotation to the target relative to body Args: quaternion: qt.quaternion float(4,) - quaternion to rotate by center: np.array float(3,) - center of rotation Returns: steps until end of mvt """ samples = int(self.movementTime * self.movementUpdateRate) start_target, start_quat = self.get_final_target() x = np.linspace(0 + 1 / samples, 1, num=samples - 0) # x: ]0->1] x = self.smoother(x) quaternion_slerp_for_xyz = geometric_slerp( start=qt.as_float_array(qt.one.copy()), end=qt.as_float_array(quat.copy()), t=x, ) quaternion_slerp_for_xyz = qt.as_quat_array(quaternion_slerp_for_xyz) trajectory = ( qt.rotate_vectors(quaternion_slerp_for_xyz, start_target - center) + center ) quaternion_slerp = geometric_slerp( start=qt.as_float_array(start_quat), end=qt.as_float_array(quat.copy() * start_quat), # end=qt.as_float_array(start_quat * quat.copy()), t=x, ) quaternion_slerp = qt.as_quat_array(quaternion_slerp) self.add_to_trajectory(trajectory, quaternion_slerp) return samples
[docs] @staticmethod def point_with_quat(vect: NDArray): x = vect z_pure = np.array([0, 0, 1], dtype=float) y = np.cross(z_pure, x) # to define y in the horizontal plane z = np.cross(x, y) x_is_z_or_zero = np.isclose(np.linalg.norm(y), 0) if x_is_z_or_zero: return 0 x, y, z = x / np.linalg.norm(x), y / np.linalg.norm(y), z / np.linalg.norm(z) rot_matrix = np.empty((3, 3), dtype=float) rot_matrix[0, :] = x rot_matrix[1, :] = y rot_matrix[2, :] = z rot: qt.quaternion = 1 / qt.from_rotation_matrix(rot_matrix) # IDK why the -1 return rot
[docs] @error_catcher def point_toward(self, vect: NDArray) -> int: rotDirect = self.point_with_quat(vect) rotFlip = self.point_with_quat(-vect) deltaDirect = self.get_final_quat() / rotDirect deltaFlip = self.get_final_quat() / rotFlip if np.linalg.norm( qt.as_float_array(deltaDirect) - qt.as_float_array(qt.one) ) < np.linalg.norm(qt.as_float_array(deltaFlip) - qt.as_float_array(qt.one)): rot = rotDirect self.rollFlip = False else: rot = rotFlip self.rollFlip = True allready_oriented = qt.isclose(self.get_final_quat(), rot, atol=0.01) if allready_oriented: rot = self.get_final_quat() steps_until_done = 0 else: steps_until_done = self.rel_transl(xyz=None, quat=rot) return steps_until_done
[docs] @error_catcher def point_wheel(self, direction: NDArray) -> None: # this rotates the end effector and we get the final quaternion until_rot_done: int = self.point_toward(direction)
# distance = np.linalg.norm(direction) # samples = int(self.movementTime * self.movementUpdateRate) # traj = np.zeros(until_rot_done + samples) # traj[until_rot_done:] = distance / samples # self.fuse_roll_trajectory(traj)
[docs] @error_catcher def shift( self, shift: Optional[NDArray] = None, quat: Optional[qt.quaternion] = None ) -> int: """performs translation to the target relative to current position Args: target: np.array float(3,) - target relative to current position Returns: target: np.array float(3,) - target relative to current position """ finalXYZ, finalQUAT = self.get_final_target() newShift: Optional[NDArray] newQuat: Optional[NDArray] if shift is None: newShift = None else: newShift = finalXYZ + shift if quat is None: newQuat = None else: newQuat = quat * finalQUAT return self.rel_transl(newShift, newQuat)
[docs] @error_catcher def rel_hop(self, target: np.ndarray) -> NDArray: """performs jump to the target relative to body Args: target: np.array float(3,) - target relative to the body center Returns: target: np.array float(3,) - target relative to the body center """ samples = int(self.movementTime * self.movementUpdateRate) start_target, start_quat = self.get_final_target() x = np.linspace(0 + 1 / samples, 1, num=samples - 0) # x: ]0->1] z = self.smoother_complement(x) x = self.smoother(x) x = np.tile(x, (3, 1)).transpose() trajectory = target * x + start_target * (1 - x) trajectory[:, 2] += z * STEPSIZE self.add_to_trajectory(trajectory) return trajectory[-1, :].copy()
[docs] @error_catcher def tip_pos_received_cbk(self, msg: Transform) -> None: """callback when a new end effector position is received Args: msg (Vector3): real end effector position """ self.currentTipXYZ, self.currentTipQuat = tf2np(msg) self.check_divergence() return
[docs] @error_catcher def check_divergence(self) -> None: # TODO: quaternions also """If the real end effector is not on the last target that was sent. Launches the timer to overwrite the target with the real position. If target and end effector correpsond, cancel the timer to overwrite the target. """ if self.currentTipXYZ is None: self.pwarn("Tip positon unknown") return noTargetData = self.lastTarget is None if noTargetData: self.overwrite_target() if ( np.linalg.norm(self.currentTipXYZ - self.lastTarget) > TARGET_OVERWRITE_DIST # or not np.all( # qt.isclose(self.currentTipQuat, self.lastQuat, atol=QUAT_OVERWRITE_DIST) # ) # or np.linalg.norm(qt.as_float_array(self.currentTipQuat - self.lastQuat)) > QUAT_OVERWRITE_DIST ) and self.queue_xyz_empty(): if self.overwriteTargetTimer.is_canceled(): self.overwriteTargetTimer.reset() else: self.overwriteTargetTimer.cancel()
[docs] @error_catcher def overwrite_target(self) -> None: """ overwrites the last sent target with the real position of the end effector. This will happen on startup, and when there is a problem on the real leg (cannot perform the movement) Small divergences are expected (in position and time), hence the timer and the check_divergence function. Setting lastTarget to the currentTip all the time is a bas idea, it create overcorrections. """ if self.currentTipXYZ is None: self.pwarn("Tip positon unknown") return if self.lastTarget is None: self.pinfo( f"Initial EE coord: v{np.round(self.currentTipXYZ)}, q{np.round(qt.as_float_array(self.currentTipQuat), 2)}", force=True, ) else: pass # self.pinfo( # f"EE coord overwriten: v{np.round(self.currentTipXYZ)}, q{np.round(qt.as_float_array(self.currentTipQuat), 2)}", # force=True, # ) self.lastTarget = self.currentTipXYZ self.lastQuat = self.currentTipQuat self.overwriteTargetTimer.cancel()
[docs] def wait_end_of_motion(self) -> None: """waits for the trajectory to end. This function is very bad, but I don't need nor have time to do something better. We should keep track of which trajectory are beeing queued to improve""" # return self.sleep(self.movementTime + 0.0)
[docs] def append_trajectory(self, trajectory_function: Callable) -> Future: """The function will be executed before the next read of the trajectory sequentialy This avoids trajectory_function in multiple threads modifying indentical data simultaneously. Args: trajectory_function - function: function to execute Returns: future: holds the future results of the function if needed """ future = Future() def fun_with_future(): result = trajectory_function() if self.trajectory_timer.is_canceled(): self.trajectory_timer.reset() # this starts execution immediately, # not waiting for the first timer callback self.execute_in_cbk_group( self.trajectory_executor, self.trajectory_safe_cbkgrp ) future.set_result(result) return result self.execute_in_cbk_group(fun_with_future, self.trajectory_safe_cbkgrp) # self.trajectory_update_queue.append(fun_with_future) # self.call_forward.append(trajectory_function) # if self.trajectory_timer.is_canceled(): # self.trajectory_timer.reset() return future
[docs] @error_catcher def shift_cbk( self, request: TFService.Request, response: TFService.Response ) -> TFService.Response: """Callback for leg shift motion Args: request: target relative to current end effector position response: Returns: success = True all the time """ target: Optional[NDArray] quat: Optional[qt.quaternion] target, quat = tf2np(request.tf) if np.any(np.isnan(target)): target = None if np.any(np.isnan(qt.as_float_array(quat))): quat = None fun = lambda: self.shift(target, quat) self.append_trajectory(fun) self.wait_end_of_motion() response.success_str.data = SUCCESS return response
[docs] @error_catcher def rel_transl_srv_cbk( self, request: TFService.Request, response: TFService.Response ) -> TFService.Response: """Callback for leg translation motion Args: request: target relative to body response: Returns: success = True all the time """ target: Optional[NDArray] quat: Optional[qt.quaternion] target, quat = tf2np(request.tf) if np.any(np.isnan(target)): target = None if np.any(np.isnan(qt.as_float_array(quat))): quat = None fun = lambda: self.rel_transl(target, quat) self.append_trajectory(fun) self.wait_end_of_motion() response.success_str.data = SUCCESS return response
[docs] @error_catcher def rel_hop_srv_cbk( self, request: TFService.Request, response: TFService.Response ) -> TFService.Response: """Callback for leg hopping motion Args: request: target relative to body response: Returns: success = True all the time """ target: Optional[NDArray] quat: Optional[qt.quaternion] target, quat = tf2np(request.tf) if np.any(np.isnan(target)): target = None if np.any(np.isnan(qt.as_float_array(quat))): quat = None fun = lambda: self.rel_hop(target) self.append_trajectory(fun) self.wait_end_of_motion() response.success_str.data = SUCCESS return response
[docs] @error_catcher def rot_cbk( self, request: TFService.Request, response: TFService.Response ) -> TFService.Response: """Callback for leg rotation motion Args: request: TF for the rotation and center of the rotation response: Returns: success = True all the time """ center, quat = tf2np(request.tf) fun = lambda: self.rel_rotation(quat, center) self.append_trajectory(fun) self.wait_end_of_motion() response.success_str.data = SUCCESS return response
[docs] def point_cbk( self, request: TFService.Request, response: TFService.Response ) -> TFService.Response: roll_transl, quat = tf2np(request.tf) fun = lambda: self.point_wheel(roll_transl) self.append_trajectory(fun) self.wait_end_of_motion() response.success_str.data = SUCCESS return response
[docs] def main(args=None): myMain(LegNode, multiThreaded=True)
if __name__ == "__main__": main()