"""
Node and its object of level 1.
"""
from typing import Callable, Dict, Final, Iterable, List, Optional, Set, Tuple, Union
import numpy as np
from nptyping import NDArray
from roboticstoolbox.tools.urdf.urdf import Joint as RTBJoint
from motion_stack.api.injection.remapper import StateRemapper
from .utils.joint_state import JState, impose_state, jattr, jdata, js_changed, jstamp
from .utils.printing import TCOL, list_cyanize
from .utils.robot_parsing import get_limit, load_set_urdf, load_set_urdf_raw, make_ee
from .utils.static_executor import FlexNode
from .utils.time import NANOSEC, Time
[docs]
class JointHandler:
"""This handles a single joint.
The main purpose is to update stateSensor and stateCommand. As well as getting the
newest values for those (in order to not continuously publish unchanging data).
Args:
name: name of the joint (in the URDF)
parent_node: Parent object handling several joints and messages
joint_object: raw joint object from rtb, extracted from the URDF
IGNORE_LIM: If true, joint limits are ignored
MARGIN: Adds a margin to the joints limits
"""
_name: str #: name of the joint (in the URDF)
_parent: "JointCore" #: Parent object handling several joints and messages
_joint_object: RTBJoint
_upper: float = np.inf
_lower: float = -np.inf
_ignore_limits: bool = False
_command: JState #: current command state
_fresh_command: JState #: fresh command state (old data is replaced by None)
_sensor: JState #: current sensor state
_fresh_sensor: JState #: fresh sensor state (old data replaced by None)
_failed_init_advertized: bool = False
_init_advertized: bool = False
#: tolerance for two state to be identical. Time is also considered,
#: so 2 states far from each other in time will be considered different
#: and trigger an update
TOL_NO_CHANGE: Final[JState] = JState(
name="",
time=Time(sec=1),
position=np.deg2rad(0.1),
velocity=np.deg2rad(0.01),
effort=np.deg2rad(0.001),
)
#: if true enable a PID for speed control. Will be deprecated in favor of an injection
_smode: bool
PID_P = 3 #: P gain of the PID for speed mode. TO BE DEPRECATED
PID_D = 0.1 #: D gain of the PID for speed mode. TO BE DEPRECATED
PID_LATE = 0.0 #: Target will be reached late for smoother motion. TO BE DEPRECATED
PID_CLOSE_ENOUGH = np.deg2rad(0.01) #: TO BE DEPRECATED
def __init__(
self,
name: str,
parent_node: "JointCore",
joint_object: RTBJoint,
IGNORE_LIM: bool = False,
MARGIN: float = 0.0,
):
self._name = name
self._parent = parent_node
self._joint_object = joint_object
self._smode = self._parent.SPEED_MODE #: to place in an injection
self.IGNORE_LIM = IGNORE_LIM
self.MARGIN = MARGIN
self._load_limit()
self._command = JState(name=self._name)
self._fresh_command = self._command
self._sensor = JState(name=self._name)
self._fresh_sensor = self._sensor
self._critical_check()
@property
def command(self) -> JState:
"""Current command state"""
return self._command
@property
def sensor(self) -> JState:
"""current sendor state"""
return self._sensor
@property
def name(self) -> str:
"""Name of the joint"""
return self._name
def _load_limit(self):
"""Loads the limit from the (urdf) joint object
Args:
ignore: if limits should be ignored
"""
if self._ignore_limits:
self._lower: float = -np.inf
self._upper: float = np.inf
return
self._lower, self._upper = get_limit(self._joint_object)
def _critical_check(self):
"""Raises error is clearly wrong data"""
assert self._name == self._joint_object.name
assert self._lower <= self._upper
@property
def no_limit(self) -> bool:
"""
True if the joint has not limits
"""
return (self._lower, self._upper) == (-np.inf, np.inf)
@property
def command_ready(self) -> bool:
"""
True if no commands have been received
"""
return self._command.is_initialized
@property
def sensor_ready(self) -> bool:
"""
True if no sensor data have been received
"""
return self._sensor.is_initialized
def _checkAngle(self, angle: Optional[float]) -> bool:
"""True is angle is valid or None"""
if self.IGNORE_LIM or angle is None:
return True
return self._lower <= angle <= self._upper
def _activeAngleCheckCBK(self):
"""check the sensor angle for validity, changes the speed if invalid"""
no_problem = self._checkAngle(self._sensor.position)
if no_problem:
return
stop = self._sensor.copy()
stop.time = self._parent.now()
stop.velocity = 0
stop.effort = 0
self.set_js_command(stop)
def _applyAngleLimit(self, angle: float) -> Tuple[float, bool]:
"""Clamps the angle between the joints limits"""
if self.IGNORE_LIM:
return angle, True
out = np.clip(angle, a_min=self._lower, a_max=self._upper)
return out, out == angle
def _process_command(self, js: JState) -> JState:
"""Processes incomming jstate in view of storing it as the command.
it applies angle limits, checks, speed ...
Args:
js: incomming unprocessed JState
Returns:
Processed JState, ready to be used
"""
js = js.copy()
if js.time is not None:
js.time = js.time
else:
js.time = self._parent.now()
if js.position is not None:
js.position = self._process_angle_command(js.position)
if js.velocity is not None:
v = self._process_velocity_command(js.velocity)
emergency = v is None
if emergency and self._sensor.position is not None:
js.position = self._process_angle_command(self._sensor.position)
if emergency:
v = 0
js.velocity = v
if js.effort is not None:
js.effort = self._process_effort_command(js.effort)
return js
[docs]
def set_js_command(self, js: JState):
"""Updates the stateCommand to a new js."""
assert js.name == self._name
js = self._process_command(js)
self._command = impose_state(self._command, js)
self._fresh_command = impose_state(self._fresh_command, js)
[docs]
def is_new_jssensor(self, js: JState):
"""True if js is different enough from the last received.
Also true if stateSensor is more the TOL_NO_CHANGE.time old relative to the new
"""
d = self.TOL_NO_CHANGE.copy()
if self._sensor.time is not None and self.TOL_NO_CHANGE.time is not None:
# black magic to keep publishing in sync when no changes
# We basically refresh every t=N*dt, and not dt after the previous
ts = self._sensor.time
dt = self.TOL_NO_CHANGE.time
if dt.nano() == 0:
return True
d.time = Time(dt - ts % dt)
something_changed = js_changed(js, self._sensor, delta=d)
return something_changed
[docs]
def set_js_sensor(self, js: JState):
"""Updates the stateSensor to a new js."""
assert js.name == self._name
if not self.is_new_jssensor(js):
return
self._sensor = js # no processing for sensor
self._fresh_sensor = impose_state(self._fresh_sensor, js)
# if self.name == "joint1":
# print(f"{js=}\n{self._sensor=}\n{self._fresh_sensor=}")
def _process_angle_command(self, angle: float) -> float:
"""This runs on new js before updating stateCommand"""
angle_in = angle
angle, isValid = self._applyAngleLimit(angle)
if not isValid:
self._parent.warn(
f"{TCOL.OKCYAN}{self._name}{TCOL.WARNING} clipped to limit. "
f"{angle_in:.2f} -> {angle:.2f}"
)
return angle
[docs]
def set_angle_cmd(self, angle: float, time: Optional[Time] = None):
"""Updates stateCommand by providing only an angle.
should be avoided as the timestamp will be set to now.
"""
assert isinstance(angle, float)
if time is None:
time = self._parent.now()
new_js = JState(self._name)
new_js.position = angle
new_js.time = self._parent.now()
self.set_js_command(new_js)
def _process_velocity_command(self, speed: float) -> Optional[float]:
"""This runs on new js before updating stateCommand"""
if self._sensor.position is None or self.IGNORE_LIM:
goingLow = False
goingHi = False
else:
goingLow = self._sensor.position <= self._lower and speed <= 0
goingHi = self._sensor.position >= self._upper and speed >= 0
mustStop = goingLow or goingHi
if mustStop:
return None
return speed
[docs]
def set_speed_cmd(self, speed: float, stop_other_commands: bool = False):
"""Updates stateCommand by providing only an speed.
should be avoided as the timestamp will be set to now.
"""
if isinstance(speed, int):
speed = float(speed)
assert isinstance(speed, float)
new_js = JState(self._name)
new_js.velocity = speed
new_js.time = self._parent.now()
if stop_other_commands:
self._command.position = None
self._command.effort = None
else:
self.set_js_command(new_js)
def _process_effort_command(self, eff: float) -> float:
"""This runs on new js before updating stateCommand"""
return eff
[docs]
def set_effort_cmd(self, effort: float):
"""Updates stateCommand by providing only an effort.
should be avoided as the timestamp will be set to now.
"""
new_js = JState(self._name)
new_js.effort = effort
new_js.time = self._parent.now()
self.set_js_command(new_js)
def _angle_feedback(self) -> None:
"""PID updating the speed command based on last stateSensor"""
if not self._smode:
return
if self._command.position is None or self._sensor.position is None:
return
delta = self._command.position - self._sensor.position
small_angle = abs(delta) < self.PID_CLOSE_ENOUGH
if small_angle:
self.set_speed_cmd(0)
return
# print(f"{self._command.position} - {self._sensor.position}")
if self._command.velocity is None:
vel_comm = 0
else:
vel_comm = self._command.velocity
if self._sensor.velocity is None:
vel_sens = 0
else:
vel_sens = self._sensor.velocity
vel = vel_comm - vel_sens
speedPID = delta * self.PID_P + vel * self.PID_D
if self._command.time is None or self._sensor.time is None:
self.set_speed_cmd(speedPID)
return
period = Time(sec=1 / self._parent.MVMT_UPDATE_RATE + self.PID_LATE)
arrivalTime = self._command.time + period
# time left to reach the target
timeLeft: Time = arrivalTime - self._sensor.time
# if less than 5% of the time left to reach, we will go slower and not freak out
# with infinite speed
timeLeftSafe = Time(nano=max(0.05 * period.nano(), timeLeft.nano()))
perfectSpeed = delta / timeLeftSafe.sec()
pid_is_slower = abs(speedPID) < abs(perfectSpeed)
# pick the slower of the two methodes.
# when far away we move at constant speed to reach the destination on the next
# command. If close, or when the PID wants to slow down, the PID is used
speed = speedPID if pid_is_slower else perfectSpeed
# if self._parent.leg_num == 1:
# print(f"{self.name}: {abs(speed)=:.4f}")
self.set_speed_cmd(speed)
return
[docs]
def get_fresh_sensor(self, reset: bool = True) -> JState:
"""returns sensor data that is newer than the last time it was called.
if the sensor data didn't changed enough to trigger a refresh, this will
be full of None. If a refresh occured, the None will be replaced by the non-None
values in the new sensor data.
example: if you stop sending speed sensor data after sending a bunch of speeds.
This speed will switch to None, it will not continue to be the last received
speed.
This last received speed is still available in stateSensor.
"""
out = self._fresh_sensor.copy()
if reset:
self._fresh_sensor = JState(name=self._name)
return out
[docs]
def get_fresh_command(self, reset: bool = True) -> JState:
"""returns command data that is newer than the last time it was called.
full of None is not newer"""
self._angle_feedback()
out = self._fresh_command.copy()
if reset:
self._fresh_command = JState(name=self._name)
return out
[docs]
class JointCore(FlexNode):
"""Lvl1"""
#: Remapping around any joint state communication of lvl0. Overwritable
lvl0_remap: StateRemapper
#: Remapping around any joint state communication of lvl2. Overwritable
lvl2_remap: StateRemapper
#: leg number identifier, deduced from the parameters
leg_num: int
send_to_lvl0_callbacks: List[Callable[[List[JState]], None]] = []
send_to_lvl2_callbacks: List[Callable[[List[JState]], None]] = []
#: duration after which joints with no sensor data are displayed (warning)
SENS_VERBOSE_TIMEOUT: int = 1
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.lvl0_remap = StateRemapper() # empty
self.lvl2_remap = StateRemapper() # empty
# V Params V
# \ / #
# \/ #
self.leg_num = self.ms_param["leg_number"]
self.spinner.alias = f"J{self.leg_num}"
self.MOVEMENT_TIME = self.ms_param["std_movement_time"]
self.CONTROL_RATE = self.ms_param["control_rate"]
self.MVMT_UPDATE_RATE = self.ms_param["mvmt_update_rate"]
self.IGNORE_LIM = self.ms_param["ignore_limits"]
self.MARGIN: float = self.ms_param["limit_margin"]
self.SPEED_MODE: bool = self.ms_param["speed_mode"]
self.ADD_JOINTS: List[str] = list(self.ms_param["add_joints"])
self.urdf_raw = self.ms_param["urdf"]
self.start_effector: str | None = self.ms_param["start_effector_name"]
end_effector: str = self.ms_param["end_effector_name"]
cleanup = set(self.ADD_JOINTS)
cleanup -= {""}
self.ADD_JOINTS = list(cleanup)
self.end_effector_name = make_ee(end_effector, self.leg_num)
# /\ #
# / \ #
# ^ Params ^
# self.end_effector_name = None
# self.start_effector = None
(
self.model,
self.ETchain,
self.joint_names,
self.joints_objects,
self.last_link,
) = load_set_urdf_raw(self.urdf_raw, self.end_effector_name, self.start_effector)
# if end_effector == "ALL":
# self.end_effector_name = self.leg_num
# try: # kill me
# if isinstance(self.end_effector_name, str) and isinstance(
# self.start_effector, str
# ):
# ( # don't want to see this
# model2,
# ETchain2,
# joint_names2,
# joints_objects2,
# last_link2,
# ) = load_set_urdf_raw(
# self.urdf_raw, self.start_effector, self.end_effector_name
# )
# if len(joint_names2) > len(self.joint_names) and len(
# joints_objects2
# ) > len(self.joints_objects):
# joint_names2.reverse()
# self.joint_names = joint_names2
# joints_objects2.reverse()
# self.joints_objects = joints_objects2
# except:
# self.info(f"link tree could not be reversed")
# self.baselinkName = self.model.base_link.name # base of the whole model
if self.start_effector is None:
self.baselinkName = self.model.base_link.name
else:
self.baselinkName = self.start_effector
if self.baselinkName != self.model.base_link.name:
self.info(
f"base_link forced to `{self.baselinkName}` "
f"instead of the tf root `{self.model.base_link.name}`, "
f"this can render part of the tf tree missing, or worse"
)
#
# if not self.joint_names:
# self.dont_handle_body = True
# else:
# self.dont_handle_body = False
self.joint_names += self.ADD_JOINTS
self.joints_objects += [
RTBJoint(
joint_type="continuous",
parent=None,
child=None,
name=jn,
# limit=[0, 0],
)
for jn in self.ADD_JOINTS
]
# self.pinfo(f"Joints controled: {TCOL.OKCYAN}{self.joint_names}{TCOL.ENDC}")
ee = self.last_link.name if self.last_link is not None else "all joints"
self.info(
f"Using base_link: {TCOL.OKCYAN}{self.baselinkName}{TCOL.ENDC}"
f", to ee: {TCOL.OKCYAN}{ee}{TCOL.ENDC}"
)
self.jointHandlerDic: Dict[str, JointHandler] = {}
limits_undefined: List[str] = []
# self.pwarn([j.name for j in self.joints_objects])
# self.pwarn(self.joint_names)
for index, name in enumerate(self.joint_names):
jObj = None
for j in self.joints_objects:
if j.name == name:
jObj = j
break
assert jObj is not None
handler = JointHandler(
name, self, jObj, MARGIN=self.MARGIN, IGNORE_LIM=self.IGNORE_LIM
)
if handler.no_limit:
limits_undefined.append(jObj.name)
self.jointHandlerDic[name] = handler
if limits_undefined:
self.info(
f"{TCOL.WARNING}Undefined limits{TCOL.ENDC} "
f"in urdf for joint {limits_undefined}"
)
else:
self.info(f"{TCOL.OKBLUE}All URDF limits defined{TCOL.ENDC} ")
[docs]
def send_to_lvl0(self, states: List[JState]):
"""Sends states to lvl0 (commands for motors).
This function is executed every time data needs to be sent down.
.. Important::
Change/overload this method with what you need.
Or put what you want to execute in self.send_to_lvl0_callbacks
"""
# self.info(f"sending {states}")
for f in self.send_to_lvl0_callbacks:
# self.info(f"sending through {f.__name__}")
f(states)
[docs]
def send_to_lvl2(self, states: List[JState]):
"""Sends states to lvl2 (states for ik).
This function is executed every time data needs to be sent up.
.. Important::
Change/overload this method with what you need.
Or put what you want to execute in self.send_to_lvl0_callbacks
"""
# self.info(f"sending {states}")
for f in self.send_to_lvl2_callbacks:
# self.info(f"sending through {f.__name__}")
f(states)
[docs]
def coming_from_lvl2(self, states: List[JState]):
"""Processes incomming commands from lvl2 ik.
Call this function after processing the data into a List[JState]
.. Caution::
Overloading this is not advised, but if you do, always do
super().coming_from_lvl0(states) before your code.
Unless you know what you are doing
"""
# self.info(f"received {states}")
stamp = None
self.lvl2_remap.unmap(states)
for s in states:
if s.time is None:
stamp = self.now() if stamp is None else stamp
s.time = stamp
self._push_commands(states)
[docs]
def coming_from_lvl0(self, states: List[JState]):
"""Processes incomming sensor states from lvl0 motors.
Call this function after processing the data into a List[JState]
.. Caution::
Overloading this is not advised, but if you do, always do
super().coming_from_lvl0(states) before your code.
Unless you know what you are doing
"""
stamp = None
self.lvl0_remap.unmap(states)
for s in states:
if s.time is None:
stamp = self.now() if stamp is None else stamp
s.time = stamp
# if self.leg_num ==1:
# self.warn(states)
self._push_sensors(states)
[docs]
def send_sensor_up(self):
"""pulls and resets fresh sensor data, applies remapping, then sends it to lvl2"""
states = self._pull_sensors()
self.lvl2_remap.map(states)
self.send_to_lvl2(states)
[docs]
def send_command_down(self):
"""pulls and resets fresh command data, applies remapping, then sends it to lvl0"""
states = self._pull_commands()
self.lvl0_remap.map(states)
self.send_to_lvl0(states)
@property
def _sensors_missing(self) -> Set[str]:
return {
name for name, jobj in self.jointHandlerDic.items() if not jobj.sensor_ready
}
@property
def _sensors_ready(self) -> Set[str]:
return self._all_joint_names - self._sensors_missing
@property
def _all_joint_names(self) -> Set[str]:
return set(self.jointHandlerDic.keys())
def _advertize_success(self, names: Iterable[str]):
if not names:
return
self.info(
f"{TCOL.OKGREEN}Angle recieved{TCOL.ENDC} on " f"joints {list_cyanize(names)}"
)
if not self._sensors_missing:
self.info(f"{TCOL.OKGREEN}Angle recieved on ALL joints :){TCOL.ENDC}")
for n in names:
jobj = self.jointHandlerDic[n]
jobj._failed_init_advertized = True
jobj._init_advertized = True
def _advertize_failure(self, names: Iterable[str]):
if not names:
return
self.warn(
f"No angle readings yet on {list_cyanize(names)}. "
f"Might not be published. :(" # )
)
if not self._sensors_missing:
self.info(f"{TCOL.OKGREEN}Angle recieved on ALL joints :){TCOL.ENDC}")
for n in names:
jobj = self.jointHandlerDic[n]
jobj._failed_init_advertized = True
[docs]
def sensor_check_verbose(self) -> bool:
"""Checks that all joints are receiving data.
After TIMEOUT, if not, warns the user.
Returns:
True if all joints have angle data
"""
less_than_timeout = self.now() - self.startup_time < Time(
sec=self.SENS_VERBOSE_TIMEOUT
)
defined = self._sensors_ready
undefined = self._sensors_missing
fail_done = {
name
for name, jobj in self.jointHandlerDic.items()
if jobj._failed_init_advertized
}
success_done = {
name for name, jobj in self.jointHandlerDic.items() if jobj._init_advertized
}
succes_to_be_advertized = defined - success_done
failure_to_be_advertized = undefined - fail_done
all_are_ready = not bool(undefined)
if less_than_timeout and not all_are_ready:
return False
self._advertize_success(succes_to_be_advertized)
self._advertize_failure(failure_to_be_advertized)
return all_are_ready
[docs]
def send_empty_command_to_lvl0(self):
"""Sends a command to lvl0 with no data.
Usefull to initialize lvl0 by giving only the joint names."""
js: List[JState] = [JState(name=n) for n in self._all_joint_names]
self.lvl0_remap.map(js)
self.send_to_lvl0(js)
def _push_commands(self, states: List[JState]) -> None:
for js in states:
if js.name is None:
continue
joint = self.jointHandlerDic.get(js.name)
if joint is None:
continue
joint.set_js_command(js)
def _push_sensors(self, states: Iterable[JState]) -> None:
"""Sets sensor state on several joints"""
for js in states:
if js.name is None:
continue
handler = self.jointHandlerDic.get(js.name)
if handler is None:
continue
handler.set_js_sensor(js)
def _pull_sensors(self, reset=True) -> List[JState]:
"""Gets fresh sensor state for all joints and resets it"""
allStates: List[JState] = []
for handler in self.jointHandlerDic.values():
state: JState = handler.get_fresh_sensor(reset)
allEmpty: bool = (
(state.velocity is None)
and (state.position is None)
and (state.effort is None)
)
if not allEmpty:
allStates.append(state)
return allStates
def _pull_commands(self) -> List[JState]:
"""Gets fresh commands state for all joints and resets it"""
allStates: List[JState] = []
for jointMiniNode in self.jointHandlerDic.values():
state: JState = jointMiniNode.get_fresh_command()
allEmpty: bool = (
(state.velocity is None)
and (state.position is None)
and (state.effort is None)
)
if allEmpty:
continue
allStates.append(state)
return allStates
[docs]
def all_go_zero(self):
"""Sends command of angle=0 to all joints"""
states = [JState(name=n, position=0) for n in self._all_joint_names]
self.coming_from_lvl2(states)