Skip to content

Commit

Permalink
Implement enum to represent CAN protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
lumagi committed Mar 7, 2023
1 parent b2ec42c commit 14cba14
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 16 deletions.
2 changes: 1 addition & 1 deletion can/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from .util import set_logging_level

from .message import Message
from .bus import BusABC, BusState
from .bus import BusABC, BusState, CANProtocol
from .thread_safe_bus import ThreadSafeBus
from .notifier import Notifier
from .interfaces import VALID_INTERFACES
Expand Down
29 changes: 23 additions & 6 deletions can/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ class BusState(Enum):
ERROR = auto()


class CANProtocol(Enum):
"""The CAN protocol type supported by a :clas:`can.BusABC` instance"""

CAN_20 = auto()
CAN_FD = auto()
CAN_XL = auto()


class BusABC(metaclass=ABCMeta):
"""The CAN Bus Abstract Base Class that serves as the basis
for all concrete interfaces.
Expand All @@ -48,7 +56,7 @@ class BusABC(metaclass=ABCMeta):
def __init__(
self,
channel: Any,
is_fd: bool = False,
protocol: CANProtocol = CANProtocol.CAN_20,
can_filters: Optional[can.typechecking.CanFilters] = None,
**kwargs: object
):
Expand All @@ -60,8 +68,12 @@ def __init__(
:param channel:
The can interface identifier. Expected type is backend dependent.
:param is_fd:
Indicates that this bus supports CAN-FD.
:param protocol:
The CAN protocol currently used by this bus instance. This value
is determined at initialization time (based on the initialization
parameters or because the bus interface only supports a specific
protocol) and does not change during the lifetime of a bus
instance.
:param can_filters:
See :meth:`~can.BusABC.set_filters` for details.
Expand All @@ -75,7 +87,7 @@ def __init__(
:raises ~can.exceptions.CanInitializationError:
If the bus cannot be initialized
"""
self._is_fd = is_fd
self._can_protocol = protocol
self._periodic_tasks: List[_SelfRemovingCyclicTask] = []
self.set_filters(can_filters)

Expand Down Expand Up @@ -448,8 +460,13 @@ def state(self, new_state: BusState) -> None:
raise NotImplementedError("Property is not implemented.")

@property
def is_fd(self) -> bool:
return self._is_fd
def protocol(self) -> CANProtocol:
"""Return the CAN protocol used by this bus instance.
This value is set at initialization time and does not change
during the lifetime of a bus instance.
"""
return self._can_protocol

@staticmethod
def _detect_available_configs() -> List[can.typechecking.AutoDetectedConfig]:
Expand Down
13 changes: 9 additions & 4 deletions can/interfaces/pcan/pcan.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from can import (
BusABC,
BusState,
CANProtocol,
BitTiming,
BitTimingFd,
Message,
Expand Down Expand Up @@ -339,7 +340,11 @@ def __init__(
raise PcanCanInitializationError(self._get_formatted_error(result))

super().__init__(
channel=channel, is_fd=is_fd, state=state, bitrate=bitrate, **kwargs
channel=channel,
protocol=CANProtocol.CAN_FD if is_fd else CANProtocol.CAN_20,
state=state,
bitrate=bitrate,
**kwargs,
)

def _find_channel_by_dev_id(self, device_id):
Expand Down Expand Up @@ -486,7 +491,7 @@ def _recv_internal(
end_time = time.time() + timeout if timeout is not None else None

while True:
if self.is_fd:
if self.protocol == CANProtocol.CAN_FD:
result, pcan_msg, pcan_timestamp = self.m_objPCANBasic.ReadFD(
self.m_PcanHandle
)
Expand Down Expand Up @@ -548,7 +553,7 @@ def _recv_internal(
error_state_indicator = bool(pcan_msg.MSGTYPE & PCAN_MESSAGE_ESI.value)
is_error_frame = bool(pcan_msg.MSGTYPE & PCAN_MESSAGE_ERRFRAME.value)

if self.is_fd:
if self.protocol == CANProtocol.CAN_FD:
dlc = dlc2len(pcan_msg.DLC)
timestamp = boottimeEpoch + (pcan_timestamp.value / (1000.0 * 1000.0))
else:
Expand Down Expand Up @@ -594,7 +599,7 @@ def send(self, msg, timeout=None):
if msg.error_state_indicator:
msgType |= PCAN_MESSAGE_ESI.value

if self.is_fd:
if self.protocol == CANProtocol.CAN_FD:
# create a TPCANMsg message structure
CANMsg = TPCANMsgFD()

Expand Down
10 changes: 5 additions & 5 deletions test/test_pcan.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from parameterized import parameterized

import can
from can.bus import BusState
from can import BusState, CANProtocol
from can.exceptions import CanInitializationError
from can.interfaces.pcan import PcanBus, PcanError
from can.interfaces.pcan.basic import *
Expand Down Expand Up @@ -53,7 +53,7 @@ def test_bus_creation(self) -> None:
self.bus = can.Bus(interface="pcan")

self.assertIsInstance(self.bus, PcanBus)
self.assertFalse(self.bus.is_fd)
self.assertEqual(self.bus.protocol, CANProtocol.CAN_20)

self.MockPCANBasic.assert_called_once()
self.mock_pcan.Initialize.assert_called_once()
Expand Down Expand Up @@ -81,7 +81,7 @@ def test_bus_creation_fd(self, clock_param: str, clock_val: int) -> None:
)

self.assertIsInstance(self.bus, PcanBus)
self.assertTrue(self.bus.is_fd)
self.assertEqual(self.bus.protocol, CANProtocol.CAN_FD)

self.MockPCANBasic.assert_called_once()
self.mock_pcan.Initialize.assert_not_called()
Expand Down Expand Up @@ -459,7 +459,7 @@ def test_constructor_bit_timing(self):

bitrate_arg = self.mock_pcan.Initialize.call_args[0][1]
self.assertEqual(bitrate_arg.value, 0x472F)
self.assertFalse(bus.is_fd)
self.assertEqual(bus.protocol, CANProtocol.CAN_20)

def test_constructor_bit_timing_fd(self):
timing = can.BitTimingFd(
Expand All @@ -474,7 +474,7 @@ def test_constructor_bit_timing_fd(self):
data_sjw=1,
)
bus = can.Bus(interface="pcan", channel="PCAN_USBBUS1", timing=timing)
self.assertTrue(bus.is_fd)
self.assertEqual(bus.protocol, CANProtocol.CAN_FD)

bitrate_arg = self.mock_pcan.InitializeFD.call_args[0][-1]

Expand Down

0 comments on commit 14cba14

Please sign in to comment.