Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for spin_until_complete #1268

Open
wants to merge 1 commit into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions rclpy/rclpy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
This will invalidate all entities derived from the context.
"""

from typing import Callable
from typing import List
from typing import Optional
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -242,6 +243,52 @@ def spin(node: 'Node', executor: Optional['Executor'] = None) -> None:
executor.remove_node(node)


def spin_for(node: 'Node', executor: 'Executor' = None, duration_sec: float = None) -> None:
"""
Execute work for some time.

Callbacks will be executed by the provided executor until the context associated with the
executor is shut down or the given time duration passes.

:param node: A node to add to the executor to check for work.
:param executor: The executor to use, or the global executor if ``None``.
:param timeout_sec: Seconds to wait (blocking).
"""
executor = get_global_executor() if executor is None else executor
try:
executor.add_node(node)
executor.spin_for(duration_sec)
finally:
executor.remove_node(node)


def spin_until_complete(
node: 'Node',
condition: Callable[[], bool],
executor: Optional['Executor'] = None,
timeout_sec: Optional[float] = None,
) -> None:
Comment on lines +265 to +270
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this not return something to indicate if the function returned due to the condition being complete or due to a timeout instead?

Copy link
Member Author

@christophebedard christophebedard Apr 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then that would mean that spin_until_future_complete should also return something similar?

Copy link
Member Author

@christophebedard christophebedard Apr 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so we agree before I do the changes: do we just want a bool (True means it returned due to the condition being complete, or False otherwise), or do we want something like rclcpp::SpinUntilCompleteReturnCode to indicate SUCCESS or INTERRUPTED or TIMEOUT?

"""
Execute work until the condition is complete.

Callbacks and other work will be executed by the provided executor until ``condition()``
returns ``True`` or the context associated with the executor is shutdown.
Comment on lines +272 to +275
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: no mention of the timeout here


:param node: A node to add to the executor to check for work.
:param condition: The callable condition to wait on. If this condition is not related to what
the executor is waiting on and the timeout is infinite, this could block forever.
:param executor: The executor to use, or the global executor if ``None``.
:param timeout_sec: Seconds to wait. Block until the condition is complete
if ``None`` or negative. Don't wait if 0.
"""
Comment on lines +265 to +283
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To go along with the discussion in the rclcpp version of this (ros2/rclcpp#2475 (comment)), I think we need to do a much better job of explaining that the executor must be woken up when the condition changes and condition's value must be changed before the signal to wake up is sent, otherwise you might deadlock here (if not using a timeout).

I think we could make an issue to do this as a follow up, as the docs can be improved post API freeze.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, will do it as a follow-up issue/PR.

executor = get_global_executor() if executor is None else executor
try:
executor.add_node(node)
executor.spin_until_complete(condition, timeout_sec)
finally:
executor.remove_node(node)


def spin_until_future_complete(
node: 'Node',
future: Future,
Expand Down
63 changes: 52 additions & 11 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,32 +307,42 @@ def spin(self) -> None:
while self._context.ok() and not self._is_shutdown:
self.spin_once()

def spin_until_future_complete(
def spin_for(self, duration_sec: Optional[float] = None) -> None:
"""Execute callbacks until shutdown or timeout."""
self.spin_until_complete(lambda: False, duration_sec)

def spin_until_complete(
self,
future: Future,
timeout_sec: Optional[float] = None
condition: Callable[[], bool],
timeout_sec: Optional[float] = None,
) -> None:
"""Execute callbacks until a given future is done or a timeout occurs."""
# Make sure the future wakes this executor when it is done
future.add_done_callback(lambda x: self.wake())

"""Execute callbacks until a given condition is complete or a timeout occurs."""
if timeout_sec is None or timeout_sec < 0:
while self._context.ok() and not future.done() and not self._is_shutdown:
self.spin_once_until_future_complete(future, timeout_sec)
while self._context.ok() and not condition() and not self._is_shutdown:
self.spin_once_until_complete(condition, timeout_sec)
else:
start = time.monotonic()
end = start + timeout_sec
timeout_left = TimeoutObject(timeout_sec)

while self._context.ok() and not future.done() and not self._is_shutdown:
self.spin_once_until_future_complete(future, timeout_left)
while self._context.ok() and not condition() and not self._is_shutdown:
self.spin_once_until_complete(condition, timeout_left)
now = time.monotonic()

if now >= end:
return

timeout_left.timeout = end - now

def spin_until_future_complete(
self,
future: Future,
timeout_sec: Optional[float] = None,
) -> None:
"""Execute callbacks until a given future is done or a timeout occurs."""
future.add_done_callback(lambda x: self.wake())
self.spin_until_complete(future.done, timeout_sec)

def spin_once(self, timeout_sec: Optional[float] = None) -> None:
"""
Wait for and execute a single callback.
Expand All @@ -346,6 +356,23 @@ def spin_once(self, timeout_sec: Optional[float] = None) -> None:
"""
raise NotImplementedError()

def spin_once_until_complete(
self,
condition: Callable[[], bool],
timeout_sec: Optional[Union[float, TimeoutObject]] = None,
) -> None:
"""
Wait for and execute a single callback.

This should behave in the same way as :meth:`spin_once`.
If needed by the implementation, it should awake other threads waiting.

:param condition: The callable condition to wait on.
:param timeout_sec: Maximum seconds to wait. Block forever if ``None`` or negative.
Don't wait if 0.
"""
raise NotImplementedError()

def spin_once_until_future_complete(
self,
future: Future,
Expand Down Expand Up @@ -826,6 +853,13 @@ def _spin_once_impl(
def spin_once(self, timeout_sec: Optional[float] = None) -> None:
self._spin_once_impl(timeout_sec)

def spin_once_until_complete(
self,
condition: Callable[[], bool],
timeout_sec: Optional[Union[float, TimeoutObject]] = None,
) -> None:
self._spin_once_impl(timeout_sec, condition)

def spin_once_until_future_complete(
self,
future: Future,
Expand Down Expand Up @@ -898,6 +932,13 @@ def _spin_once_impl(
def spin_once(self, timeout_sec: Optional[float] = None) -> None:
self._spin_once_impl(timeout_sec)

def spin_once_until_complete(
self,
condition: Callable[[], bool],
timeout_sec: float = None,
) -> None:
self._spin_once_impl(timeout_sec, condition)

def spin_once_until_future_complete(
self,
future: Future,
Expand Down
82 changes: 82 additions & 0 deletions rclpy/test/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,22 @@ def test_executor_add_node(self):
assert not executor.add_node(self.node)
assert id(executor) == id(self.node.executor)

def test_executor_spin_for(self):
self.assertIsNotNone(self.node.handle)
executor = SingleThreadedExecutor(context=self.context)
executor.add_node(self.node)

def timer_callback():
pass
timer = self.node.create_timer(1.0, timer_callback)

start = time.monotonic()
executor.spin_for(duration_sec=10.0)
end = time.monotonic()
self.assertGreaterEqual(end - start, 10.0)

timer.cancel()

def test_executor_spin_until_future_complete_timeout(self):
self.assertIsNotNone(self.node.handle)
executor = SingleThreadedExecutor(context=self.context)
Expand All @@ -402,6 +418,72 @@ def timer_callback():

timer.cancel()

def test_executor_spin_until_complete_condition_done(self):
self.assertIsNotNone(self.node.handle)
executor = SingleThreadedExecutor(context=self.context)
executor.add_node(self.node)

def timer_callback():
pass
timer = self.node.create_timer(0.1, timer_callback)

condition_var = False

def set_condition():
nonlocal condition_var
condition_var = True

def condition():
nonlocal condition_var
return condition_var

# Condition complete timeout_sec > 0
self.assertFalse(condition())
t = threading.Thread(target=lambda: set_condition())
t.start()
executor.spin_until_complete(condition, timeout_sec=1.0)
self.assertTrue(condition())
Comment on lines +441 to +445
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a fairly weak test in my opinion, as you could comment out line 444 and have it pass most of the time. Also, if spin_until_complete() was broken (and did not return before the timeout if the condition was true) this would still pass.


# timeout_sec = None
condition_var = False
self.assertFalse(condition())
t = threading.Thread(target=lambda: set_condition())
t.start()
executor.spin_until_complete(condition, timeout_sec=None)
self.assertTrue(condition())

# Condition complete timeout < 0
condition_var = False
self.assertFalse(condition())
t = threading.Thread(target=lambda: set_condition())
t.start()
executor.spin_until_complete(condition, timeout_sec=-1)
self.assertTrue(condition())

timer.cancel()

def test_executor_spin_until_complete_do_not_wait(self):
self.assertIsNotNone(self.node.handle)
executor = SingleThreadedExecutor(context=self.context)
executor.add_node(self.node)

def timer_callback():
pass
timer = self.node.create_timer(0.1, timer_callback)

condition_var = False

def condition():
nonlocal condition_var
return condition_var

# Do not wait timeout_sec = 0
self.assertFalse(condition())
executor.spin_until_complete(condition, timeout_sec=0)
self.assertFalse(condition())

timer.cancel()

def test_executor_spin_until_future_complete_future_done(self):
self.assertIsNotNone(self.node.handle)
executor = SingleThreadedExecutor(context=self.context)
Expand Down