From d0e81306af6d075893636edc91956bfb73487015 Mon Sep 17 00:00:00 2001 From: Christophe Bedard Date: Tue, 2 Apr 2024 13:00:41 -0700 Subject: [PATCH] Add spin_until_complete and spin_for Co-authored-by: Hubert Liberacki Signed-off-by: Hubert Liberacki Signed-off-by: Christophe Bedard --- rclpy/rclpy/__init__.py | 47 +++++++++++++++++++++ rclpy/rclpy/executors.py | 63 +++++++++++++++++++++++----- rclpy/test/test_executor.py | 82 +++++++++++++++++++++++++++++++++++++ 3 files changed, 181 insertions(+), 11 deletions(-) diff --git a/rclpy/rclpy/__init__.py b/rclpy/rclpy/__init__.py index 6e94dce51..4ce9fb6ca 100644 --- a/rclpy/rclpy/__init__.py +++ b/rclpy/rclpy/__init__.py @@ -40,6 +40,7 @@ This will invalidate all entities derived from the context. """ +from typing import Callable from typing import List from typing import Optional from typing import TYPE_CHECKING @@ -242,6 +243,52 @@ def spin(node: 'Node', executor: Optional['Executor'] = None) -> None: executor.remove_node(node) +def spin_for(node: 'Node', executor: 'Executor' = None, duration_sec: float = None) -> None: + """ + Execute work for some time. + + Callbacks will be executed by the provided executor until the context associated with the + executor is shut down or the given time duration passes. + + :param node: A node to add to the executor to check for work. + :param executor: The executor to use, or the global executor if ``None``. + :param timeout_sec: Seconds to wait (blocking). + """ + executor = get_global_executor() if executor is None else executor + try: + executor.add_node(node) + executor.spin_for(duration_sec) + finally: + executor.remove_node(node) + + +def spin_until_complete( + node: 'Node', + condition: Callable[[], bool], + executor: Optional['Executor'] = None, + timeout_sec: Optional[float] = None, +) -> None: + """ + Execute work until the condition is complete. + + Callbacks and other work will be executed by the provided executor until ``condition()`` + returns ``True`` or the context associated with the executor is shutdown. + + :param node: A node to add to the executor to check for work. + :param condition: The callable condition to wait on. If this condition is not related to what + the executor is waiting on and the timeout is infinite, this could block forever. + :param executor: The executor to use, or the global executor if ``None``. + :param timeout_sec: Seconds to wait. Block until the condition is complete + if ``None`` or negative. Don't wait if 0. + """ + executor = get_global_executor() if executor is None else executor + try: + executor.add_node(node) + executor.spin_until_complete(condition, timeout_sec) + finally: + executor.remove_node(node) + + def spin_until_future_complete( node: 'Node', future: Future, diff --git a/rclpy/rclpy/executors.py b/rclpy/rclpy/executors.py index 78544c91c..e79a4e476 100644 --- a/rclpy/rclpy/executors.py +++ b/rclpy/rclpy/executors.py @@ -307,25 +307,26 @@ def spin(self) -> None: while self._context.ok() and not self._is_shutdown: self.spin_once() - def spin_until_future_complete( + def spin_for(self, duration_sec: Optional[float] = None) -> None: + """Execute callbacks until shutdown or timeout.""" + self.spin_until_complete(lambda: False, duration_sec) + + def spin_until_complete( self, - future: Future, - timeout_sec: Optional[float] = None + condition: Callable[[], bool], + timeout_sec: Optional[float] = None, ) -> None: - """Execute callbacks until a given future is done or a timeout occurs.""" - # Make sure the future wakes this executor when it is done - future.add_done_callback(lambda x: self.wake()) - + """Execute callbacks until a given condition is complete or a timeout occurs.""" if timeout_sec is None or timeout_sec < 0: - while self._context.ok() and not future.done() and not self._is_shutdown: - self.spin_once_until_future_complete(future, timeout_sec) + while self._context.ok() and not condition() and not self._is_shutdown: + self.spin_once_until_complete(condition, timeout_sec) else: start = time.monotonic() end = start + timeout_sec timeout_left = TimeoutObject(timeout_sec) - while self._context.ok() and not future.done() and not self._is_shutdown: - self.spin_once_until_future_complete(future, timeout_left) + while self._context.ok() and not condition() and not self._is_shutdown: + self.spin_once_until_complete(condition, timeout_left) now = time.monotonic() if now >= end: @@ -333,6 +334,15 @@ def spin_until_future_complete( timeout_left.timeout = end - now + def spin_until_future_complete( + self, + future: Future, + timeout_sec: Optional[float] = None, + ) -> None: + """Execute callbacks until a given future is done or a timeout occurs.""" + future.add_done_callback(lambda x: self.wake()) + self.spin_until_complete(future.done, timeout_sec) + def spin_once(self, timeout_sec: Optional[float] = None) -> None: """ Wait for and execute a single callback. @@ -346,6 +356,23 @@ def spin_once(self, timeout_sec: Optional[float] = None) -> None: """ raise NotImplementedError() + def spin_once_until_complete( + self, + condition: Callable[[], bool], + timeout_sec: Optional[Union[float, TimeoutObject]] = None, + ) -> None: + """ + Wait for and execute a single callback. + + This should behave in the same way as :meth:`spin_once`. + If needed by the implementation, it should awake other threads waiting. + + :param condition: The callable condition to wait on. + :param timeout_sec: Maximum seconds to wait. Block forever if ``None`` or negative. + Don't wait if 0. + """ + raise NotImplementedError() + def spin_once_until_future_complete( self, future: Future, @@ -826,6 +853,13 @@ def _spin_once_impl( def spin_once(self, timeout_sec: Optional[float] = None) -> None: self._spin_once_impl(timeout_sec) + def spin_once_until_complete( + self, + condition: Callable[[], bool], + timeout_sec: Optional[Union[float, TimeoutObject]] = None, + ) -> None: + self._spin_once_impl(timeout_sec, condition) + def spin_once_until_future_complete( self, future: Future, @@ -898,6 +932,13 @@ def _spin_once_impl( def spin_once(self, timeout_sec: Optional[float] = None) -> None: self._spin_once_impl(timeout_sec) + def spin_once_until_complete( + self, + condition: Callable[[], bool], + timeout_sec: float = None, + ) -> None: + self._spin_once_impl(timeout_sec, condition) + def spin_once_until_future_complete( self, future: Future, diff --git a/rclpy/test/test_executor.py b/rclpy/test/test_executor.py index 1d3d8d975..c2c7fb3d8 100644 --- a/rclpy/test/test_executor.py +++ b/rclpy/test/test_executor.py @@ -380,6 +380,22 @@ def test_executor_add_node(self): assert not executor.add_node(self.node) assert id(executor) == id(self.node.executor) + def test_executor_spin_for(self): + self.assertIsNotNone(self.node.handle) + executor = SingleThreadedExecutor(context=self.context) + executor.add_node(self.node) + + def timer_callback(): + pass + timer = self.node.create_timer(1.0, timer_callback) + + start = time.monotonic() + executor.spin_for(duration_sec=10.0) + end = time.monotonic() + self.assertGreaterEqual(end - start, 10.0) + + timer.cancel() + def test_executor_spin_until_future_complete_timeout(self): self.assertIsNotNone(self.node.handle) executor = SingleThreadedExecutor(context=self.context) @@ -402,6 +418,72 @@ def timer_callback(): timer.cancel() + def test_executor_spin_until_complete_condition_done(self): + self.assertIsNotNone(self.node.handle) + executor = SingleThreadedExecutor(context=self.context) + executor.add_node(self.node) + + def timer_callback(): + pass + timer = self.node.create_timer(0.1, timer_callback) + + condition_var = False + + def set_condition(): + nonlocal condition_var + condition_var = True + + def condition(): + nonlocal condition_var + return condition_var + + # Condition complete timeout_sec > 0 + self.assertFalse(condition()) + t = threading.Thread(target=lambda: set_condition()) + t.start() + executor.spin_until_complete(condition, timeout_sec=1.0) + self.assertTrue(condition()) + + # timeout_sec = None + condition_var = False + self.assertFalse(condition()) + t = threading.Thread(target=lambda: set_condition()) + t.start() + executor.spin_until_complete(condition, timeout_sec=None) + self.assertTrue(condition()) + + # Condition complete timeout < 0 + condition_var = False + self.assertFalse(condition()) + t = threading.Thread(target=lambda: set_condition()) + t.start() + executor.spin_until_complete(condition, timeout_sec=-1) + self.assertTrue(condition()) + + timer.cancel() + + def test_executor_spin_until_complete_do_not_wait(self): + self.assertIsNotNone(self.node.handle) + executor = SingleThreadedExecutor(context=self.context) + executor.add_node(self.node) + + def timer_callback(): + pass + timer = self.node.create_timer(0.1, timer_callback) + + condition_var = False + + def condition(): + nonlocal condition_var + return condition_var + + # Do not wait timeout_sec = 0 + self.assertFalse(condition()) + executor.spin_until_complete(condition, timeout_sec=0) + self.assertFalse(condition()) + + timer.cancel() + def test_executor_spin_until_future_complete_future_done(self): self.assertIsNotNone(self.node.handle) executor = SingleThreadedExecutor(context=self.context)