Source code for easy_robot_control.injection.offsetter

from copy import deepcopy
from typing import Dict, List, Optional, Set, Tuple

import numpy as np
from motion_stack_msgs.srv import SendJointState
from rclpy.node import Service

from easy_robot_control.EliaNode import error_catcher, list_cyanize
from easy_robot_control.joint_state_interface import JointHandler, JointNode
from easy_robot_control.utils.csv import csv_to_dict, update_csv
from easy_robot_control.utils.joint_state_util import JState, js_from_ros
from easy_robot_control.utils.state_remaper import StateRemapper, insert_angle_offset


[docs] class OffsetterLvl0: """Position offseter for lvl0, to be injected into a JointNode. Usefull if your URDF and robot are not aligned. Features - Inject this into any JointNode - Apply an angle offset to any joint of the lvl0 input/output. - Use a service to apply the offset at runtime - (Optional) Load offsets from a csv on disk. - (Optional) Save current angles multiplied by -1 every 3s in a csv on disk.\ This saved angle can tell you the last shutdown position of the robot, if you need to recover the offsets from it. Note: - You should inject this object into a JointNode at the end of initialization. - You do not need to call any of those function, just inject this object. Args: parent: angle_path: if None, does not save the current angles on disk offset_path: if None, does not save or load offsets from disk Examples: - Injecting in a JointNode:: class Example(JointNode): def __init__(self): super().__init__() self.offsetter = OffsetterLvl0( self, angle_path=ANGLE_PATH, offset_path=OFFSET_PATH ) - Service call:: ros2 service call /leg1/set_offset motion_stack_msgs/srv/SendJointState "{js: {name: [joint1-2], position: [1], velocity: [], effort: []}}" """ def __init__( self, parent: JointNode, angle_path: Optional[str] = None, offset_path: Optional[str] = None, ) -> None: self._offsets: Dict[str, float] = {} self.parent = parent self._lvl0_save = deepcopy(self.parent.lvl0_remap) self.angle_path = angle_path self.offset_path = offset_path self.load_offset() self.save_current_angleTMR = self.parent.create_timer( 3, self.save_angle_as_offset ) self.set_offsetSRV: Service = self.parent.create_service( SendJointState, "set_offset", self.__set_offsetSRVCBK ) self.update_mapper()
[docs] def update_mapper( self, mapper_in: Optional[StateRemapper] = None, mapper_out: Optional[StateRemapper] = None, ) -> None: """Applies the offsets to a StateRemapper. Args: mapper_in: original map to which offset should be added mapper_out: affected subshaper of this map will change """ if mapper_in is None: mapper_in = self._lvl0_save if mapper_out is None: mapper_out = self.parent.lvl0_remap insert_angle_offset(mapper_in, mapper_out, self._offsets)
[docs] def update_and_save_offset(self, js_offset: List[JState]) -> Tuple[bool, str]: """Held offset values will be updated then saved on disk. Note: Preferably use this to not lose the offset in case of restart Args: js_offset: list of offsets Returns: True if all offsets have a joint to be applied to String for user debugging """ ret = self.update_offset(js_offset) self.save_current_offset() return ret
[docs] def save_angle_as_offset(self, handlers: Optional[Dict[str, JointHandler]] = None): """Saves current position as the offset to recover to incase of powerloss. Note: - Saved in self.angle_path - To use those saves as offsets, replace the file `<self.offset_path>` with `<self.angle_path>` """ if self.angle_path is None: return if handlers is None: handlers = self.parent.jointHandlerDic for name, jobj in handlers.items(): if jobj.stateSensor.position is None: continue update_csv(self.angle_path, name, -jobj.stateSensor.position)
[docs] def load_offset(self): """Loads offset from offset csv. Skips unknown joints.""" if self.offset_path is None: return handlers = self.parent.jointHandlerDic off = csv_to_dict(self.offset_path) if off is None: self.parent.pwarn(f"Cannot load offsets from {self.offset_path}") return known_joints = handlers.keys() if handlers is not None else off.keys() names = set(known_joints) & set(off.keys()) unknown_names: Set[str] = set(known_joints) - names for name, val in off.items(): if np.isnan(val): self._offsets[name] = 0.0 else: self._offsets[name] = off[name] if unknown_names: self.parent.pwarn(f"Joints not in offset csv: {list_cyanize(unknown_names)}")
[docs] def save_current_offset(self, to_save: Optional[Dict[str, float]] = None): """DO NOT DO THIS AUTOMATICALLY, IT COULD BE DESTRUCTIVE OF VALUABLE INFO. overwrites the offset csv with the currently running offsets""" if self.offset_path is None: return if to_save is None: to_save = self._offsets user_disp = "" old_d = csv_to_dict(self.offset_path) if old_d is None: old_d = {} for name, off_val in to_save.items(): old = old_d.get(name) if old is None: old = 0.0 if np.isclose(old, off_val): continue update_csv(self.offset_path, name, off_val) user_disp += f"{name}: {old:.4f}->{off_val:.4f}\n" self.parent.pinfo(f"{self.offset_path} updated \n{user_disp}")
[docs] def update_offset(self, js_offset: List[JState]) -> Tuple[bool, str]: """Updates offset in memory Args: js_offset: list of offsets Returns: True if all offsets have a joint to be applied to String for user debugging """ handler = self.parent.jointHandlerDic new: Dict[str, float] = { js.name: js.position for js in js_offset if js.position is not None } known = set(handler.keys()) & set(new.keys()) unchanged = set(handler.keys()) - set(new.keys()) unused = set(new.keys()) - set(handler.keys()) for n, val in new.items(): if not n in self._offsets.keys(): self._offsets[n] = 0 self._offsets[n] += val outstr = f"Updated: {known}. " if unused: return False, outstr + f"Joints unknown: {unused}." return True, outstr
@error_catcher def __set_offsetSRVCBK( self, req: SendJointState.Request, res: SendJointState.Response ) -> SendJointState.Response: """service to set the offsets of several joints and save""" if len(req.js.name) < 1: return res states: List[JState] = js_from_ros(req.js) # type: ignore res.success, res.message = self.update_and_save_offset(states) self.update_mapper() return res