Skip to content

Commit

Permalink
Fix ThreadBasedCyclicSendTask thread not being stopped on Windows (#1679
Browse files Browse the repository at this point in the history
)

Also adding unit test to cover RestartableCyclicTaskABC

Co-authored-by: Pierre-Luc Tessier Gagné <[email protected]>
  • Loading branch information
pierreluctg and Pierre-Luc Tessier Gagné authored Oct 16, 2023
1 parent b2689ae commit e869fb7
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
5 changes: 3 additions & 2 deletions can/broadcastmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,10 @@ def __init__(
self.start()

def stop(self) -> None:
if USE_WINDOWS_EVENTS:
win32event.CancelWaitableTimer(self.event.handle)
self.stopped = True
if USE_WINDOWS_EVENTS:
# Reset and signal any pending wait by setting the timer to 0
win32event.SetWaitableTimer(self.event.handle, 0, 0, None, None, False)

def start(self) -> None:
self.stopped = False
Expand Down
35 changes: 35 additions & 0 deletions test/simplecyclic_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,41 @@ def test_stopping_perodic_tasks(self):

bus.shutdown()

def test_restart_perodic_tasks(self):
period = 0.01
safe_timeout = period * 5

msg = can.Message(
is_extended_id=False, arbitration_id=0x123, data=[0, 1, 2, 3, 4, 5, 6, 7]
)

with can.ThreadSafeBus(interface="virtual", receive_own_messages=True) as bus:
task = bus.send_periodic(msg, period)
self.assertIsInstance(task, can.broadcastmanager.RestartableCyclicTaskABC)

# Test that the task is sending messages
sleep(safe_timeout)
assert not bus.queue.empty(), "messages should have been transmitted"

# Stop the task and check that messages are no longer being sent
bus.stop_all_periodic_tasks(remove_tasks=False)
sleep(safe_timeout)
while not bus.queue.empty():
bus.recv(timeout=period)
sleep(safe_timeout)
assert bus.queue.empty(), "messages should not have been transmitted"

# Restart the task and check that messages are being sent again
task.start()
sleep(safe_timeout)
assert not bus.queue.empty(), "messages should have been transmitted"

# Stop all tasks and wait for the thread to exit
bus.stop_all_periodic_tasks()
if isinstance(task, can.broadcastmanager.ThreadBasedCyclicSendTask):
# Avoids issues where the thread is still running when the bus is shutdown
task.thread.join(safe_timeout)

@unittest.skipIf(IS_CI, "fails randomly when run on CI server")
def test_thread_based_cyclic_send_task(self):
bus = can.ThreadSafeBus(interface="virtual")
Expand Down

0 comments on commit e869fb7

Please sign in to comment.