From 0b85b496fef8efc0eb6fc86dfd3ed71ada95a1de Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Thu, 11 Jul 2024 15:10:38 +0200 Subject: [PATCH] A small optimization to the node setup (#798) --- matter_server/server/device_controller.py | 122 ++++++++++++---------- 1 file changed, 65 insertions(+), 57 deletions(-) diff --git a/matter_server/server/device_controller.py b/matter_server/server/device_controller.py index 31d2caf7..db98fa32 100644 --- a/matter_server/server/device_controller.py +++ b/matter_server/server/device_controller.py @@ -132,7 +132,7 @@ def __init__( self._aiozc: AsyncZeroconf | None = None self._fallback_node_scanner_timer: asyncio.TimerHandle | None = None self._fallback_node_scanner_task: asyncio.Task | None = None - self._node_setup_throttle = asyncio.Semaphore(5) + self._thread_node_setup_throttle = asyncio.Semaphore(5) self._mdns_event_timer: dict[str, asyncio.TimerHandle] = {} self._polled_attributes: dict[int, set[str]] = {} self._custom_attribute_poller_timer: asyncio.TimerHandle | None = None @@ -1105,6 +1105,9 @@ async def _setup_node(self, node_id: int) -> None: node_logger = LOGGER.getChild(f"node_{node_id}") node_data = self._nodes[node_id] log_timers: dict[int, asyncio.TimerHandle] = {} + is_thread_node = ( + node_data.attributes.get(ROUTING_ROLE_ATTRIBUTE_PATH) is not None + ) async def log_node_long_setup(time_start: float) -> None: """Temporary measure to track a locked-up SDK issue in some (special) circumstances.""" @@ -1136,22 +1139,49 @@ async def log_node_long_setup(time_start: float) -> None: log_timers[node_id] = self._loop.call_later( 15 * 60, lambda: asyncio.create_task(log_node_long_setup(time_start)) ) + # release semaphore to give an additional free slot for setup + # otherwise no thread nodes will be setup if 5 are stuck in this state + if is_thread_node: + self._thread_node_setup_throttle.release() + + # use semaphore for thread based devices to (somewhat) + # throttle the traffic that setup/initial subscription generates + if is_thread_node: + await self._thread_node_setup_throttle.acquire() + time_start = time.time() + # we want to track nodes that take too long so we log it when we detect that + log_timers[node_id] = self._loop.call_later( + 15 * 60, lambda: asyncio.create_task(log_node_long_setup(time_start)) + ) + try: + node_logger.info("Setting-up node...") - async with self._node_setup_throttle: - time_start = time.time() - # we want to track nodes that take too long so we log it when we detect that - log_timers[node_id] = self._loop.call_later( - 15 * 60, lambda: asyncio.create_task(log_node_long_setup(time_start)) - ) + # try to resolve the node using the sdk first before do anything else try: - node_logger.info("Setting-up node...") + await self._chip_device_controller.find_or_establish_case_session( + node_id=node_id + ) + except NodeNotResolving as err: + node_logger.warning( + "Setup for node failed: %s", + str(err) or err.__class__.__name__, + # log full stack trace if verbose logging is enabled + exc_info=err if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else None, + ) + # NOTE: the node will be picked up by mdns discovery automatically + # when it comes available again. + return - # try to resolve the node using the sdk first before do anything else + # (re)interview node (only) if needed + if ( + # re-interview if we dont have any node attributes (empty node) + not node_data.attributes + # re-interview if the data model schema has changed + or node_data.interview_version != DATA_MODEL_SCHEMA_VERSION + ): try: - await self._chip_device_controller.find_or_establish_case_session( - node_id=node_id - ) - except NodeNotResolving as err: + await self.interview_node(node_id) + except NodeInterviewFailed as err: node_logger.warning( "Setup for node failed: %s", str(err) or err.__class__.__name__, @@ -1164,52 +1194,30 @@ async def log_node_long_setup(time_start: float) -> None: # when it comes available again. return - # (re)interview node (only) if needed - if ( - # re-interview if we dont have any node attributes (empty node) - not node_data.attributes - # re-interview if the data model schema has changed - or node_data.interview_version != DATA_MODEL_SCHEMA_VERSION - ): - try: - await self.interview_node(node_id) - except NodeInterviewFailed as err: - node_logger.warning( - "Setup for node failed: %s", - str(err) or err.__class__.__name__, - # log full stack trace if verbose logging is enabled - exc_info=err - if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) - else None, - ) - # NOTE: the node will be picked up by mdns discovery automatically - # when it comes available again. - return - - # setup subscriptions for the node - try: - await self._subscribe_node(node_id) - except ChipStackError as err: - node_logger.warning( - "Unable to subscribe to Node: %s", - str(err) or err.__class__.__name__, - # log full stack trace if verbose logging is enabled - exc_info=err - if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) - else None, - ) - # NOTE: the node will be picked up by mdns discovery automatically - # when it becomes available again. - return + # setup subscriptions for the node + try: + await self._subscribe_node(node_id) + except ChipStackError as err: + node_logger.warning( + "Unable to subscribe to Node: %s", + str(err) or err.__class__.__name__, + # log full stack trace if verbose logging is enabled + exc_info=err if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else None, + ) + # NOTE: the node will be picked up by mdns discovery automatically + # when it becomes available again. + return - # check if this node has any custom clusters that need to be polled - if polled_attributes := check_polled_attributes(node_data): - self._polled_attributes[node_id] = polled_attributes - self._schedule_custom_attributes_poller() + # check if this node has any custom clusters that need to be polled + if polled_attributes := check_polled_attributes(node_data): + self._polled_attributes[node_id] = polled_attributes + self._schedule_custom_attributes_poller() - finally: - log_timers[node_id].cancel() - self._nodes_in_setup.discard(node_id) + finally: + log_timers[node_id].cancel() + self._nodes_in_setup.discard(node_id) + if is_thread_node: + self._thread_node_setup_throttle.release() def _handle_endpoints_removed(self, node_id: int, endpoints: Iterable[int]) -> None: """Handle callback for when bridge endpoint(s) get deleted."""