motion_stack.ros2.utils package

Submodules

motion_stack.ros2.utils.conversion module

motion_stack.ros2.utils.conversion.ros_to_time(time)[source]
Return type:

Time

Parameters:

time (Time | Duration)

motion_stack.ros2.utils.conversion.ros_now(node)[source]
Return type:

Time

Parameters:

node (Node)

motion_stack.ros2.utils.conversion.time_to_ros(time)[source]
Return type:

Time

Parameters:

time (Time)

motion_stack.ros2.utils.conversion.transform_to_pose(tf, time)[source]
Return type:

Pose

Parameters:
  • tf (Transform)

  • time (Time)

motion_stack.ros2.utils.conversion.pose_to_transform(pose, sendNone=False)[source]
Return type:

Transform

Parameters:
  • pose (Pose)

  • sendNone (bool)

motion_stack.ros2.utils.executor module

motion_stack.ros2.utils.executor.error_catcher(func)[source]

This is a wrapper to catch and display exceptions.

Note

This only needs to be used on functions executed in callbacks. It is not necessary everywhere.

Example

@error_catcher
def foo(..):
    ...
Parameters:

func (Callable) – Function executed by a callback

Returns:

warpped function

motion_stack.ros2.utils.executor.future_list_complete(future_list)[source]

Returns True is all futures in the input list are done.

Parameters:

future_list (List[Future] | Future) – a list of futures

Returns:

True if all futures are done

Return type:

bool

class motion_stack.ros2.utils.executor.EZRate(parent, frequency, clock=None)[source]

Bases: object

Parameters:
  • parent (Node)

  • frequency (float)

  • clock (Clock | None)

sleep()[source]

sleeps (blocking) until next tick

Return type:

None

destroy()[source]

Destroys the object

Return type:

None

is_ready()[source]
Return type:

bool

class motion_stack.ros2.utils.executor.Ros2Spinner(node)[source]

Bases: Spinner

Parameters:

node (Node)

get_parameter(name, value_type, default=None)[source]
Return type:

Any

Parameters:
  • name (str)

  • value_type (type)

wait_for_lower_level(more_services={}, all_requiered=False)[source]

Blocks until all or any service is available.

Note

  • List of waited services is given by services_to_wait ros2 param

  • Overload this to wait for an additional service

Parameters:
  • more_services (Iterable[str]) – add more services

  • all_requiered (bool) – if True, all listed services must be available

now()[source]

quick: self.get_clock().now()

Return type:

Time

sleep(seconds)[source]

sleeps using the node’s clock.

Note

Special case for good old foxy

Parameters:

seconds (float) – time to sleep

Return type:

None

wait_on_futures(future_list, wait_Hz=10)[source]

Waits for the completion of a list of futures, checking completion at the provided rate.

Parameters:
  • future_list (List[Future] | Future) – List of Future to wait for

  • wait_Hz (float) – rate at which to wait

error(*args, force=False)[source]

Prints/Logs error if Yapping==True (default) or force==True.

Parameters:
  • object – Thing to print

  • bool (force -) – if True the message will print whatever if self.Yapping is.

  • force (bool)

warn(*args, force=False)[source]

Prints/Logs warning if Yapping==True (default) or force==True.

Parameters:
  • object – Thing to print

  • bool (force -) – if True the message will print whatever if self.Yapping is.

  • force (bool)

info(*args, force=False)[source]

Prints/Logs info if Yapping==True (default) or force==True.

Parameters:
  • object – Thing to print

  • force (bool) – if True the message will print whatever if self.Yapping is.

debug(*args, force=False)[source]

Prints/Logs info if Yapping==True (default) or force==True.

Parameters:
  • object – Thing to print

  • force (bool) – if True the message will print whatever if self.Yapping is.

resolve_service_name(service, *, only_expand=False)[source]

Return a service name expanded and remapped.

Note

Overloaded to handle missing foxy

Parameters:
  • service (str) – service name to be expanded and remapped.

  • only_expand (bool) – if True, remapping rules won’t be applied.

Return type:

str

Returns:

a fully qualified service name, result of applying expansion and remapping to the given service.

setAndBlockForNecessaryNodes(necessary_node_names, silent_trial=3, intervalSec=0.5)[source]

Blocks for nodes to be alive

Parameters:
  • necessary_node_names (List[str] | str)

  • silent_trial (int | None)

  • intervalSec (float | None)

get_and_wait_Client(service_name, service_type, cbk_grp=None)[source]

Return the client to the corresponding service, wait for it ot be available.

Parameters:
  • str (service_name -)

  • service_type (service_type - Ros2)

  • cbk_grp (CallbackGroup | None) – Not important I think but it’s there

  • service_name (str)

Return type:

Client

Returns:

Return type:

Client

Parameters:
  • service_name (str)

  • cbk_grp (CallbackGroup | None)

create_EZrate(frequency, clock=None)[source]

Creates a better rate where rate.destroy actually destroys the rate

Parameters:
  • frequency (float) – frequency of the rate

  • clock (Clock | None) – clock to use

Returns:

EZRate manipulating a Rate object

Return type:

EZRate

execute_in_cbk_group(fun, callback_group=None)[source]

Executes the given function by adding it as a callback to a callback_group.

Note

Pretty sure that’s not how it should be done.

Parameters:
  • fun (Callable) – function to execute

  • callback_group (CallbackGroup | None) – callback group in which to execute the function

Returns:

holds the future results quardian: the guard condition object in the callback_group

Return type:

future

Return type:

Tuple[Future, GuardCondition]

motion_stack.ros2.utils.executor.my_main(node, multi_threaded=False)[source]

Main function used through the motion stack.

Parameters:
  • nodeClass – Node to spin

  • multiThreaded – using multithreaded executor or not

  • () (args)

  • multi_threaded (bool)

motion_stack.ros2.utils.files module

motion_stack.ros2.utils.files.get_src_folder(package_name)[source]

Absolute path to workspace/src/package_name.

Note

Meant for debugging. Avoid using this, you should build properly.

Parameters:

package_name (str) – workspace/src/package_name

Return type:

str

Returns: Absolute path as str

Return type:

str

Parameters:

package_name (str)

motion_stack.ros2.utils.joint_state module

subscribes to a JointState topic, converts the message then calls the callback.

Parameters:
  • node (Node) – node spinning

  • topic_name (str) – name of the JointState topic

  • callback (Callable[[List[JState]], Any]) – callback using not JointState but List[JState] and input

motion_stack.ros2.utils.joint_state.ros2js_wrap(func)[source]
Parameters:
  • callback – function with List[JState] as the input

  • func (Callable[[List[JState]], Any])

Returns:

function with JointState as the input

Return type:

Callable[[JointState], None]

motion_stack.ros2.utils.joint_state.ros2js(jsin)[source]

Converts JointState to a List[JState]

Return type:

List[JState]

Parameters:

jsin (JointState)

class motion_stack.ros2.utils.joint_state.JSCallableWrapper(original_callable)[source]

Bases: object

Parameters:

original_callable (Callable[[JointState], None])

motion_stack.ros2.utils.joint_state.publish_jstate(publisher, states)[source]

Publishes a List[JState] as several JointState messages.

Parameters:
  • publisher (Publisher) – ros2 publisher to use

  • states (List[JState]) – states to send

motion_stack.ros2.utils.joint_state.callable_js_publisher(node, topic_name, **kwargs)[source]

Creates a function publishing a JState on ROS2.

You can then call the function directly with a List[JState] when you wanna send something.

Parameters:
  • node (Node) – node handling the publisher

  • topic_name (str) – publisher name

  • **kwargs – kwargs for node.create_publisher

Returns:

A function, converting List[JState] to (several) JointState, then publishing

Return type:

Callable[[List[JState]], Any]

motion_stack.ros2.utils.joint_state.stateOrderinator3000(allStates)[source]

Converts a list of JState to multiple ros JointStates messages. Timestamp ignored.

Return type:

List[JointState]

Parameters:

allStates (Iterable[JState])

motion_stack.ros2.utils.lazy_joint_state_publisher module

Overloading the joint_state_publisher package so it does not publish joint states that are not actively published

Lots of black magic being used

class motion_stack.ros2.utils.lazy_joint_state_publisher.dummy_pub[source]

Bases: object

publish(msg)[source]
class motion_stack.ros2.utils.lazy_joint_state_publisher.LazyJointStatePublisher(description_file)[source]

Bases: JointStatePublisher

source_cb(msg)[source]
delete_inactive_from_msg(msg)[source]

Deletes joints that are not part of self.active_joints from a message

Return type:

JointState

Parameters:

msg (JointState)

motion_stack.ros2.utils.lazy_joint_state_publisher.main()[source]

motion_stack.ros2.utils.linking module

class motion_stack.ros2.utils.linking.CallablePublisher(node, topic_type, topic_name, qos=10, *args, **kwargs)[source]

Bases: object

Parameters:
  • node (Node)

  • topic_type (type)

  • topic_name (str)

  • qos (int)

Creates a callback to be execute on the node’s startup given arguments.

Parameters:
  • node (Node) – spinning node

  • lvl1 – lvl1 core

  • startup_callback (Callable)

  • argument (Any)

Return type:

Future

motion_stack.ros2.utils.task module