Source code for easy_robot_control.lazy_joint_state_publisher

"""Overloading the joint_state_publisher package 
so it does not publish joint states that are not actively published

Lots of black magic being used"""

import joint_state_publisher.joint_state_publisher as jsp_lib
import sensor_msgs.msg
from joint_state_publisher.joint_state_publisher import JointStatePublisher as Jsp


[docs] class dummy_pub:
[docs] def publish(self, msg): return
[docs] class LazyJointStatePublisher(Jsp): def __init__(self, description_file): super().__init__(description_file) self.active_joints = set() # replaces self.pub.publish by by a call that performs # self.delete_inactive_from_msg before publishing self.pure_pub = self.pub self.pub = dummy_pub() self.pub.publish = lambda msg: self.pure_pub.publish( self.delete_inactive_from_msg(msg) )
[docs] def source_cb(self, msg): # adds the message's names to the set of active joints self.active_joints = self.active_joints | set(msg.name) super().source_cb(msg)
[docs] def delete_inactive_from_msg( self, msg: sensor_msgs.msg.JointState ) -> sensor_msgs.msg.JointState: """Deletes joints that are not part of self.active_joints from a message""" new = sensor_msgs.msg.JointState() new.header = msg.header indexes = [ index for index, name in enumerate(msg.name) if name in self.active_joints ] new.name = [msg.name[i] for i in indexes] if msg.position: new.position = [msg.position[i] for i in indexes] if msg.velocity: new.velocity = [msg.velocity[i] for i in indexes] if msg.effort: new.effort = [msg.effort[i] for i in indexes] return new
[docs] def main(): jsp_lib.JointStatePublisher = LazyJointStatePublisher jsp_lib.main()
if __name__ == "__main__": main()