Skip to content

Commit

Permalink
Guard condition types. (#1252)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Carlstrom <[email protected]>
Co-authored-by: Chris Lalancette <[email protected]>
  • Loading branch information
InvincibleRMC and clalancette authored Jul 31, 2024
1 parent a7aea8e commit adfcb2b
Showing 1 changed file with 22 additions and 5 deletions.
27 changes: 22 additions & 5 deletions rclpy/rclpy/guard_condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,38 +12,55 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Callable, Optional, Protocol

from rclpy.callback_groups import CallbackGroup
from rclpy.context import Context
from rclpy.destroyable import DestroyableType
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.utilities import get_default_context


class GuardConditionHandle(DestroyableType, Protocol):

def trigger_guard_condition(self) -> None:
...


class GuardCondition:

def __init__(self, callback, callback_group, context=None):
def __init__(self, callback: Optional[Callable],
callback_group: Optional[CallbackGroup],
context: Optional[Context] = None) -> None:
"""
Create a GuardCondition.
.. warning:: Users should not create a guard condition with this constructor, instead they
should call :meth:`.Node.create_guard_condition`.
"""
self._context = get_default_context() if context is None else context

if self._context.handle is None:
raise RuntimeError('Context must be initialized a GuardCondition can be created')

with self._context.handle:
self.__gc = _rclpy.GuardCondition(self._context.handle)
self.__gc: GuardConditionHandle = _rclpy.GuardCondition(self._context.handle)
self.callback = callback
self.callback_group = callback_group
# True when the callback is ready to fire but has not been "taken" by an executor
self._executor_event = False
# True when the executor sees this has been triggered but has not yet been handled
self._executor_triggered = False

def trigger(self):
def trigger(self) -> None:
with self.__gc:
self.__gc.trigger_guard_condition()

@property
def handle(self):
def handle(self) -> GuardConditionHandle:
return self.__gc

def destroy(self):
def destroy(self) -> None:
"""
Destroy a container for a ROS guard condition.
Expand Down

0 comments on commit adfcb2b

Please sign in to comment.