Skip to content

Commit

Permalink
Revamp Callback Groups (#92)
Browse files Browse the repository at this point in the history
* Put MoveIt2 objects into their own ReentrantCallbackGroup

* Revert create_action_servers.py default callback group to MutuallyExclusive

* Updated watchdog callback groups

* Updated face detection callback groups

* Updated food detection callback groups

* Updated republisher callback groups
  • Loading branch information
amalnanavati authored Sep 18, 2023
1 parent b384040 commit 4cc3acc
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 16 deletions.
5 changes: 5 additions & 0 deletions ada_feeding/ada_feeding/behaviors/move_to.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import py_trees
from pymoveit2 import MoveIt2, MoveIt2State
from pymoveit2.robots import kinova
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.node import Node
from rclpy.qos import QoSPresetProfiles
from sensor_msgs.msg import JointState
Expand Down Expand Up @@ -120,12 +121,16 @@ def __init__(
# Create MoveIt 2 interface for moving the Jaco arm. This must be done
# in __init__ and not setup since the MoveIt2 interface must be
# initialized before the ROS2 node starts spinning.
# Using ReentrantCallbackGroup to align with the examples from pymoveit2.
# TODO: Assess whether ReentrantCallbackGroup is necessary for MoveIt2.
callback_group = ReentrantCallbackGroup()
self.moveit2 = MoveIt2(
node=self.node,
joint_names=kinova.joint_names(),
base_link_name=kinova.base_link_name(),
end_effector_name="forkTip",
group_name=kinova.MOVE_GROUP_ARM,
callback_group=callback_group,
)

# Subscribe to the joint state and track the distance to goal while the
Expand Down
5 changes: 5 additions & 0 deletions ada_feeding/ada_feeding/behaviors/toggle_collision_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import py_trees
from pymoveit2 import MoveIt2
from pymoveit2.robots import kinova
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.node import Node

# Local imports
Expand Down Expand Up @@ -55,12 +56,16 @@ def __init__(
# Create MoveIt 2 interface for moving the Jaco arm. This must be done
# in __init__ and not setup since the MoveIt2 interface must be
# initialized before the ROS2 node starts spinning.
# Using ReentrantCallbackGroup to align with the examples from pymoveit2.
# TODO: Assess whether ReentrantCallbackGroup is necessary for MoveIt2.
callback_group = ReentrantCallbackGroup()
self.moveit2 = MoveIt2(
node=self.node,
joint_names=kinova.joint_names(),
base_link_name=kinova.base_link_name(),
end_effector_name="forkTip",
group_name=kinova.MOVE_GROUP_ARM,
callback_group=callback_group,
)

def update(self) -> py_trees.common.Status:
Expand Down
4 changes: 4 additions & 0 deletions ada_feeding/ada_feeding/watchdog/ft_sensor_condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,15 @@ def __init__(self, node: Node) -> None:

# Subscribe to the FT sensor topic
self.ft_sensor_topic = self._node.resolve_topic_name("~/ft_topic")
self.ft_sensor_callback_group = (
rclpy.callback_groups.MutuallyExclusiveCallbackGroup()
)
self.ft_sensor_subscription = self._node.create_subscription(
WrenchStamped,
self.ft_sensor_topic,
self.__ft_sensor_callback,
rclpy.qos.QoSPresetProfiles.SENSOR_DATA.value,
callback_group=self.ft_sensor_callback_group,
)

def __ft_sensor_callback(self, msg: WrenchStamped) -> None:
Expand Down
2 changes: 2 additions & 0 deletions ada_feeding/scripts/ada_planning_scene.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def __init__(self) -> None:
self.load_parameters()

# Initialize the MoveIt2 interface
# Using ReentrantCallbackGroup to align with the examples from pymoveit2.
# TODO: Assess whether ReentrantCallbackGroup is necessary for MoveIt2.
callback_group = ReentrantCallbackGroup()
self.moveit2 = MoveIt2(
node=self,
Expand Down
12 changes: 10 additions & 2 deletions ada_feeding/scripts/ada_watchdog.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,16 @@ def __init__(self) -> None:
)

# Publish at the specified rate
# TODO: Consider making this a separate thread that runs at a fixed rate,
# to avoid the callback queue getting backed up if the function takes
# longer than the specified rate to run.
self.timer_callback_group = (
rclpy.callback_groups.MutuallyExclusiveCallbackGroup()
)
timer_period = 1.0 / self.publish_rate_hz.value # seconds
self.timer = self.create_timer(timer_period, self.__check_conditions)
self.timer = self.create_timer(
timer_period, self.__check_conditions, self.timer_callback_group
)

def __load_parameters(self) -> None:
"""
Expand Down Expand Up @@ -199,7 +207,7 @@ def main(args=None):
rclpy.init(args=args)

# Use a MultiThreadedExecutor to enable processing topics concurrently
executor = MultiThreadedExecutor()
executor = MultiThreadedExecutor(num_threads=3)

# Create and spin the node
ada_watchdog = ADAWatchdog()
Expand Down
2 changes: 0 additions & 2 deletions ada_feeding/scripts/create_action_servers.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ def __init__(self) -> None:
"""
super().__init__("create_action_servers")

self._default_callback_group = rclpy.callback_groups.ReentrantCallbackGroup()

# Read the parameters that specify what action servers to create.
action_server_params = self.read_params()

Expand Down
29 changes: 21 additions & 8 deletions ada_feeding_perception/ada_feeding_perception/face_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from geometry_msgs.msg import Point
import numpy as np
import rclpy
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from rclpy.node import Node
from rclpy.parameter import Parameter
from rclpy.executors import MultiThreadedExecutor
Expand Down Expand Up @@ -55,8 +56,6 @@ def __init__(
"""
super().__init__("face_detection")

self._default_callback_group = rclpy.callback_groups.ReentrantCallbackGroup()

# Read the parameters
# NOTE: These parameters are only read once. Any changes after the node
# is initialized will not be reflected.
Expand Down Expand Up @@ -108,6 +107,7 @@ def __init__(
SetBool,
"~/toggle_face_detection",
self.toggle_face_detection_callback,
callback_group=MutuallyExclusiveCallbackGroup(),
)

# Create the publishers
Expand All @@ -131,7 +131,11 @@ def __init__(
self.latest_img_msg_lock = threading.Lock()
image_topic = "~/image"
self.img_subscription = self.create_subscription(
get_img_msg_type(image_topic, self), image_topic, self.camera_callback, 1
get_img_msg_type(image_topic, self),
image_topic,
self.camera_callback,
1,
callback_group=MutuallyExclusiveCallbackGroup(),
)

depth_buffer_size = depth_buffer_size.value
Expand All @@ -144,6 +148,7 @@ def __init__(
aligned_depth_topic,
self.depth_callback,
1,
callback_group=MutuallyExclusiveCallbackGroup(),
)

# Subscribe to the camera info
Expand All @@ -152,7 +157,11 @@ def __init__(
# (updated with subscription)
self.camera_matrix = [614, 0, 312, 0, 614, 223, 0, 0, 1]
self.camera_info_subscription = self.create_subscription(
CameraInfo, "~/camera_info", self.camera_info_callback, 1
CameraInfo,
"~/camera_info",
self.camera_info_callback,
1,
callback_group=MutuallyExclusiveCallbackGroup(),
)

def read_params(
Expand Down Expand Up @@ -504,7 +513,9 @@ def run(self) -> None:
with self.latest_img_msg_lock:
rgb_msg = self.latest_img_msg
if rgb_msg is None:
self.get_logger().warn("No RGB image message received.", throttle_duration_sec=1)
self.get_logger().warn(
"No RGB image message received.", throttle_duration_sec=1
)
continue

# Detect the largest face in the RGB image
Expand All @@ -522,8 +533,10 @@ def run(self) -> None:
if is_face_detected_depth:
# Get the 3d location of the mouth
face_detection_msg.detected_mouth_center.header = rgb_msg.header
face_detection_msg.detected_mouth_center.point = self.get_stomion_point(
img_mouth_center[0], img_mouth_center[1], depth_mm
face_detection_msg.detected_mouth_center.point = (
self.get_stomion_point(
img_mouth_center[0], img_mouth_center[1], depth_mm
)
)
else:
is_face_detected = False
Expand All @@ -540,7 +553,7 @@ def main(args=None):
rclpy.init(args=args)

face_detection = FaceDetectionNode()
executor = MultiThreadedExecutor()
executor = MultiThreadedExecutor(num_threads=4)

# Spin in the background since detecting faces will block
# the main thread
Expand Down
4 changes: 3 additions & 1 deletion ada_feeding_perception/ada_feeding_perception/republisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# Third-party imports
from rcl_interfaces.msg import ParameterDescriptor, ParameterType
import rclpy
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node

Expand Down Expand Up @@ -79,6 +80,7 @@ def __init__(self) -> None:
topic=self.from_topics[i],
callback=callback,
qos_profile=1, # TODO: we should get and mirror the QOS profile of the from_topic
callback_group=MutuallyExclusiveCallbackGroup(),
)
self.subs.append(subscriber)

Expand Down Expand Up @@ -183,7 +185,7 @@ def main(args=None):
republisher = Republisher()

# Use a MultiThreadedExecutor to enable processing goals concurrently
executor = MultiThreadedExecutor()
executor = MultiThreadedExecutor(num_threads=len(republisher.subs))

rclpy.spin(republisher, executor=executor)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import rclpy
from rclpy.action import ActionServer, CancelResponse, GoalResponse
from rclpy.action.server import ServerGoalHandle
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node
from rclpy.parameter import Parameter
Expand Down Expand Up @@ -58,8 +59,6 @@ def __init__(self) -> None:

super().__init__("segment_from_point")

self._default_callback_group = rclpy.callback_groups.ReentrantCallbackGroup()

# Check if cuda is available
self.device = "cuda" if torch.cuda.is_available() else "cpu"

Expand Down Expand Up @@ -91,6 +90,7 @@ def __init__(self) -> None:
"~/camera_info",
self.camera_info_callback,
1,
callback_group=MutuallyExclusiveCallbackGroup(),
)
self.camera_info = None
self.camera_info_lock = threading.Lock()
Expand All @@ -106,6 +106,7 @@ def __init__(self) -> None:
aligned_depth_topic,
self.depth_image_callback,
1,
callback_group=MutuallyExclusiveCallbackGroup(),
)
self.latest_depth_img_msg = None
self.latest_depth_img_msg_lock = threading.Lock()
Expand All @@ -117,6 +118,7 @@ def __init__(self) -> None:
image_topic,
self.image_callback,
1,
callback_group=MutuallyExclusiveCallbackGroup(),
)
self.latest_img_msg = None
self.latest_img_msg_lock = threading.Lock()
Expand Down Expand Up @@ -244,6 +246,7 @@ def initialize_food_segmentation(self, model_name: str, model_path: str) -> None
self.execute_callback,
goal_callback=self.goal_callback,
cancel_callback=self.cancel_callback,
callback_group=MutuallyExclusiveCallbackGroup(),
)
self.get_logger().info("...Done!")

Expand Down Expand Up @@ -509,7 +512,7 @@ def main(args=None):
segment_from_point = SegmentFromPointNode()

# Use a MultiThreadedExecutor to enable processing goals concurrently
executor = MultiThreadedExecutor()
executor = MultiThreadedExecutor(num_threads=5)

rclpy.spin(segment_from_point, executor=executor)

Expand Down

0 comments on commit 4cc3acc

Please sign in to comment.