Skip to content

Commit

Permalink
Add types to Node.py (#1346)
Browse files Browse the repository at this point in the history
* Add types

Signed-off-by: Michael Carlstrom <[email protected]>

* string around type

Signed-off-by: Michael Carlstrom <[email protected]>

* string around type

Signed-off-by: Michael Carlstrom <[email protected]>

* use error

Signed-off-by: Michael Carlstrom <[email protected]>

* flake8

Signed-off-by: Michael Carlstrom <[email protected]>

* switch error raise order

Signed-off-by: Michael Carlstrom <[email protected]>

* unify error

Signed-off-by: Michael Carlstrom <[email protected]>

---------

Signed-off-by: Michael Carlstrom <[email protected]>
  • Loading branch information
InvincibleRMC authored Aug 31, 2024
1 parent 3fa9e75 commit 8a2e2d3
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 101 deletions.
3 changes: 2 additions & 1 deletion rclpy/rclpy/callback_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
from rclpy.client import Client
from rclpy.service import Service
from rclpy.waitable import Waitable
Entity = Union[Subscription, Timer, Client, Service, Waitable[Any]]
from rclpy.guard_condition import GuardCondition
Entity = Union[Subscription, Timer, Client, Service, Waitable[Any], GuardCondition]


class CallbackGroup:
Expand Down
2 changes: 1 addition & 1 deletion rclpy/rclpy/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class InvalidParameterTypeException(ParameterException):

from rclpy.parameter import Parameter

def __init__(self, desired_parameter: Parameter, expected_type: Parameter.Type) -> None:
def __init__(self, desired_parameter: Parameter, expected_type: str) -> None:
from rclpy.parameter import Parameter
ParameterException.__init__(
self,
Expand Down
142 changes: 131 additions & 11 deletions rclpy/rclpy/impl/_rclpy_pybind11.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ from typing import Any, Generic, Literal, overload, Sequence, TypedDict

from rclpy.clock import JumpHandle
from rclpy.clock_type import ClockType
from rclpy.qos import (QoSDurabilityPolicy, QoSHistoryPolicy, QoSLivelinessPolicy,
QoSReliabilityPolicy)
from rclpy.duration import Duration
from rclpy.parameter import Parameter
from rclpy.subscription import MessageInfo
from rclpy.type_support import MsgT

Expand Down Expand Up @@ -173,10 +173,118 @@ class Subscription(Destroyable, Generic[MsgT]):
"""Count the publishers from a subscription."""


class Node:
class Node(Destroyable):

def __init__(self, node_name: str, namespace_: str, context: Context,
pycli_args: list[str] | None, use_global_arguments: bool,
enable_rosout: bool) -> None: ...

@property
def pointer(self) -> int:
"""Get the address of the entity as an integer."""

def get_fully_qualified_name(self) -> str:
"""Get the fully qualified name of the node."""

def logger_name(self) -> str:
"""Get the name of the logger associated with a node."""

def get_node_name(self) -> str:
"""Get the name of a node."""

def get_namespace(self) -> str:
"""Get the namespace of a node."""

def get_count_publishers(self, topic_name: str) -> int:
"""Return the count of all the publishers known for that topic in the entire ROS graph."""

def get_count_subscribers(self, topic_name: str) -> int:
"""Return the count of all the subscribers known for that topic in the entire ROS graph."""

def get_count_clients(self, service_name: str) -> int:
"""Return the count of all the clients known for that service in the entire ROS graph."""

def get_count_services(self, service_name: str) -> int:
"""Return the count of all the servers known for that service in the entire ROS graph."""

def get_node_names_and_namespaces(self) -> list[tuple[str, str, str] | tuple[str, str]]:
"""Get the list of nodes discovered by the provided node."""

def get_node_names_and_namespaces_with_enclaves(self) -> list[tuple[str, str, str] |
tuple[str, str]]:
"""Get the list of nodes discovered by the provided node, with their enclaves."""

def get_action_client_names_and_types_by_node(self, remote_node_name: str,
remote_node_namespace: str) -> list[tuple[str,
list[str]]]:
"""Get action client names and types by node."""

def get_action_server_names_and_types_by_node(self, remote_node_name: str,
remote_node_namespace: str) -> list[tuple[str,
list[str]]]:
"""Get action server names and types by node."""

def get_action_names_and_types(self) -> list[tuple[str, list[str]]]:
"""Get action names and types."""

def get_parameters(self, pyparamter_cls: type[Parameter]) -> dict[str, Parameter]:
"""Get a list of parameters for the current node."""


def rclpy_resolve_name(node: Node, topic_name: str, only_expand: bool, is_service: bool) -> str:
"""Expand and remap a topic or service name."""


def rclpy_get_publisher_names_and_types_by_node(node: Node, no_demangle: bool, node_name: str,
node_namespace: str
) -> list[tuple[str, list[str]]]:
"""Get topic names and types for which a remote node has publishers."""


def rclpy_get_subscriber_names_and_types_by_node(node: Node, no_demangle: bool, node_name: str,
node_namespace: str
) -> list[tuple[str, list[str]]]:
"""Get topic names and types for which a remote node has subscribers."""


def rclpy_get_service_names_and_types_by_node(node: Node, node_name: str, node_namespace: str
) -> list[tuple[str, list[str]]]:
"""Get all service names and types in the ROS graph."""


def rclpy_get_client_names_and_types_by_node(node: Node, node_name: str, node_namespace: str
) -> list[tuple[str, list[str]]]:
"""Get service names and types for which a remote node has servers."""


def rclpy_get_service_names_and_types(node: Node) -> list[tuple[str, list[str]]]:
"""Get all service names and types in the ROS graph."""


class TypeHashDict(TypedDict):
version: int
value: bytes


class QoSDict(TypedDict):
pass


class TopicEndpointInfoDict(TypedDict):
node_name: str
node_namespace: str
topic_type: str
topic_type_hash: TypeHashDict
endpoint_type: int
endpoint_gid: list[int]
qos_profile: rmw_qos_profile_dict


def rclpy_get_publishers_info_by_topic(node: Node, topic_name: str, no_mangle: bool
) -> list[TopicEndpointInfoDict]:
"""Get publishers info for a topic."""


class Publisher(Destroyable, Generic[MsgT]):

def __init__(self, arg0: Node, arg1: type[MsgT], arg2: str, arg3: rmw_qos_profile_t) -> None:
Expand Down Expand Up @@ -257,14 +365,14 @@ PredefinedQosProfileTNames = Literal['qos_profile_sensor_data', 'qos_profile_def


class rmw_qos_profile_dict(TypedDict):
qos_history: QoSHistoryPolicy | int
qos_depth: int
qos_reliability: QoSReliabilityPolicy | int
qos_durability: QoSDurabilityPolicy | int
pyqos_lifespan: rcl_duration_t
pyqos_deadline: rcl_duration_t
qos_liveliness: QoSLivelinessPolicy | int
pyqos_liveliness_lease_duration: rcl_duration_t
depth: int
history: int
reliability: int
durability: int
lifespan: Duration
deadline: Duration
liveliness: int
liveliness_lease_duration: Duration
avoid_ros_namespace_conventions: bool


Expand Down Expand Up @@ -335,6 +443,18 @@ class WaitSet(Destroyable):
"""Wait until timeout is reached or event happened."""


class RCLError(RuntimeError):
pass


class NodeNameNonExistentError(RCLError):
pass


class InvalidHandle(RuntimeError):
pass


class SignalHandlerOptions(Enum):
_value_: int
NO = ...
Expand Down
Loading

0 comments on commit 8a2e2d3

Please sign in to comment.