Skip to content

Commit

Permalink
Utilities.py types. (#1250)
Browse files Browse the repository at this point in the history
* Types

Signed-off-by: Michael Carlstrom <[email protected]>
Co-authored-by: Chris Lalancette <[email protected]>
  • Loading branch information
InvincibleRMC and clalancette authored Jul 30, 2024
1 parent 9de752e commit dc72d8c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
14 changes: 13 additions & 1 deletion rclpy/rclpy/impl/implementation_singleton.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,19 @@
# ...
"""


from typing import List, Protocol, Sequence

from rpyutils import import_c_library
package = 'rclpy'

rclpy_implementation = import_c_library('._rclpy_pybind11', package)
rclpy_implementation: 'rclpyHandle' = import_c_library('._rclpy_pybind11', package)


class rclpyHandle(Protocol):

def rclpy_remove_ros_args(self, pycli_args: Sequence[str]) -> List[str]:
...

def rclpy_get_rmw_implementation_identifier(self) -> str:
...
10 changes: 5 additions & 5 deletions rclpy/rclpy/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
g_context_lock = threading.Lock()


def get_default_context(*, shutting_down=False) -> Context:
def get_default_context(*, shutting_down: bool = False) -> Context:
"""Return the global default context singleton."""
global g_context_lock
with g_context_lock:
Expand Down Expand Up @@ -70,7 +70,7 @@ def ok(*, context: Optional[Context] = None) -> bool:
return context.ok()


def shutdown(*, context=None) -> None:
def shutdown(*, context: Optional[Context] = None) -> None:
"""
Shutdown the given ``Context``.
Expand All @@ -82,7 +82,7 @@ def shutdown(*, context=None) -> None:
context.shutdown()


def try_shutdown(*, context=None) -> None:
def try_shutdown(*, context: Optional[Context] = None) -> None:
"""
Shutdown the given ``Context`` if not already shutdown.
Expand Down Expand Up @@ -131,8 +131,8 @@ def get_available_rmw_implementations() -> Set[str]:
# filter by implementations in environment variable if provided
rmw_implementations = os.environ.get('RMW_IMPLEMENTATIONS')
if rmw_implementations:
rmw_implementations = rmw_implementations.split(os.pathsep)
missing_rmw_implementations = set(rmw_implementations) - \
rmw_implementations_array = rmw_implementations.split(os.pathsep)
missing_rmw_implementations = set(rmw_implementations_array) - \
available_rmw_implementations
if missing_rmw_implementations:
# TODO(sloretz) function name suggets to me it would return available ones even
Expand Down

0 comments on commit dc72d8c

Please sign in to comment.