From f997cdeedf781ade325804192fffd99205fbf41d Mon Sep 17 00:00:00 2001 From: Richardvdketterij Date: Mon, 12 Feb 2024 14:35:04 +0100 Subject: [PATCH 1/5] Implement canceling of goals, fix the execution and preemption timeout timers, make 'outcomes' a param, add extra intermediate internal state and use state enum instead of global var, add function to wait for cancelation request response --- smach_ros/smach_ros/simple_action_state.py | 307 ++++++++++++++------- 1 file changed, 202 insertions(+), 105 deletions(-) diff --git a/smach_ros/smach_ros/simple_action_state.py b/smach_ros/smach_ros/simple_action_state.py index 67ada37..3c97009 100644 --- a/smach_ros/smach_ros/simple_action_state.py +++ b/smach_ros/smach_ros/simple_action_state.py @@ -4,27 +4,34 @@ from rclpy.duration import Duration import rclpy.action from rclpy.action.client import GoalStatus +from rclpy.task import Future +from enum import Enum import threading import traceback +import time + import smach from .ros_state import RosState __all__ = ['SimpleActionState'] -class SimpleActionState(RosState): - """Simple action client state. - - Use this class to represent an actionlib as a state in a state machine. - """ +class ActionState(Enum): # Meta-states for this action WAITING_FOR_SERVER = 0 INACTIVE = 1 ACTIVE = 2 - PREEMPTING = 3 - COMPLETED = 4 + CANCELING = 3 + CANCELED = 4 + COMPLETED = 5 + + +class SimpleActionState(RosState): + """Simple action client state. + Use this class to represent an ActionClient as a state in a state machine. + """ def __init__(self, node, @@ -47,11 +54,11 @@ def __init__(self, # Keys input_keys = [], output_keys = [], - outcomes = [], + outcomes = ['succeeded', 'aborted', 'preempted'], # Timeouts - exec_timeout = None, - preempt_timeout = Duration(seconds=60.0), - server_wait_timeout = Duration(seconds=60.0) + exec_timeout: Duration = None, + preempt_timeout: Duration = Duration(seconds=60.0), + server_wait_timeout: Duration = Duration(seconds=60.0), ): """Constructor for SimpleActionState action client wrapper. @@ -117,7 +124,7 @@ def __init__(self, """ # Initialize base class - RosState.__init__(self, node, outcomes=['succeeded','aborted','preempted']) + RosState.__init__(self, node, outcomes=outcomes) # Set action properties self._action_name = action_name @@ -135,7 +142,7 @@ def __init__(self, raise smach.InvalidStateError("Goal object given to SimpleActionState that IS a function object") sl = action_spec.Goal.get_fields_and_field_types().keys() if not all([s in sl for s in goal_slots]): - raise smach.InvalidStateError("Goal slots specified are not valid slots. Available slots: %s; specified slots: %s" % (sl, goal_slots)) + raise smach.InvalidStateError(f"Goal slots specified are not valid slots. Available slots: {sl}; specified slots: {goal_slots}") if goal_cb and not hasattr(goal_cb, '__call__'): raise smach.InvalidStateError("Goal callback object given to SimpleActionState that IS NOT a function object") @@ -208,12 +215,12 @@ def __init__(self, # Declare some status variables self._activate_time = self.node.get_clock().now() - self._preempt_time = self.node.get_clock().now() self._duration = Duration(seconds=0.0) - self._status = SimpleActionState.WAITING_FOR_SERVER + self._status = ActionState.WAITING_FOR_SERVER - # Construct action client, and wait for it to come active + # Construct action client and goal handle self._action_client = rclpy.action.ActionClient(self.node, action_spec, action_name) + self._client_goal_handle = None self._execution_timer_thread = None # Condition variables for threading synchronization @@ -221,27 +228,64 @@ def __init__(self, def _execution_timer(self): """Internal method for cancelling a timed out goal after a timeout.""" - rate = self.node.create_rate(10) - clock = self.node.get_clock() - while self._status == SimpleActionState.ACTIVE and rclpy.ok(): + self.node.get_logger().debug( + f"Starting execution timer for action '{self._action_name}' " + f"with timeout {self._exec_timeout.nanoseconds / 1e9} seconds." + ) + while self._status == ActionState.ACTIVE and rclpy.ok(): try: - rate.sleep() - except: - if rclpy.ok(): - self.node.get_logger().error("Failed to sleep while running '%s'" % self._action_name) - if clock.now() - self._activate_time > self._exec_timeout: - self.node.get_logger().warn("Action %s timed out after %d seconds." % (self._action_name, self._exec_timeout.to_sec())) - # Cancel the goal - self._action_client.cancel_goal() - - ### smach State API + # Check for preemption while checking for timeout + if self.preempt_requested(): + self.node.get_logger().info( + f"Preempting while waiting for goal (request) result '{self._action_name}'.") + self._status = ActionState.CANCELED + with self._done_cond: + self._done_cond.notify() + if self.node.get_clock().now() - self._activate_time > self._exec_timeout: + self.node.get_logger().warn( + f"Action {self._action_name} timed out after {self._exec_timeout.nanoseconds / 1e9} seconds. " + f"Trying to cancel goal with timeout {self._preempt_timeout.nanoseconds / 1e9}.") + self._status = self.__cancel_goal_and_wait( + timeout=self._preempt_timeout) + with self._done_cond: + self._done_cond.notify() + time.sleep(0.1) + except Exception as e: + self.node.get_logger().error( + f"Caught exception: {str(e)} Failed to sleep while running '{self._action_name}'") + break + + # Smach State API def request_preempt(self): - self.node.get_logger().info("Preempt requested on action '%s'" % (self._action_name)) + """Request preemption of this state and cancel the goal with a timeout.""" + self.node.get_logger().info(f"Preempt requested on action '{self._action_name}'") RosState.request_preempt(self) - if self._status == SimpleActionState.ACTIVE: - self.node.get_logger().info("Preempt on action '%s' cancelling goal: \n%s" % (self._action_name, str(self._goal))) - # Cancel the goal - self._action_client.cancel_goal() + + # If status is active, try to cancel goal with a timeout + if self._status == ActionState.ACTIVE and rclpy.ok(): + self.node.get_logger().info( + f"Preempt on action '{self._action_name}' trying to cancel goal: {self._client_goal_handle} " + f"with timeout {self._preempt_timeout.nanoseconds / 1e9}") + self._status = self.__cancel_goal_and_wait(self._preempt_timeout) + with self._done_cond: + self._done_cond.notify() + + def __cancel_goal_and_wait(self, timeout) -> ActionState: + """Cancel goal and wait for the result.""" + if self._client_goal_handle is not None: + self._cancel_goal() + self._cancel_activation_time = self.node.get_clock().now() + + while (self.node.get_clock().now() - self._cancel_activation_time) <= timeout: + if self._goal_status == GoalStatus.STATUS_CANCELED: + self.node.get_logger().info( + f"Preempt on action '{self._action_name}'. Goal cancel succeeded within timeout .") + return ActionState.CANCELED + time.sleep(0.1) + + self.node.get_logger().warn( + f"Preempt on action '{self._action_name}' Goal cancel failed within timeout.") + return ActionState.CANCELING def execute(self, ud): """Called when executing a state. @@ -249,35 +293,31 @@ def execute(self, ud): goal with a non-blocking call to the action client. """ - # Check for preemption before executing - if self.preempt_requested(): - self.node.get_logger().info("Preempting %s before sending goal." % self._action_name) - self.service_preempt() - return 'preempted' - # Make sure we're connected to the action server try: start_time = self.node.get_clock().now() while not self._action_client.server_is_ready(): + # Check for preemption while waiting for server if self.preempt_requested(): - self.node.get_logger().info("Preempting while waiting for server '%s'." % self._action_name) + self.node.get_logger().info(f"Preempting while waiting for server '{self._action_name}'.") self.service_preempt() return 'preempted' elif not rclpy.ok(): - self.node.get_logger().info("Shutting down while waiting for service '%s'." % self._action_name) + self.node.get_logger().info(f"Shutting down while waiting for server '{self._action_name}'.") return 'aborted' elif self._action_client.wait_for_server(1.0): - self.node.get_logger().debug("Connected to server '%s'" % self._action_name) + self.node.get_logger().debug(f"Connected to server '{self._action_name}'") elif self.node.get_clock().now() - start_time > self._server_wait_timeout: self.node.get_logger().warn("Server connection timeout reached") return 'aborted' else: - self.node.get_logger().warn("Still waiting for server '%s'..." % self._action_name) - except: - self.node.get_logger().warn("Terminated while waiting for server '%s'." % self._action_name) + self.node.get_logger().warn(f"Still waiting for server '{self._action_name}'...") + except Exception as e: + self.node.get_logger().error(f"Caught exception: {str(e)} Failed to connect to server '{self._action_name}'. Aborting.") return 'aborted' - self._status = SimpleActionState.INACTIVE + # Server is ready, set status to inactive + self._status = ActionState.INACTIVE # Grab goal key, if set if self._goal_key is not None: @@ -301,31 +341,36 @@ def execute(self, ud): **self._goal_cb_kwargs) if goal_update is not None: self._goal = goal_update - except: - self.node.get_logger().error("Could not execute goal callback: "+traceback.format_exc()) + except Exception as e: + self.node.get_logger().error( + f"Caught exception: {str(e)} Could not execute goal callback: {traceback.format_exc()}") return 'aborted' - # Make sure the necessary paramters have been set + # Make sure the necessary parameters have been set if self._goal is None and self._goal_cb is None: - self.node.get_logger().error("Attempting to activate action "+self._action_name+" with no goal or goal callback set. Did you construct the SimpleActionState properly?") + self.node.get_logger().error( + f"Attempting to activate action {self._action_name} with no goal or goal callback set. " + f"Did you construct the SimpleActionState properly?") return 'aborted' - # Dispatch goal via non-blocking call to action client + # Activate the state before sending the goal self._activate_time = self.node.get_clock().now() - self._status = SimpleActionState.ACTIVE + self._status = ActionState.ACTIVE - # Wait on done condition - self._done_cond.acquire() - send_future = self._action_client.send_goal_async(self._goal, feedback_callback=self._goal_feedback_cb) - send_future.add_done_callback(self._goal_active_cb) + with self._done_cond: + # Dispatch goal via non-blocking call to action server + send_future = self._action_client.send_goal_async( + self._goal, feedback_callback=self._goal_feedback_cb) + send_future.add_done_callback(self._goal_active_cb) - # Preempt timeout watch thread - if self._exec_timeout: - self._execution_timer_thread = threading.Thread(name=self._action_name+'/preempt_watchdog', target=self._execution_timer) - self._execution_timer_thread.start() + # Execution timeout watch thread + if self._exec_timeout: + self._execution_timer_thread = threading.Thread( + name=self._action_name + '/exec_timeout_watchdog', target=self._execution_timer) + self._execution_timer_thread.start() - # Wait for action to finish - self._done_cond.wait() + # Wait for action to finish + self._done_cond.wait() # Call user result callback if defined result_cb_outcome = None @@ -340,10 +385,14 @@ def execute(self, ud): self._goal_status, self._goal_result) if result_cb_outcome is not None and result_cb_outcome not in self.get_registered_outcomes(): - self.node.get_logger().error("Result callback for action "+self._action_name+", "+str(self._result_cb)+" was not registered with the result_cb_outcomes argument. The result callback returned '"+str(result_cb_outcome)+"' but the only registered outcomes are: "+str(self.get_registered_outcomes())) + self.node.get_logger().error( + f"Result callback for action {self._action_name}, {self._result_cb} was not registered with " + f"the result_cb_outcomes argument. The result callback returned '{result_cb_outcome}' " + f"but the only registered outcomes are: {self.get_registered_outcomes()}") return 'aborted' - except: - self.node.get_logger().error("Could not execute result callback: "+traceback.format_exc()) + except Exception as e: + self.node.get_logger().error( + f"Caught exception: {str(e)} Could not execute result callback: {traceback.format_exc()}. Aborting") return 'aborted' if self._result_key is not None: @@ -354,21 +403,30 @@ def execute(self, ud): for key in self._result_slots: ud[key] = getattr(self._goal_result, key) - # Check status - if self._status == SimpleActionState.INACTIVE: - # Set the outcome on the result state - if self._goal_status == GoalStatus.STATUS_SUCCEEDED: - outcome = 'succeeded' - elif self._goal_status == GoalStatus.STATUS_CANCELED and self.preempt_requested(): - outcome = 'preempted' + # Check for preemption after executing + if self.preempt_requested(): + self.node.get_logger().info(f"Preempting {self._action_name} after goal.") + self.service_preempt() + return 'preempted' + + # Check status and set the outcome to the result state + if self._status == ActionState.COMPLETED: + # Goal request and result are succeeded + outcome = 'succeeded' + elif self._status == ActionState.CANCELED: + # Preempted or goal cancelled + if self.preempt_requested(): self.service_preempt() - else: - # All failures at this level are captured by aborting, even if we timed out - # This is an important distinction between local preemption, and preemption - # from above. - outcome = 'aborted' + outcome = 'preempted' + elif self._status == ActionState.INACTIVE: + # Aborted via execution timeout + outcome = 'aborted' + elif self._status == ActionState.CANCELING: + # Preempting or exec timeout but goal not cancelled + outcome = 'aborted' else: - # We terminated without going inactive + # All failures at this level are captured by aborting, even if we + # timed out self.node.get_logger().warn("Action state terminated without going inactive first.") outcome = 'aborted' @@ -377,56 +435,95 @@ def execute(self, ud): outcome = result_cb_outcome # Set status inactive - self._status = SimpleActionState.INACTIVE - self._done_cond.release() + self._status = ActionState.INACTIVE return outcome - ### Action client callbacks + # Action client callbacks def _goal_active_cb(self, future): """Goal Active Callback - This callback starts the timer that watches for the timeout specified for this action. + Accept or reject a client request to begin an action. """ - gh = future.result() - if not gh.accepted: - self.node.get_logger().debug("Action "+self._action_name+" has been rejected!") + goal_handle = future.result() + + if not goal_handle.accepted: + self.node.get_logger().debug(f"Goal request from action {self._action_name} has been rejected!") return - result_future = gh.get_result_async() - result_future.add_done_callback(self._goal_done_cb) + else: + self.node.get_logger().debug(f"Goal request from action {self._action_name} has been accepted!") + + # Goal accepted so save goal handle + self._client_goal_handle = goal_handle + + result_future = self._client_goal_handle.get_result_async() + result_future.add_done_callback(self._get_result_callback) def _goal_feedback_cb(self, feedback): """Goal Feedback Callback""" - self.node.get_logger().debug("Action "+self._action_name+" sent feedback {}".format(feedback)) + self.node.get_logger().debug(f"Action {self._action_name} sent feedback {feedback}") - def _goal_done_cb(self, future): + def _get_result_callback(self, future: Future): """Goal Done Callback This callback resets the active flags and reports the duration of the action. - Also, if the user has defined a result_cb, it is called here before the - method returns. """ + def get_result_str(i): - strs = ('STATUS_UNKONWN','STATUS_ACCEPTED','STATUS_EXECUTING','STATUS_CANCELING','STATUS_SUCCEEDED','STATUS_CANCELED','STATUS_ABORTED') + strs = ( + 'STATUS_UNKONWN', + 'STATUS_ACCEPTED', + 'STATUS_EXECUTING', + 'STATUS_CANCELING', + 'STATUS_SUCCEEDED', + 'STATUS_CANCELED', + 'STATUS_ABORTED') if i < len(strs): return strs[i] else: - return 'UNKNOWN ('+str(i)+')' + return 'UNKNOWN (' + str(i) + ')' - gh = future.result() + if future.done(): + goal_result = future.result() # Calculate duration self._duration = self.node.get_clock().now() - self._activate_time - self.node.get_logger().debug("Action "+self._action_name+" terminated after "\ - +str(self._duration)+" nanoseconds with result "\ - +get_result_str(gh.status)+".") + self.node.get_logger().debug(f"Action {self._action_name} terminated after {self._duration} nanoseconds " + f"with result {get_result_str(goal_result.status)}.") # Store goal state - self._goal_status = gh.status - self._goal_result = gh.result + self._goal_status = goal_result.status + self._goal_result = goal_result.result + + # Update status + if self._goal_status == GoalStatus.STATUS_SUCCEEDED: + self._status = ActionState.COMPLETED + elif self._goal_status == GoalStatus.STATUS_CANCELED: + self._status = ActionState.CANCELED + elif self._goal_status == GoalStatus.STATUS_ABORTED: + self._status = ActionState.CANCELED - # Rest status - self._status = SimpleActionState.INACTIVE + self.node.get_logger().debug(f"Goal completed: {self._client_goal_handle}") + + # Reset the goal handle + self._client_goal_handle = None # Notify done - self._done_cond.acquire() - self._done_cond.notify() - self._done_cond.release() + with self._done_cond: + self._done_cond.notify() + + def _cancel_goal(self): + """Cancel Goal""" + self.node.get_logger().debug("Canceling goal: " + str(self._client_goal_handle)) + if self._client_goal_handle is not None: + self.node.get_logger().info("Canceling goal") + future = self._client_goal_handle.cancel_goal_async() + future.add_done_callback(self._cancel_goal_callback) + else: + self.node.get_logger().warn("Client goal handle is None, cannot cancel goal.") + + def _cancel_goal_callback(self, future): + """Cancel Goal Callback""" + cancel_response = future.result() + if len(cancel_response.goals_canceling) > 0: + self.node.get_logger().debug("Cancel request accepted") + else: + self.node.get_logger().debug("Cancel request rejected") From 679321a64f03f7dfa024d289cca470c9b270a710 Mon Sep 17 00:00:00 2001 From: Richardvdketterij Date: Mon, 12 Feb 2024 14:50:29 +0100 Subject: [PATCH 2/5] remove redundant import and redundent exit status --- smach_ros/smach_ros/simple_action_state.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/smach_ros/smach_ros/simple_action_state.py b/smach_ros/smach_ros/simple_action_state.py index 3c97009..7fae6cd 100644 --- a/smach_ros/smach_ros/simple_action_state.py +++ b/smach_ros/smach_ros/simple_action_state.py @@ -1,6 +1,5 @@ #!/usr/bin/env python3 import rclpy -import rclpy.time from rclpy.duration import Duration import rclpy.action from rclpy.action.client import GoalStatus @@ -236,8 +235,7 @@ def _execution_timer(self): try: # Check for preemption while checking for timeout if self.preempt_requested(): - self.node.get_logger().info( - f"Preempting while waiting for goal (request) result '{self._action_name}'.") + self.node.get_logger().info(f"Preempting while waiting for goal (request) result '{self._action_name}'.") self._status = ActionState.CANCELED with self._done_cond: self._done_cond.notify() @@ -418,9 +416,6 @@ def execute(self, ud): if self.preempt_requested(): self.service_preempt() outcome = 'preempted' - elif self._status == ActionState.INACTIVE: - # Aborted via execution timeout - outcome = 'aborted' elif self._status == ActionState.CANCELING: # Preempting or exec timeout but goal not cancelled outcome = 'aborted' From 356bfc01c2650387f039c3327237f6950aac3695 Mon Sep 17 00:00:00 2001 From: Richardvdketterij Date: Tue, 13 Feb 2024 11:51:50 +0100 Subject: [PATCH 3/5] remove redundent if and typo --- smach_ros/smach_ros/simple_action_state.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/smach_ros/smach_ros/simple_action_state.py b/smach_ros/smach_ros/simple_action_state.py index 7fae6cd..5e8fb4d 100644 --- a/smach_ros/smach_ros/simple_action_state.py +++ b/smach_ros/smach_ros/simple_action_state.py @@ -250,7 +250,7 @@ def _execution_timer(self): time.sleep(0.1) except Exception as e: self.node.get_logger().error( - f"Caught exception: {str(e)} Failed to sleep while running '{self._action_name}'") + f"Caught exception: {str(e)}. Failed to sleep while running '{self._action_name}'") break # Smach State API @@ -476,8 +476,7 @@ def get_result_str(i): else: return 'UNKNOWN (' + str(i) + ')' - if future.done(): - goal_result = future.result() + goal_result = future.result() # Calculate duration self._duration = self.node.get_clock().now() - self._activate_time From 831c6f7bff45542af46a498baeea5a879c924ded Mon Sep 17 00:00:00 2001 From: Richardvdketterij Date: Thu, 15 Feb 2024 14:08:30 +0100 Subject: [PATCH 4/5] private function should have 1 underscore --- smach_ros/smach_ros/simple_action_state.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/smach_ros/smach_ros/simple_action_state.py b/smach_ros/smach_ros/simple_action_state.py index 5e8fb4d..a90d7ed 100644 --- a/smach_ros/smach_ros/simple_action_state.py +++ b/smach_ros/smach_ros/simple_action_state.py @@ -243,7 +243,7 @@ def _execution_timer(self): self.node.get_logger().warn( f"Action {self._action_name} timed out after {self._exec_timeout.nanoseconds / 1e9} seconds. " f"Trying to cancel goal with timeout {self._preempt_timeout.nanoseconds / 1e9}.") - self._status = self.__cancel_goal_and_wait( + self._status = self._cancel_goal_and_wait( timeout=self._preempt_timeout) with self._done_cond: self._done_cond.notify() @@ -264,11 +264,11 @@ def request_preempt(self): self.node.get_logger().info( f"Preempt on action '{self._action_name}' trying to cancel goal: {self._client_goal_handle} " f"with timeout {self._preempt_timeout.nanoseconds / 1e9}") - self._status = self.__cancel_goal_and_wait(self._preempt_timeout) + self._status = self._cancel_goal_and_wait(self._preempt_timeout) with self._done_cond: self._done_cond.notify() - def __cancel_goal_and_wait(self, timeout) -> ActionState: + def _cancel_goal_and_wait(self, timeout) -> ActionState: """Cancel goal and wait for the result.""" if self._client_goal_handle is not None: self._cancel_goal() From b21fd960e2d498fd9633cdf00c3ef0c673237c32 Mon Sep 17 00:00:00 2001 From: Richardvdketterij Date: Thu, 15 Feb 2024 14:11:36 +0100 Subject: [PATCH 5/5] change exception types --- smach_ros/smach_ros/simple_action_state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smach_ros/smach_ros/simple_action_state.py b/smach_ros/smach_ros/simple_action_state.py index a90d7ed..9c6b212 100644 --- a/smach_ros/smach_ros/simple_action_state.py +++ b/smach_ros/smach_ros/simple_action_state.py @@ -248,7 +248,7 @@ def _execution_timer(self): with self._done_cond: self._done_cond.notify() time.sleep(0.1) - except Exception as e: + except (RuntimeError, TypeError) as e: self.node.get_logger().error( f"Caught exception: {str(e)}. Failed to sleep while running '{self._action_name}'") break