import copy
import matplotlib
matplotlib.use("Agg") # fix for when there is no display
# from pytest import ExitCode
from typing import Dict, Final, Iterable, List, Optional, Tuple
import numpy as np
import quaternion as qt
import tf2_ros
from geometry_msgs.msg import Transform, TransformStamped
from motion_stack_msgs.srv import ReturnJointState, SendJointState
from numpy.typing import NDArray
from rclpy.node import MutuallyExclusiveCallbackGroup, Publisher, Service, Timer, Union
from rclpy.time import Duration, Time
from scipy.spatial import geometric_slerp
from sensor_msgs.msg import JointState
from std_msgs.msg import Float64
from std_srvs.srv import Empty
from std_srvs.srv import Empty as EmptySrv
from easy_robot_control.EliaNode import (
EliaNode,
Joint,
bcolors,
error_catcher,
list_cyanize,
loadAndSet_URDF,
myMain,
np2tf,
replace_incompatible_char_ros2,
rosTime2Float,
tf2np,
)
from easy_robot_control.utils.joint_state_util import (
JState,
impose_state,
js_changed,
js_copy,
js_from_ros,
stateOrderinator3000,
)
from easy_robot_control.utils.state_remaper import StateRemapper, empty_remapper
P_GAIN = 3.5
D_GAIN = 0.00005 # can be improved
INIT_AT_ZERO = False # dangerous
CLOSE_ENOUGH = np.deg2rad(0.25)
LATE = 0.3
EXIT_CODE_TEST = {
0: "OK",
1: "TESTS_FAILED",
2: "INTERRUPTED",
3: "INTERNAL_ERROR",
4: "USAGE_ERROR",
5: "NO_TESTS_COLLECTED",
}
ECO_MODE_PERIOD: float = 0.5 # seconds
TOL_NO_CHANGE: Final[JState] = JState(
name="",
time=Time(seconds=ECO_MODE_PERIOD),
position=np.deg2rad(0.1),
velocity=np.deg2rad(0.1),
effort=np.deg2rad(0.1),
)
[docs]
class JointHandler:
"""This handles a single joint.
The main purpose is to update stateSensor and stateCommand. As well as getting the
newest values for those (in order to not continuously publish unchanging data).
"""
def __init__(
self,
name: str,
parent_node: "JointNode",
joint_object: Joint,
IGNORE_LIM: bool = False,
MARGIN: float = 0.0,
):
self.name = name
self._corrected_name = replace_incompatible_char_ros2(name)
self.parent = parent_node
self.joint_object = joint_object
self._smode = self.parent.SPEED_MODE
self.IGNORE_LIM = IGNORE_LIM
self.MARGIN = MARGIN
self.offset: float = 0.0
self.lower: float = -np.inf
self.upper: float = np.inf
self.load_limit(self.IGNORE_LIM)
assert self.name == self.joint_object.name
self.stateCommand = JState(name=self.name)
self.fresh_command = self.stateCommand
self.stateSensor = JState(name=self.name)
self.fresh_sensor = self.stateSensor
self._activeAngleCheckTMR = self.parent.create_timer(
timer_period_sec=0.2, callback=self._activeAngleCheckCBK
)
self._ang_speakupTMR: Optional[Timer] = None
[docs]
def load_limit(self, ignore: bool, jobj: Optional[Joint] = None):
"""Loads the limit from the (urdf) joint object
Args:
ignore: if limits should be ignored
"""
if jobj is None:
jobj = self.joint_object
if not ignore:
try:
self.lower: float = jobj.limit.lower + self.MARGIN
self.upper: float = jobj.limit.upper - self.MARGIN
except AttributeError:
self.IGNORE_LIM = True
self.lower: float = -np.inf
self.upper: float = np.inf
else:
self.lower: float = -np.inf
self.upper: float = np.inf
assert self.lower <= self.upper
@property
def limit_rejected(self) -> bool:
"""
Returns: if the limit was rejected when loading (often when not defined in urdf)
"""
return self.parent.IGNORE_LIM != self.IGNORE_LIM
[docs]
def speakup_when_angle(self):
"""start a verbose check every seconds for new angles"""
if self._ang_speakupTMR is None:
self._ang_speakupTMR = self.parent.create_timer(1, self._ang_speakupTMRCBK)
if self._ang_speakupTMR.is_canceled():
self._ang_speakupTMR.reset()
self._ang_speakupTMRCBK()
def _ang_speakupTMRCBK(self):
"""if angle is defined, prints happy meseage and cancel the timer"""
if self.stateSensor.position is not None:
self.parent.pinfo(
f"{bcolors.OKGREEN}Angle received on {self.name}{bcolors.ENDC}"
)
if self._ang_speakupTMR is not None:
self.parent.destroy_timer(self._ang_speakupTMR)
[docs]
def checkAngle(self, angle: Optional[float]) -> bool:
"""True is angle is valid or None"""
if self.IGNORE_LIM or angle is None:
return True
return self.lower <= angle <= self.upper
@error_catcher
def _activeAngleCheckCBK(self):
"""check the sensor angle for validity, changes the speed if invalid"""
if self.checkAngle(self.stateSensor.position):
return
self._update_speed_cmd(None)
[docs]
def applyAngleLimit(self, angle: float) -> Tuple[float, bool]:
"""Clamps the angle between the joints limits"""
if self.IGNORE_LIM:
return angle, True
out = np.clip(angle, a_min=self.lower, a_max=self.upper)
return out, out == angle
[docs]
def resetAnglesAtZero(self):
self._update_angle_cmd(0)
def _process_command(self, js: JState) -> JState:
"""Processes incomming jstate in view of storing it as the command.
it applies angle limits, checks, speed ...
Args:
js: incomming unprocessed JState
Returns:
Processed JState, ready to be used
"""
js = js_copy(js)
if js.time is not None:
js.time = js.time
else:
js.time = self.parent.getNow()
if js.position is not None:
js.position = self.process_angle_command(js.position)
if js.velocity is not None:
v = self.process_velocity_command(js.velocity)
emergency = v is None
if emergency and self.stateSensor.position is not None:
js.position = self.process_angle_command(self.stateSensor.position)
if emergency:
v = 0
js.velocity = v
if js.effort is not None:
js.effort = self.process_effort_command(js.effort)
return js
[docs]
def update_js_command(self, js: JState):
"""Updates the stateCommand to a new js."""
assert js.name == self.name
js = self._process_command(js)
self.stateCommand = impose_state(self.stateCommand, js)
self.fresh_command = impose_state(self.fresh_command, js)
[docs]
def is_new_jssensor(self, js: JState):
"""True if js is different enough from the last received.
Also true if stateSensor is more the TOL_NO_CHANGE.time old relative to the new
"""
d = js_copy(TOL_NO_CHANGE)
if self.stateSensor.time is not None and TOL_NO_CHANGE.time is not None:
# black magic to keep publishing in sync when no changes
# We basically refresh every t=N*dt, and not dt after the previous
ts = self.stateSensor.time.nanoseconds
dt = TOL_NO_CHANGE.time.nanoseconds
d.time = Time(nanoseconds=dt - ts % dt)
something_changed = js_changed(js, self.stateSensor, delta=d)
return something_changed
[docs]
def setJSSensor(self, js: JState):
"""Updates the stateSensor to a new js."""
assert js.name == self.name
if not self.is_new_jssensor(js):
return
self.stateSensor = js # no processing for sensor
self.fresh_sensor = impose_state(self.fresh_sensor, js)
self.sensor_updated = True
[docs]
def process_angle_command(self, angle: float) -> float:
"""This runs on new js before updating stateCommand"""
angle_in = angle
angle, isValid = self.applyAngleLimit(angle)
if not isValid:
self.parent.pwarn(
f"{bcolors.OKCYAN}{self.name}{bcolors.WARNING} clipped to limit. "
f"{angle_in:.2f} -> {angle:.2f}"
)
return angle + self.offset
@error_catcher
def _update_angle_cmd(self, msg: Union[Float64, float]):
"""Updates stateCommand by providing only an angle.
should be avoided as the timestamp will be set to now.
"""
if isinstance(msg, Float64):
angle = msg.data
else:
angle = msg
assert isinstance(angle, float)
new_js = JState(self.name)
new_js.position = angle
new_js.time = self.parent.getNow()
self.update_js_command(new_js)
[docs]
def process_velocity_command(self, speed: float) -> Optional[float]:
"""This runs on new js before updating stateCommand"""
if self.stateSensor.position is None or self.IGNORE_LIM:
goingLow = False
goingHi = False
else:
goingLow = self.stateSensor.position <= self.lower and speed <= 0
goingHi = self.stateSensor.position >= self.upper and speed >= 0
mustStop = goingLow or goingHi
if mustStop:
return None
return speed
@error_catcher
def _update_speed_cmd(self, msg: Union[Float64, float, None]):
"""Updates stateCommand by providing only an speed.
should be avoided as the timestamp will be set to now.
"""
speed: float
if isinstance(msg, Float64):
speed = msg.data
self.stateCommand.position = None
elif msg is None:
last_vel = self.stateCommand.velocity
if last_vel is None:
speed = 0.0
else:
speed = float(last_vel)
else:
speed = float(msg)
assert isinstance(speed, float)
new_js = JState(self.name)
new_js.velocity = speed
new_js.time = self.parent.getNow()
self.update_js_command(new_js)
[docs]
def process_effort_command(self, eff: float) -> float:
"""This runs on new js before updating stateCommand"""
return eff
[docs]
@error_catcher
def set_effortCBK(self, msg: Union[Float64, float]):
"""Updates stateCommand by providing only an effort.
should be avoided as the timestamp will be set to now.
"""
if isinstance(msg, Float64):
effort = msg.data
else:
effort = msg
new_js = JState(self.name)
new_js.effort = effort
new_js.time = self.parent.getNow()
self.update_js_command(new_js)
def _angle_feedback(self) -> None:
"""PID updating the speed command based on last stateSensor"""
if not self._smode:
return
if self.stateCommand.position is None or self.stateSensor.position is None:
return
delta = self.stateCommand.position - self.stateSensor.position
small_angle = abs(delta) < CLOSE_ENOUGH
if small_angle:
self._update_speed_cmd(0)
return
if self.stateSensor.velocity is None:
vel = 0
else:
vel = self.stateSensor.velocity
speedPID = delta * P_GAIN - vel * D_GAIN
if self.stateCommand.time is None or self.stateSensor.time is None:
self._update_speed_cmd(speedPID)
return
period = 1 / self.parent.MVMT_UPDATE_RATE + LATE
arrivalTime = self.stateCommand.time + Duration(
seconds=period # type:ignore
)
# time left to reach the target
timeLeft: Duration = arrivalTime - self.stateSensor.time
# if less than 5% of the time left to reach, we will go slower and not freak out
# with infinite speed
timeLeftSafe = max(0.05 * period, rosTime2Float(timeLeft))
perfectSpeed = delta / timeLeftSafe
pid_is_slower = abs(speedPID) < abs(perfectSpeed)
# pick the slower of the two methodes.
# when far away we move at constant speed to reach the destination on the next
# command. If close, or when the PID wants to slow down, the PID is used
speed = speedPID if pid_is_slower else perfectSpeed
self._update_speed_cmd(speed)
return
[docs]
def get_fresh_sensor(self, reset: bool = True) -> JState:
"""returns sensor data that is newer than the last time it was called.
if the sensor data didn't changed enough to trigger a refresh, this will
be full of None. If a refresh occured, the None will be replaced by the non-None
values in the new sensor data.
example: if you stop sending speed sensor data after sending a bunch of speeds.
This speed will switch to None, it will not continue to be the last received
speed.
This last received speed is still available in stateSensor.
"""
out = self.fresh_sensor.copy()
if out.position is not None:
out.position -= self.offset
if reset:
self.fresh_sensor = JState(name=self.name)
return out
[docs]
def get_freshCommand(self, reset: bool = True) -> JState:
"""returns command data that is newer than the last time it was called.
full of None is not newer"""
self._angle_feedback()
out = self.fresh_command.copy()
if reset:
self.fresh_command = JState(name=self.name)
return out
[docs]
class JointNode(EliaNode):
"""Lvl1
"""
#: Remapping around any joint state communication of lvl0
lvl0_remap: StateRemapper
#: Remapping around any joint state communication of lvl2
lvl2_remap: StateRemapper
def __init__(self):
# rclpy.init()
self.lvl0_remap: StateRemapper = empty_remapper
self.lvl2_remap: StateRemapper = empty_remapper
super().__init__("joint") # type: ignore
self.NAMESPACE = self.get_namespace()
self.declare_parameter("leg_number", 0)
self.leg_num = (
self.get_parameter("leg_number").get_parameter_value().integer_value
)
self.Alias = f"J{self.leg_num}"
self.current_body_xyz: NDArray = np.array([0, 0, 0], dtype=float)
self.current_body_quat: qt.quaternion = qt.one
self.body_xyz_queue = np.zeros((0, 3), dtype=float)
self.body_quat_queue = qt.from_float_array(np.zeros((0, 4), dtype=float))
self.wait_for_lower_level(["rviz_interface_alive"], all_requiered=False)
self.pinfo(f"""{bcolors.OKBLUE}Interface connected to motors :){bcolors.ENDC}""")
# V Params V
# \ / #
# \/ #
self.declare_parameter("std_movement_time", float(0.5))
self.MOVEMENT_TIME = (
self.get_parameter("std_movement_time").get_parameter_value().double_value
)
self.declare_parameter("frame_prefix", "")
self.FRAME_PREFIX = (
self.get_parameter("frame_prefix").get_parameter_value().string_value
)
self.declare_parameter("control_rate", float(30))
self.CONTROL_RATE = (
self.get_parameter("control_rate").get_parameter_value().double_value
)
self.declare_parameter("mvmt_update_rate", float(30))
self.MVMT_UPDATE_RATE = (
self.get_parameter("mvmt_update_rate").get_parameter_value().double_value
)
self.declare_parameter("ignore_limits", False)
self.IGNORE_LIM = (
self.get_parameter("ignore_limits").get_parameter_value().bool_value
)
self.declare_parameter("limit_margin", float(0))
self.MARGIN: float = (
self.get_parameter("limit_margin").get_parameter_value().double_value
)
self.declare_parameter("speed_mode", False)
self.SPEED_MODE: bool = (
self.get_parameter("speed_mode").get_parameter_value().bool_value
)
self.declare_parameter("add_joints", [""])
self.ADD_JOINTS: List[str] = list(
self.get_parameter("add_joints").get_parameter_value().string_array_value
)
cleanup = set(self.ADD_JOINTS)
cleanup -= {""}
self.ADD_JOINTS = list(cleanup)
# self.SPEED_MODE: bool = True
# self.pwarn(self.SPEED_MODE)
self.declare_parameter("start_coord", [0.0, 0.0, 0.0])
self.START_COORD = np.array(
self.get_parameter("start_coord").get_parameter_value().double_array_value,
dtype=float,
)
if np.isnan(self.START_COORD).any():
self.current_body_xyz: NDArray = np.array([0, 0, 0], dtype=float)
self.dont_handle_body = True
else:
self.current_body_xyz: NDArray = self.START_COORD
self.dont_handle_body = False
self.declare_parameter("mirror_angles", False)
self.MIRROR_ANGLES: bool = (
self.get_parameter("mirror_angles").get_parameter_value().bool_value
) # DEBBUG: sends back received angles immediatly
self.declare_parameter("urdf_path", str())
self.urdf_path = (
self.get_parameter("urdf_path").get_parameter_value().string_value
)
self.declare_parameter("start_effector_name", str(f""))
self.start_effector: str | None = (
self.get_parameter("start_effector_name").get_parameter_value().string_value
)
if self.start_effector == "":
self.start_effector = None
self.declare_parameter("end_effector_name", str(self.leg_num))
end_effector: str = (
self.get_parameter("end_effector_name").get_parameter_value().string_value
)
self.end_effector_name: Union[str, int]
if end_effector.isdigit():
self.end_effector_name = int(end_effector)
else:
if end_effector == "ALL":
self.end_effector_name = None
elif end_effector == "":
self.end_effector_name = self.leg_num
else:
self.end_effector_name = end_effector
# /\ #
# / \ #
# ^ Params ^
self.pinfo(f"chain: {self.start_effector} -> {self.end_effector_name}")
# self.perror(f"{self.start_effector==self.end_effector_name}")
# self.end_effector_name = None
# self.start_effector = None
(
self.model,
self.ETchain,
self.joint_names,
self.joints_objects,
self.last_link,
) = loadAndSet_URDF(self.urdf_path, self.end_effector_name, self.start_effector)
# if end_effector == "ALL":
# self.end_effector_name = self.leg_num
try: # kill me
if isinstance(self.end_effector_name, str) and isinstance(
self.start_effector, str
):
( # don't want to see this
model2,
ETchain2,
joint_names2,
joints_objects2,
last_link2,
) = loadAndSet_URDF(
self.urdf_path, self.start_effector, self.end_effector_name
)
if len(joint_names2) > len(self.joint_names) and len(
joints_objects2
) > len(self.joints_objects):
joint_names2.reverse()
self.joint_names = joint_names2
joints_objects2.reverse()
self.joints_objects = joints_objects2
except:
self.pinfo(f"link tree could not be reversed")
# self.baselinkName = self.model.base_link.name # base of the whole model
if self.start_effector is None:
self.baselinkName = self.model.base_link.name
else:
self.baselinkName = self.start_effector
if self.baselinkName != self.model.base_link.name:
self.pinfo(
f"base_link forced to `{self.baselinkName}` "
f"instead of the tf root `{self.model.base_link.name}`, "
f"this can render part of the tf tree missing, or worse"
)
#
# if not self.joint_names:
# self.dont_handle_body = True
# else:
# self.dont_handle_body = False
self.joint_names += self.ADD_JOINTS
self.joints_objects += [
Joint(
joint_type="continuous",
parent=None,
child=None,
name=jn,
# limit=[0, 0],
)
for jn in self.ADD_JOINTS
]
# self.pinfo(f"Joints controled: {bcolors.OKCYAN}{self.joint_names}{bcolors.ENDC}")
ee = self.last_link.name if self.last_link is not None else "all joints"
self.pinfo(
f"Using base_link: {bcolors.OKCYAN}{self.baselinkName}{bcolors.ENDC}"
f", to ee: {bcolors.OKCYAN}{ee}{bcolors.ENDC}"
)
self.cbk_legs = MutuallyExclusiveCallbackGroup()
self.jointHandlerDic: Dict[str, JointHandler] = {}
limits_undefined: List[str] = []
# self.pwarn([j.name for j in self.joints_objects])
# self.pwarn(self.joint_names)
for index, name in enumerate(self.joint_names):
jObj = None
for j in self.joints_objects:
if j.name == name:
jObj = j
break
assert jObj is not None
handler = JointHandler(
name, self, jObj, MARGIN=self.MARGIN, IGNORE_LIM=self.IGNORE_LIM
)
if handler.limit_rejected:
limits_undefined.append(jObj.name)
self.jointHandlerDic[name] = handler
if limits_undefined:
self.pinfo(
f"{bcolors.WARNING}Undefined limits{bcolors.ENDC} "
f"in urdf for joint {limits_undefined}"
)
else:
self.pinfo(f"{bcolors.OKBLUE}All URDF limits defined{bcolors.ENDC} ")
# V Subscriber V
# \ / #
# \/ #
self.js_from_lvl2SUB = self.create_subscription(
JointState,
"joint_set",
self.js_from_lvl2,
10,
callback_group=self.cbk_legs,
)
self.js_from_lvl0SUB = self.create_subscription(
JointState,
"joint_states",
self.js_from_lvl0,
10,
)
self.body_pose_sub = self.create_subscription(
Transform,
"robot_body",
self.robot_body_pose_cbk,
10,
callback_group=self.cbk_legs,
)
self.smooth_body_pose_sub = self.create_subscription(
Transform,
"smooth_body_rviz",
self.smooth_body_trans,
10,
callback_group=MutuallyExclusiveCallbackGroup(),
)
if self.dont_handle_body:
self.destroy_subscription(self.smooth_body_pose_sub)
self.destroy_subscription(self.body_pose_sub)
# /\ #
# / \ #
# ^ Subscriber ^
# V Publisher V
# \ / #
# \/ #
self.jsPUB_to_lvl2 = self.create_publisher(
JointState,
"joint_read",
10,
callback_group=self.cbk_legs,
)
self.jsPUB_to_lvl0 = self.create_publisher(JointState, "joint_commands", 10)
# self.body_pose_pub = self.create_publisher(
# TFMessage,
# '/BODY', 10)
self.tf_broadcaster = tf2_ros.TransformBroadcaster(self)
self.rviz_spyPUB = self.create_publisher(JointState, "rviz_spy", 10)
# /\ #
# / \ #
# ^ Publisher ^
# V Service V
# \ / #
# \/ #
self.iAmAlive: Optional[Service] = None
self.go_zero_all: Service = self.create_service(
EmptySrv, "go_zero_all", self.go_zero_allCBK
)
self.adveriserSVR: Service = self.create_service(
ReturnJointState, "advertise_joints", self.advertiserSRVCBK
)
# /\ #
# / \ #
# ^ Service ^
# V Timer V
# \ / #
# \/ #
self._send_commandTMR = self.create_timer(
1 / self.MVMT_UPDATE_RATE, self._send_commandTMRCBK
)
self._send_readTMR = self.create_timer(
1 / self.MVMT_UPDATE_RATE, self._send_sensorTMRCBK
)
self.__bodyTMR = self.create_timer(1 / self.CONTROL_RATE, self.__bodyTMRCBK)
self.firstSpin: Timer = self.create_timer(1 / 100, self.firstSpinCBK)
self.angle_read_checkTMR = self.create_timer(0.05, self.angle_read_checkTMRCBK)
self.angle_read_checkTMR.cancel()
# /\ #
# / \ #
# ^ Timer ^
[docs]
def send_to_lvl0(self, states: List[JState]):
"""Sends states to lvl0 (commands for motors).
This function is executed every time data needs to be sent down.
Change/overload this method with what you need"""
stamp = self.getNow().to_msg()
msgs = stateOrderinator3000(states)
for msg in msgs:
msg.header.stamp = stamp
self.jsPUB_to_lvl0.publish(msg)
[docs]
def send_to_lvl2(self, states: List[JState]):
"""Sends states to lvl2 (states for ik).
This function is executed every time data needs to be sent up.
Change/overload this method with what you need"""
stamp = self.getNow().to_msg()
msgs = stateOrderinator3000(states)
for msg in msgs:
msg.header.stamp = stamp
self.jsPUB_to_lvl2.publish(msg)
[docs]
@error_catcher
def js_from_lvl0(self, msg: JointState):
"""Callback when a JointState arrives from the lvl0 (states from motor).
Converts it into a list of states, then hands it to the general function
"""
if msg.header.stamp is None:
msg.header.stamp = self.getNow().to_msg()
states = js_from_ros(msg)
self.coming_from_lvl0(states)
[docs]
@error_catcher
def js_from_lvl2(self, msg: JointState):
"""Callback when a JointState arrives from the lvl2 (commands from ik).
Converts it into a list of states, then hands it to the general function
"""
if msg.header.stamp is None:
msg.header.stamp = self.getNow().to_msg()
states = js_from_ros(msg)
self.coming_from_lvl2(states)
[docs]
def coming_from_lvl2(self, states: List[JState]):
"""Processes incomming commands from lvl2 ik.
Call this function after processing the ros message"""
stamp = None
self.lvl2_remap.unmap(states)
for s in states:
if s.time is None:
stamp = self.getNow() if stamp is None else stamp
s.time = stamp
self._push_commands(states)
[docs]
def coming_from_lvl0(self, states: List[JState]):
"""Processes incomming sensor states from lvl0 motors.
Call this function after processing the ros message.
Always do super().coming_from_lvl0(states) before your code,
Unless you know what you are doing"""
stamp = None
self.lvl0_remap.unmap(states)
for s in states:
if s.time is None:
stamp = self.getNow() if stamp is None else stamp
s.time = stamp
self._push_sensors(states)
@error_catcher
def _send_sensorTMRCBK(self):
"""pulls and resets fresh sensor data, then sends it to lvl2"""
states = self._pull_sensors()
self.lvl2_remap.map(states)
self.send_to_lvl2(states)
@error_catcher
def _send_commandTMRCBK(self):
"""pulls and resets fresh command data, then sends it to lvl0"""
states = self._pull_commands()
self.lvl0_remap.map(states)
self.send_to_lvl0(states)
[docs]
def advertiserSRVCBK(
self, req: ReturnJointState.Request, res: ReturnJointState.Response
) -> ReturnJointState.Response:
"""Sends an JointState mainly to advertise the names of the joints"""
names: List[str] = [h.stateSensor.name for h in self.jointHandlerDic.values()]
none2nan = lambda x: x if x is not None else np.nan
res.js = JointState(
name=names,
position=[
none2nan(h.stateSensor.position) for h in self.jointHandlerDic.values()
],
velocity=[
none2nan(h.stateSensor.velocity) for h in self.jointHandlerDic.values()
],
effort=[
none2nan(h.stateSensor.effort) for h in self.jointHandlerDic.values()
],
)
res.js.header.stamp = self.getNow().to_msg()
return res
[docs]
def defined_undefined(self) -> Tuple[List[str], List[str]]:
"""Return joints with and without poistion data received yet
Returns:
Tuple(List[joint names that did not receive any data],
List[joint names that have data])
"""
undefined: List[str] = []
defined: List[str] = []
for name, jobj in self.jointHandlerDic.items():
if jobj.stateSensor.position is None:
undefined.append(name)
else:
defined.append(name)
return undefined, defined
[docs]
@error_catcher
def angle_read_checkTMRCBK(self):
"""Checks that all joints are receiving data.
After 1s, if not warns the user, and starts the verbose check on the joint handler.
"""
less_than_1s = self.getNow() - self.node_start < Duration(seconds=1)
expired = not less_than_1s
undefined, defined = self.defined_undefined()
i = f"{bcolors.OKGREEN}all{bcolors.ENDC}"
if undefined:
if less_than_1s:
return # waits 1 seconds before printing if there are missing angles
for jobj in [self.jointHandlerDic[name] for name in undefined]:
jobj.speakup_when_angle()
self.pwarn(
f"No angle readings yet on {list_cyanize(undefined)}. "
f"Might not be published."
)
i = "some"
if defined:
self.pinfo(
f"{bcolors.OKGREEN}Angle recieved{bcolors.ENDC} on {i} "
f"joints {list_cyanize(defined)}"
)
if expired or (not undefined):
self.destroy_timer(self.angle_read_checkTMR)
[docs]
@error_catcher
def firstSpinCBK(self):
self.iAmAlive = self.create_service(Empty, "joint_alive", lambda i, o: o)
self.destroy_timer(self.firstSpin)
self.node_start: Time = self.getNow()
# send empty command to initialize (notabily Rviz interface)
empty = JointState(name=self.joint_names)
empty.header.stamp = self.getNow().to_msg()
self.jsPUB_to_lvl0.publish(empty)
# we should not start at zero when using real robot
if INIT_AT_ZERO:
for jointMiniNode in self.jointHandlerDic.values():
jointMiniNode.resetAnglesAtZero()
self.angle_read_checkTMR.reset()
def _push_commands(self, states: List[JState]) -> None:
for js in states:
if js.name is None:
continue
handler = self.jointHandlerDic.get(js.name)
if handler is None:
continue
handler.update_js_command(js)
def _push_sensors(self, states: Iterable[JState]) -> None:
for js in states:
if js.name is None:
continue
handler = self.jointHandlerDic.get(js.name)
if handler is None:
continue
handler.setJSSensor(js)
def _pull_sensors(self, reset=True) -> List[JState]:
allStates: List[JState] = []
for handler in self.jointHandlerDic.values():
state: JState = handler.get_fresh_sensor(reset)
allEmpty: bool = (
(state.velocity is None)
and (state.position is None)
and (state.effort is None)
)
if not allEmpty:
allStates.append(state)
return allStates
def _pull_commands(self) -> List[JState]:
allStates: List[JState] = []
for jointMiniNode in self.jointHandlerDic.values():
state: JState = jointMiniNode.get_freshCommand()
allEmpty: bool = (
(state.velocity is None)
and (state.position is None)
and (state.effort is None)
)
if allEmpty:
continue
allStates.append(state)
return allStates
def __bodyTMRCBK(self, time_stamp: Optional[Time] = None):
if time_stamp is None:
now: Time = self.get_clock().now()
else:
now: Time = time_stamp
time_now_stamp = now.to_msg()
self.__pop_and_load_body()
xyz = self.current_body_xyz.copy()
rot = self.current_body_quat.copy()
msgTF = np2tf(xyz, rot)
body_transform = TransformStamped()
body_transform.header.stamp = time_now_stamp
body_transform.header.frame_id = "world"
body_transform.child_frame_id = f"{self.FRAME_PREFIX}{self.baselinkName}"
body_transform.transform = msgTF
if not self.dont_handle_body:
self.tf_broadcaster.sendTransform(body_transform)
return
def __pop_and_load_body(self):
empty = self.body_xyz_queue.shape[0] <= 0
if not empty:
self.current_body_xyz = self.body_xyz_queue[0, :]
self.current_body_quat = self.body_quat_queue[0]
self.body_xyz_queue = np.delete(self.body_xyz_queue, 0, axis=0)
self.body_quat_queue = np.delete(self.body_quat_queue, 0, axis=0)
[docs]
@error_catcher
def robot_body_pose_cbk(self, msg: Transform):
tra, quat = tf2np(msg)
self.current_body_xyz = tra
self.current_body_quat = quat
[docs]
def smoother(self, x: NDArray) -> NDArray:
"""smoothes the interval [0, 1] to have a soft start and end
(derivative is zero)
"""
# x = (1 - np.cos(x * np.pi)) / 2
# x = (1 - np.cos(x * np.pi)) / 2
return x
[docs]
@error_catcher
def smooth_body_trans(self, request: Transform):
tra, quat = self.tf2np(request)
final_coord = self.current_body_xyz + tra / 1000
final_quat = self.current_body_quat * quat
samples = int(self.MOVEMENT_TIME * self.CONTROL_RATE)
start_coord = self.current_body_xyz.copy()
start_quat = self.current_body_quat.copy()
x = np.linspace(0, 1, num=samples) # x: [0->1]
x = self.smoother(x)
quaternion_interpolation = geometric_slerp(
start=qt.as_float_array(start_quat), end=qt.as_float_array(final_quat), t=x
)
quaternion_interpolation = qt.as_quat_array(quaternion_interpolation)
x = np.tile(x, (3, 1)).transpose()
coord_interpolation = final_coord * x + start_coord * (1 - x)
self.body_xyz_queue = coord_interpolation
self.body_quat_queue = quaternion_interpolation
return
[docs]
@error_catcher
def go_zero_allCBK(self, req: EmptySrv.Request, resp: EmptySrv.Response):
for j in self.jointHandlerDic.values():
j.resetAnglesAtZero()
return resp
[docs]
def main(args=None):
myMain(JointNode)
if __name__ == "__main__":
main()