Source code for motion_stack.api.injection.offsetter

from copy import deepcopy
from typing import Dict, List, Optional, Set, Tuple

from motion_stack.core.utils.printing import list_cyanize
import numpy as np

from motion_stack.api.injection.remapper import StateRemapper, insert_angle_offset
from motion_stack.core.lvl1_joint import JointCore, JointHandler
from motion_stack.core.utils.joint_state import JState

from motion_stack.core.utils.csv import csv_to_dict, update_csv


[docs] class OffsetterLvl0: r""" Position offseter for lvl0. Usefull if your URDF and robot are not aligned. Features - Simply provide the JointCore on which to apply the offsets - Apply an angle offset to any joint of the lvl0 input/output. - Apply the offset at runtime - (Optional) Load offsets from a csv on disk. - (Optional) Save current angles multiplied by -1 in a csv on disk. This saved angle can tell you the last shutdown position of the robot, if you need to recover the offsets from it. Note: - You should provide this object an initialized JointCore. - You need to call by yourself: - \ :py:meth:`.OffsetterLvl0.apply_offset` - \ :py:meth:`.OffsetterLvl0.save_angle_as_offset` - \ :py:meth:`.OffsetterLvl0.save_current_offset` - \ :py:meth:`.OffsetterLvl0.load_offset` is called on object initialization. - See the ros2 wrapper if you running the core through ros2. Args: core: Initialized JointCore angle_recovery_path: if None, does not save the current angles on disk offset_path: if None, does not save or load offsets from disk """ def __init__( self, core: JointCore, angle_recovery_path: Optional[str] = None, offset_path: Optional[str] = None, ) -> None: """ """ self._offsets: Dict[str, float] = {} self._core = core self._lvl0_save = deepcopy(self._core.lvl0_remap) self._angle_path = angle_recovery_path self._offset_path = offset_path self.load_offset() self._update_mapper() @property def offsets(self) -> Dict[str, float]: """Offets being used""" return self._offsets def _update_mapper( self, mapper_in: Optional[StateRemapper] = None, mapper_out: Optional[StateRemapper] = None, ) -> None: """Applies the offsets to a StateRemapper. Args: mapper_in: original map to which offset should be added mapper_out: affected subshaper of this map will change """ if mapper_in is None: mapper_in = self._lvl0_save if mapper_out is None: mapper_out = self._core.lvl0_remap insert_angle_offset(mapper_in, mapper_out, self._offsets)
[docs] def apply_offset(self, js_offset: Optional[List[JState]]) -> Tuple[bool, str]: """Offset values will be replaced by new ones then saved on disk. Note: Preferably use this to not lose the offset in case of restart Args: js_offset: list of offsets Returns: True if all offsets have a joint to be applied to String for user debugging """ if js_offset is None: js_offset = [JState(name=n, position=p) for n, p in self._offsets.items()] ret = self._update_offset(js_offset) self._update_mapper() self.save_current_offset() return ret
[docs] def save_angle_recovery(self, handlers: Optional[Dict[str, JointHandler]] = None): """Saves current position as the offset to recover to incase of powerloss. Note: - Saved in self.angle_path - To use those saves as offsets, replace the file `<self.offset_path>` with `<self.angle_path>` """ if self._angle_path is None: return if handlers is None: handlers = self._core.jointHandlerDic for name, jobj in handlers.items(): if jobj.sensor.position is None: continue update_csv(self._angle_path, name, -jobj.sensor.position)
[docs] def load_offset(self): """Loads offset from offset csv. Skips unknown joints.""" if self._offset_path is None: return handlers = self._core.jointHandlerDic off = csv_to_dict(self._offset_path) if off is None: self._core.warn(f"Cannot load offsets from {self._offset_path}") return known_joints = handlers.keys() if handlers is not None else off.keys() names = set(known_joints) & set(off.keys()) unknown_names: Set[str] = set(known_joints) - names for name, val in off.items(): if np.isnan(val): self._offsets[name] = 0.0 else: self._offsets[name] = off[name] if unknown_names: self._core.warn(f"Joints not in offset csv: {list_cyanize(unknown_names)}")
[docs] def save_current_offset(self, to_save: Optional[Dict[str, float]] = None): """DO NOT DO THIS AUTOMATICALLY, IT COULD BE DESTRUCTIVE OF VALUABLE INFO. overwrites the offset csv with the currently running offsets""" if self._offset_path is None: return if to_save is None: to_save = self._offsets user_disp = "" old_d = csv_to_dict(self._offset_path) if old_d is None: old_d = {} for name, off_val in to_save.items(): old = old_d.get(name) if old is None: old = 0.0 if np.isclose(old, off_val): continue update_csv(self._offset_path, name, off_val) user_disp += f"{name}: {old:.4f}->{off_val:.4f}\n" self._core.info(f"{self._offset_path} updated \n{user_disp}")
def _update_offset(self, js_offset: List[JState]) -> Tuple[bool, str]: """Updates offset in memory Args: js_offset: list of offsets Returns: True if all offsets have a joint to be applied to String for user debugging """ handler = self._core.jointHandlerDic new: Dict[str, float] = { js.name: js.position for js in js_offset if js.position is not None } known = set(handler.keys()) & set(new.keys()) unchanged = set(handler.keys()) - set(new.keys()) unused = set(new.keys()) - set(handler.keys()) for n, val in new.items(): if not n in self._offsets.keys(): self._offsets[n] = 0 self._offsets[n] += val outstr = f"Updated: {known}. " if unused: return False, outstr + f"Joints unknown: {unused}." return True, outstr