"""
API to generate launch files
.. _level-builder-label:
"""
import sys
from copy import deepcopy
from typing import Any, Dict, Iterable, List, Mapping, Optional, TypeVar, Union
import numpy as np
from launch_ros.actions import Node
from launch_ros.parameter_descriptions import ParameterValue
from easy_robot_control.launch.default_params import (
RVIZ_SIMU_REMAP,
default_params,
enforce_params_type,
get_xacro_path,
)
from launch.launch_description import LaunchDescription
from launch.substitutions import Command
T = TypeVar("T")
[docs]
class LevelBuilder:
"""Builds a launcher for the motion stack generating your nodes
Note:
This class is meant to be overloaded and changed for your robot.
Refere to :ref:`launch-api-label`
Args:
robot_name: Name of your robot URDF
leg_dict: Dictionary linking leg number to end effector name.\
This informs the API of the number of legs and nodes to launch.
params_overwrite: Will overwrite the default parameters
Example::
from easy_robot_control.launch.builder import LevelBuilder
ROBOT_NAME = "moonbot_7" # name of the xacro to load
LEGS_DIC = {
1: "end1",
2: "end2",
3: "end3",
4: "end4",
}
lvl_builder = LevelBuilder(robot_name=ROBOT_NAME, leg_dict=LEGS_DIC)
def generate_launch_description():
return lvl_builder.make_description()
"""
def __init__(
self,
robot_name: str,
leg_dict: Mapping[int, Union[str, int]],
params_overwrite: Dict[str, Any] = dict(),
):
""" """
self.name = robot_name
self.xacro_path = self.get_xacro_path()
self.params_overwrite = deepcopy(params_overwrite)
self.ms_package = "easy_robot_control"
self.legs_dict = leg_dict
self.all_param = default_params.copy()
self.generate_global_params()
self.process_CLI_args()
enforce_params_type(self.all_param)
[docs]
def make_description(
self, levels: Optional[List[List[Node]]] = None
) -> LaunchDescription:
"""Return the launch description for ros2
Example::
def generate_launch_description():
return lvl_builder.make_description()
Args:
levels:
list of levels, levels being a list of nodes to be launched
Returns:
launch description to launch all the nodes
"""
if levels is None:
levels = self.make_levels()
return LaunchDescription(
[x for xs in levels for x in xs], # flattens the list
)
[docs]
def process_CLI_args(self):
self.down_from: int = int(get_cli_argument("MS_down_from_level", 1))
self.up_to: int = int(get_cli_argument("MS_up_to_level", 4))
# True by default, become false only if a keyword in this list is used
self.USE_SIMU: bool = get_cli_argument("MS_simu_mode", "True").lower() not in [
"false",
"n",
"no",
"0",
]
self.remaplvl1 = []
if self.USE_SIMU:
self.remaplvl1 += RVIZ_SIMU_REMAP
[docs]
def lvl_to_launch(self) -> List[int]:
"""
Returns:
List of int corresponding to the motion stack levels to start
"""
return list(range(self.down_from, self.up_to + 1))
[docs]
def get_xacro_path(self) -> str:
"""
Returns:
Path to the robot's xacro file
"""
p = get_xacro_path(self.name)
return p
[docs]
def generate_global_params(self):
"""Generates parameters shared by all nodes.
Based on the default params and overwrites.
.. Note:
stores it in self.all_param
"""
self.all_param = default_params
overwrite_default = {
"robot_name": self.name,
"urdf_path": self.xacro_path,
# will be overwritten later VVV
"number_of_legs": len([i for i in self.legs_dict.keys()]),
"leg_list": [i for i in self.legs_dict.keys()],
}
self.all_param.update(overwrite_default)
self.all_param.update(self.params_overwrite)
[docs]
def make_leg_param(self, leg_index: int, ee_name: Union[None, str, int]) -> Dict:
"""Based on the leg index/number (and end-effector), returns the parameters corresponding to the leg
Args:
leg_index: leg number to create the params for
ee_name: (Optional) end effector name or number
Raises:
Exception:
Exception(f"{leg_index} not in self.legs_dic")
Returns:
Dictionary of ROS2 parameters
"""
if ee_name is None:
ee_name = self.legs_dict.get(leg_index)
if ee_name is None:
raise Exception(f"{leg_index} not in self.legs_dic")
leg_param: Dict[str, Any] = deepcopy(self.all_param)
leg_param["leg_number"] = leg_index
leg_param["end_effector_name"] = str(ee_name)
is_very_first_leg = leg_index == list(self.legs_dict.keys())[0]
if not is_very_first_leg:
# only one leg should set and handle the body coords
# otherwise several legs will fight
# behavior to be updated and deleted
leg_param["start_coord"] = [np.nan, np.nan, np.nan]
if not self.USE_SIMU:
# if we don't use Rviz this coordinate publishing is useless.
# It is only for visuals
leg_param["start_coord"] = [np.nan, np.nan, np.nan]
return leg_param
[docs]
def lvl1_params(self) -> List[Dict]:
"""Returns parameters for all lvl1 nodes to be launched
Returns:
List of ros2 parameter dictionary, one per node.
"""
all_params = [self.make_leg_param(k, v) for k, v in self.legs_dict.items()]
for param in all_params:
param["services_to_wait"] = ["rviz_interface_alive"]
return all_params
[docs]
def lvl2_params(self) -> List[Dict]:
"""Returns parameters for all lvl2 nodes to be launched
Returns:
List of ros2 parameter dictionary, one per node.
"""
all_params = self.lvl1_params()
for param in all_params:
param["services_to_wait"] = ["joint_alive"]
return all_params
[docs]
def lvl3_params(self) -> List[Dict]:
"""Returns parameters for all lvl3 nodes to be launched
Returns:
List of ros2 parameter dictionary, one per node.
"""
all_params = self.lvl2_params()
for param in all_params:
param["services_to_wait"] = ["ik_alive"]
return all_params
[docs]
def lvl4_params(self) -> List[Dict]:
"""Returns parameters for all lvl4 nodes to be launched
Returns:
List of ros2 parameter dictionary, one per node.
"""
all_params = [deepcopy(self.all_param)]
lvl3 = self.lvl3_params()
all_leg_ind = [n["leg_number"] for n in lvl3]
for param in all_params:
param["leg_list"] = all_leg_ind
param["number_of_legs"] = len(all_leg_ind)
param["services_to_wait"] = [f"leg{n}/leg_alive" for n in param["leg_list"]]
return all_params
[docs]
def lvl5_params(self) -> List[Dict]:
"""Returns parameters for all lvl5 nodes to be launched
Returns:
List of ros2 parameter dictionary, one per node.
"""
all_params = self.lvl4_params()
for param in all_params:
param["services_to_wait"] = ["mover_alive"]
return all_params
[docs]
def state_publisher_lvl1(self) -> List[Node]:
""".. _builder-state-label:
Prepares all nodes meant to publish the state of the robot to external tools.
Along each lvl1 node, it creates:
- One (customized) joint_state_publisher continuously publishing joint angles
- One robot_state_publisher continuously publishing robot TF and description.
Returns:
List of Nodes to be launched (empty if lvl1 is not to be launched)
"""
if 1 not in self.lvl_to_launch():
return []
compiled_xacro = Command([f"xacro ", self.xacro_path])
node_list = []
for param in self.lvl1_params():
ns = f"leg{param['leg_number']}"
node_list.append(
Node(
package=self.ms_package,
executable="joint_state_publisher",
name="joint_state_publisher",
namespace=ns,
arguments=["--ros-args", "--log-level", "warn"],
parameters=[
{
"source_list": ["joint_read"],
"publish_default_positions": True,
}
],
remappings=[
# (intside node, outside node),
("joint_states", "continuous_joint_read"),
],
),
)
node_list.append(
Node(
package="robot_state_publisher",
executable="robot_state_publisher",
name="robot_state_publisher",
namespace=ns,
arguments=["--ros-args", "--log-level", "warn"],
parameters=[
{
"robot_description": ParameterValue(
compiled_xacro, value_type=str
),
}
],
remappings=[
# (intside node, outside node),
("joint_states", "continuous_joint_read"),
],
),
)
return node_list
[docs]
def get_node_lvl1(self, params: Dict[str, Any]) -> Node:
ns = f"leg{params['leg_number']}"
return Node(
package=self.ms_package,
namespace=ns,
executable="joint_node",
name=f"joint_node",
arguments=["--ros-args", "--log-level", "info"],
emulate_tty=True,
output="screen",
parameters=[params],
remappings=self.remaplvl1,
)
[docs]
def get_node_lvl2(self, params: Dict[str, Any]) -> Node:
ns = f"leg{params['leg_number']}"
return Node(
package=self.ms_package,
namespace=ns,
executable="ik_heavy_node",
name=f"ik",
arguments=["--ros-args", "--log-level", "info"],
emulate_tty=True,
output="screen",
parameters=[params],
remappings=[],
)
[docs]
def get_node_lvl3(self, params: Dict[str, Any]) -> Node:
ns = f"leg{params['leg_number']}"
return Node(
package=self.ms_package,
namespace=ns,
executable="leg_node",
name=f"leg",
arguments=["--ros-args", "--log-level", "info"],
emulate_tty=True,
output="screen",
parameters=[params],
remappings=[],
)
[docs]
def get_node_lvl4(self, params: Dict[str, Any]) -> Node:
return Node(
package=self.ms_package,
executable="mover_node",
name=f"mover",
arguments=["--ros-args", "--log-level", "info"],
emulate_tty=True,
output="screen",
parameters=[params],
remappings=[],
)
[docs]
def get_node_lvl5(self, params: Dict[str, Any]) -> Node:
return Node(
package=self.ms_package,
executable="gait_node",
name=f"gait",
arguments=["--ros-args", "--log-level", "info"],
emulate_tty=True,
output="screen",
parameters=[params],
remappings=[],
)
[docs]
def lvl1(self) -> List[Node]:
if 1 not in self.lvl_to_launch():
return []
node_list = []
for param in self.lvl1_params():
node_list.append(self.get_node_lvl1(param))
return node_list
[docs]
def lvl2(self) -> List[Node]:
if 2 not in self.lvl_to_launch():
return []
node_list = []
for param in self.lvl2_params():
node_list.append(self.get_node_lvl2(param))
return node_list
[docs]
def lvl3(self) -> List[Node]:
if 3 not in self.lvl_to_launch():
return []
node_list = []
for param in self.lvl3_params():
node_list.append(self.get_node_lvl3(param))
return node_list
[docs]
def lvl4(self) -> List[Node]:
if 4 not in self.lvl_to_launch():
return []
node_list = []
for param in self.lvl4_params():
node_list.append(self.get_node_lvl4(param))
return node_list
[docs]
def lvl5(self) -> List[Node]:
if 5 not in self.lvl_to_launch():
return []
node_list = []
for param in self.lvl5_params():
node_list.append(self.get_node_lvl5(param))
return node_list
[docs]
def make_levels(self) -> List[List[Node]]:
return [
self.lvl1() + self.state_publisher_lvl1(),
self.lvl2(),
self.lvl3(),
self.lvl4(),
self.lvl5(),
]
[docs]
def get_cli_argument(arg_name: str, default: T) -> Union[T, str]:
"""Returns the CLI argument as a string, or default is none inputed.
Can be much better optimised, I don't care.
"""
for arg in sys.argv:
if f"{arg_name}:=" in arg:
return arg.split(":=")[1]
return default