Skip to content

Commit

Permalink
Add parameter for on_error to BusABC.send_periodic
Browse files Browse the repository at this point in the history
If there is an error while sending messages in a (background) periodic
task, then the `on_error` callback is called, if set. If the callback is
configured and returns `True` then the task continues, otherwise it is
aborted.

While it is possible to update a task with a callback after the task has
been created, this procedure is prone to a race condition where the
callback might not be configured in time for the first send event. Thus,
if the first send fails and the callback has not yet been configured,
the task will abort.

This commit solves the race condition issue by adding an argument to
`BusABC.send_periodic` to specify the callback. By including the
callback in the constructor it will be deterministically active for all
sends in the task. This fixes issue #1282.

This commit also adds myself to the CONTRIBUTORS list.
  • Loading branch information
andebjor committed Mar 16, 2022
1 parent af55b0a commit a0e1f92
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
3 changes: 2 additions & 1 deletion CONTRIBUTORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,5 @@ Felix Nieuwenhuizen
@fjburgos
@pkess
@felixn
@Tbruno25
@Tbruno25
@andebjor
19 changes: 16 additions & 3 deletions can/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Contains the ABC bus implementation and its documentation.
"""

from typing import cast, Any, Iterator, List, Optional, Sequence, Tuple, Union
from typing import cast, Any, Callable, Iterator, List, Optional, Sequence, Tuple, Union

import can.typechecking

Expand Down Expand Up @@ -181,6 +181,7 @@ def send_periodic(
period: float,
duration: Optional[float] = None,
store_task: bool = True,
on_error: Optional[Callable[[Exception], bool]] = None,
) -> can.broadcastmanager.CyclicSendTaskABC:
"""Start sending messages at a given period on this bus.
Expand All @@ -191,6 +192,9 @@ def send_periodic(
- the Bus instance is shutdown
- :meth:`BusABC.stop_all_periodic_tasks()` is called
- the task's :meth:`CyclicTask.stop()` method is called.
- an error while sending and the (optional) `on_error` callback does
not return `True`. If the callback is not specified the task is
deactivated on error.
:param msgs:
Message(s) to transmit
Expand All @@ -202,6 +206,10 @@ def send_periodic(
:param store_task:
If True (the default) the task will be attached to this Bus instance.
Disable to instead manage tasks manually.
:param on_error:
Callable that accepts an exception if any error happened on a `bus`
while sending `msgs`, it shall return either ``True`` or ``False``
depending on desired behaviour of `ThreadBasedCyclicSendTask`.
:return:
A started task instance. Note the task can be stopped (and depending on
the backend modified) by calling the task's :meth:`stop` method.
Expand Down Expand Up @@ -232,7 +240,7 @@ def send_periodic(
# Create a backend specific task; will be patched to a _SelfRemovingCyclicTask later
task = cast(
_SelfRemovingCyclicTask,
self._send_periodic_internal(msgs, period, duration),
self._send_periodic_internal(msgs, period, duration, on_error),
)

# we wrap the task's stop method to also remove it from the Bus's list of tasks
Expand Down Expand Up @@ -260,6 +268,7 @@ def _send_periodic_internal(
msgs: Union[Sequence[Message], Message],
period: float,
duration: Optional[float] = None,
on_error: Optional[Callable[[Exception], bool]] = None,
) -> can.broadcastmanager.CyclicSendTaskABC:
"""Default implementation of periodic message sending using threading.
Expand All @@ -272,6 +281,10 @@ def _send_periodic_internal(
:param duration:
The duration between sending each message at the given rate. If
no duration is provided, the task will continue indefinitely.
:param on_error:
Callable that accepts an exception if any error happened on a `bus`
while sending `msgs`, it shall return either ``True`` or ``False``
depending on desired behaviour of `ThreadBasedCyclicSendTask`.
:return:
A started task instance. Note the task can be stopped (and
depending on the backend modified) by calling the :meth:`stop`
Expand All @@ -283,7 +296,7 @@ def _send_periodic_internal(
threading.Lock()
)
task = ThreadBasedCyclicSendTask(
self, self._lock_send_periodic, msgs, period, duration
self, self._lock_send_periodic, msgs, period, duration, on_error
)
return task

Expand Down

0 comments on commit a0e1f92

Please sign in to comment.