Source code for motion_stack.core.lvl2_ik

"""
This node is responsible for recieving targets in the body reference frame, and send the
corresponding angles to the motors.

Author: Elian NEPPEL
Lab: SRL, Moonshot team
"""

from asyncio import Future

import matplotlib

matplotlib.use("Agg")  # fix for when there is no display

from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Type, Union

import numpy as np
import quaternion as qt
import roboticstoolbox as rtb
from numpy.typing import NDArray
from roboticstoolbox import ETS, Link, Robot
from roboticstoolbox.robot.ET import SE3
from roboticstoolbox.tools import Joint

from .rtb_fix.patch import patch
patch()

from ..api.joint_syncer import JointSyncer
from ..api.joint_syncer import _order_dict2arr, only_position
from .utils.joint_state import JState, impose_state
from .utils.pose import Pose
from .utils.printing import TCOL, list_cyanize
from .utils.robot_parsing import joint_by_joint_fk, load_set_urdf_raw, make_ee
from .utils.static_executor import FlexNode
from .utils.time import Time

float_formatter = "{:.2f}".format
np.set_printoptions(formatter={"float_kind": float_formatter})

[docs] class IKCore(FlexNode): #: duration after which verbose debug log is displayed for missing data SENS_VERBOSE_TIMEOUT: int = 3 def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.send_to_lvl1_callbacks: List[Callable[[List[JState]], Any]] = [] self.send_to_lvl3_callbacks: List[Callable[[Pose], Any]] = [] self.RESET_LAST_SENT: Time = Time(sec=0.5) self._verbose_last_missing: Set[str] = set() # V Parameters V # \ / # # \/ # self.leg_num = self.ms_param["leg_number"] self.spinner.alias = f"IK{self.leg_num}" self.REFRESH_RATE = self.ms_param["mvmt_update_rate"] self.IGNORE_LIM = self.ms_param["ignore_limits"] self.MARGIN: float = self.ms_param["limit_margin"] self.SPEED_MODE: bool = self.ms_param["speed_mode"] self.ADD_JOINTS: List[str] = list(self.ms_param["add_joints"]) self.SYNCER_DELTA: float = self.ms_param["angle_syncer_delta"] self.urdf_raw = self.ms_param["urdf"] self.start_effector: str | None = self.ms_param["start_effector_name"] end_effector: str = self.ms_param["end_effector_name"] self.end_effector_name = make_ee(end_effector, self.leg_num) ( self.model, self.et_chain, self.joint_names, self.joints_objects, self.last_link, ) = load_set_urdf_raw( self.urdf_raw, self.end_effector_name, self.start_effector ) self.et_chain: ETS self.end_link: Link = self.last_link # type: ignore # self.ETchain = self.all_limits(self.ETchain, self.joints_objects) self.subModel: Robot = rtb.Robot(self.et_chain) self.info( f"Base_link: {TCOL.OKCYAN}{self.model.base_link.name}{TCOL.ENDC}\n" f"Chain: {self.joint_names}\n" f"End effector: {TCOL.OKCYAN}{self.end_link.name}{TCOL.ENDC}" ) # /\ # # / \ # # ^ Parameters ^ self.accumulated_joint_state: Dict[str, JState] = {} #: storing the whole state self.angles: NDArray = np.empty(len(self.joint_names), dtype=float) self.angles[:] = np.nan self.last_sent: NDArray = self.angles.copy() self.lastTimeIK = Time(0) self.ready = False self.joint_syncer = JointSyncerIk(self) self.joint_syncer._interpolation_delta = self.SYNCER_DELTA self.joint_syncer._on_target_delta = self.SYNCER_DELTA if self.SYNCER_DELTA ==0: self.interp_method = self.joint_syncer.unsafe else: self.interp_method = self.joint_syncer.lerp
[docs] def firstSpinCBK(self): return
[docs] def all_limits(self, et_chain: ETS, jobjL: List[Joint]): li = 0 for j in et_chain: if j.isjoint: jobj = jobjL[li] li += 1 self.error( f"{jobj.name}, {jobj.limit.lower}, {jobj.limit.upper}, {j._qlim}" ) return ETS(et_chain)
[docs] def compute_raw_ik( self, pose: Pose, start: NDArray, compute_budget: Optional[Time] = None, # type: ignore mvt_duration: Optional[Time] = None, # type: ignore ) -> Tuple[Optional[NDArray], bool]: computeBudget: Time deltaTime: Time if mvt_duration is None: deltaTime = Time(sec=1 / self.REFRESH_RATE) # type: ignore else: deltaTime = mvt_duration if compute_budget is None: computeBudget = Time(sec=1 / self.REFRESH_RATE) else: computeBudget = compute_budget del compute_budget, mvt_duration # ugly af finish_by: Time = self.now() + computeBudget motion: SE3 = SE3(pose.xyz) # type: ignore motion.A[:3, :3] = qt.as_rotation_matrix(pose.quat) # type: ignore if self.angles.shape[0] > 5: mask = np.array([1, 1, 1, 1, 1, 1], dtype=float) else: mask = np.array([1, 1, 1, 0, 0, 0], dtype=float) angles: NDArray = self.angles.copy() np.nan_to_num(x=angles, nan=0.0, copy=False) np.nan_to_num(x=start, nan=0.0, copy=False) # for trial in range(4): trial = -1 trialLimit = 20 bestSolution: Optional[NDArray] = None velMaybe: float = 1000000 validSolFound = False compBudgetExceeded = lambda: self.now() > finish_by # compBudgetExceeded = lambda: False while trial < trialLimit and not compBudgetExceeded(): trial += 1 startingPose = start.copy() if trial <= 0: i = 10 s = 1 tol=1e-7 elif trial == 1: i = 50 s = 1 tol=1e-6 else: i = 50 s = 5 tol=1e-5 stpose = np.empty((s, startingPose.shape[0]), float) stpose[:, :] = startingPose.reshape(1, -1) r = np.random.rand(stpose.shape[0], stpose.shape[1]) * 2 - 1 maxi = 1 / 10 mini = maxi / 100 r = r * np.linspace(mini, maxi, s, endpoint=True).reshape(-1, 1) startingPose = stpose + r # self.info(f"computing start\n" # f"{trial=}\n" # f"Tep={motion},\n" # f"q0={startingPose},\n" # f"mask={mask},\n" # f"ilimit={i},\n" # f"slimit={s},\n" # f"joint_limits={False},\n" # f"tol={tol},") ik_result = self.subModel.ik_LM( # ik_result = self.subModel.ik_NR( Tep=motion, q0=startingPose, mask=mask, ilimit=i, slimit=s, joint_limits=not self.IGNORE_LIM, # pinv=True, # pinv_damping=0.2, tol=tol, ) # self.pwarn(not self.IGNORE_LIM) solFound = ik_result[1] sol = np.array(ik_result[0], dtype=float) sol_im = 1 * np.exp(1j * sol) star = 1 * np.exp(1j * start) delt = sol_im / star real_delta = np.angle(delt) real_angles = start + real_delta # self.pinfo(sol) # self.pwarn(real_angles) for ind, a in enumerate(real_angles): l = self.joints_objects[ind].limit if l is None: continue lup = l.upper llo = l.lower # if limit exceeded we go back to the original solution if lup is not None and not self.IGNORE_LIM: if real_angles[ind] > lup: real_angles[ind] = sol[ind] if llo is not None and not self.IGNORE_LIM: if real_angles[ind] < llo: real_angles[ind] = sol[ind] # self.pwarn(real_angles) delta = real_angles - start dist = float(np.linalg.norm(delta, ord=np.inf)) # dist = float(np.linalg.norm(delta, ord=3)) velocity: float = dist / deltaTime.sec() if solFound: if abs(dist) < abs(self.SYNCER_DELTA) or self.SYNCER_DELTA <= 0: angles = real_angles validSolFound = True velMaybe = dist bestSolution = real_angles break isBetter = dist < velMaybe if isBetter: bestSolution = real_angles velMaybe = dist if compBudgetExceeded(): self.warn("IK slow, compute terminated") return bestSolution, validSolFound
[docs] def find_next_ik( self, pose: Pose, compute_budget: Optional[Time] = None, # type: ignore mvt_duration: Optional[Time] = None, # type: ignore ) -> NDArray: if not self.ready: self.warn(f"{TCOL.FAIL} No angle data available, assuming joints at 0.") arriveTime: Time = self.now() deltaTime: Time = arriveTime - self.lastTimeIK self.lastTimeIK = arriveTime ikIsRecent = deltaTime < self.RESET_LAST_SENT if ikIsRecent: start: NDArray = self.last_sent.copy() else: start: NDArray = self.angles.copy() assert start.shape == self.angles.shape bestSolution, validSolutionFound = self.compute_raw_ik( pose, start, compute_budget=compute_budget, # type: ignore mvt_duration=mvt_duration, # type: ignore ) if bestSolution is None: self.warn("""no IK found :C""") return start if not validSolutionFound: pass self.warn("no continuous IK found :C") return bestSolution
[docs] def ik_target(self, pose: Pose) -> None: """ recieves target from leg, converts to numpy, computes IK, sends angle results to joints Args: msg: target as Ros2 Vector3 """ pose = self._replace_nan(pose) pose.xyz /= 1000 angles = self.find_next_ik(pose) self._send_command(angles) return
def _replace_nan(self, pose: Pose) -> Pose: xyz, quat = pose.xyz, pose.quat fk = None if np.any(np.isnan(xyz)): if fk is None: fk = self.current_fk() xyz = fk.xyz if np.any(np.isnan(qt.as_float_array(quat))): if fk is None: fk = self.current_fk() quat = fk.quat new = Pose(time=pose.time, xyz=xyz, quat=quat) return new
[docs] def state_from_lvl1(self, states: List[JState]): di = {j.name: j for j in states} joint_of_interest = set(self.joint_names) & set(di.keys()) self.accumulated_joint_state.update( { name: impose_state( onto=self.accumulated_joint_state.get(name), fromm=di[name] ) for name in joint_of_interest } ) for name in joint_of_interest: ind = self.joint_names.index(name) self.angles[ind] = di[name].position missing = np.any(np.isnan(self.angles)) if not missing: fk = self.send_current_fk() self.joint_syncer.execute() if not self.ready: self.info( f"Forward Kinematics: {TCOL.BOLD}{TCOL.OKGREEN}FULLY Ready :){TCOL.ENDC}:\n{fk}" ) self.ready = True
def _verbose_timeout(self): return self.now() - self.startup_time < Time(sec=self.SENS_VERBOSE_TIMEOUT)
[docs] def verbose_check(self): """Checks that data is available, if not displays information to the user. Returns: True when the verbose doesn't need to be called anymore """ if self._verbose_timeout(): return False controlled_joint = set(self.joint_names) ready, missing = self.joint_syncer.ready(controlled_joint) if missing == self._verbose_last_missing: return False self._verbose_last_missing = missing if controlled_joint == missing: self.warn( f"{TCOL.ENDC}Forward Kinematics: {TCOL.FAIL}MISSING ALL :({TCOL.ENDC} {list_cyanize(missing)}{TCOL.ENDC}" ) # ) elif missing: self.warn( f"{TCOL.ENDC}Forward Kinematics: {TCOL.FAIL}MISSING{TCOL.ENDC} {list_cyanize(missing)}{TCOL.ENDC}, {TCOL.OKBLUE}available {list_cyanize(controlled_joint - missing)}{TCOL.ENDC}" ) # ) return ready
def _send_command(self, angles: NDArray): assert self.last_sent.shape == angles.shape assert angles.dtype in [float, np.float32] target = {name: angle for name, angle in zip(self.joint_names, angles)} try: self.interp_method(target) except AssertionError: self.warn("Joint syncer not ready.") self.joint_syncer.execute() return
[docs] def save_as_last(self, js: List[JState]): order = self.joint_names posdict: Dict[str, float] = only_position(js) posarr = _order_dict2arr(order, posdict) self.last_sent = posarr
[docs] def send_to_lvl1(self, states: List[JState]): self.save_as_last(states) for f in self.send_to_lvl1_callbacks: f(states)
[docs] def send_current_fk(self) -> Pose: fk = self.current_fk() self.send_to_lvl3(fk) return fk
[docs] def send_to_lvl3(self, pose: Pose): for f in self.send_to_lvl3_callbacks: f(pose)
[docs] def current_fk(self) -> Pose: fw_result: List[SE3] = self.subModel.fkine(self.angles) # type: ignore rot_matrix = np.array(fw_result[-1].R, dtype=float) tip_coord: NDArray = fw_result[-1].t * 1000 tip_quat: qt.quaternion = qt.from_rotation_matrix(rot_matrix) return Pose(time=self.now(), xyz=tip_coord, quat=tip_quat)
[docs] class JointSyncerIk(JointSyncer): def __init__( self, core: IKCore, interpolation_delta: float = np.deg2rad(10), on_target_delta: float = np.deg2rad(10), ) -> None: super().__init__(interpolation_delta, on_target_delta) self._core = core
[docs] def send_to_lvl1(self, states: List[JState]): self._core.send_to_lvl1(states)
@property def sensor(self) -> Dict[str, JState]: return self._core.accumulated_joint_state @property def FutureT(self) -> Type[Future]: return Future