diff --git a/can/broadcastmanager.py b/can/broadcastmanager.py index 398114a59..0ac9b6adc 100644 --- a/can/broadcastmanager.py +++ b/can/broadcastmanager.py @@ -269,9 +269,10 @@ def __init__( self.start() def stop(self) -> None: - if USE_WINDOWS_EVENTS: - win32event.CancelWaitableTimer(self.event.handle) self.stopped = True + if USE_WINDOWS_EVENTS: + # Reset and signal any pending wait by setting the timer to 0 + win32event.SetWaitableTimer(self.event.handle, 0, 0, None, None, False) def start(self) -> None: self.stopped = False diff --git a/test/simplecyclic_test.py b/test/simplecyclic_test.py index 650a1fddf..21e88e9f0 100644 --- a/test/simplecyclic_test.py +++ b/test/simplecyclic_test.py @@ -152,6 +152,41 @@ def test_stopping_perodic_tasks(self): bus.shutdown() + def test_restart_perodic_tasks(self): + period = 0.01 + safe_timeout = period * 5 + + msg = can.Message( + is_extended_id=False, arbitration_id=0x123, data=[0, 1, 2, 3, 4, 5, 6, 7] + ) + + with can.ThreadSafeBus(interface="virtual", receive_own_messages=True) as bus: + task = bus.send_periodic(msg, period) + self.assertIsInstance(task, can.broadcastmanager.RestartableCyclicTaskABC) + + # Test that the task is sending messages + sleep(safe_timeout) + assert not bus.queue.empty(), "messages should have been transmitted" + + # Stop the task and check that messages are no longer being sent + bus.stop_all_periodic_tasks(remove_tasks=False) + sleep(safe_timeout) + while not bus.queue.empty(): + bus.recv(timeout=period) + sleep(safe_timeout) + assert bus.queue.empty(), "messages should not have been transmitted" + + # Restart the task and check that messages are being sent again + task.start() + sleep(safe_timeout) + assert not bus.queue.empty(), "messages should have been transmitted" + + # Stop all tasks and wait for the thread to exit + bus.stop_all_periodic_tasks() + if isinstance(task, can.broadcastmanager.ThreadBasedCyclicSendTask): + # Avoids issues where the thread is still running when the bus is shutdown + task.thread.join(safe_timeout) + @unittest.skipIf(IS_CI, "fails randomly when run on CI server") def test_thread_based_cyclic_send_task(self): bus = can.ThreadSafeBus(interface="virtual")