Source code for easy_robot_control.gait_node

"""
This node is responsible for chosing targets and positions.

Author: Elian NEPPEL
Lab: SRL, Moonshot team
"""

from typing import Dict, Final, Literal, Optional, Sequence, overload

import numpy as np
import quaternion as qt
from motion_stack_msgs.srv import (
    ReturnJointState,
    ReturnTargetSet,
    ReturnVect3,
    SendTargetBody,
    TFService,
)
from geometry_msgs.msg import Transform
from numpy.typing import NDArray
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from rclpy.time import Duration, Time
from rclpy.node import List, Union
from rclpy.publisher import Publisher
from rclpy.task import Future
from std_msgs.msg import Float64
from std_srvs.srv import Empty

from easy_robot_control.EliaNode import (
    Client,
    EliaNode,
    error_catcher,
    myMain,
    np2TargetSet,
    np2tf,
    rosTime2Float,
    targetSet2np,
    tf2np,
)
from easy_robot_control.utils.joint_state_util import (
    JointState,
    JState,
    js_from_ros,
    stateOrderinator3000,
)

float_formatter = "{:.1f}".format
np.set_printoptions(formatter={"float_kind": float_formatter})

AvailableMvt = Literal["shift", "transl", "rot", "hop"]

MVT2SRV: Final[Dict[AvailableMvt, str]] = {
    "shift": "shift",
    "transl": "rel_transl",
    "rot": "rot",
    "hop": "rel_hop",
}


[docs] class JointMini: def __init__(self, joint_name: str, prefix: str, parent_leg: "Leg"): self.name = joint_name self.leg = parent_leg self.node = parent_leg.parent self.state = JState(name=self.name) self.prefix = prefix self.speed_target: Optional[float] = None self.speed_set_time: Optional[Time] = None self.speed_set_angle: Optional[float] = None self.speed_set_angle_save: Optional[float] = None self.speedTMR = self.node.create_timer(0.05, self.speedTMRCBK) self.speedTMR.cancel() self.MAX_DELTA = np.deg2rad(7)
[docs] def is_active(self): if self.state.time is None: return False return rosTime2Float(self.node.getNow() - self.state.time) < 5
[docs] def self_report(self): header = f"{self.name}, " if self.state.time is None: return header + "NO sensor data" if not self.is_active(): return header + "OLD sensor data" return ""
@property def effort(self) -> Optional[float]: return self.state.effort @effort.setter def effort(self, value: Optional[float]): self.state.time = self.node.getNow() self.state.effort = value @property def speed(self) -> Optional[float]: return self.state.velocity @speed.setter def speed(self, value: Optional[float]): self.state.time = self.node.getNow() self.state.velocity = value @property def angle(self) -> Optional[float]: return self.state.position @angle.setter def angle(self, value: Optional[float]): self.state.time = self.node.getNow() self.state.position = value
[docs] def apply_speed_target(self, speed: Optional[float]) -> None: """Moves the joint at a given speed using position command. It sends everincreasing angle target (this is for safety, so it stops when nothing happens). The next angle target sent cannot be more than MAX_DELTA radian away from the current sensor angle (safe, because you can only do small movements with this method) If it is too far away, the speed value will decrease to match the max speed of the joint (not exactly, it will be a little higher). Args: speed: approx speed value (max speed if for 1) or None """ if speed == 0: speed = None if self.speed_target == speed: return self.speed_target = speed self.speed_set_angle = self.angle self.speed_set_angle_save = self.speed_set_angle self.speed_set_time = self.node.getNow() # compensate for starting late self.speed_set_time -= Duration(nanoseconds=self.speedTMR.timer_period_ns) if speed is None: self.__speed_tmr_off() self.node.execute_in_cbk_group(self.speedTMRCBK) else: self.__speed_tmr_on()
def __speed_tmr_on(self): """starts to publish angles based on speed""" # return if self.speedTMR.is_canceled(): # avoids a ros2 bug I think self.node.execute_in_cbk_group(self.speedTMR.reset) def __speed_tmr_off(self): """starts to publish angles based on speed""" return if not self.speedTMR.is_canceled(): # avoids a ros2 bug I think self.node.execute_in_cbk_group(self.speedTMR.cancel)
[docs] @error_catcher def speedTMRCBK(self): """Updates angle based on stored speed. Stops if speed is None""" if ( self.speed_target is None or self.angle is None or self.speed_set_angle is None or self.speed_set_angle_save is None or self.speed_set_time is None ): return now = self.node.getNow() delta_time = rosTime2Float(now - self.speed_set_time) delta_ang = self.speed_target * delta_time integrated_angle = self.speed_set_angle + delta_ang upper_bound = self.angle + self.MAX_DELTA lower_bound = self.angle - self.MAX_DELTA if not (lower_bound < integrated_angle < upper_bound): clipped = np.clip(integrated_angle, lower_bound, upper_bound) real_speed = (self.angle - self.speed_set_angle_save) / (delta_time) if abs(real_speed) < 0.0001: real_speed = 0.0002 if delta_time > 1 and abs(self.speed_target / real_speed) > 1.2: # will slowly converge towward real speed*1.05 self.speed_target = self.speed_target * 0.9 + (real_speed * 1.05) * 0.1 self.node.pwarn(f"Speed fast, reduced to {self.speed_target:.3f}") else: pass self.speed_set_angle = clipped - self.speed_target * delta_time integrated_angle = clipped self.__publish_angle_cmd(integrated_angle)
[docs] def apply_angle_target(self, angle: float) -> None: """Sets angle target for the joint, and cancels speed command Args: angle: angle target """ self.apply_speed_target(0) self.__publish_angle_cmd(angle)
def __publish_angle_cmd(self, angle: Optional[float]): js = JState(name=self.name, time=self.node.getNow(), position=angle) self.__publish_cmd(js) def __publish_cmd(self, js: JState): self.leg.send_joint_cmd([js])
[docs] class Leg: """Helps you use lvl 1-2-3 of a leg Attributes: number: leg number parent: parent node spinning joint_name_list: list of joints belonging to the leg """ def __init__(self, number: int, parent: EliaNode) -> None: self.number = number self.parent = parent self._ikSUB = self.parent.create_subscription( Transform, f"leg{number}/tip_pos", self._ikSUBCBK, 10 ) self.xyz_now: Optional[NDArray] = None self.quat_now: Optional[qt.quaternion] = None self._jointPUB = self.parent.create_publisher( JointState, f"leg{number}/joint_set", 10 ) self._ikPUB = self.parent.create_publisher( Transform, f"leg{number}/set_ik_target", 10 ) self._jointSUB = self.parent.create_subscription( JointState, f"leg{number}/joint_read", self._listen_jointsCBK, 10 ) self._mvt_clients: Dict[AvailableMvt, Client] = {} self.joint_name_list: Sequence[str] = [] self.joints: Dict[str, JointMini] = {} self._joint_pub: Sequence[Publisher] = [] self._joint_getterCLI = self.parent.create_client( ReturnJointState, f"leg{self.number}/advertise_joints" ) for mvt, srv in MVT2SRV.items(): self._mvt_clients[mvt] = self.parent.get_and_wait_Client( f"leg{self.number}/{srv}", TFService ) self.look_for_joints()
[docs] @staticmethod def do_i_exist(number: int, parent: EliaNode, timeout: float = 0.1): """Slow do not use to spam scan. Returns True if the leg is alive""" cli = parent.create_client(srv_type=Empty, srv_name=f"leg{number}/joint_alive") is_alive = cli.wait_for_service(timeout_sec=timeout) parent.destroy_client(cli) return is_alive
[docs] def self_report(self) -> str: msg = "" if self.xyz_now is None or self.quat_now is None: msg += "Issue [IK]: No end effector data received\n" for j in self.joints.values(): if j.self_report() != "": msg += f"Issue [J]: {j.self_report()}\n" msg_head = f"Leg {self.number} report: " if msg == "": return msg_head + "no issues :)" else: return msg_head + "\n" + msg
def _listen_jointsCBK(self, msg: JointState): states = js_from_ros(msg) self._update_joints_sensor(states) def _update_joints_sensor(self, states: List[JState]): for state in states: j = self.joints.get(state.name) if j is None: continue j.angle = state.position
[docs] def send_joint_cmd(self, states: List[JState]): msgs = stateOrderinator3000(states) for msg in msgs: self._jointPUB.publish(msg)
def _ikSUBCBK(self, msg: Transform): """recieves tip position form ik lvl2""" self.xyz_now, self.quat_now = tf2np(msg)
[docs] def look_for_joints(self): """scans and updates the list of joints of this leg""" fut = self._joint_getterCLI.call_async(ReturnJointState.Request()) def next_step(msg: Future): res: JointState = msg.result().js all_joints = res.name old_joint = self.joints.keys() new_joints = set(all_joints) - set(old_joint) for n in new_joints: self.joints[n] = JointMini(n, f"leg{self.number}/", self) self.joint_name_list = sorted(list(self.joints.keys())) fut.add_done_callback(next_step)
[docs] def go2zero(self): """sends angle target of 0 on all joints""" for j in self.joint_name_list: self.set_angle(angle=0, joint=j)
[docs] def get_joint_obj(self, joint: Union[int, str]) -> Optional[JointMini]: if isinstance(joint, int): if joint >= len(self.joint_name_list): self.parent.perror( f"[leg {self.number} object] " f"index {joint} out of range" ) return None jname = self.joint_name_list[joint] else: if joint not in self.joints.keys(): self.parent.perror( f"[leg {self.number} object] joint name {joint} not in joint list" ) return None jname = joint joint_obj = self.joints[jname] return joint_obj
[docs] def get_angle(self, joint: Union[int, str]) -> Optional[float]: """Gets an angle from a joint Args: joint: joint name or number (alphabetically ordered) Returns: last recieved angle as float """ joint_obj = self.get_joint_obj(joint) if joint_obj is None: return None return joint_obj.angle
[docs] def set_angle(self, angle: float, joint: Union[int, str]) -> bool: """Sends a angle to a joint Args: angle: rad joint: joint name or number (alphabetically ordered) Returns: True if message sent """ joint_obj = self.get_joint_obj(joint) if joint_obj is None: return False joint_obj.apply_angle_target(angle) return True
[docs] def ik( self, xyz: Union[None, NDArray, Sequence[float]] = None, quat: Optional[qt.quaternion] = None, ) -> None: """Publishes an ik target for the leg () relative to baselink. Motion stack lvl2 Args: xyz: quat: """ # self.parent.pinfo(f"{xyz} --- {quat}") msg = np2tf(coord=xyz, quat=quat, sendNone=True) self._ikPUB.publish(msg) return
@overload def move( self, xyz: Union[None, NDArray, Sequence[float]] = None, quat: Optional[qt.quaternion] = None, mvt_type: AvailableMvt = "shift", blocking: Literal[True] = True, ) -> Future: ... @overload def move( self, xyz: Union[None, NDArray, Sequence[float]] = None, quat: Optional[qt.quaternion] = None, mvt_type: AvailableMvt = "shift", blocking: Literal[False] = False, ) -> TFService.Response: ...
[docs] def move( self, xyz: Union[None, NDArray, Sequence[float]] = None, quat: Optional[qt.quaternion] = None, mvt_type: AvailableMvt = "shift", blocking: bool = True, ) -> Union[Future, TFService.Response]: """Calls the leg's movement service. Motion stack lvl3 Args: xyz: vector part of the tf quat: quat of the tf mvt_type: type of movement to call blocking: if false returns a Future. Else returns the response Returns: """ request = TFService.Request() request.tf = np2tf(coord=xyz, quat=quat, sendNone=True) shiftCMD = self._mvt_clients[mvt_type] # shiftCMD.wait_for_service(0.5) # laggy ??? if blocking: call = shiftCMD.call(request) else: call = shiftCMD.call_async(request) return call
[docs] class GaitNode(EliaNode): def __init__(self): # rclpy.init() super().__init__("gait_node") # type: ignore self.Alias = "G" self.wait_for_lower_level(["mover_alive"]) # V Params V # \ / # # \/ # self.declare_parameter("std_movement_time", 2.0) self.MVT_TIME = ( self.get_parameter("std_movement_time").get_parameter_value().double_value ) self.declare_parameter("robot_name", "None") self.ROBOT_NAME = ( self.get_parameter("robot_name").get_parameter_value().string_value ) self.declare_parameter("number_of_legs", 1) self.NUMBER_OF_LEG = ( self.get_parameter("number_of_legs").get_parameter_value().integer_value ) self.declare_parameter("leg_list", [0]) self.LEG_LIST: Sequence[int] = ( # type: ignore self.get_parameter("leg_list").get_parameter_value().integer_array_value ) self.legs: Dict[int, Leg] = {} for number in self.LEG_LIST: self.legs[number] = Leg(number, self) # /\ # # / \ # # ^ Params ^ # V Publishers V # \ / # # \/ # # /\ # # / \ # # ^ Publishers ^ # V Service client V # \ / # # \/ # self.returnTargetSet: Client = self.get_and_wait_Client( "get_targetset", ReturnTargetSet ) self.sendTargetBody: Client = self.get_and_wait_Client( "go2_targetbody", SendTargetBody ) # /\ # # / \ # # ^ Service client ^ # V Service server V # \ / # # \/ # # /\ # # / \ # # ^ Service server ^ self.firstSpin = self.create_timer( 1, self.firstSpinCBK, callback_group=MutuallyExclusiveCallbackGroup() )
[docs] @error_catcher def firstSpinCBK(self): self.pinfo("go") self.destroy_timer(self.firstSpin) tsnow = self.getTargetSetBlocking() # self.perror(self.ROBOT_NAME) if "hero_vehicle" == self.ROBOT_NAME: self.hero_vehicle() if "hero_dragon" == self.ROBOT_NAME: self.hero_dragon() if "mglimb_7dof" in self.ROBOT_NAME: self.gustavo() if "hero_7dof" in self.ROBOT_NAME: self.hero_arm() if self.ROBOT_NAME == "ur16_3f": self.ashutosh() return # self.randomPosLoop() # while 1: # self.mZeroBasicSetAndWalk() # self.goToTargetBodyBlocking(ts=np.array([[-1100, 0, 460]])) shiftcmd = self.get_and_wait_Client("leg_0_rel_transl", TFService) while 1: self.crawl1Wheel()
[docs] def hero_vehicle(self): wheel_j = [ "/leg13/spe_2wheel_left_joint_set", "/leg13/spe_2wheel_right_joint_set", "/leg14/spe_1wheel_left_joint_set", "/leg14/spe_1wheel_right_joint_set", ] wheel_spe: Sequence[Publisher] = [ self.create_publisher(Float64, name, 10) for name in wheel_j ] def forward(ang_speed: float): for p in wheel_spe: p.publish(Float64(data=float(ang_speed))) main_leg_ind = self.LEG_LIST[0] # default for all moves main_leg = self.legs[main_leg_ind] # default for all moves default_coord = [0, -1000, 0] main_leg.ik(xyz=default_coord, quat=qt.one) self.sleep(2) for direction in [1, -1]: forward(-0.5) rot = qt.from_rotation_vector(np.array([0, 0, 0.5 * direction])) main_leg.move(quat=rot) self.goToTargetBody(bodyQuat=rot) forward(0) unrot = 1 / rot self.goToTargetBody(bodyQuat=unrot) origin = qt.from_rotation_vector(np.array([0, 0, 0])) main_leg.move(xyz=default_coord, quat=origin, mvt_type="transl")
[docs] def hero_dragon(self): wheel_j = [ "/leg13/spe_2wheel_left_joint_set", "/leg13/spe_2wheel_right_joint_set", "/leg14/spe_1wheel_left_joint_set", "/leg14/spe_1wheel_right_joint_set", ] wheel_s_pub: Sequence[Publisher] = [ self.create_publisher(Float64, name, 10) for name in wheel_j ] def forward(ang_speed: float): for p in wheel_s_pub: p.publish(Float64(data=float(ang_speed))) main_leg_ind = self.LEG_LIST[0] # default for all moves manip_leg_ind = self.LEG_LIST[1] main_leg = self.legs[main_leg_ind] # default for all moves manip_leg = self.legs[manip_leg_ind] # quit() manip_leg.set_angle(-np.pi / 2, 0) manip_leg.set_angle(np.pi, 5) self.sleep(0.1) rot = qt.from_rotation_vector(np.array([1, 0, 0]) * np.pi / 2) rot = qt.from_rotation_vector(np.array([0, 1, 0]) * np.pi / 2) * rot manip_leg.ik(xyz=[-100, 500, 700], quat=rot) main_leg.ik(xyz=[0, -1200, 0], quat=qt.one) self.sleep(2) for direction in [1, -1]: forward(-0.5) rot = qt.from_rotation_vector(np.array([0, 0, 0.5 * direction])) main_leg.move(quat=rot) self.goToTargetBody(bodyQuat=rot) forward(0) unrot = 1 / rot self.goToTargetBody(bodyQuat=unrot) origin = qt.from_rotation_vector(np.array([0, 0, 0])) main_leg.move(quat=origin, mvt_type="transl")
[docs] def hero_arm(self): wheel_j = [ # I wanna publish speeds directly to wheel joints "/leg11/spe_11wheel_left_joint_set", "/leg11/spe_11wheel_right_joint_set", "/leg12/spe_12wheel_left_joint_set", "/leg12/spe_12wheel_right_joint_set", "/leg13/spe_13wheel_left_joint_set", "/leg13/spe_13wheel_right_joint_set", "/leg14/spe_14wheel_left_joint_set", "/leg14/spe_14wheel_right_joint_set", ] wheel_s_pub: Sequence[Publisher] = [ self.create_publisher(Float64, name, 10) for name in wheel_j ] def forward(ang_speed: float): for p in wheel_s_pub: p.publish(Float64(data=float(ang_speed))) tsnow = self.getTargetSetBlocking() # gets current tip xyz for all legs # I send an lvl2 IK call to all legs # leg 1 will not move, then leg 2 a bit more, 3 more, 4 will be on end_mov/rot call_list: Sequence[Future] = [] end_mov = np.array([200, 0, -200], dtype=float) end_rot = np.pi / 4 for ind, leg in enumerate(self.legs.values()): x = ind / (len(self.legs.values()) - 0) # x in [0, 1] movement = end_mov * x rot_axis = np.array([0, 1, 0], dtype=float) rot_axis = rot_axis / np.linalg.norm(rot_axis) rot_magnitude = end_rot * x rot_vec = rot_magnitude * rot_axis rotation: qt.quaternion = qt.from_rotation_vector(rot_vec) current_xyz = tsnow[ind, :] # sorry we have to do that manually current_quat = qt.from_rotation_matrix( [ [0, 0, -1], # z is oposite to x0 [0, 1, 0], # y is aligned with y0 [1, 0, 0], # x is aligned with z0 ] ) leg.ik( xyz=current_xyz + movement + np.array([-100, 0, 0]), quat=current_quat * rotation, ) self.sleep(6) movement = np.array([100, 0, 0], dtype=float) rot_axis = np.array([1, 0, 0], dtype=float) rot_axis = rot_axis / np.linalg.norm(rot_axis) rot_magnitude = 0 rot_vec = rot_magnitude * rot_axis rotation: qt.quaternion = qt.from_rotation_vector(rot_vec) while 1: forward(0.5) self.goToTargetBody( bodyXYZ=movement, ) forward(0) self.goToTargetBody( bodyXYZ=-movement, ) forward(-0.5) call_list: Sequence[Future] = [] for leg in self.legs.values(): call = leg.move(xyz=movement, blocking=False) call_list.append(call) self.wait_on_futures(call_list) forward(0) call_list: Sequence[Future] = [] for leg in self.legs.values(): call = leg.move(xyz=-movement, blocking=False) call_list.append(call) self.wait_on_futures(call_list)
[docs] def gustavo(self): # get the current end effector position your_arm_number = self.LEG_LIST[0] get_tipCMD: Client = self.get_and_wait_Client( f"leg{your_arm_number}/tip_pos", ReturnVect3 ) tip_pos_result: ReturnVect3.Response = get_tipCMD.call( ReturnVect3.Request(), ) # this blocks until recieved tip_pos = np.array( [ tip_pos_result.vector.x, tip_pos_result.vector.y, tip_pos_result.vector.z, ], dtype=float, ) # get your data # let's go to the same postion but y=300, z=0, and reset the ee quaternion target = tip_pos.copy() target[1] += 500 target[2] = 0 request = TFService.Request() request.tf = np2tf( coord=target, quat=qt.one, sendNone=True, ) shiftCMD = self.get_and_wait_Client(f"leg{your_arm_number}/rel_transl", TFService) shiftCMD.call(request) # let's go y+200, x+200, and rotate a little movement = np.array([200, 200, 0], dtype=float) rot_axis = np.array([0, 1, 0], dtype=float) rot_axis = rot_axis / np.linalg.norm(rot_axis) rot_magnitude = -np.pi / 2 rot_vec = rot_magnitude * rot_axis rotation: qt.quaternion = qt.from_rotation_vector(rot_vec) request = TFService.Request() request.tf = np2tf( coord=movement, quat=rotation, sendNone=True, ) shiftCMD = self.get_and_wait_Client("leg0/shift", TFService) shiftCMD.call(request) # let's go x-200mm, wait 1 s then y-200 movement = np.array([-200, 0, 0], dtype=float) request = TFService.Request() request.tf = np2tf( coord=movement, quat=None, sendNone=True, ) call1: Future = shiftCMD.call_async(request) self.sleep(1) movement = np.array([0, -200, 0], dtype=float) request = TFService.Request() request.tf = np2tf( coord=movement, quat=None, sendNone=True, ) call2: Future = shiftCMD.call_async(request) self.wait_on_futures([call1, call2]) # will block until all your futures are done quit()
[docs] def ashutosh(self, res=None) -> None: arm = Leg(0, self) arm.ik(xyz=[500, 0, 500], quat=qt.one) self.sleep(2) arm.move(xyz=[200, 0, 0], quat=qt.one, mvt_type="shift", blocking=True) arm.move(xyz=[-200, 200, 0], quat=qt.one, mvt_type="shift", blocking=True) return # get the current end effector position get_tipCMD: Client = self.get_and_wait_Client("leg0/tip_pos", ReturnVect3) tip_pos_result: ReturnVect3.Response = get_tipCMD.call( ReturnVect3.Request(), ) # this blocks until recieved tip_pos = np.array( [ tip_pos_result.vector.x, tip_pos_result.vector.y, tip_pos_result.vector.z, ], dtype=float, ) # get your data # let's go to the same postion but y=-200, 100m backward, and reset to ee quaternion target = tip_pos.copy() target[0] += 100 target[1] = -200 request = TFService.Request() request.tf = np2tf( coord=target, quat=qt.one, sendNone=True, ) shiftCMD = self.get_and_wait_Client("leg0/rel_transl", TFService) shiftCMD.call(request) # let's go down 200mm and back 400mm, and rotate the ee movement = np.array([400, 0, -200], dtype=float) rot_axis = np.array([1, 0, 0], dtype=float) rot_axis = rot_axis / np.linalg.norm(rot_axis) rot_magnitude = -np.pi / 2 rot_vec = rot_magnitude * rot_axis rotation: qt.quaternion = qt.from_rotation_vector(rot_vec) request = TFService.Request() request.tf = np2tf( coord=movement, quat=rotation, sendNone=True, ) shiftCMD = self.get_and_wait_Client("leg0/shift", TFService) shiftCMD.call(request) # let's go forward 400mm, wait 1 s then go left 300mm movement = np.array([-400, 0, 0], dtype=float) request = TFService.Request() request.tf = np2tf( coord=movement, quat=None, sendNone=True, ) call1: Future = shiftCMD.call_async(request) self.sleep(1) movement = np.array([0, -300, 0], dtype=float) request = TFService.Request() request.tf = np2tf( coord=movement, quat=None, sendNone=True, ) call2: Future = shiftCMD.call_async(request) self.wait_on_futures([call1, call2]) # will block until all your futures are done # lets go back to the some position, but not by doing a trajectory. Simply # executing the IK immediately self.sleep(0.3) arm = list(self.legs.values())[0] arm.ik([-600, -300, 800], qt.one) self.sleep(1.5) self.ashutosh()
[docs] def shiftCrawlDebbug(self, res=None): raise Exception("fix that shit") tsnow = self.getTargetSetBlocking() mvt = np.array([100, 0, 0], dtype=float) ts_next = tsnow + mvt.reshape(1, 3) self.goToTargetBodyBlocking(ts=ts_next) fut: Future = self.goToTargetBody(bodyXYZ=mvt) fut.add_done_callback(self.shiftCrawlDebbug)
[docs] @error_catcher def crawl1Wheel(self): """for moonbot hero one arm+wheel only""" tsnow = self.getTargetSetBlocking() shiftcmd = self.get_and_wait_Client("leg0/shift", TFService) mvt = np.array([400, 0, 0], dtype=float) speed = np.linalg.norm(mvt) / self.MVT_TIME rollcmd: Publisher = self.create_publisher( Float64, f"leg0/smart_roll", 10, ) rollcmd.publish(Float64(data=speed)) shiftcmd.call( TFService.Request( tf=np2tf( coord=mvt, ) ) ) rollcmd.publish(Float64(data=0.0)) self.goToTargetBodyBlocking(bodyXYZ=mvt)
[docs] @error_catcher def mZeroBasicSetAndWalk(self): """for moonbot 0""" height = 220 width = 250 targetSet = np.array( [ [width, 0, -height], [0, width, -height], [-width, 0, -height], [0, -width, -height], ], dtype=float, ) self.sendTargetBody.call( SendTargetBody.Request( target_body=TargetBody( target_set=np2TargetSet(targetSet), body=np2tf(), ) ) ) self.sendTargetBody.call( SendTargetBody.Request( target_body=TargetBody( # target_set=np2TargetSet(ts), body=np2tf(np.array([-50.0, 0.0, 0.0], None)), ) ) ) for l in range(4): newTS = np.empty(shape=(4, 3), dtype=float) newTS[:, :] = np.nan newTS[l, :] = targetSet[l, :] self.sendTargetBody.call( SendTargetBody.Request( target_body=TargetBody( target_set=np2TargetSet(newTS), body=np2tf(np.array([0.0, 0.0, 0.0], None)), ) ) )
[docs] def randomPosLoop(self): """for multi legged robots""" self.goToTargetBody(bodyXYZ=np.array([-200, 0, 0], dtype=float)) d = 400 v = np.random.random(size=(3,)) * 0 while 1: dir = np.random.random(size=(3,)) vnew = (dir - 0.5) * d self.goToTargetBody(bodyXYZ=vnew - v) v = vnew
[docs] def getTargetSet(self) -> Future: call = self.returnTargetSet.call_async(ReturnTargetSet.Request()) processFuture = Future() @error_catcher def processMessage(f: Future) -> None: response: Optional[ReturnTargetSet.Response] = f.result() assert response is not None ts = targetSet2np(response.target_set) processFuture.set_result(ts) call.add_done_callback(processMessage) return processFuture
[docs] def getTargetSetBlocking(self) -> NDArray: response: ReturnTargetSet.Response = self.returnTargetSet.call( ReturnTargetSet.Request() ) return targetSet2np(response.target_set)
@overload def goToTargetBody( self, ts: Optional[NDArray] = None, bodyXYZ: Optional[NDArray] = None, bodyQuat: Optional[qt.quaternion] = None, blocking: Literal[False] = False, ) -> SendTargetBody.Response: ... @overload def goToTargetBody( self, ts: Optional[NDArray] = None, bodyXYZ: Optional[NDArray] = None, bodyQuat: Optional[qt.quaternion] = None, blocking: Literal[True] = True, ) -> Future: ...
[docs] def goToTargetBody( self, ts: Optional[NDArray] = None, bodyXYZ: Optional[NDArray] = None, bodyQuat: Optional[qt.quaternion] = None, blocking: bool = True, ) -> Union[Future, SendTargetBody.Response]: target = TargetBody( target_set=np2TargetSet(ts), body=np2tf(bodyXYZ, bodyQuat), ) request = SendTargetBody.Request(target_body=target) if blocking: call = self.sendTargetBody.call(request) return call else: call = self.sendTargetBody.call_async(request) return call
[docs] def crawlToTargetSet(self, NDArray) -> None: ...
[docs] def goToDefault(self): ...
[docs] def stand(self): ...
[docs] def main(args=None): myMain(GaitNode, multiThreaded=True)
if __name__ == "__main__": main()