From a3ecd7e84bc3bcce0e5935d50744d2a161669a40 Mon Sep 17 00:00:00 2001 From: Michael Carlstrom <36806982+InvincibleRMC@users.noreply.github.com> Date: Wed, 27 Mar 2024 08:07:21 -0400 Subject: [PATCH] Add types to time_source.py (#1259) * Add types to time_source.py Signed-off-by: Michael Carlstrom --- rclpy/rclpy/time_source.py | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/rclpy/rclpy/time_source.py b/rclpy/rclpy/time_source.py index 8975397eb..aa0e2a10e 100644 --- a/rclpy/rclpy/time_source.py +++ b/rclpy/rclpy/time_source.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import List, Optional, Set, TYPE_CHECKING import weakref from rcl_interfaces.msg import SetParametersResult @@ -23,16 +24,20 @@ from rclpy.time import Time import rosgraph_msgs.msg +if TYPE_CHECKING: + from rclpy.node import Node + from rclpy.subscription import Subscription + CLOCK_TOPIC = '/clock' USE_SIM_TIME_NAME = 'use_sim_time' class TimeSource: - def __init__(self, *, node=None): - self._clock_sub = None - self._node_weak_ref = None - self._associated_clocks = set() + def __init__(self, *, node: Optional['Node'] = None): + self._clock_sub: Optional['Subscription'] = None + self._node_weak_ref: Optional[weakref.ReferenceType['Node']] = None + self._associated_clocks: Set[ROSClock] = set() # Zero time is a special value that means time is uninitialzied self._last_time_set = Time(clock_type=ClockType.ROS_TIME) self._ros_time_is_active = False @@ -40,11 +45,11 @@ def __init__(self, *, node=None): self.attach_node(node) @property - def ros_time_is_active(self): + def ros_time_is_active(self) -> bool: return self._ros_time_is_active @ros_time_is_active.setter - def ros_time_is_active(self, enabled): + def ros_time_is_active(self, enabled: bool) -> None: if self._ros_time_is_active == enabled: return self._ros_time_is_active = enabled @@ -59,7 +64,7 @@ def ros_time_is_active(self, enabled): node.destroy_subscription(self._clock_sub) self._clock_sub = None - def _subscribe_to_clock_topic(self): + def _subscribe_to_clock_topic(self) -> None: if self._clock_sub is None: node = self._get_node() if node is not None: @@ -70,7 +75,7 @@ def _subscribe_to_clock_topic(self): QoSProfile(depth=1, reliability=ReliabilityPolicy.BEST_EFFORT) ) - def attach_node(self, node): + def attach_node(self, node: 'Node') -> None: from rclpy.node import Node if not isinstance(node, Node): raise TypeError('Node must be of type rclpy.node.Node') @@ -97,7 +102,7 @@ def attach_node(self, node): node.add_on_set_parameters_callback(self._on_parameter_event) - def detach_node(self): + def detach_node(self) -> None: # Remove the subscription to the clock topic. if self._clock_sub is not None: node = self._get_node() @@ -107,7 +112,7 @@ def detach_node(self): self._clock_sub = None self._node_weak_ref = None - def attach_clock(self, clock): + def attach_clock(self, clock: ROSClock) -> None: if not isinstance(clock, ROSClock): raise ValueError('Only clocks with type ROS_TIME can be attached.') @@ -115,14 +120,14 @@ def attach_clock(self, clock): clock._set_ros_time_is_active(self.ros_time_is_active) self._associated_clocks.add(clock) - def clock_callback(self, msg): + def clock_callback(self, msg: rosgraph_msgs.msg.Clock) -> None: # Cache the last message in case a new clock is attached. time_from_msg = Time.from_msg(msg.clock) self._last_time_set = time_from_msg for clock in self._associated_clocks: clock.set_ros_time_override(time_from_msg) - def _on_parameter_event(self, parameter_list): + def _on_parameter_event(self, parameter_list: List[Parameter]) -> SetParametersResult: successful = True reason = '' @@ -142,7 +147,7 @@ def _on_parameter_event(self, parameter_list): return SetParametersResult(successful=successful, reason=reason) - def _get_node(self): + def _get_node(self) -> Optional['Node']: if self._node_weak_ref is not None: return self._node_weak_ref() return None