From 1f2394486e05e17020f903becc85779d7bb65987 Mon Sep 17 00:00:00 2001 From: Stephen Schueth Date: Fri, 19 Jan 2024 13:33:06 -0600 Subject: [PATCH 1/3] Added VariableRateCyclicTaskABC and updated ThreadBasedCyclicSendTask --- can/broadcastmanager.py | 145 ++++++++++++++++++++++++++++------------ can/bus.py | 17 ++++- 2 files changed, 120 insertions(+), 42 deletions(-) diff --git a/can/broadcastmanager.py b/can/broadcastmanager.py index a610b7a8a..1b2afef55 100644 --- a/can/broadcastmanager.py +++ b/can/broadcastmanager.py @@ -67,12 +67,13 @@ def __init__( :raises ValueError: If the given messages are invalid """ messages = self._check_and_convert_messages(messages) - - # Take the Arbitration ID of the first element - self.arbitration_id = messages[0].arbitration_id + self.msgs_len = len(messages) + # Take the Arbitration ID of each message and put them into a list + self.arbitration_id = [self.messages[idx].arbitration_id for idx in range(self.msgs_len)] self.period = period self.period_ns = int(round(period * 1e9)) self.messages = messages + self.msg_index = 0 @staticmethod def _check_and_convert_messages( @@ -81,8 +82,7 @@ def _check_and_convert_messages( """Helper function to convert a Message or Sequence of messages into a tuple, and raises an error when the given value is invalid. - Performs error checking to ensure that all Messages have the same - arbitration ID and channel. + Performs error checking to ensure that all Messages have the same channel. Should be called when the cyclic task is initialized. @@ -97,12 +97,6 @@ def _check_and_convert_messages( raise ValueError("Must be at least a list or tuple of length 1") messages = tuple(messages) - all_same_id = all( - message.arbitration_id == messages[0].arbitration_id for message in messages - ) - if not all_same_id: - raise ValueError("All Arbitration IDs should be the same") - all_same_channel = all( message.channel == messages[0].channel for message in messages ) @@ -154,16 +148,17 @@ def _check_modified_messages(self, messages: Tuple[Message, ...]) -> None: :raises ValueError: If the given messages are invalid """ - if len(self.messages) != len(messages): + if self.msgs_len != len(messages): raise ValueError( "The number of new cyclic messages to be sent must be equal to " "the number of messages originally specified for this task" ) - if self.arbitration_id != messages[0].arbitration_id: - raise ValueError( - "The arbitration ID of new cyclic messages cannot be changed " - "from when the task was created" - ) + for idx in range(self.msgs_len): + if self.arbitration_id[idx] != messages[idx].arbitration_id: + raise ValueError( + "The arbitration ID of new cyclic messages cannot be changed " + "from when the task was created" + ) def modify_data(self, messages: Union[Sequence[Message], Message]) -> None: """Update the contents of the periodically sent messages, without @@ -185,6 +180,68 @@ def modify_data(self, messages: Union[Sequence[Message], Message]) -> None: self.messages = messages +class VariableRateCyclicTaskABC(CyclicSendTaskABC, abc.ABC): + """A Cyclic task that supports a group period and intra-message period.""" + def _check_and_apply_period_intra( + self, period_intra: Optional[float] + ) -> None: + """ + Helper function that checks if the given period_intra is valid and applies the + variable rate attributes to be used in the cyclic task. + + :param period_intra: + The period in seconds to send intra-message. + + :raises ValueError: If the given period_intra is invalid + """ + self._is_variable_rate = False + self._run_cnt_msgs = None + self._run_cnt_max = None + self._run_cnt = None + + if period_intra is not None: + if not isinstance(period_intra, float): + raise ValueError("period_intra must be a float") + if period_intra <= 0: + raise ValueError("period_intra must be greater than 0") + if self.msgs_len <= 1: + raise ValueError("period_intra can only be used with multiple messages") + if period_intra*self.msgs_len >= self.period: + raise ValueError("period_intra per intra-message must be less than period") + period_ms = int(round(self.period * 1000, 0)) + period_intra_ms = int(round(period_intra * 1000, 0)) + (_run_period_ms, msg_cnts, group_cnts) = self._find_gcd(period_ms, period_intra_ms) + self._is_variable_rate = True + self._run_cnt_msgs = [i*msg_cnts for i in range(self.msgs_len)] + self._run_cnt_max = group_cnts + self._run_cnt = 0 + # Override period, period_ms, and period_ns to be the variable period + self.period = _run_period_ms / 1000 + self.period_ms = _run_period_ms + self.period_ns = _run_period_ms * 1000000 + + @staticmethod + def _find_gcd( + period_ms: int, + period_intra_ms: int, + ) -> Tuple[int, int, int]: + """ + Helper function that finds the greatest common divisor between period_ms and period_intra_ms. + + :returns: + Tuple of (gcd_ms, m_steps, n_steps) + * gcd_ms: greatest common divisor in milliseconds + * m_steps: number of steps to send intra-message + * n_steps: number of steps to send message group + """ + gcd_ms = min(period_ms, period_intra_ms) + while gcd_ms > 1: + if period_ms % gcd_ms == 0 and period_intra_ms % gcd_ms == 0: + break + gcd_ms -= 1 + m_steps = int(period_intra_ms / gcd_ms) + n_steps = int(period_ms / gcd_ms) + return (gcd_ms, m_steps, n_steps) class MultiRateCyclicSendTaskABC(CyclicSendTaskABC, abc.ABC): """A Cyclic send task that supports switches send frequency after a set time.""" @@ -214,7 +271,7 @@ def __init__( class ThreadBasedCyclicSendTask( - LimitedDurationCyclicSendTaskABC, ModifiableCyclicTaskABC, RestartableCyclicTaskABC + LimitedDurationCyclicSendTaskABC, ModifiableCyclicTaskABC, RestartableCyclicTaskABC, VariableRateCyclicTaskABC ): """Fallback cyclic send task using daemon thread.""" @@ -227,6 +284,7 @@ def __init__( duration: Optional[float] = None, on_error: Optional[Callable[[Exception], bool]] = None, modifier_callback: Optional[Callable[[Message], None]] = None, + period_intra: Optional[float] = None, ) -> None: """Transmits `messages` with a `period` seconds for `duration` seconds on a `bus`. @@ -253,9 +311,11 @@ def __init__( ) self.on_error = on_error self.modifier_callback = modifier_callback + self._check_and_apply_period_intra(period_intra) if USE_WINDOWS_EVENTS: - self.period_ms = int(round(period * 1000, 0)) + if not self._is_variable_rate: + self.period_ms = int(round(period * 1000, 0)) try: self.event = win32event.CreateWaitableTimerEx( None, @@ -289,7 +349,7 @@ def start(self) -> None: self.thread.start() def _run(self) -> None: - msg_index = 0 + self.msg_index = 0 msg_due_time_ns = time.perf_counter_ns() if USE_WINDOWS_EVENTS: @@ -297,33 +357,33 @@ def _run(self) -> None: win32event.WaitForSingleObject(self.event.handle, 0) while not self.stopped: + msg_send = (self._run_cnt in self._run_cnt_msgs) if self._is_variable_rate else True if self.end_time is not None and time.perf_counter() >= self.end_time: break - - # Prevent calling bus.send from multiple threads - with self.send_lock: - try: - if self.modifier_callback is not None: - self.modifier_callback(self.messages[msg_index]) - self.bus.send(self.messages[msg_index]) - except Exception as exc: # pylint: disable=broad-except - log.exception(exc) - - # stop if `on_error` callback was not given - if self.on_error is None: - self.stop() - raise exc - - # stop if `on_error` returns False - if not self.on_error(exc): - self.stop() - break + if msg_send: + # Prevent calling bus.send from multiple threads + with self.send_lock: + try: + if self.modifier_callback is not None: + self.modifier_callback(self.messages[self.msg_index]) + self.bus.send(self.messages[self.msg_index]) + except Exception as exc: # pylint: disable=broad-except + log.exception(exc) + + # stop if `on_error` callback was not given + if self.on_error is None: + self.stop() + raise exc + + # stop if `on_error` returns False + if not self.on_error(exc): + self.stop() + break + self.msg_index = (self.msg_index + 1) % self.msgs_len if not USE_WINDOWS_EVENTS: msg_due_time_ns += self.period_ns - msg_index = (msg_index + 1) % len(self.messages) - if USE_WINDOWS_EVENTS: win32event.WaitForSingleObject( self.event.handle, @@ -334,3 +394,6 @@ def _run(self) -> None: delay_ns = msg_due_time_ns - time.perf_counter_ns() if delay_ns > 0: time.sleep(delay_ns / NANOSECONDS_IN_SECOND) + + if self._is_variable_rate: + self._run_cnt = (self._run_cnt + 1) % self._run_cnt_max diff --git a/can/bus.py b/can/bus.py index c3a906757..3f551a817 100644 --- a/can/bus.py +++ b/can/bus.py @@ -210,6 +210,7 @@ def send_periodic( duration: Optional[float] = None, store_task: bool = True, modifier_callback: Optional[Callable[[Message], None]] = None, + period_intra: Optional[float] = None, ) -> can.broadcastmanager.CyclicSendTaskABC: """Start sending messages at a given period on this bus. @@ -235,6 +236,10 @@ def send_periodic( Function which should be used to modify each message's data before sending. The callback modifies the :attr:`~can.Message.data` of the message and returns ``None``. + :param period_intra: + Period in seconds between each message when sending multiple messages + in a sequence. If not provided, the period will be used for each + message. :return: A started task instance. Note the task can be stopped (and depending on the backend modified) by calling the task's @@ -266,7 +271,7 @@ def send_periodic( # Create a backend specific task; will be patched to a _SelfRemovingCyclicTask later task = cast( _SelfRemovingCyclicTask, - self._send_periodic_internal(msgs, period, duration, modifier_callback), + self._send_periodic_internal(msgs, period, duration, modifier_callback, period_intra), ) # we wrap the task's stop method to also remove it from the Bus's list of tasks periodic_tasks = self._periodic_tasks @@ -294,6 +299,7 @@ def _send_periodic_internal( period: float, duration: Optional[float] = None, modifier_callback: Optional[Callable[[Message], None]] = None, + period_intra: Optional[float] = None, ) -> can.broadcastmanager.CyclicSendTaskABC: """Default implementation of periodic message sending using threading. @@ -306,6 +312,14 @@ def _send_periodic_internal( :param duration: The duration between sending each message at the given rate. If no duration is provided, the task will continue indefinitely. + :param modifier_callback: + Function which should be used to modify each message's data before + sending. The callback modifies the :attr:`~can.Message.data` of the + message and returns ``None``. + :param period_intra: + Period in seconds between each message when sending multiple messages + in a sequence. If not provided, the period will be used for each + message. :return: A started task instance. Note the task can be stopped (and depending on the backend modified) by calling the @@ -323,6 +337,7 @@ def _send_periodic_internal( period=period, duration=duration, modifier_callback=modifier_callback, + period_intra=period_intra, ) return task From 183e4b5f09d5fb901183d5e30aeff3eeaed5f895 Mon Sep 17 00:00:00 2001 From: Stephen Schueth Date: Mon, 22 Jan 2024 17:55:26 -0600 Subject: [PATCH 2/3] fixed bug within broadcastmanager causing error with self.messages --- can/broadcastmanager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/can/broadcastmanager.py b/can/broadcastmanager.py index 1b2afef55..0fff2e630 100644 --- a/can/broadcastmanager.py +++ b/can/broadcastmanager.py @@ -68,11 +68,11 @@ def __init__( """ messages = self._check_and_convert_messages(messages) self.msgs_len = len(messages) + self.messages = messages # Take the Arbitration ID of each message and put them into a list self.arbitration_id = [self.messages[idx].arbitration_id for idx in range(self.msgs_len)] self.period = period self.period_ns = int(round(period * 1e9)) - self.messages = messages self.msg_index = 0 @staticmethod From 63ab27441899b70c4137db9744e9854146d58fec Mon Sep 17 00:00:00 2001 From: Stephen Schueth Date: Wed, 24 Jan 2024 10:02:18 -0600 Subject: [PATCH 3/3] updated socketcan to include period_intra attribute, logic not implemented; updated default values in _check_and_apply_period_intra() --- can/broadcastmanager.py | 6 +++--- can/interfaces/socketcan/socketcan.py | 8 +++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/can/broadcastmanager.py b/can/broadcastmanager.py index 0fff2e630..50301e34e 100644 --- a/can/broadcastmanager.py +++ b/can/broadcastmanager.py @@ -195,9 +195,9 @@ def _check_and_apply_period_intra( :raises ValueError: If the given period_intra is invalid """ self._is_variable_rate = False - self._run_cnt_msgs = None - self._run_cnt_max = None - self._run_cnt = None + self._run_cnt_msgs = [0] + self._run_cnt_max = 1 + self._run_cnt = 0 if period_intra is not None: if not isinstance(period_intra, float): diff --git a/can/interfaces/socketcan/socketcan.py b/can/interfaces/socketcan/socketcan.py index cdf4afac6..5b5153a17 100644 --- a/can/interfaces/socketcan/socketcan.py +++ b/can/interfaces/socketcan/socketcan.py @@ -23,6 +23,7 @@ LimitedDurationCyclicSendTaskABC, ModifiableCyclicTaskABC, RestartableCyclicTaskABC, + VariableRateCyclicTaskABC, ) from can.interfaces.socketcan import constants from can.interfaces.socketcan.utils import find_available_interfaces, pack_filters @@ -303,7 +304,7 @@ def _compose_arbitration_id(message: Message) -> int: class CyclicSendTask( - LimitedDurationCyclicSendTaskABC, ModifiableCyclicTaskABC, RestartableCyclicTaskABC + LimitedDurationCyclicSendTaskABC, ModifiableCyclicTaskABC, RestartableCyclicTaskABC, VariableRateCyclicTaskABC ): """ A SocketCAN cyclic send task supports: @@ -320,6 +321,7 @@ def __init__( messages: Union[Sequence[Message], Message], period: float, duration: Optional[float] = None, + period_intra: Optional[float] = None, ) -> None: """Construct and :meth:`~start` a task. @@ -339,7 +341,6 @@ def __init__( # - self.period # - self.duration super().__init__(messages, period, duration) - self.bcm_socket = bcm_socket self.task_id = task_id self._tx_setup(self.messages) @@ -809,6 +810,7 @@ def _send_periodic_internal( period: float, duration: Optional[float] = None, modifier_callback: Optional[Callable[[Message], None]] = None, + period_intra: Optional[float] = None, ) -> can.broadcastmanager.CyclicSendTaskABC: """Start sending messages at a given period on this bus. @@ -849,7 +851,7 @@ def _send_periodic_internal( msgs_channel = str(msgs[0].channel) if msgs[0].channel else None bcm_socket = self._get_bcm_socket(msgs_channel or self.channel) task_id = self._get_next_task_id() - task = CyclicSendTask(bcm_socket, task_id, msgs, period, duration) + task = CyclicSendTask(bcm_socket, task_id, msgs, period, duration, period_intra) return task # fallback to thread based cyclic task