"""
This node is responsible for recieving targets in the body reference frame, and send the
corresponding angles to the motors.
Author: Elian NEPPEL
Lab: SRL, Moonshot team
"""
from asyncio import Future
import matplotlib
matplotlib.use("Agg") # fix for when there is no display
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Type, Union
import numpy as np
import quaternion as qt
import roboticstoolbox as rtb
from numpy.typing import NDArray
from roboticstoolbox import ETS, Link, Robot
from roboticstoolbox.robot.ET import SE3
from roboticstoolbox.tools import Joint
from .rtb_fix.patch import patch
patch()
from ..api.joint_syncer import JointSyncer
from ..api.joint_syncer import _order_dict2arr, only_position
from .utils.joint_state import JState, impose_state
from .utils.pose import Pose
from .utils.printing import TCOL, list_cyanize
from .utils.robot_parsing import joint_by_joint_fk, load_set_urdf_raw, make_ee
from .utils.static_executor import FlexNode
from .utils.time import Time
float_formatter = "{:.2f}".format
np.set_printoptions(formatter={"float_kind": float_formatter})
[docs]
class IKCore(FlexNode):
#: duration after which verbose debug log is displayed for missing data
SENS_VERBOSE_TIMEOUT: int = 3
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.send_to_lvl1_callbacks: List[Callable[[List[JState]], Any]] = []
self.send_to_lvl3_callbacks: List[Callable[[Pose], Any]] = []
self.RESET_LAST_SENT: Time = Time(sec=0.5)
self._verbose_last_missing: Set[str] = set()
# V Parameters V
# \ / #
# \/ #
self.leg_num = self.ms_param["leg_number"]
self.spinner.alias = f"IK{self.leg_num}"
self.REFRESH_RATE = self.ms_param["mvmt_update_rate"]
self.IGNORE_LIM = self.ms_param["ignore_limits"]
self.MARGIN: float = self.ms_param["limit_margin"]
self.SPEED_MODE: bool = self.ms_param["speed_mode"]
self.ADD_JOINTS: List[str] = list(self.ms_param["add_joints"])
self.SYNCER_DELTA: float = self.ms_param["angle_syncer_delta"]
self.urdf_raw = self.ms_param["urdf"]
self.start_effector: str | None = self.ms_param["start_effector_name"]
end_effector: str = self.ms_param["end_effector_name"]
self.end_effector_name = make_ee(end_effector, self.leg_num)
(
self.model,
self.et_chain,
self.joint_names,
self.joints_objects,
self.last_link,
) = load_set_urdf_raw(
self.urdf_raw, self.end_effector_name, self.start_effector
)
self.et_chain: ETS
self.end_link: Link = self.last_link # type: ignore
# self.ETchain = self.all_limits(self.ETchain, self.joints_objects)
self.subModel: Robot = rtb.Robot(self.et_chain)
self.info(
f"Base_link: {TCOL.OKCYAN}{self.model.base_link.name}{TCOL.ENDC}\n"
f"Chain: {self.joint_names}\n"
f"End effector: {TCOL.OKCYAN}{self.end_link.name}{TCOL.ENDC}"
)
# /\ #
# / \ #
# ^ Parameters ^
self.accumulated_joint_state: Dict[str, JState] = {} #: storing the whole state
self.angles: NDArray = np.empty(len(self.joint_names), dtype=float)
self.angles[:] = np.nan
self.last_sent: NDArray = self.angles.copy()
self.lastTimeIK = Time(0)
self.ready = False
self.joint_syncer = JointSyncerIk(self)
self.joint_syncer._interpolation_delta = self.SYNCER_DELTA
self.joint_syncer._on_target_delta = self.SYNCER_DELTA
if self.SYNCER_DELTA ==0:
self.interp_method = self.joint_syncer.unsafe
else:
self.interp_method = self.joint_syncer.lerp
[docs]
def firstSpinCBK(self):
return
[docs]
def all_limits(self, et_chain: ETS, jobjL: List[Joint]):
li = 0
for j in et_chain:
if j.isjoint:
jobj = jobjL[li]
li += 1
self.error(
f"{jobj.name}, {jobj.limit.lower}, {jobj.limit.upper}, {j._qlim}"
)
return ETS(et_chain)
[docs]
def compute_raw_ik(
self,
pose: Pose,
start: NDArray,
compute_budget: Optional[Time] = None, # type: ignore
mvt_duration: Optional[Time] = None, # type: ignore
) -> Tuple[Optional[NDArray], bool]:
computeBudget: Time
deltaTime: Time
if mvt_duration is None:
deltaTime = Time(sec=1 / self.REFRESH_RATE) # type: ignore
else:
deltaTime = mvt_duration
if compute_budget is None:
computeBudget = Time(sec=1 / self.REFRESH_RATE)
else:
computeBudget = compute_budget
del compute_budget, mvt_duration # ugly af
finish_by: Time = self.now() + computeBudget
motion: SE3 = SE3(pose.xyz) # type: ignore
motion.A[:3, :3] = qt.as_rotation_matrix(pose.quat) # type: ignore
if self.angles.shape[0] > 5:
mask = np.array([1, 1, 1, 1, 1, 1], dtype=float)
else:
mask = np.array([1, 1, 1, 0, 0, 0], dtype=float)
angles: NDArray = self.angles.copy()
np.nan_to_num(x=angles, nan=0.0, copy=False)
np.nan_to_num(x=start, nan=0.0, copy=False)
# for trial in range(4):
trial = -1
trialLimit = 20
bestSolution: Optional[NDArray] = None
velMaybe: float = 1000000
validSolFound = False
compBudgetExceeded = lambda: self.now() > finish_by
# compBudgetExceeded = lambda: False
while trial < trialLimit and not compBudgetExceeded():
trial += 1
startingPose = start.copy()
if trial <= 0:
i = 10
s = 1
tol=1e-7
elif trial == 1:
i = 50
s = 1
tol=1e-6
else:
i = 50
s = 5
tol=1e-5
stpose = np.empty((s, startingPose.shape[0]), float)
stpose[:, :] = startingPose.reshape(1, -1)
r = np.random.rand(stpose.shape[0], stpose.shape[1]) * 2 - 1
maxi = 1 / 10
mini = maxi / 100
r = r * np.linspace(mini, maxi, s, endpoint=True).reshape(-1, 1)
startingPose = stpose + r
# self.info(f"computing start\n"
# f"{trial=}\n"
# f"Tep={motion},\n"
# f"q0={startingPose},\n"
# f"mask={mask},\n"
# f"ilimit={i},\n"
# f"slimit={s},\n"
# f"joint_limits={False},\n"
# f"tol={tol},")
ik_result = self.subModel.ik_LM(
# ik_result = self.subModel.ik_NR(
Tep=motion,
q0=startingPose,
mask=mask,
ilimit=i,
slimit=s,
joint_limits=not self.IGNORE_LIM,
# pinv=True,
# pinv_damping=0.2,
tol=tol,
)
# self.pwarn(not self.IGNORE_LIM)
solFound = ik_result[1]
sol = np.array(ik_result[0], dtype=float)
sol_im = 1 * np.exp(1j * sol)
star = 1 * np.exp(1j * start)
delt = sol_im / star
real_delta = np.angle(delt)
real_angles = start + real_delta
# self.pinfo(sol)
# self.pwarn(real_angles)
for ind, a in enumerate(real_angles):
l = self.joints_objects[ind].limit
if l is None:
continue
lup = l.upper
llo = l.lower
# if limit exceeded we go back to the original solution
if lup is not None and not self.IGNORE_LIM:
if real_angles[ind] > lup:
real_angles[ind] = sol[ind]
if llo is not None and not self.IGNORE_LIM:
if real_angles[ind] < llo:
real_angles[ind] = sol[ind]
# self.pwarn(real_angles)
delta = real_angles - start
dist = float(np.linalg.norm(delta, ord=np.inf))
# dist = float(np.linalg.norm(delta, ord=3))
velocity: float = dist / deltaTime.sec()
if solFound:
if abs(dist) < abs(self.SYNCER_DELTA) or self.SYNCER_DELTA <= 0:
angles = real_angles
validSolFound = True
velMaybe = dist
bestSolution = real_angles
break
isBetter = dist < velMaybe
if isBetter:
bestSolution = real_angles
velMaybe = dist
if compBudgetExceeded():
self.warn("IK slow, compute terminated")
return bestSolution, validSolFound
[docs]
def find_next_ik(
self,
pose: Pose,
compute_budget: Optional[Time] = None, # type: ignore
mvt_duration: Optional[Time] = None, # type: ignore
) -> NDArray:
if not self.ready:
self.warn(f"{TCOL.FAIL} No angle data available, assuming joints at 0.")
arriveTime: Time = self.now()
deltaTime: Time = arriveTime - self.lastTimeIK
self.lastTimeIK = arriveTime
ikIsRecent = deltaTime < self.RESET_LAST_SENT
if ikIsRecent:
start: NDArray = self.last_sent.copy()
else:
start: NDArray = self.angles.copy()
assert start.shape == self.angles.shape
bestSolution, validSolutionFound = self.compute_raw_ik(
pose,
start,
compute_budget=compute_budget, # type: ignore
mvt_duration=mvt_duration, # type: ignore
)
if bestSolution is None:
self.warn("""no IK found :C""")
return start
if not validSolutionFound:
pass
self.warn("no continuous IK found :C")
return bestSolution
[docs]
def ik_target(self, pose: Pose) -> None:
"""
recieves target from leg, converts to numpy, computes IK, sends angle
results to joints
Args:
msg: target as Ros2 Vector3
"""
pose = self._replace_nan(pose)
pose.xyz /= 1000
angles = self.find_next_ik(pose)
self._send_command(angles)
return
def _replace_nan(self, pose: Pose) -> Pose:
xyz, quat = pose.xyz, pose.quat
fk = None
if np.any(np.isnan(xyz)):
if fk is None:
fk = self.current_fk()
xyz = fk.xyz
if np.any(np.isnan(qt.as_float_array(quat))):
if fk is None:
fk = self.current_fk()
quat = fk.quat
new = Pose(time=pose.time, xyz=xyz, quat=quat)
return new
[docs]
def state_from_lvl1(self, states: List[JState]):
di = {j.name: j for j in states}
joint_of_interest = set(self.joint_names) & set(di.keys())
self.accumulated_joint_state.update(
{
name: impose_state(
onto=self.accumulated_joint_state.get(name), fromm=di[name]
)
for name in joint_of_interest
}
)
for name in joint_of_interest:
ind = self.joint_names.index(name)
self.angles[ind] = di[name].position
missing = np.any(np.isnan(self.angles))
if not missing:
fk = self.send_current_fk()
self.joint_syncer.execute()
if not self.ready:
self.info(
f"Forward Kinematics: {TCOL.BOLD}{TCOL.OKGREEN}FULLY Ready :){TCOL.ENDC}:\n{fk}"
)
self.ready = True
def _verbose_timeout(self):
return self.now() - self.startup_time < Time(sec=self.SENS_VERBOSE_TIMEOUT)
[docs]
def verbose_check(self):
"""Checks that data is available, if not displays information to the user.
Returns:
True when the verbose doesn't need to be called anymore
"""
if self._verbose_timeout():
return False
controlled_joint = set(self.joint_names)
ready, missing = self.joint_syncer.ready(controlled_joint)
if missing == self._verbose_last_missing:
return False
self._verbose_last_missing = missing
if controlled_joint == missing:
self.warn(
f"{TCOL.ENDC}Forward Kinematics: {TCOL.FAIL}MISSING ALL :({TCOL.ENDC} {list_cyanize(missing)}{TCOL.ENDC}"
) # )
elif missing:
self.warn(
f"{TCOL.ENDC}Forward Kinematics: {TCOL.FAIL}MISSING{TCOL.ENDC} {list_cyanize(missing)}{TCOL.ENDC}, {TCOL.OKBLUE}available {list_cyanize(controlled_joint - missing)}{TCOL.ENDC}"
) # )
return ready
def _send_command(self, angles: NDArray):
assert self.last_sent.shape == angles.shape
assert angles.dtype in [float, np.float32]
target = {name: angle for name, angle in zip(self.joint_names, angles)}
try:
self.interp_method(target)
except AssertionError:
self.warn("Joint syncer not ready.")
self.joint_syncer.execute()
return
[docs]
def save_as_last(self, js: List[JState]):
order = self.joint_names
posdict: Dict[str, float] = only_position(js)
posarr = _order_dict2arr(order, posdict)
self.last_sent = posarr
[docs]
def send_to_lvl1(self, states: List[JState]):
self.save_as_last(states)
for f in self.send_to_lvl1_callbacks:
f(states)
[docs]
def send_current_fk(self) -> Pose:
fk = self.current_fk()
self.send_to_lvl3(fk)
return fk
[docs]
def send_to_lvl3(self, pose: Pose):
for f in self.send_to_lvl3_callbacks:
f(pose)
[docs]
def current_fk(self) -> Pose:
fw_result: List[SE3] = self.subModel.fkine(self.angles) # type: ignore
rot_matrix = np.array(fw_result[-1].R, dtype=float)
tip_coord: NDArray = fw_result[-1].t * 1000
tip_quat: qt.quaternion = qt.from_rotation_matrix(rot_matrix)
return Pose(time=self.now(), xyz=tip_coord, quat=tip_quat)
[docs]
class JointSyncerIk(JointSyncer):
def __init__(
self,
core: IKCore,
interpolation_delta: float = np.deg2rad(10),
on_target_delta: float = np.deg2rad(10),
) -> None:
super().__init__(interpolation_delta, on_target_delta)
self._core = core
[docs]
def send_to_lvl1(self, states: List[JState]):
self._core.send_to_lvl1(states)
@property
def sensor(self) -> Dict[str, JState]:
return self._core.accumulated_joint_state
@property
def FutureT(self) -> Type[Future]:
return Future