"""
This node is responsible for recieving targets in the body reference frame, and send the
corresponding angles to the motors.
Author: Elian NEPPEL
Lab: SRL, Moonshot team
"""
from asyncio import Future
import matplotlib
from motion_stack.api.joint_syncer import JointSyncer
matplotlib.use("Agg") # fix for when there is no display
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, Type
import numpy as np
import quaternion as qt
import roboticstoolbox as rtb
from geometry_msgs.msg import Transform, Vector3
from numpy.typing import NDArray
from roboticstoolbox import ET, ETS, Link, Robot
from roboticstoolbox.robot.ET import SE3
from roboticstoolbox.tools import Joint
from .utils.joint_state import JState, impose_state, jattr, jdata, js_changed, jstamp
from .utils.pose import Pose
from .utils.printing import TCOL, list_cyanize
from .utils.robot_parsing import (
get_limit,
joint_by_joint_fk,
load_set_urdf,
load_set_urdf_raw,
make_ee,
)
from .utils.static_executor import FlexNode
from .utils.time import NANOSEC, Time
float_formatter = "{:.2f}".format
np.set_printoptions(formatter={"float_kind": float_formatter})
# IK_MAX_VEL = 0.003 # changes depending on the refresh rate idk why. This is bad
IK_MAX_VEL = (
10 # changes depending on the refresh rate and dimensions idk why. This is bad
)
[docs]
class IKCore(FlexNode):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.send_to_lvl1_callbacks: List[Callable[[List[JState]], Any]] = []
self.send_to_lvl3_callbacks: List[Callable[[Pose], Any]] = []
self.RESET_LAST_SENT: Time = Time(sec=0.5)
# V Parameters V
# \ / #
# \/ #
self.leg_num = self.ms_param["leg_number"]
self.spinner.alias = f"IK{self.leg_num}"
self.REFRESH_RATE = self.ms_param["mvmt_update_rate"]
self.IGNORE_LIM = self.ms_param["ignore_limits"]
self.MARGIN: float = self.ms_param["limit_margin"]
self.SPEED_MODE: bool = self.ms_param["speed_mode"]
self.ADD_JOINTS: List[str] = list(self.ms_param["add_joints"])
self.urdf_raw = self.ms_param["urdf"]
self.start_effector: str | None = self.ms_param["start_effector_name"]
end_effector: str = self.ms_param["end_effector_name"]
self.end_effector_name = make_ee(end_effector, self.leg_num)
(
self.model,
self.et_chain,
self.joint_names,
self.joints_objects,
self.last_link,
) = load_set_urdf_raw(
self.urdf_raw, self.end_effector_name, self.start_effector
)
# try:
# if isinstance(self.end_effector_name, str) and isinstance(
# self.start_effector, str
# ):
# ( # don't want to see this
# model2,
# ETchain2,
# joint_names2,
# joints_objects2,
# last_link2,
# ) = load_set_urdf_raw(
# self.urdf_raw, self.start_effector, self.end_effector_name
# )
# if len(joint_names2) > len(self.joint_names) and len(
# joints_objects2
# ) > len(self.joints_objects):
# joint_names2.reverse()
# self.joint_names = joint_names2
# joints_objects2.reverse()
# self.joints_objects = joints_objects2
# except:
# self.info(f"link tree could not be reversed")
# self.pinfo(self.model)
self.et_chain: ETS
# self.pwarn(len(self.joints_objects))
# self.ETchain = ETS(self.ETchain.compile())
self.end_link: Link = self.last_link # type: ignore
self.info(f"Kinematic chain is:\n{self.et_chain}")
jbj_fk = joint_by_joint_fk(self.et_chain, self.joint_names)
self.info(f"Joint by joint forward kinematics:\n{jbj_fk}")
# self.ETchain = self.all_limits(self.ETchain, self.joints_objects)
self.subModel: Robot = rtb.Robot(self.et_chain)
self.info(
f"Using base_link: {TCOL.OKCYAN}{self.model.base_link.name}{TCOL.ENDC}"
f", to ee: {TCOL.OKCYAN}{self.end_link.name}{TCOL.ENDC}"
)
# /\ #
# / \ #
# ^ Parameters ^
self.stated: Dict[str, JState] = {} #: recent addition storing the whole state
self.angles: NDArray = np.empty(len(self.joint_names), dtype=float)
self.angles[:] = np.nan
self.last_sent: NDArray = self.angles.copy()
self.lastTimeIK = Time(0)
self.ready = False
self.joint_syncer = JointSyncerIk(self)
[docs]
def firstSpinCBK(self):
return
[docs]
def all_limits(self, et_chain: ETS, jobjL: List[Joint]):
li = 0
for j in et_chain:
if j.isjoint:
jobj = jobjL[li]
li += 1
self.error(
f"{jobj.name}, {jobj.limit.lower}, {jobj.limit.upper}, {j._qlim}"
)
return ETS(et_chain)
[docs]
def compute_raw_ik(
self,
pose: Pose,
start: NDArray,
compute_budget: Optional[Time] = None, # type: ignore
mvt_duration: Optional[Time] = None, # type: ignore
) -> Tuple[Optional[NDArray], bool]:
computeBudget: Time
deltaTime: Time
if mvt_duration is None:
deltaTime = Time(sec=1 / self.REFRESH_RATE) # type: ignore
else:
deltaTime = mvt_duration
if compute_budget is None:
computeBudget = Time(sec=1 / self.REFRESH_RATE)
else:
computeBudget = compute_budget
del compute_budget, mvt_duration # ugly af
finish_by: Time = self.now() + computeBudget
motion: SE3 = SE3(pose.xyz) # type: ignore
motion.A[:3, :3] = qt.as_rotation_matrix(pose.quat) # type: ignore
if self.angles.shape[0] > 5:
mask = np.array([1, 1, 1, 1, 1, 1], dtype=float)
else:
mask = np.array([1, 1, 1, 0, 0, 0], dtype=float)
angles: NDArray = self.angles.copy()
np.nan_to_num(x=angles, nan=0.0, copy=False)
np.nan_to_num(x=start, nan=0.0, copy=False)
# self.pinfo(f"start: {start}")
# for trial in range(4):
trial = -1
trialLimit = 20
bestSolution: Optional[NDArray] = None
velMaybe: float = 1000000
validSolFound = False
compBudgetExceeded = lambda: self.now() > finish_by
# compBudgetExceeded = lambda: False
while trial < trialLimit and not compBudgetExceeded():
# self.pinfo(f"trial {trial}")
trial += 1
startingPose = start.copy()
if trial == 0:
i = 10
s = 1
if trial == 1:
i = 50
s = 1
else:
i = 50
s = 1_000
# s = 100
stpose = np.empty((s, startingPose.shape[0]), float)
stpose[:, :] = startingPose.reshape(1, -1)
r = np.random.rand(stpose.shape[0], stpose.shape[1])
r = r * 2 - 1
maxi = 1 / 100
mini = maxi / 100
r = r * np.linspace(mini, maxi, s, endpoint=True).reshape(-1, 1)
startingPose = stpose + r
# self.pwarn(startingPose)
ik_result = self.subModel.ik_LM(
# ik_result = self.subModel.ik_NR(
Tep=motion,
q0=startingPose,
mask=mask,
ilimit=i,
slimit=s,
joint_limits=not self.IGNORE_LIM,
# pinv=True,
# pinv_damping=0.2,
tol=1e-6,
)
# self.pwarn(not self.IGNORE_LIM)
solFound = ik_result[1]
sol = np.array(ik_result[0], dtype=float)
sol_im = 1 * np.exp(1j * sol)
star = 1 * np.exp(1j * start)
delt = sol_im / star
real_delta = np.angle(delt)
real_angles = start + real_delta
# self.pinfo(sol)
# self.pwarn(real_angles)
for ind, a in enumerate(real_angles):
l = self.joints_objects[ind].limit
if l is None:
continue
lup = l.upper
llo = l.lower
# if limit exceeded we go back to the original solution
if lup is not None and not self.IGNORE_LIM:
if real_angles[ind] > lup:
real_angles[ind] = sol[ind]
if llo is not None and not self.IGNORE_LIM:
if real_angles[ind] < llo:
real_angles[ind] = sol[ind]
# self.pwarn(real_angles)
delta = real_angles - start
# dist = float(np.linalg.norm(delta, ord=np.inf))
dist = float(np.linalg.norm(delta, ord=3))
velocity: float = dist / deltaTime.sec()
if solFound:
if abs(velocity) < abs(IK_MAX_VEL):
angles = real_angles
validSolFound = True
velMaybe = velocity
bestSolution = real_angles
break
isBetter = velocity < velMaybe
if isBetter:
bestSolution = real_angles
velMaybe = velocity
if compBudgetExceeded():
self.warn("IK slow, compute terminated")
return bestSolution, validSolFound
[docs]
def find_next_ik(
self,
pose: Pose,
compute_budget: Optional[Time] = None, # type: ignore
mvt_duration: Optional[Time] = None, # type: ignore
) -> NDArray:
if not self.ready:
self.warn(f"{TCOL.FAIL} No angle data available, assuming joints at 0.")
arriveTime: Time = self.now()
deltaTime: Time = arriveTime - self.lastTimeIK
self.lastTimeIK = arriveTime
ikIsRecent = deltaTime < self.RESET_LAST_SENT
if ikIsRecent:
start: NDArray = self.last_sent.copy()
else:
start: NDArray = self.angles.copy()
assert start.shape == self.angles.shape
bestSolution, validSolutionFound = self.compute_raw_ik(
pose,
start,
compute_budget=compute_budget, # type: ignore
mvt_duration=mvt_duration, # type: ignore
)
if bestSolution is None:
self.warn("""no IK found :C""")
return start
if not validSolutionFound:
pass
self.warn("no continuous IK found :C")
return bestSolution
[docs]
def ik_target(self, pose: Pose) -> None:
"""
recieves target from leg, converts to numpy, computes IK, sends angle
results to joints
Args:
msg: target as Ros2 Vector3
"""
pose = self._replace_nan(pose)
pose.xyz /= 1000
angles = self.find_next_ik(pose)
self._send_command(angles)
return
def _replace_nan(self, pose: Pose) -> Pose:
xyz, quat = pose.xyz, pose.quat
fk = None
if np.any(np.isnan(xyz)):
if fk is None:
fk = self.current_fk()
xyz = fk.xyz
if np.any(np.isnan(qt.as_float_array(quat))):
if fk is None:
fk = self.current_fk()
quat = fk.quat
new = Pose(time=pose.time, xyz=xyz, quat=quat)
return new
[docs]
def state_from_lvl1(self, states: List[JState]):
di = {j.name: j for j in states}
joint_of_interest = set(self.joint_names) & set(di.keys())
self.stated.update(
{
name: impose_state(onto=self.stated.get(name), fromm=di[name])
for name in joint_of_interest
}
)
for name in joint_of_interest:
ind = self.joint_names.index(name)
self.angles[ind] = di[name].position
missing = np.any(np.isnan(self.angles))
if not missing:
fk = self.send_current_fk()
if not self.ready:
self.info(f"{TCOL.OKGREEN}Forward Kinematics ready :){TCOL.ENDC}: {fk}")
self.ready = True
else:
self.joint_syncer.execute()
def _send_command(self, angles: NDArray):
assert self.last_sent.shape == angles.shape
assert angles.dtype in [float, np.float32]
self.last_sent: NDArray = angles.copy()
target = {name: angle for name, angle in zip(self.joint_names, angles)}
try:
# self.joint_syncer.lerp(target)
self.joint_syncer.unsafe(target)
except AssertionError:
self.warn("Joint syncer not ready.")
self.joint_syncer.execute()
return
[docs]
def send_to_lvl1(self, states: List[JState]):
for f in self.send_to_lvl1_callbacks:
f(states)
[docs]
def send_current_fk(self) -> Pose:
fk = self.current_fk()
self.send_to_lvl3(fk)
return fk
[docs]
def send_to_lvl3(self, pose: Pose):
for f in self.send_to_lvl3_callbacks:
f(pose)
[docs]
def current_fk(self) -> Pose:
fw_result: List[SE3] = self.subModel.fkine(self.angles) # type: ignore
rot_matrix = np.array(fw_result[-1].R, dtype=float)
tip_coord: NDArray = fw_result[-1].t * 1000
tip_quat: qt.quaternion = qt.from_rotation_matrix(rot_matrix)
return Pose(time=self.now(), xyz=tip_coord, quat=tip_quat)
[docs]
class JointSyncerIk(JointSyncer):
def __init__(
self,
core: IKCore,
interpolation_delta: float = np.deg2rad(10),
on_target_delta: float = np.deg2rad(10),
) -> None:
super().__init__(interpolation_delta, on_target_delta)
self._core = core
[docs]
def send_to_lvl1(self, states: List[JState]):
self._core.send_to_lvl1(states)
@property
def sensor(self) -> Dict[str, JState]:
return self._core.stated
@property
def FutureT(self) -> Type[Future]:
return Future