From 459b2030b48c57efeee0604528a2a9be3ef5d17e Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Wed, 7 Feb 2024 20:12:35 +0100 Subject: [PATCH] Refactor the interview logic a small bit (#538) --- matter_server/server/const.py | 6 ++ matter_server/server/device_controller.py | 68 +++++------------------ 2 files changed, 20 insertions(+), 54 deletions(-) diff --git a/matter_server/server/const.py b/matter_server/server/const.py index b6cd8392..2a6140b0 100644 --- a/matter_server/server/const.py +++ b/matter_server/server/const.py @@ -1,10 +1,16 @@ """Server-only constants for the Python Matter Server.""" + import pathlib from typing import Final # The minimum schema version (of a client) the server can support MIN_SCHEMA_VERSION = 5 +# schema version of our data model +# only bump if the format of the data in MatterNodeData changed +# and a full re-interview is mandatory +DATA_MODEL_SCHEMA_VERSION = 6 + # the paa-root-certs path is hardcoded in the sdk at this time # and always uses the development subfolder # regardless of anything you pass into instantiating the controller diff --git a/matter_server/server/device_controller.py b/matter_server/server/device_controller.py index 012eb9ab..2f35027d 100644 --- a/matter_server/server/device_controller.py +++ b/matter_server/server/device_controller.py @@ -47,7 +47,7 @@ MatterNodeEvent, NodePingResult, ) -from .const import PAA_ROOT_CERTS_DIR +from .const import DATA_MODEL_SCHEMA_VERSION, PAA_ROOT_CERTS_DIR from .helpers.paa_certificates import fetch_certificates if TYPE_CHECKING: @@ -501,7 +501,7 @@ async def interview_node(self, node_id: int) -> None: existing_info.date_commissioned if existing_info else datetime.utcnow() ), last_interview=datetime.utcnow(), - interview_version=SCHEMA_VERSION, + interview_version=DATA_MODEL_SCHEMA_VERSION, available=True, attributes=parse_attributes_from_read_result(read_response.tlvAttributes), ) @@ -1050,48 +1050,28 @@ async def _call_sdk(self, func: Callable[..., _T], *args: Any, **kwargs: Any) -> ), ) - async def _check_interview_and_subscription( - self, node_id: int, reschedule_interval: int = 30 - ) -> None: - """Handle interview (if needed) and subscription for known node.""" - + async def _setup_node(self, node_id: int) -> None: + """Handle set-up of subscriptions and interview (if needed) for known/discovered node.""" if node_id not in self._nodes: raise NodeNotExists(f"Node {node_id} does not exist.") # (re)interview node (only) if needed - node_data = self._nodes.get(node_id) + node_data = self._nodes[node_id] if ( - node_data is None - # re-interview if the schema has changed - or node_data.interview_version < SCHEMA_VERSION + # re-interview if we dont have any node attributes (empty node) + not node_data.attributes + # re-interview if the data model schema has changed + or node_data.interview_version != DATA_MODEL_SCHEMA_VERSION ): try: await self.interview_node(node_id) - except NodeNotResolving: - LOGGER.warning( - "Unable to interview Node %s as it is unavailable", - node_id, - ) - # NOTE: the node will be picked up by mdns discovery automatically - # when it becomes available again. - except NodeInterviewFailed: - LOGGER.warning( - "Unable to interview Node %s, will retry later in the background.", - node_id, - ) - # reschedule interview on error - # increase interval at each attempt with maximum of - # MAX_POLL_INTERVAL seconds (= 10 minutes) - self._schedule_interview( - node_id, - min(reschedule_interval + 10, MAX_POLL_INTERVAL), - ) + except (NodeNotResolving, NodeInterviewFailed) as err: + LOGGER.warning("Unable to interview Node %s", exc_info=err) + # NOTE: the node will be picked up by mdns discovery automatically + # when it comes available again. return # setup subscriptions for the node - if node_id in self._subscriptions: - return - try: await self._subscribe_node(node_id) except NodeNotResolving: @@ -1102,26 +1082,6 @@ async def _check_interview_and_subscription( # NOTE: the node will be picked up by mdns discovery automatically # when it becomes available again. - def _schedule_interview(self, node_id: int, delay: int) -> None: - """(Re)Schedule interview and/or initial subscription for a node.""" - assert self.server.loop is not None - # cancel any existing (re)schedule timer - if existing := self._sub_retry_timer.pop(node_id, None): - existing.cancel() - - def create_interview_task() -> None: - asyncio.create_task( - self._check_interview_and_subscription( - node_id, - ) - ) - # the handle to the timer can now be removed - self._sub_retry_timer.pop(node_id, None) - - self._sub_retry_timer[node_id] = self.server.loop.call_later( - delay, create_interview_task - ) - async def _resolve_node( self, node_id: int, retries: int = 2, attempt: int = 1 ) -> DeviceProxyWrapper: @@ -1230,7 +1190,7 @@ async def _process_mdns_queue( continue # node is already set-up, no action needed LOGGER.info("Node %s discovered on MDNS", node_id) # setup the node - await self._check_interview_and_subscription(node_id) + await self._setup_node(node_id) elif state_change == ServiceStateChange.Removed: if not node.available: continue # node is already offline, nothing to do