API
I encourage you to dive into the source code and customize it to fit your robot’s unique needs. By importing the motion stack Python API into your own package and nodes, you keep customizations separate from the core motion stack while adapting it to each of your robot or robot module.
In this section, I’ll walk you through an example: creating a package to launch the Moonbot Zero with a different architecture and modifying the behavior of the nodes.
Make your package
Note
Source ros2 before all those commands.
Go in your workspace’s source:
cd ~/Motion-Stack/src/
Create a package with a node named lvl1:
ros2 pkg create --build-type ament_python --node-name lvl1 moonbot_zero
Open src/moonbot_zero/setup.py
and change it like below. This will make available in the share sirectory:
All your .launch.py files in
launch/
, so ros2 can find them.All your meshes/ and urdf/ available
1from setuptools import find_packages, setup
2from glob import glob # add this line
3
4package_name = 'moonbot_zero'
5
6setup(
7 name=package_name,
8 version='0.0.0',
9 packages=find_packages(exclude=['test']),
10 data_files=[
11 (f"share/{package_name}/launch", glob("launch/*.py")),
12 ('share/ament_index/resource_index/packages',
13 ['resource/' + package_name]),
14 ('share/' + package_name, ['package.xml']),
15 ...
Create your own launcher in launch/
of your new package:
cd ~/Motion-Stack/src/moonbot_zero
mkdir launch
cd launch
touch myrobot.launch.py
Note
For the provided executable to launch your new launcher, change ~/Motion-Stack/launch_stack.bash
like so:
...
ros2 launch moonbot_zero myrobot.launch.py MS_up_to_level:=4
You can then launch and see your changes with bash launch_stack.bash
:
Using your URDF
Making a URDF available from your custom package
In a ros package (here named moonbot_zero), create a urdf/
and meshes/
directories, then place you urdfs and meshes inside.
cd ~/Motion-Stack/src/moonbot_zero
mkdir meshes
mkdir urdf
Make those directories available in the package shared directory by changing the setup.py
1...
2
3setup(
4 name=package_name,
5 version='0.0.0',
6 packages=find_packages(exclude=['test']),
7 data_files=[
8 (f"share/{package_name}/launch", glob("launch/*.py")),
9 (f"share/{package_name}/urdf", glob("urdf/*", recursive=True)), # (this)
10 (f"share/{package_name}/meshes", glob("meshes/*", recursive=True)), # (this)
11 ]
12 )
Make sure to adjust the paths of the urdf. <mesh filename="{SOMETHING}/base_link.stl" />
should be <mesh filename="package://moonbot_zero/meshes/base_link.stl" />
(where moonbot_zero is the name of the package).
Loading your URDF
Using the launch API in the next section, you can load a URDF by providing the package name and path. Assuming the package is moonbot_zero and the URDF is inside urdf/moonbot_zero.xacro
:
urdf_path=xacro_path_from_pkg(
package_name="moonbot_zero",
xacro_path="urdf/moonbot_zero.xacro",
)
Launch API
To streamline the creation of numerous nodes, the motion_stack.api.launch
provides a python launch API – essentially wrapping around ROS2’s launch system. The class api.launch.builder.LevelBuilder
creates the nodes to be launched and its ultimate method api.launch.builder.LevelBuilder.make_description()
returns the launch description used by ROS2.
Warming up
Edit your myrobot.launch.py
and let us start with the default launch provided by the motion stack:
from motion_stack.api.launch.builder import (
LevelBuilder,
xacro_path_from_pkg,
)
ROBOT_NAME = "moonbot_7"
LEGS_DIC = {
1: "end1",
2: "end2",
3: "end3",
4: "end4",
}
lvl_builder = LevelBuilder(
urdf_path=xacro_path_from_pkg(
package_name="moonbot_zero", xacro_path="urdf/moonbot_zero.xacro"
),
leg_dict=LEGS_DIC,
)
def generate_launch_description():
return lvl_builder.make_description()
Changing params
...
new_params = {
"std_movement_time": 10.0,
}
lvl_builder = LevelBuilder(
urdf_path=xacro_path_from_pkg(
package_name="moonbot_zero", xacro_path="urdf/moonbot_zero.xacro"
),
leg_dict=LEGS_DIC,
params_overwrite=new_params,
)
...
After overwriting the std_movement_time
parameter with 10 by passing it to the LevelBuilder
, movements are very slow:
ros2 service call /leg1/shift motion_stack_msgs/srv/TFService "{tf: {translation: {x: -100, y: 0, z: -100}, rotation: {x: 0.0, y: 0.0, z: 0.0, w: 1.0}}}"
Changing end effector and leg numbers
...
LEGS_DIC = {
1: "end2",
2: "end1",
3: "end3",
40: "end4",
}
...
After changing the LEGS_DIC
dictionary specifying which end effector correspond to each leg and passing it to LevelBuilder
, leg2 is now the one at the front.
ros2 service call /leg2/shift motion_stack_msgs/srv/TFService "{tf: {translation: {x: -100, y: 0, z: -100}, rotation: {x: 0.0, y: 0.0, z: 0.0, w: 1.0}}}"
And leg 40 is the one on the right:
ros2 service call /leg40/shift motion_stack_msgs/srv/TFService "{tf: {translation: {x: 20, y: 50, z: -50}, rotation: {x: 0.0, y: 0.0, z: 0.0, w: 1.0}}}"
Note
Revert this back when you are done, otherwise you might get confused going further.
LEGS_DIC = {1: "end1", 2: "end2", 3: "end3", 4: "end4"}
Overloading to have a single robot_state_publisher
Looking at the default launching behavior, each leg has it own state publisher. This has limited usefulness for our Moobot Zero because this robot makes use of one centralized computer and not one computer per leg.
Let’s change api.launch.LevelBuilder.state_publisher_lvl1()
to centralize the state publishers in global namespace. Comparing below with the original source code, not much changed aside from one loop and a remapping.
...
from typing import Any, Dict, List, Mapping, Union
from launch_ros.actions import Node
from launch_ros.parameter_descriptions import ParameterValue
from motion_stack.api.launch.builder import LevelBuilder, xacro_path_from_pkg
from launch.substitutions import Command
class MyLevelBuilder(LevelBuilder):
def state_publisher_lvl1(self) -> List[Node]:
compiled_xacro = Command([f"xacro ", self.xacro_path])
node_list = []
leg_namespaces = [f"leg{param['leg_number']}" for param in self.lvl1_params()]
all_joint_read_topics = [f"{ns}/joint_read" for ns in leg_namespaces]
node_list.append(
Node(
package=self.MS_PACKAGE,
executable="lazy_joint_state_publisher",
name="lazy_joint_state_publisher",
# namespace=ns,
arguments=["--ros-args", "--log-level", "warn"],
parameters=[
{
"source_list": all_joint_read_topics,
"publish_default_positions": True,
}
],
remappings=[
# (intside node, outside node),
("joint_states", "continuous_joint_read"),
],
),
)
node_list.append(
Node(
package="robot_state_publisher",
executable="robot_state_publisher",
name="robot_state_publisher",
# namespace=ns,
arguments=["--ros-args", "--log-level", "warn"],
parameters=[
{
"robot_description": ParameterValue(
compiled_xacro, value_type=str
),
}
],
remappings=[
# (intside node, outside node),
("joint_states", "continuous_joint_read"),
],
),
)
return node_list
...
We created a new class MyLevelBuilder
that inherits the behavior of LevelBuilder
and overwrites the one method state_publisher_lvl1
. Now, when self.state_publisher_lvl1
is called, one joint_state_publisher
and robot_state_publisher
is created in the global namespace listening to the list of topics [leg1/joint_read, leg2/joint_read, ...]
.
Note
lazy_joint_state_publisher
is used, it is slightly different from the default joint_state_publisher
. See motion_stack.ros2.utils.lazy_joint_state_publisher.LazyJointStatePublisher
Remapping
Notice in the previous example, “joint_states” topic is used differently by several nodes. They need to be remapped onto other name to avoid conflicts:
...
remappings=[
# (intside node, outside node),
("joint_states", "continuous_joint_read"),
],
...
Note
Remapping and namespaces are the main way to avoid conflicts when building your modular system.
Automating modularity
Using python you can change the behavior of your launcher depending on where it is launch (on the robot brain, on leg #1, on leg #2, on any PC, on ground station, …). There is no one good way to do it, so I will explain my method with a very basic example:
I define environment variables in the OS of the computer, then launch different nodes base on that. Again, overwrite api.launch.LevelBuilder.state_publisher_lvl1()
to add such functionalities.
class MyLevelBuilder(LevelBuilder):
def __init__(
self,
urdf_path: str,
leg_dict: Mapping[int, Union[str, int]],
params_overwrite: Dict[str, Any] = dict(),
urdf: Union[None, str, Command] = None,
):
# gets the "COMPUTER_ID" environement variable
self.COMPUTER_ID = os.environ.get("COMPUTER_ID")
if self.COMPUTER_ID in ["leg1", "leg2", "leg3", "leg4"]:
# if running on one of the leg computer
# we only start the assiciated leg/end-effector
leg_number = int(self.COMPUTER_ID[-1])
end_effector: Union[str, int, None] = leg_dict.get(leg_number)
if end_effector is None:
raise Exception("leg number has no entry in leg_dict")
reduced_leg_dict = {leg_number: end_effector}
leg_dict = reduced_leg_dict
super().__init__(urdf_path, leg_dict, params_overwrite, urdf)
def make_levels(self) -> List[List[Node]]:
if self.COMPUTER_ID in ["leg1", "leg2", "leg3", "leg4"]:
# if running on one of the leg computer
# we only start lvl1
return [self.lvl1()]
if self.COMPUTER_ID == "robot_brain":
# if running on the main robot computer
# we start lvl2-3-4
return [self.lvl2(), self.lvl3(), self.lvl4()]
if self.COMPUTER_ID == "ground_station":
# if running on the ground station
# we start only lvl5
return [self.lvl5()]
# if none of the previous cases, the default behavior runs everything
return super().make_levels()
Note
This is not part of the tutorial, you do not need to make this work.
Loading you own node
In the next section we will replace the default motion stack lvl1 node motion_stack.ros2.default_node.lvl1.DefaultLvl1
with our own modified node, from our package. We will make the launch API load our node instead of the default.
In your launcher overload LevelBuilder.get_node_lvl1()
with:
class MyLevelBuilder(LevelBuilder):
def get_node_lvl1(self, params: Dict[str, Any]) -> Node:
ns = f"leg{params['leg_number']}"
return Node(
package="moonbot_zero",
namespace=ns,
executable="lvl1",
name=f"lvl1",
arguments=["--ros-args", "--log-level", "info"],
emulate_tty=True,
output="screen",
parameters=[params],
remappings=self.remaplvl1,
)
Lvl1 specialization API
The Motion Stack low level python code is designed such that you can easily overload relevant part of the code and use it like an API in which you inject your code.
Note
After completing the previous step “Loading you own node”, modify your node src/moonbot_zero/moonbot_zero/lvl1.py
.
Overwriting
By importing the motion stack default node of lvl1 motion_stack.ros2.default_node.lvl1
, you can overwrite parts of it with the code you need.
- The following example python file:
Overwrite
lvl1.DefaultLvl1
.__init__()
to add a timer and publisherMakes a new callback for the timer, moving each joint in a sinusoidal motion (this emulates a subscriber or something receiving data).
Overwrites
DefaultLvl1.publish_to_lvl0()
, it now also publishes every command on a string topicdisplay_angle_command
.
1"""Overloaded JointNode for Moonbot zero."""
2
3from typing import Iterable, List
4
5import numpy as np
6from motion_stack.api.injection.remapper import Shaper, StateRemapper
7from motion_stack.api.ros2.offsetter import setup_lvl0_offsetter
8from motion_stack.api.ros2.state_to_topic import StatesToTopic
9from motion_stack.core.utils.joint_state import JState
10from motion_stack.ros2.default_node.lvl1 import DefaultLvl1
11from motion_stack.ros2.utils.conversion import ros_to_time
12from motion_stack.ros2.utils.executor import error_catcher
13from rclpy.node import Timer
14from std_msgs.msg import String
15
16
17class ZeroLvl1(DefaultLvl1):
18 def __init__(self):
19 super().__init__() # default node intialization
20 # our new code
21 self.stringPUB = self.create_publisher(String, "display_angle_command", 10)
22 self.sinusiodTMR: Timer = self.create_timer(0.1, self.send_sinusoid)
23 self.start_time = self.get_clock().now()
24
25
26 @error_catcher
27 def send_sinusoid(self):
28 """Sends a sinusoidal command on every joint based on the clock.
29 Callback of the self.sinTMR timer."""
30 # sinusoid creation
31 now = self.get_clock().now()
32 since_start = ros_to_time(now - self.start_time)
33 PERIOD = 10
34 AMPLITUDE = 0.1
35 sinusoid = np.sin(since_start.sec() * np.pi / PERIOD) * AMPLITUDE
36
37 # joint states data creation
38 state_list: List[JState] = []
39 for name in self.core.jointHandlerDic.keys():
40 state = JState(name=name, time=ros_to_time(now), position=sinusoid)
41 state_list.append(state)
42
43 # call of the function handling incomming joint commands from lvl2
44 self.core.coming_from_lvl2(state_list)
45
46 def publish_to_lvl0(self, states: List[JState]):
47 """Executes our custom code every time state data (commands) needs to be sent down to lvl0."""
48 super().publish_to_lvl0(states) # executes default behavior
49
50 # prepares string
51 str_to_send: List[str] = [f"leg {self.core.leg_num}"]
52 for joint_state in states:
53 if joint_state.position is None:
54 continue # skips empty states with no angles
55 str_to_send.append(
56 f"lvl1 -> lvl0: {joint_state.name} "
57 f"| {np.rad2deg(joint_state.position):.1f}"
58 )
59
60 # publishes string
61 if str_to_send[1:]:
62 self.stringPUB.publish(String(data="\n".join(str_to_send)))
63
64
65def main(args=None):
66 ZeroLvl1.spin()
67
68
69if __name__ == "__main__":
70 main()
You can now listen to the motor commands of leg1 using:
ros2 topic echo /leg1/display_angle_command
data: 'leg 1
lvl1 -> lvl0: joint1_1 | 5.7
lvl1 -> lvl0: joint1_2 | 5.7
lvl1 -> lvl0: joint1_3 | 5.7'
Using the API and overloading like this, you can easily add functionalities to the motion stack without creating a new whole node, and with minimal knowledge of ros2. You can:
Change where the data is sent and how it is formatted (like we did with the string topic).
Change where the data comes from and its format (like we did with the timer, you can replace it with a subscriber).
Injection
Injection consists in instantiating an object that adds functionalities to a parent object.
Right now a few ready to use injections are available in motion_stack.api.ros2
(their non-ros dependent and general injections are in motion_stack.api.injection
).
motion_stack.api.injection.remapper
: Remaps states names, and applies shaping functions to the state data. With this you can apply offsets, gains and more. (does not require ros)
motion_stack.api.ros2.offsetter
: Adds angle offsets to the motor output of lvl1 at runtime (and a little bit more)
motion_stack.api.ros2.state_to_topic
: Publishes on individual Float64 topics instead of a JointStates topic.
Let’s use all 3:
1"""Overloaded JointNode for Moonbot zero."""
2
3from typing import Iterable, List
4
5import numpy as np
6from motion_stack.api.injection.remapper import Shaper, StateRemapper
7from motion_stack.api.ros2.offsetter import setup_lvl0_offsetter
8from motion_stack.api.ros2.state_to_topic import StatesToTopic
9from motion_stack.core.utils.joint_state import JState
10from motion_stack.ros2.default_node.lvl1 import DefaultLvl1
11from motion_stack.ros2.utils.conversion import ros_to_time
12from motion_stack.ros2.utils.executor import error_catcher
13from rclpy.node import Timer
14from std_msgs.msg import String
15
16
17class ZeroLvl1(DefaultLvl1):
18 def __init__(self):
19 super().__init__() # default node intialization
20 # our new code
21 self.stringPUB = self.create_publisher(String, "display_angle_command", 10)
22 self.sinusiodTMR: Timer = self.create_timer(0.1, self.send_sinusoid)
23 self.start_time = self.get_clock().now()
24
25 # Replaces default empty remap by our own
26 self.core.lvl0_remap = StateRemapper(
27 name_map={
28 "joint1_1": "my_new_joint" # changes the name of "joint1_1"
29 },
30 state_map={
31 "joint1_1": Shaper(position=lambda x: x * 2) # multiplies command by 2
32 },
33 unstate_map={
34 "joint1_1": Shaper(position=lambda x: x / 2) # divides sensor by 2
35 },
36 )
37
38 setup_lvl0_offsetter(self) # Enables the offsetter
39
40 # Enables publishing on individual float topics
41 StatesToTopic.setup_lvl0_command(self)
42
43 @error_catcher
44 def send_sinusoid(self):
45 """Sends a sinusoidal command on every joint based on the clock.
46 Callback of the self.sinTMR timer."""
47 # sinusoid creation
48 now = self.get_clock().now()
49 since_start = ros_to_time(now - self.start_time)
50 PERIOD = 10
51 AMPLITUDE = 0.1
52 sinusoid = np.sin(since_start.sec() * np.pi / PERIOD) * AMPLITUDE
53
54 # joint states data creation
55 state_list: List[JState] = []
56 for name in self.core.jointHandlerDic.keys():
57 state = JState(name=name, time=ros_to_time(now), position=sinusoid)
58 state_list.append(state)
59
60 # call of the function handling incomming joint commands from lvl2
61 self.core.coming_from_lvl2(state_list)
62
63 def publish_to_lvl0(self, states: List[JState]):
64 """Executes our custom code every time state data (commands) needs to be sent down to lvl0."""
65 super().publish_to_lvl0(states) # executes default behavior
66
67 # prepares string
68 str_to_send: List[str] = [f"leg {self.core.leg_num}"]
69 for joint_state in states:
70 if joint_state.position is None:
71 continue # skips empty states with no angles
72 str_to_send.append(
73 f"lvl1 -> lvl0: {joint_state.name} "
74 f"| {np.rad2deg(joint_state.position):.1f}"
75 )
76
77 # publishes string
78 if str_to_send[1:]:
79 self.stringPUB.publish(String(data="\n".join(str_to_send)))
80
81
82def main(args=None):
83 ZeroLvl1.spin()
84
85
86if __name__ == "__main__":
87 main()
Running ros2 topic echo /leg1/display_angle_command
you’ll see that joint1-1
is now my-new-joint
, and its value has been multiplied by 2.
ros2 topic echo /leg1/display_angle_command
data: 'leg 1
lvl1 -> lvl0: my_new_joint | 11.4
lvl1 -> lvl0: joint1_2 | 5.7
lvl1 -> lvl0: joint1_3 | 5.7'
Running ros2 topic list | grep .*/driver
you’ll see that topics have been created, publishing the positions of the joints.
ros2 topic list | grep .*/driver
/leg1/driver/joint1_2/position
/leg1/driver/joint1_3/position
/leg1/driver/my_new_joint/position
/leg2/driver/joint2_1/position
/leg2/driver/joint2_2/position
/leg2/driver/joint2_3/position
/leg3/driver/joint3_1/position
/leg3/driver/joint3_2/position
/leg3/driver/joint3_3/position
/leg4/driver/joint4_1/position
/leg4/driver/joint4_2/position
/leg4/driver/joint4_3/position
Running the code below, will add 1 radian to the output of joint1-2 (not in rviz, only on the lvl0 motor command output).
ros2 service call /leg1/set_offset motion_stack_msgs/srv/SendJointState "{js: {name: [joint1_2], position: [1], velocity: [], effort: []}}"
data: 'leg 1
lvl1 -> lvl0: my_new_joint | -1.4
lvl1 -> lvl0: joint1_2 | -58.0
lvl1 -> lvl0: joint1_3 | -0.7'
High level API
Warning
This tutorial section is not finished, the in-code documentation is however available: motion_stack.api.ros2
High level APIs are available and meant to be used in your own ROS2 nodes. The API simplifies things, however you can also directly send messages onto the available ROS2 topics.
Joint API –
api.ros2.joint_api
: ROS2 API to send/receive joint command/state to lvl1 and synchronize multiple joints.IK API –
api.ros2.ik_api
: ROS2 API to send/receive end-effector IK command / FK state to lvl2 and synchronize multiple limbs.

An example node using the high level API, doing some movements using the moonbot zero is available in src/moonbot_zero_tuto/moonbot_zero_tuto/high_level.py
. This node is specific to moonbot zero, however the apis used are not. Please take inspiration from it.
Launch the motion stack, Rviz and the tutorial node with the moonbot zero:
bash launch_stack.bash
bash launch_simu_rviz.bash # (separate terminal)
ros2 run moonbot_zero_tuto high_level # (separate terminal)
1"""This gives example of a high level node using the motion stack API
2
3Warning:
4 To make this example as easy as possible, async/await is heavily used.
5 This is unusual, you do not need and even, should not use async/await with Ros2.
6 The motion stack uses generic Future and callback, async/await style
7 is not required for the motion stack.
8
9 In this example every time ``await``, is used (on a ros2 Future, or python awaitable),
10 the code pauses until the awaitable finishes, however it does not block the ros2 executor.
11 Basically, this ``await`` sleeps/waits without blocking ros2 operations
12 (incomming/outgoing messages).
13
14 async/await is easier to read, however much more reliable and performant code is
15 possible using ros2 future+callback and especially timers.
16
17"""
18
19from typing import Coroutine
20
21import numpy as np
22from rclpy.node import List, Node
23
24pass
25import motion_stack.ros2.ros2_asyncio.ros2_asyncio as rao
26from motion_stack.api.ik_syncer import XyzQuat
27from motion_stack.api.ros2.ik_api import IkHandler, IkSyncerRos
28from motion_stack.api.ros2.joint_api import JointHandler, JointSyncerRos
29from motion_stack.core.utils.math import patch_numpy_display_light, qt
30from motion_stack.core.utils.pose import Pose
31from motion_stack.ros2.utils.conversion import ros_now
32from motion_stack.ros2.utils.executor import error_catcher, my_main
33
34# lighter numpy display
35patch_numpy_display_light()
36
37
38x = 400
39z = -100
40DEFAULT_STANCE = np.array(
41 [
42 [x, 0, z],
43 [0, x, z],
44 [-x, 0, z],
45 [0, -x, z],
46 ],
47 dtype=float,
48)
49
50
51class TutoNode(Node):
52
53 #: list of limbs number that are controlled
54 LIMBS = [1, 2, 3, 4]
55
56 def __init__(self) -> None:
57 super().__init__("test_node")
58
59 self.create_timer(1 / 30, self.exec_loop) # regular execution
60 self.startTMR = self.create_timer(0.1, self.startup) # executed once
61
62 # API objects:
63
64 # Handles ros2 joints lvl1 (subscribers, publishers and more)
65 self.joint_handlers = [JointHandler(self, l) for l in self.LIMBS]
66 # Syncronises several joints
67 self.joint_syncer = JointSyncerRos(self.joint_handlers)
68
69 # Handles ros2 ik lvl2
70 self.ik_handlers = [IkHandler(self, l) for l in self.LIMBS]
71 # Syncronises several IK
72 self.ik_syncer = IkSyncerRos(
73 self.ik_handlers,
74 interpolation_delta=XyzQuat(20, np.inf),
75 on_target_delta=XyzQuat(2, np.inf),
76 )
77
78 self.get_logger().info("init done")
79
80 @error_catcher
81 async def main(self):
82 # wait for all syncers to be ready
83 await self.joints_ready()
84 await self.ik_ready()
85
86 # send to all angle at 0.0
87 await self.angles_to_zero()
88 # send to default stance
89 await self.stance()
90
91 # move end effector in a square (circle with 4 samples)
92 await self.ik_circle(4)
93 await self.stance()
94
95 # move end effector in a circle
96 await self.ik_circle(100)
97 await self.stance()
98
99 # increase the value of on_target_delta. Each point of the trajectory will be considered done faster, hence decreasing precision, but executing faster.
100 self.ik_syncer = IkSyncerRos(
101 self.ik_handlers,
102 interpolation_delta=XyzQuat(20, np.inf),
103 on_target_delta=XyzQuat(20, np.inf),
104 )
105 await self.ik_circle(100)
106 await self.ik_circle(100)
107 await self.ik_circle(100)
108 await self.ik_circle(100)
109 await self.stance()
110 print("finished")
111
112 async def joints_ready(self):
113 """Returns once all joints are ready"""
114 ready_tasks = [jh.ready for jh in self.joint_handlers]
115 try:
116 print("Waiting for joints.")
117 fused_task = rao.gather(self, *ready_tasks)
118 await rao.wait_for(self, fused_task, timeout_sec=100)
119 print(f"Joints ready.")
120 strlist = "\n".join(
121 [f"limb {jh.limb_number}: {jh.tracked}" for jh in self.joint_handlers]
122 )
123 print(f"Joints are:\n{strlist}")
124 return
125 except TimeoutError:
126 raise TimeoutError("Joint data unavailable after 100 sec")
127
128 async def ik_ready(self):
129 """Returns once all ik are ready"""
130 ready_tasks = [ih.ready for ih in self.ik_handlers]
131 try:
132 print("Waiting for ik.")
133 fused_task = rao.gather(self, *ready_tasks)
134 await rao.wait_for(self, fused_task, timeout_sec=100)
135 print(f"Ik ready.")
136 strlist = "\n".join(
137 [f"limb {ih.limb_number}: {ih.ee_pose}" for ih in self.ik_handlers]
138 )
139 print(f"EE poses are:\n{strlist}")
140 return
141 except TimeoutError:
142 raise TimeoutError("Ik data unavailable after 100 sec")
143
144 def angles_to_zero(self) -> Coroutine:
145 """sends all joints to 0.0"""
146 target = {}
147 for jh in self.joint_handlers:
148 target.update({jname: 0.0 for jname in jh.tracked})
149
150 task = self.joint_syncer.asap(target)
151 return rao.wait_for(self, task, timeout_sec=100)
152
153 async def ik_circle(self, samples: int = 20):
154 """Executes a flat cricle trajectory.
155
156 Args:
157 samples: number of sample points making the circle trajectory.
158 """
159 s = samples
160 s += 1
161 radius = 70
162 ang = np.linspace(0, 2 * np.pi, s)
163 yz = radius * np.exp(1j * ang)
164 trajectory = np.zeros((s, 3), dtype=float)
165 trajectory[:, 0] = yz.real
166 trajectory[:, 1] = yz.imag
167
168 # last_target = self.ik_syncer._previous_point(set(self.LIMBS))
169 # start_poses = [last_target[h] for h in self.LIMBS]
170 for ind in range(trajectory.shape[0]):
171 target = {
172 handler.limb_number: Pose(
173 time=ros_now(self),
174 xyz=DEFAULT_STANCE[handler.limb_number - 1, :] + trajectory[ind, :],
175 quat=qt.one,
176 )
177 for handler in self.ik_handlers
178 # for handler, start in zip(self.ik_handlers, start_poses)
179 }
180 task = self.ik_syncer.lerp(target)
181 await rao.wait_for(self, task, timeout_sec=100)
182
183 def stance(self) -> Coroutine:
184 """Goes to the default moonbot zero stance using IK"""
185 xyz_targets = DEFAULT_STANCE
186 target = {
187 leg_num: Pose(
188 time=ros_now(self),
189 xyz=xyz_targets[leg_num - 1, :],
190 quat=qt.one,
191 )
192 for leg_num in self.LIMBS
193 }
194 return rao.wait_for(self, self.ik_syncer.lerp(target), timeout_sec=100)
195
196 @error_catcher
197 def startup(self):
198 """Execute once at startup"""
199 # Ros2 will executor will handle main()
200 rao.ensure_future(self, self.main())
201
202 # destroys timer
203 self.destroy_timer(self.startTMR)
204 print("Startup done.")
205
206 @error_catcher
207 def exec_loop(self):
208 """Regularly executes the syncers"""
209 self.joint_syncer.execute()
210 self.ik_syncer.execute()
211
212
213def main(*args):
214 my_main(TutoNode)