Skip to content

Commit

Permalink
Add types to time_source.py (#1259)
Browse files Browse the repository at this point in the history
* Add types to time_source.py

Signed-off-by: Michael Carlstrom <[email protected]>
  • Loading branch information
InvincibleRMC authored Mar 27, 2024
1 parent 61e0409 commit a3ecd7e
Showing 1 changed file with 18 additions and 13 deletions.
31 changes: 18 additions & 13 deletions rclpy/rclpy/time_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Optional, Set, TYPE_CHECKING
import weakref

from rcl_interfaces.msg import SetParametersResult
Expand All @@ -23,28 +24,32 @@
from rclpy.time import Time
import rosgraph_msgs.msg

if TYPE_CHECKING:
from rclpy.node import Node
from rclpy.subscription import Subscription

CLOCK_TOPIC = '/clock'
USE_SIM_TIME_NAME = 'use_sim_time'


class TimeSource:

def __init__(self, *, node=None):
self._clock_sub = None
self._node_weak_ref = None
self._associated_clocks = set()
def __init__(self, *, node: Optional['Node'] = None):
self._clock_sub: Optional['Subscription'] = None
self._node_weak_ref: Optional[weakref.ReferenceType['Node']] = None
self._associated_clocks: Set[ROSClock] = set()
# Zero time is a special value that means time is uninitialzied
self._last_time_set = Time(clock_type=ClockType.ROS_TIME)
self._ros_time_is_active = False
if node is not None:
self.attach_node(node)

@property
def ros_time_is_active(self):
def ros_time_is_active(self) -> bool:
return self._ros_time_is_active

@ros_time_is_active.setter
def ros_time_is_active(self, enabled):
def ros_time_is_active(self, enabled: bool) -> None:
if self._ros_time_is_active == enabled:
return
self._ros_time_is_active = enabled
Expand All @@ -59,7 +64,7 @@ def ros_time_is_active(self, enabled):
node.destroy_subscription(self._clock_sub)
self._clock_sub = None

def _subscribe_to_clock_topic(self):
def _subscribe_to_clock_topic(self) -> None:
if self._clock_sub is None:
node = self._get_node()
if node is not None:
Expand All @@ -70,7 +75,7 @@ def _subscribe_to_clock_topic(self):
QoSProfile(depth=1, reliability=ReliabilityPolicy.BEST_EFFORT)
)

def attach_node(self, node):
def attach_node(self, node: 'Node') -> None:
from rclpy.node import Node
if not isinstance(node, Node):
raise TypeError('Node must be of type rclpy.node.Node')
Expand All @@ -97,7 +102,7 @@ def attach_node(self, node):

node.add_on_set_parameters_callback(self._on_parameter_event)

def detach_node(self):
def detach_node(self) -> None:
# Remove the subscription to the clock topic.
if self._clock_sub is not None:
node = self._get_node()
Expand All @@ -107,22 +112,22 @@ def detach_node(self):
self._clock_sub = None
self._node_weak_ref = None

def attach_clock(self, clock):
def attach_clock(self, clock: ROSClock) -> None:
if not isinstance(clock, ROSClock):
raise ValueError('Only clocks with type ROS_TIME can be attached.')

clock.set_ros_time_override(self._last_time_set)
clock._set_ros_time_is_active(self.ros_time_is_active)
self._associated_clocks.add(clock)

def clock_callback(self, msg):
def clock_callback(self, msg: rosgraph_msgs.msg.Clock) -> None:
# Cache the last message in case a new clock is attached.
time_from_msg = Time.from_msg(msg.clock)
self._last_time_set = time_from_msg
for clock in self._associated_clocks:
clock.set_ros_time_override(time_from_msg)

def _on_parameter_event(self, parameter_list):
def _on_parameter_event(self, parameter_list: List[Parameter]) -> SetParametersResult:
successful = True
reason = ''

Expand All @@ -142,7 +147,7 @@ def _on_parameter_event(self, parameter_list):

return SetParametersResult(successful=successful, reason=reason)

def _get_node(self):
def _get_node(self) -> Optional['Node']:
if self._node_weak_ref is not None:
return self._node_weak_ref()
return None

0 comments on commit a3ecd7e

Please sign in to comment.