motion_stack.ros2.utils package
Submodules
motion_stack.ros2.utils.conversion module
- motion_stack.ros2.utils.conversion.ros_to_time(time)[source]
- Return type:
- Parameters:
time (Time | Duration)
- motion_stack.ros2.utils.conversion.time_to_ros(time)[source]
- Return type:
Time
- Parameters:
time (Time)
motion_stack.ros2.utils.executor module
- motion_stack.ros2.utils.executor.error_catcher(func)[source]
This is a wrapper to catch and display exceptions.
Note
This only needs to be used on functions executed in callbacks. It is not necessary everywhere.
Example
@error_catcher def foo(..): ...
- Parameters:
func (Callable) – Function executed by a callback
- Returns:
warpped function
- motion_stack.ros2.utils.executor.future_list_complete(future_list)[source]
Returns True is all futures in the input list are done.
- Parameters:
future_list (List[Future] | Future) – a list of futures
- Returns:
True if all futures are done
- Return type:
bool
- class motion_stack.ros2.utils.executor.EZRate(parent, frequency, clock=None)[source]
Bases:
object
- Parameters:
parent (Node)
frequency (float)
clock (Clock | None)
- class motion_stack.ros2.utils.executor.Ros2Spinner(node)[source]
Bases:
Spinner
- Parameters:
node (Node)
- get_parameter(name, value_type, default=None)[source]
- Return type:
Any
- Parameters:
name (str)
value_type (type)
- wait_for_lower_level(more_services={}, all_requiered=False)[source]
Blocks until all or any service is available.
Note
List of waited services is given by services_to_wait ros2 param
Overload this to wait for an additional service
- Parameters:
more_services (Iterable[str]) – add more services
all_requiered (bool) – if True, all listed services must be available
- sleep(seconds)[source]
sleeps using the node’s clock.
Note
Special case for good old foxy
- Parameters:
seconds (float) – time to sleep
- Return type:
None
- wait_on_futures(future_list, wait_Hz=10)[source]
Waits for the completion of a list of futures, checking completion at the provided rate.
- Parameters:
future_list (List[Future] | Future) – List of Future to wait for
wait_Hz (float) – rate at which to wait
- error(*args, force=False)[source]
Prints/Logs error if Yapping==True (default) or force==True.
- Parameters:
object – Thing to print
bool (force -) – if True the message will print whatever if self.Yapping is.
force (bool)
- warn(*args, force=False)[source]
Prints/Logs warning if Yapping==True (default) or force==True.
- Parameters:
object – Thing to print
bool (force -) – if True the message will print whatever if self.Yapping is.
force (bool)
- info(*args, force=False)[source]
Prints/Logs info if Yapping==True (default) or force==True.
- Parameters:
object – Thing to print
force (bool) – if True the message will print whatever if self.Yapping is.
- debug(*args, force=False)[source]
Prints/Logs info if Yapping==True (default) or force==True.
- Parameters:
object – Thing to print
force (bool) – if True the message will print whatever if self.Yapping is.
- resolve_service_name(service, *, only_expand=False)[source]
Return a service name expanded and remapped.
Note
Overloaded to handle missing foxy
- Parameters:
service (
str
) – service name to be expanded and remapped.only_expand (
bool
) – if True, remapping rules won’t be applied.
- Return type:
str
- Returns:
a fully qualified service name, result of applying expansion and remapping to the given service.
- setAndBlockForNecessaryNodes(necessary_node_names, silent_trial=3, intervalSec=0.5)[source]
Blocks for nodes to be alive
- Parameters:
necessary_node_names (List[str] | str)
silent_trial (int | None)
intervalSec (float | None)
- get_and_wait_Client(service_name, service_type, cbk_grp=None)[source]
Return the client to the corresponding service, wait for it ot be available.
- Parameters:
str (service_name -)
service_type (service_type - Ros2)
cbk_grp (CallbackGroup | None) – Not important I think but it’s there
service_name (str)
- Return type:
Client
Returns:
- Return type:
Client
- Parameters:
service_name (str)
cbk_grp (CallbackGroup | None)
- create_EZrate(frequency, clock=None)[source]
Creates a better rate where rate.destroy actually destroys the rate
- Parameters:
frequency (float) – frequency of the rate
clock (Clock | None) – clock to use
- Returns:
EZRate manipulating a Rate object
- Return type:
- execute_in_cbk_group(fun, callback_group=None)[source]
Executes the given function by adding it as a callback to a callback_group.
Note
Pretty sure that’s not how it should be done.
- Parameters:
fun (Callable) – function to execute
callback_group (CallbackGroup | None) – callback group in which to execute the function
- Returns:
holds the future results quardian: the guard condition object in the callback_group
- Return type:
future
- Return type:
Tuple
[Future
,GuardCondition
]
motion_stack.ros2.utils.files module
- motion_stack.ros2.utils.files.get_src_folder(package_name)[source]
Absolute path to workspace/src/package_name.
Note
Meant for debugging. Avoid using this, you should build properly.
- Parameters:
package_name (str) – workspace/src/package_name
- Return type:
str
Returns: Absolute path as str
- Return type:
str
- Parameters:
package_name (str)
motion_stack.ros2.utils.joint_state module
- motion_stack.ros2.utils.joint_state.link_subscription(node, topic_name, callback)[source]
subscribes to a JointState topic, converts the message then calls the callback.
- Parameters:
node (Node) – node spinning
topic_name (str) – name of the JointState topic
callback (Callable[[List[JState]], Any]) – callback using not JointState but List[JState] and input
- motion_stack.ros2.utils.joint_state.ros2js_wrap(func)[source]
- Parameters:
callback – function with List[JState] as the input
func (Callable[[List[JState]], Any])
- Returns:
function with JointState as the input
- Return type:
Callable
[[JointState
],None
]
- motion_stack.ros2.utils.joint_state.ros2js(jsin)[source]
Converts JointState to a List[JState]
- Return type:
List
[JState
]- Parameters:
jsin (JointState)
- class motion_stack.ros2.utils.joint_state.JSCallableWrapper(original_callable)[source]
Bases:
object
- Parameters:
original_callable (Callable[[JointState], None])
- motion_stack.ros2.utils.joint_state.publish_jstate(publisher, states)[source]
Publishes a List[JState] as several JointState messages.
- Parameters:
publisher (Publisher) – ros2 publisher to use
states (List[JState]) – states to send
- motion_stack.ros2.utils.joint_state.callable_js_publisher(node, topic_name, **kwargs)[source]
Creates a function publishing a JState on ROS2.
You can then call the function directly with a List[JState] when you wanna send something.
- Parameters:
node (Node) – node handling the publisher
topic_name (str) – publisher name
**kwargs – kwargs for node.create_publisher
- Returns:
A function, converting List[JState] to (several) JointState, then publishing
- Return type:
Callable
[[List
[JState
]],Any
]
motion_stack.ros2.utils.lazy_joint_state_publisher module
Overloading the joint_state_publisher package so it does not publish joint states that are not actively published
Lots of black magic being used