#!/usr/bin/env python3
import re
import threading
import time
from collections import deque
from os import environ
from typing import Final, List, Optional, Set, Tuple
import numpy as np
import quaternion as qt
import rclpy
from keyboard_msgs.msg import Key
from motion_stack.api.ros2.ik_api import IkHandler, IkSyncerRos
from motion_stack.api.ros2.joint_api import JointHandler, JointSyncerRos
from motion_stack.core.utils.math import patch_numpy_display_light
from motion_stack.core.utils.pose import Flo3, Pose, VelPose, XyzQuat
from motion_stack.ros2.utils.conversion import ros_now
from motion_stack.ros2.utils.executor import error_catcher
from rclpy.duration import Duration
from sensor_msgs.msg import Joy
from .operator_utils import (
BUTT_INTS,
ButtonName,
InputMap,
JoyBits,
JoyState,
any_pressed,
bits2name,
connect_mapping,
delta_time,
msg_to_JoyBits,
rel_to_base_link,
rel_vel_to_base_link,
)
patch_numpy_display_light()
ALIAS = "operator_node"
# type def V
ANY: Final[str] = "ANY"
ALWAYS: Final[str] = "ALWAYS"
LimbNumber = int
JointName = str
[docs]
class OperatorNode(rclpy.node.Node):
def __init__(self) -> None:
"""
Construct and configure the OperatorNode, which provides a text-based
UI for interactively driving legs, joints, wheels and IK.
What this does:
1. Internal data structures
- `current_legs`: set of discovered leg IDs (initially empty).
- `joint_handlers`, `ik_handlers`, `wheel_handlers`: lists of handler objects
created once legs are discovered.
- `joint_syncer`, `ik_syncer`, `wheel_syncer`: syncers that will
translate high-level commands into ROS2 messages.
- `selected_legs`: list of leg IDs the user has chosen to control.
- `selected_joints`, `selected_joints_inv`, `selected_wheel_joints`,
`selected_wheel_joints_inv`: sets tracking which individual joints
(and inverted joints) are active.
- `joint_speed`, `wheel_speed`: default speed settings applied
when driving joints or wheels.
- Logging deque (`log_messages`) to keep the last few status entries.
- Joystick state buffers (`prev_joy_state`, `joy_state`) for edge detection.
2. ROS2 timers
- A 1 Hz timer bound to `_discover_legs`: scans for new `/legN/joint_alive`
services.
- A ~30 Hz timer bound to `loop()`: calls any active syncers’ `execute()`
method to send out drive commands.
3. Subscriptions
- `/keydown` and `/keyup` on the configured input namespace, so key presses
drive the menus and motion functions.
- `/joy` topic to drive IK with an actual gamepad, diff-detecting button
presses/releases and analog sticks.
4. Input mappings
- Builds `main_map` of global keys (e.g. `<Esc>` returns to main menu).
- Prepares `sub_map` placeholder, then immediately calls `enter_main_menu()`
to switch into the top-level menu and install those sub-bindings.
5. TUI integration
- After returning from `__init__`, `main()` will spin ROS in a worker
thread, then hand `self` off to `urwid_main(self)` to drive the text UI.
In short, all the general Motion-Stack specific things for leg discovery,
handler/syncer wiring, key/joystick input, and a dynamic interface is set up here;
you only need to plug in your robot-specific functions or overload the existing ones.
"""
super().__init__(ALIAS)
# Legs and their handlers and syncers
self.current_legs: Set[int] = set()
self.joint_handlers: List[JointHandler] = []
self.joint_syncer: Optional[JointSyncerRos] = None
self.ik_handlers: List[IkHandler] = []
self.ik_syncer: Optional[IkSyncerRos] = None
self.wheel_handlers: List[JointHandler] = []
self.wheel_syncer: Optional[JointSyncerRos] = None
self.key_downSUB = self.create_subscription(
Key, f"keydown", self.key_downSUBCBK, 10
)
self.key_upSUB = self.create_subscription(Key, f"keyup", self.key_upSUBCBK, 10)
self.joySUB = self.create_subscription(Joy, f"joy", self.joySUBCBK, 10)
self.main_map: Final[InputMap] = self.create_main_map()
self.sub_map: InputMap
self.enter_main_menu()
# select legs and joints (normal, inverted)
self.selected_legs: List[LimbNumber] = []
self.selected_ik_legs: List[LimbNumber] = []
self.selected_joints: Set[Tuple[LimbNumber, JointName]] = set()
self.selected_joints_inv: Set[Tuple[LimbNumber, JointName]] = set()
self.selected_wheel_joints: Set[Tuple[LimbNumber, JointName]] = set()
self.selected_wheel_joints_inv: Set[Tuple[LimbNumber, JointName]] = set()
self.joints_prev = []
self.ik_legs_prev = []
self.wheels_prev = []
self.leg_buffer: str = ""
self.joint_leg_buffer: str = ""
self.active_joint_leg: Optional[LimbNumber] = None
self.declare_parameter("joint_speed", 0.15)
self.declare_parameter("wheel_speed", 0.2)
self.declare_parameter("translation_speed", 70.0)
self.declare_parameter("rotation_speed", np.deg2rad(7))
self.joint_speed = self.get_parameter("joint_speed").value
self.wheel_speed = self.get_parameter("wheel_speed").value
self.translation_speed = self.get_parameter("translation_speed").value
self.rotation_speed = self.get_parameter("rotation_speed").value
# self.create_timer(5.0, self._discover_legs)
self.execTMR = self.create_timer(1 / 30.0, self.loop)
self.ikTMR = self.create_timer(0.1, self.move_ik)
self.ikTMR.cancel()
self.log_messages: deque[str] = deque(maxlen=3)
self.prev_joy_state = JoyState()
self.joy_state = JoyState()
self.ee_mode = True
self.ik_key_states = {
Key.KEY_UP: False,
Key.KEY_DOWN: False,
Key.KEY_LEFT: False,
Key.KEY_I: False,
Key.KEY_K: False,
Key.KEY_RIGHT: False,
Key.KEY_W: False,
Key.KEY_S: False,
Key.KEY_A: False,
Key.KEY_D: False,
Key.KEY_Q: False,
Key.KEY_E: False,
}
self._discover_legs()
@error_catcher
def _discover_legs(self):
"""
Every second: scan ROS2 services for /legN/joint_alive, detect newly appeared legs,
rebuild JointHandler and IkHandler lists, and hook their `.ready` futures so that
when their first messages arrive we’ll build the syncers. If legs are removed,
clear the related selected_* states.
"""
# 1) Remember the previous set of legs, so we can detect removals:
prev_legs = set(self.current_legs)
# 2) Scan ROS‐service list to find current legs again:
svc_list = self.get_service_names_and_types()
found: Set[int] = set()
for name, types in svc_list:
if name.endswith("/joint_alive") and "std_srvs/srv/Empty" in types:
m = re.match(r"^/leg(\d+)/joint_alive$", name)
if m:
found.add(int(m.group(1)))
# 3) Figure out which legs are brand‐new vs which have vanished:
new_legs = found - prev_legs
removed_legs = prev_legs - found
# 4) Update our stored current_legs set:
self.current_legs = found
# 5) If any legs went away, scrub them out of all “selected_…” sets:
if removed_legs:
# Remove from selected_legs list:
self.selected_legs = [
l for l in self.selected_legs if l not in removed_legs
]
self.selected_ik_legs = [
l for l in self.selected_ik_legs if l not in removed_legs
]
# Drop any (leg, joint_name) tuples in selected_joints / inv that belonged to removed legs:
self.selected_joints = {
(leg, jn)
for (leg, jn) in self.selected_joints
if leg not in removed_legs
}
self.selected_joints_inv = {
(leg, jn)
for (leg, jn) in self.selected_joints_inv
if leg not in removed_legs
}
# Likewise for the wheel‐joint selections:
self.selected_wheel_joints = {
(leg, jn)
for (leg, jn) in self.selected_wheel_joints
if leg not in removed_legs
}
self.selected_wheel_joints_inv = {
(leg, jn)
for (leg, jn) in self.selected_wheel_joints_inv
if leg not in removed_legs
}
self.add_log(
"I", f"Removed legs {sorted(removed_legs)} → cleared their selections"
)
# 6) If any brand‐new legs appeared, create new Joint/IK handlers for all current legs:
if new_legs:
self.add_log("I", f"New legs discovered: {sorted(new_legs)}")
self.joint_handlers = [
JointHandler(self, l) for l in sorted(self.current_legs)
]
self.ik_handlers = [IkHandler(self, l) for l in sorted(self.current_legs)]
def _rebuild_joint_syncer(self, _):
"""
Callback to (re)build the JointSyncerRos whenever any JointHandler
becomes ready or whenever the selected legs change.
Gathers all ready JointHandler for your selected legs, tears down
the old syncer (if present), and creates a fresh JointSyncerRos.
"""
ready = [
jh
for jh in self.joint_handlers
if jh.ready.done() and jh.limb_number in self.selected_legs
]
if not ready:
return
# clear old if it exists
if self.joint_syncer is not None:
self.joint_syncer.clear()
self.joint_syncer.last_future.cancel()
self.joint_syncer = JointSyncerRos(ready, interpolation_delta=np.deg2rad(20))
def _rebuild_wheel_syncer(self, _):
"""
Callback to (re)build the Wheel (Joint)SyncerRos whenever any JointHandler
becomes ready or whenever the selected legs change.
Operates identically to _rebuild_joint_syncer but uses a different
interpolation_delta tuned for wheels.
"""
ready = [
jh
for jh in self.joint_handlers
if jh.ready.done() and jh.limb_number in self.selected_legs
]
if not ready:
return
if self.wheel_syncer is not None:
self.wheel_syncer.clear()
self.wheel_syncer.last_future.cancel()
self.wheel_syncer = JointSyncerRos(ready, interpolation_delta=np.deg2rad(30))
def _rebuild_ik_syncer(self, _):
"""
Callback to (re)build the IkSyncerRos whenever any IkHandler
becomes ready or whenever the selected legs change.
Cancels the old syncer (if present) and creates a new one, passing
in the on_target_delta tuning for smooth IK interpolation.
"""
ready = [
ih
for ih in self.ik_handlers
if ih.ready.done() and ih.limb_number in self.selected_legs
]
if not ready:
return
if self.ik_syncer is not None:
self.ik_syncer.clear()
self.ik_syncer.last_future.cancel()
self.ik_syncer = IkSyncerRos(
ready, on_target_delta=XyzQuat(40, np.deg2rad(0.001))
)
[docs]
def select_leg(self, leg_inds: Optional[List[LimbNumber]]):
"""
Pick which legs to control (None ⇒ all). Filters out legs that don’t exist
and logs warnings for any missing.
"""
if not self.current_legs:
self.add_log("W", "Cannot select: no legs yet")
return
if leg_inds is None:
# select every discovered leg
self.selected_legs = sorted(self.current_legs)
else:
# only keep those that actually exist
self.selected_legs = [l for l in leg_inds if l in self.current_legs]
missing = set(leg_inds) - set(self.selected_legs)
for bad in sorted(missing):
self.add_log("W", f"Leg {bad} does not exist")
self.add_log("I", f"Selected leg(s): {self.selected_legs}")
# Remove any joint‐selections whose leg is not in selected_legs
self.selected_joints = {
(leg, jn) for (leg, jn) in self.selected_joints if leg in self.selected_legs
}
self.selected_joints_inv = {
(leg, jn)
for (leg, jn) in self.selected_joints_inv
if leg in self.selected_legs
}
# Same for wheel‐joint selections
self.selected_wheel_joints = {
(leg, jn)
for (leg, jn) in self.selected_wheel_joints
if leg in self.selected_legs
}
self.selected_wheel_joints_inv = {
(leg, jn)
for (leg, jn) in self.selected_wheel_joints_inv
if leg in self.selected_legs
}
for jh in self.joint_handlers:
jh.ready.add_done_callback(self._rebuild_joint_syncer)
jh.ready.add_done_callback(self._rebuild_wheel_syncer)
for ih in self.ik_handlers:
ih.ready.add_done_callback(self._rebuild_ik_syncer)
[docs]
def no_no_leg(self):
"""
Ensure at least one leg is selected; if none are, auto-select all discovered legs.
"""
if self.selected_legs is None:
self.select_leg(None)
if not self.selected_legs:
self.select_leg(None)
[docs]
def enter_main_menu(self):
"""
Switch the TUI into 'main' mode and install top-level key bindings
for entering each sub-menu.
"""
self.current_mode = "main"
self.sub_map = {
(Key.KEY_L, ANY): [self.enter_leg_mode],
(Key.KEY_J, ANY): [self.enter_joint_mode],
(Key.KEY_W, ANY): [self.enter_wheel_mode],
(Key.KEY_I, ANY): [self.enter_ik_mode],
("o", ANY): [self.enter_ik_mode],
("x", ANY): [self.enter_joint_mode],
("s", ANY): [self.enter_leg_mode],
("t", ANY): [self.enter_wheel_mode],
}
[docs]
def enter_leg_mode(self):
"""
Switch into 'leg_select' mode; map number keys (including multi-digit entry),
Backspace, Enter, and 'L'/Down arrow to picking legs.
"""
self._discover_legs()
self.current_mode = "leg_select"
self.leg_buffer = ""
submap: InputMap = {
(Key.KEY_DOWN, ANY): [lambda: self.select_leg(None)],
(Key.KEY_L, ANY): [lambda: self.select_leg(None)],
}
one2nine_keys = [
(1, Key.KEY_1),
(2, Key.KEY_2),
(3, Key.KEY_3),
(4, Key.KEY_4),
(5, Key.KEY_5),
(6, Key.KEY_6),
(7, Key.KEY_7),
(8, Key.KEY_8),
(9, Key.KEY_9),
]
for n, keyb in one2nine_keys:
submap.setdefault((keyb, ANY), []).append(lambda n=n: self.select_leg([n]))
digit_keys = [
(Key.KEY_0, "0"),
(Key.KEY_1, "1"),
(Key.KEY_2, "2"),
(Key.KEY_3, "3"),
(Key.KEY_4, "4"),
(Key.KEY_5, "5"),
(Key.KEY_6, "6"),
(Key.KEY_7, "7"),
(Key.KEY_8, "8"),
(Key.KEY_9, "9"),
]
for keyb, digit in digit_keys:
submap.setdefault((keyb, ANY), []).append(
lambda digit=digit: self._append_leg_digit(digit)
)
submap.setdefault((Key.KEY_BACKSPACE, ANY), []).append(
self._backspace_leg_digit
)
submap.setdefault((Key.KEY_RETURN, ANY), []).append(self._confirm_leg_buffer)
self.sub_map = submap
[docs]
def enter_joint_mode(self):
"""
Switch into 'joint_select' mode; selection of joints for Joint Syncer.
Map W/S for joint speed, O/L/P for wheel commands, and 0 to zero selected joints.
Clears the Joint Syncer if it exists.
"""
self.current_mode = "joint_select"
self.no_no_leg()
self.joint_leg_buffer = ""
self.active_joint_leg = None
if self.joint_syncer is not None:
self.joint_syncer.clear()
submap: InputMap = {
(Key.KEY_W, ANY): [lambda: self.move_joints(self.joint_speed)],
(Key.KEY_S, ANY): [lambda: self.move_joints(-self.joint_speed)],
(Key.KEY_O, ANY): [lambda: self.move_wheels(self.wheel_speed)],
(Key.KEY_L, ANY): [lambda: self.move_wheels(-self.wheel_speed)],
(Key.KEY_P, ANY): [lambda: self.move_wheels(0.0)],
(Key.KEY_Z, ANY): [self.move_zero],
(Key.KEY_C, ANY): [
self.selected_joints.clear,
self.selected_joints_inv.clear,
],
}
for keyb, digit in [
(Key.KEY_0, "0"),
(Key.KEY_1, "1"),
(Key.KEY_2, "2"),
(Key.KEY_3, "3"),
(Key.KEY_4, "4"),
(Key.KEY_5, "5"),
(Key.KEY_6, "6"),
(Key.KEY_7, "7"),
(Key.KEY_8, "8"),
(Key.KEY_9, "9"),
]:
submap.setdefault((keyb, ANY), []).append(
lambda d=digit: self._append_joint_leg_digit(d)
)
# backspace/enter for that buffer
submap.setdefault((Key.KEY_BACKSPACE, ANY), []).append(
self._backspace_joint_leg_digit
)
submap.setdefault((Key.KEY_RETURN, ANY), []).append(
self._confirm_joint_leg_buffer
)
# once a leg is active, 1–9 toggle that leg’s joint #1–#9
for idx, keyb in [
(1, Key.KEY_1),
(2, Key.KEY_2),
(3, Key.KEY_3),
(4, Key.KEY_4),
(5, Key.KEY_5),
(6, Key.KEY_6),
(7, Key.KEY_7),
(8, Key.KEY_8),
(9, Key.KEY_9),
]:
submap.setdefault((keyb, ANY), []).append(
lambda i=idx: self._toggle_joint_for_active_leg(i)
)
self.sub_map = submap
[docs]
def enter_wheel_mode(self):
"""
Switch into 'wheel_select' mode; similar to joint mode but selected joints
are controlled by the Wheel Syncer.
Clears the Wheel Syncer if it exists.
"""
self.current_mode = "wheel_select"
self.no_no_leg()
if self.wheel_syncer is not None:
self.wheel_syncer.clear()
submap: InputMap = {
(Key.KEY_W, ANY): [lambda: self.move_joints(self.joint_speed)],
(Key.KEY_S, ANY): [lambda: self.move_joints(-self.joint_speed)],
(Key.KEY_O, ANY): [lambda: self.move_wheels(self.wheel_speed)],
(Key.KEY_L, ANY): [lambda: self.move_wheels(-self.wheel_speed)],
(Key.KEY_P, ANY): [lambda: self.move_wheels(0.0)],
(Key.KEY_C, ANY): [
self.selected_wheel_joints.clear,
self.selected_wheel_joints_inv.clear,
],
}
self.sub_map = submap
[docs]
def enter_ik_mode(self):
"""
Switch into 'ik_select' mode; map joystick axes/buttons to starting
the IK timer and toggling end‐effector vs base‐link reference.
Clears the IK Syncer if it exists.
"""
self._discover_legs()
self.current_mode = "ik_select"
self.no_no_leg()
if self.ik_syncer is not None:
self.ik_syncer.clear()
self.sub_map = {
("stickL", ANY): [self.start_ik_timer],
("stickR", ANY): [self.start_ik_timer],
("R2", ANY): [self.start_ik_timer],
("L2", ANY): [self.start_ik_timer],
("R1", ANY): [self.start_ik_timer],
("L1", ANY): [self.start_ik_timer],
("L1", ANY): [self.start_ik_timer],
("x", ANY): [lambda: self.switch_ik_mode(False)],
("o", ANY): [lambda: self.switch_ik_mode(True)],
(Key.KEY_O, ANY): [lambda: self.move_wheels(self.wheel_speed)],
(Key.KEY_L, ANY): [lambda: self.move_wheels(-self.wheel_speed)],
(Key.KEY_P, ANY): [lambda: self.move_wheels(0.0)],
(Key.KEY_M, ANY): [self.switch_ik_mode],
}
[docs]
def move_joints(self, speed: float):
"""
If any joints are selected, send constant-speed commands to the
JointSyncerRos using 'speed_safe', flipping sign for inverted joints.
Clears the Joint Syncer if the selected joints are changed.
"""
if not self.joint_syncer:
return
selected_jnames = sorted(jn for (_, jn) in self.selected_joints)
selected_jnames_inv = sorted(jn for (_, jn) in self.selected_joints_inv)
if not selected_jnames and not selected_jnames_inv:
return
if (selected_jnames + selected_jnames_inv) != self.joints_prev:
self.joint_syncer.clear()
start_time = ros_now(self)
def delta_time():
nonlocal start_time
now = ros_now(self)
out = (now - start_time).sec()
start_time = now
return out
target = {jn: speed for jn in selected_jnames}
target.update({jn: -speed for jn in selected_jnames_inv})
# self.add_log("I", f"{target}")
self.joint_syncer.speed_safe(target, delta_time)
self.joints_prev = selected_jnames + selected_jnames_inv
[docs]
def move_wheels(self, v: float, omega: float = 0.0):
"""
Drive the wheel syncer: linear velocity v plus optional omega for
differential steering; cancel when both are zero.
Clears the Wheel Syncer if the selected joints are changed.
"""
if self.wheel_syncer is None:
return
if v == 0.0 and omega == 0.0:
self.wheel_syncer.last_future.cancel()
return
wheel_jnames = sorted(jn for (_, jn) in self.selected_wheel_joints)
wheel_jnames_inv = sorted(jn for (_, jn) in self.selected_wheel_joints_inv)
if not wheel_jnames:
return
if (wheel_jnames + wheel_jnames_inv) != self.wheels_prev:
self.wheel_syncer.clear()
target = {jname: (v + omega) for jname in wheel_jnames}
target.update({jn: (-v + omega) for jn in wheel_jnames_inv})
# self.add_log("I", f"{target}")
start_time = ros_now(self)
def delta_time():
nonlocal start_time
now = ros_now(self)
out = (now - start_time).sec()
start_time = now
return out
self.wheel_syncer.speed_safe(target, delta_time)
self.wheels_prev = wheel_jnames + wheel_jnames_inv
[docs]
def move_ik(self):
"""
Fired by a 0.1 s timer when any relevant joystick input is active:
compute Δ-pose from sticks/triggers, assemble per-leg targets
(relative or absolute), and send to the IK syncer. Stops when sticks go idle.
Clears the IK Syncer if the selected legs are changed.
"""
if self.ik_syncer is None:
return
ik_by_leg = {ih.limb_number: ih for ih in self.ik_handlers}
ik_ready_legs = [
leg
for leg in self.selected_ik_legs
if (ih := ik_by_leg.get(leg)) is not None and ih.ready.done()
]
if not ik_ready_legs:
return
if ik_ready_legs != self.ik_legs_prev:
self.ik_syncer.clear()
bits = self.joy_state.bits
sticks_active = any_pressed(
bits,
[
"stickR",
"stickL",
"R2",
"L2",
"R1",
"L1",
],
)
if not sticks_active:
self.ik_syncer.last_future.cancel()
self.ikTMR.cancel()
return
speed_xyz = self.translation_speed # mm/s
speed_quat = self.rotation_speed # rad/s
delta_xyz = speed_xyz * self.ikTMR.timer_period_ns / 1e9
delta_quat = speed_quat * self.ikTMR.timer_period_ns / 1e9
xyz_input = np.empty((3,), dtype=float)
xyz_input[[0, 1]] = self.joy_state.stickL # left stick to move
xyz_input[2] = -self.joy_state.R2 + self.joy_state.L2 # deep triggers to move Z
r1 = (bits & BUTT_INTS["R1"]) != 0
l1 = (bits & BUTT_INTS["L1"]) != 0
x_rot = qt.from_rotation_vector([-l1 + r1, 0, 0])
y_rot = qt.from_rotation_vector([0, self.joy_state.stickR[0], 0])
z_rot = qt.from_rotation_vector([0, 0, self.joy_state.stickR[1]])
rot = (z_rot * y_rot * x_rot) ** delta_quat
target_pose_stick = Pose(ros_now(self), xyz_input * delta_xyz, rot)
target_stick = {leg: target_pose_stick for leg in ik_ready_legs}
target_rel_to_ee = self.ik_syncer.abs_from_rel(target_stick)
target_rel_to_base = rel_to_base_link(self.ik_syncer, target_stick)
if self.ee_mode is True:
target = target_rel_to_ee
else:
target = target_rel_to_base
# self.add_log("I", f"{target_rel_to_ee}")
self.ik_syncer.lerp_toward(target)
self.ik_legs_prev = ik_ready_legs
[docs]
def move_ik_keyboard(
self, lin: Flo3 = [0.0, 0.0, 0.0], rvec: Flo3 = [0.0, 0.0, 0.0]
):
"""
Send a small IK velocity update from keyboard input.
Args:
lin: 3-vector [vx, vy, vz] (mm/s) for linear velocity.
rvec: 3-vector [wx, wy, wz] (rad/s) for angular velocity.
What it does:
1. Gathers all ready IK legs in `self.selected_ik_legs`.
2. If that set changed, clears the previous syncer to avoid stale targets.
3. Computes Δtime since last tick.
4. Wraps (lin, rvec) into a VelPose stamped now (and transforms
into base_link frame if not end-effector mode).
5. Calls `self.ik_syncer.speed_safe(...)` with the per-leg velocities and Δtime.
Called on each IK-related key press or release.
"""
if self.ik_syncer is None:
return
ik_by_leg = {ih.limb_number: ih for ih in self.ik_handlers}
ik_ready_legs = [
leg
for leg in self.selected_ik_legs
if (ih := ik_by_leg.get(leg)) is not None and ih.ready.done()
]
if not ik_ready_legs:
return
if ik_ready_legs != self.ik_legs_prev:
self.ik_syncer.clear()
dt = delta_time(self, self.execTMR.timer_period_ns / 1e9)
lin = np.asarray(lin, dtype=float)
rvec = np.asarray(rvec, dtype=float)
delta_velpose = VelPose(ros_now(self), lin, rvec)
target_rel_to_ee = {leg: delta_velpose for leg in ik_ready_legs}
if self.ee_mode:
target = target_rel_to_ee
else:
target = rel_vel_to_base_link(self.ik_syncer, target_rel_to_ee)
# self.add_log("I", f"{target}")
self.ik_syncer.speed_safe(target, dt)
self.ik_legs_prev = ik_ready_legs
def _update_ik_from_keys(self):
"""
Read the current key‐hold flags in self.ik_key_states, compute the
net translation (lin) and rotation vector (rvec), and then either:
• call move_ik_keyboard(lin, rvec) to issue a new IK velocity command, or
• cancel any ongoing IK velocity if no relevant keys are down.
Ensures you can combine multiple arrow/WSAD/QE presses simultaneously.
"""
# build net translation
lin = np.zeros(3, dtype=float)
if self.ik_key_states[Key.KEY_UP]:
lin[0] += self.translation_speed
elif self.ik_key_states[Key.KEY_DOWN]:
lin[0] -= self.translation_speed
if self.ik_key_states[Key.KEY_LEFT]:
lin[1] -= self.translation_speed
elif self.ik_key_states[Key.KEY_RIGHT]:
lin[1] += self.translation_speed
if self.ik_key_states[Key.KEY_I]:
lin[2] += self.translation_speed
elif self.ik_key_states[Key.KEY_K]:
lin[2] -= self.translation_speed
# build net rotation vector
rvec = np.zeros(3, dtype=float)
if self.ik_key_states[Key.KEY_Q]:
rvec[0] -= self.rotation_speed
elif self.ik_key_states[Key.KEY_E]:
rvec[0] += self.rotation_speed
if self.ik_key_states[Key.KEY_W]:
rvec[1] -= self.rotation_speed
elif self.ik_key_states[Key.KEY_S]:
rvec[1] += self.rotation_speed
if self.ik_key_states[Key.KEY_A]:
rvec[2] -= self.rotation_speed
elif self.ik_key_states[Key.KEY_D]:
rvec[2] += self.rotation_speed
if np.any(lin) or np.any(rvec):
self.move_ik_keyboard(lin=lin, rvec=rvec)
else:
if self.ik_syncer is not None:
self.ik_syncer.last_future.cancel()
[docs]
def move_zero(self):
"""
Send a one-off lerp command to drive all selected joints to zero position.
Clears the Joint Syncer if the selected joints are changed.
"""
if self.joint_syncer is None:
return
selected_jnames = sorted(jn for (_, jn) in self.selected_joints)
selected_jnames_inv = sorted(jn for (_, jn) in self.selected_joints_inv)
if not selected_jnames and not selected_jnames_inv:
return
if (selected_jnames + selected_jnames_inv) != self.joints_prev:
self.joint_syncer.clear()
self.add_log("I", "Sending the selected joints to zero position.")
target = {}
target.update({jname: 0.0 for jname in selected_jnames + selected_jnames_inv})
zero_future = self.joint_syncer.lerp(target)
return zero_future
[docs]
def switch_ik_mode(self, val: Optional[bool] = None):
"""
Toggle or set whether IK deltas are interpreted relative to the end
effector (True) or to the base link (False).
"""
if val is None:
self.ee_mode = not self.ee_mode
else:
self.ee_mode = val
self.add_log("I", f"IK mode relative to end effector: {self.ee_mode}")
[docs]
def start_ik_timer(self):
"""
Helper to (re)start the periodic move_ik timer, ensuring a fresh IK
stream if control resumes after a pause.
"""
if self.ik_syncer is None:
return
if self.ikTMR.is_canceled():
elapsed = Duration(nanoseconds=self.ikTMR.time_since_last_call())
if elapsed > Duration(seconds=5):
self.ik_syncer.clear()
self.ikTMR.reset()
self.ikTMR.callback()
# ───────────────────────────── Keyboard ─────────────────────────────
[docs]
@error_catcher
def key_downSUBCBK(self, msg: Key):
"""
A callback that connects the pressed key from the keyboard upon arrival of the msg.
"""
code = msg.code
mod = msg.modifiers & ~(Key.MODIFIER_NUM | Key.MODIFIER_CAPS)
connect_mapping(self.main_map, (code, mod))
connect_mapping(self.sub_map, (code, mod))
if self.current_mode == "ik_select" and code in self.ik_key_states:
self.ik_key_states[code] = True
self._update_ik_from_keys()
[docs]
@error_catcher
def key_upSUBCBK(self, msg: Key):
"""
A callback that connects the realesed key from the keyboard upon arrival of the msg.
"""
code = msg.code
mod = msg.modifiers & ~(Key.MODIFIER_NUM | Key.MODIFIER_CAPS)
if self.current_mode == "ik_select" and code in self.ik_key_states:
self.ik_key_states[code] = False
self._update_ik_from_keys()
else:
self.stop_all_joints()
# ────────────────────────────────────────────────────────────────────
# ───────────────────────────── JoyStick ─────────────────────────────
[docs]
def joy_pressed(self, button_name: ButtonName):
"""Executes for each button that is pressed. Like a callback.
Args:
bits: Should only have one single bit set to 1, for 1 single button
"""
dic_key = (button_name, self.joy_state.bits)
connect_mapping(self.main_map, dic_key)
connect_mapping(self.sub_map, dic_key)
[docs]
def joy_released(self, button_name: ButtonName):
"""Executes for each button that is released. Like a callback.
Args:
bits: Should only have one single bit set to 1, for 1 single button
"""
dic_key = (button_name, self.joy_state.bits)
self.stop_all_joints()
[docs]
@error_catcher
def joySUBCBK(self, msg: Joy):
"""Processes incomming joy messages.
Converts and stores the received state in self.joy_state .
executes self.joy_pressed and self.joy_released for each button that changed state
Args:
msg: Ros2 Joy message type
"""
self.prev_joy_state = self.joy_state
self.joy_state = msg_to_JoyBits(msg)
button_downed: JoyBits = ~self.prev_joy_state.bits & self.joy_state.bits
downed_names: List[ButtonName] = bits2name(button_downed)
for name in downed_names:
self.joy_pressed(name)
button_upped = self.prev_joy_state.bits & ~self.joy_state.bits
upped_names: List[ButtonName] = bits2name(button_upped)
for name in upped_names:
self.joy_released(name)
return
# ────────────────────────────────────────────────────────────────────
[docs]
def stop_all_joints(self):
"""
Utility to clear & cancel both joint and IK syncers’ last futures.
"""
if self.joint_syncer:
self.joint_syncer.last_future.cancel()
if self.ik_syncer:
self.ik_syncer.last_future.cancel()
[docs]
def add_log(self, level: str, msg: str):
"""
Append a timestamped log entry into the rolling message buffer
for display in the TUI footer.
"""
ts = time.strftime("%H:%M:%S")
self.log_messages.append(f"{ts} [{level}] {msg}")
def _append_leg_digit(self, d: str):
"""Add one digit to the current leg‐ID buffer."""
self.leg_buffer += d
self.add_log("I", f"Leg buffer: {self.leg_buffer}")
def _backspace_leg_digit(self):
"""Erase the last digit from the leg buffer."""
self.leg_buffer = self.leg_buffer[:-1]
self.add_log("I", f"Leg buffer: {self.leg_buffer}")
def _confirm_leg_buffer(self):
"""Parse and select the buffered leg number, then clear it."""
if not self.leg_buffer:
return
try:
num = int(self.leg_buffer)
self.select_leg([num])
except ValueError:
self.add_log("W", f"Invalid leg number: {self.leg_buffer}")
finally:
self.leg_buffer = ""
def _append_joint_leg_digit(self, d: str):
"""
Add a digit to the leg‐ID buffer if no active leg selected.
Once a leg is active, digit‐presses go straight to joint toggles.
"""
if self.active_joint_leg is None:
self.joint_leg_buffer += d
self.add_log("I", f"Joint-leg buf: {self.joint_leg_buffer}")
def _backspace_joint_leg_digit(self):
"""
If we have an active leg, clear it (so you can type a new one).
Otherwise erase the last digit from the buffer.
"""
if self.active_joint_leg is not None:
self.add_log("I", f"Cleared active leg {self.active_joint_leg}")
self.active_joint_leg = None
else:
self.joint_leg_buffer = self.joint_leg_buffer[:-1]
self.add_log("I", f"Joint-leg buf: {self.joint_leg_buffer}")
def _confirm_joint_leg_buffer(self):
if not self.joint_leg_buffer:
return
try:
leg = int(self.joint_leg_buffer)
except ValueError:
self.add_log("W", f"Bad leg ID: {self.joint_leg_buffer}")
else:
if leg in self.current_legs:
self.active_joint_leg = leg
self.add_log("I", f"Active joint leg ▶ {leg}")
else:
self.add_log("W", f"Leg {leg} not found")
finally:
self.joint_leg_buffer = ""
def _toggle_joint_for_active_leg(self, idx: int):
"""
Toggle joint #idx (1-based) on self.active_joint_leg, direct only.
"""
leg = self.active_joint_leg
if leg is None:
self.add_log("W", "No active leg (type its ID + Enter)")
return
jh = next(
(h for h in self.joint_handlers if h.limb_number == leg and h.ready.done()),
None,
)
if not jh:
self.add_log("W", f"Leg {leg} not ready")
return
joints = sorted(jh.tracked)
if idx - 1 >= len(joints):
self.add_log("W", f"Leg {leg} has no joint {idx}")
return
jn = joints[idx - 1]
key = (leg, jn)
# skip if wheel owns it
if key in self.selected_wheel_joints or key in self.selected_wheel_joints_inv:
self.add_log("W", f"This joint is being used in the wheel mode.")
return
# toggle direct selection only
if key in self.selected_joints:
self.selected_joints.remove(key)
else:
self.selected_joints.add(key)
self.add_log("I", f"Joints ▶ {sorted(self.selected_joints)}")
[docs]
def recover_all(self):
self.add_log("W", "REPLACE THIS FUNCTION WITH YOUR RECOVER STRATEGY.")
[docs]
def recover(self):
self.add_log("W", "REPLACE THIS FUNCTION WITH YOUR RECOVER STRATEGY.")
for leg in self.selected_legs:
return
[docs]
def halt_all(self):
self.add_log("W", "REPLACE THIS FUNCTION WITH YOUR HALTING STRATEGY.")
[docs]
def halt(self):
self.add_log("W", "REPLACE THIS FUNCTION WITH YOUR HALTING STRATEGY.")
for leg in self.selected_legs:
return
[docs]
def create_main_map(self) -> InputMap:
"""
Build and return the global key map (Escape ⇒ main menu, Space/Return ⇒ stubs, etc.).
These keybinds work always.
"""
return {
(Key.KEY_R, ANY): [self.recover],
(Key.KEY_R, Key.MODIFIER_LSHIFT): [self.recover_all],
(Key.KEY_SPACE, ANY): [self.halt],
(Key.KEY_SPACE, Key.MODIFIER_LSHIFT): [self.halt_all],
(Key.KEY_ESCAPE, ANY): [self.clear_screen, self.enter_main_menu],
("option", ANY): [self.enter_main_menu],
("PS", ANY): [self.halt_all],
}
[docs]
@error_catcher
def loop(self):
"""
Runs at ~30 Hz: execute whichever of the three syncers (joint, IK, wheel)
currently exists.
"""
if self.joint_syncer is not None:
self.joint_syncer.execute()
if self.ik_syncer is not None:
self.ik_syncer.execute()
if self.wheel_syncer is not None:
self.wheel_syncer.execute()
[docs]
def clear_screen(self):
if not hasattr(self, "clear_screen_callback"):
return
self.clear_screen_callback()
[docs]
def main():
rclpy.init()
node = OperatorNode()
threading.Thread(target=rclpy.spin, args=(node,), daemon=True).start()
from .operator_tui import OperatorTUI
tui = OperatorTUI(node)
node.clear_screen_callback = tui.clear_screen
tui.run()
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()