From a7e1ad0825bd31e7144f0ead34262caa91747307 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Fri, 1 Mar 2024 11:29:12 +0100 Subject: [PATCH] add semaphore for concurrent node setups --- matter_server/server/device_controller.py | 80 ++++++++++++----------- 1 file changed, 42 insertions(+), 38 deletions(-) diff --git a/matter_server/server/device_controller.py b/matter_server/server/device_controller.py index 642b2586..5ddaf316 100644 --- a/matter_server/server/device_controller.py +++ b/matter_server/server/device_controller.py @@ -118,6 +118,7 @@ def __init__( self._sdk_executor = ThreadPoolExecutor( max_workers=1, thread_name_prefix="SDKExecutor" ) + self._node_setup_throttle = asyncio.Semaphore(10) async def initialize(self) -> None: """Async initialize of controller.""" @@ -1070,53 +1071,56 @@ async def _setup_node(self, node_id: int) -> None: return self._nodes_in_setup.add(node_id) try: - # Ping the node to rule out stale mdns reports and to prevent that we - # send an unreachable node to the sdk which is very slow with resolving it. - # This will also precache the ip addresses of the node for later use. - ping_result = await self.ping_node( - node_id, attempts=3, allow_cached_ips=False - ) - if not any(ping_result.values()): - LOGGER.warning( - "Skip set-up for node %s because it does not appear to be reachable...", - node_id, + async with self._node_setup_throttle: + # Ping the node to rule out stale mdns reports and to prevent that we + # send an unreachable node to the sdk which is very slow with resolving it. + # This will also precache the ip addresses of the node for later use. + ping_result = await self.ping_node( + node_id, attempts=3, allow_cached_ips=False ) - return - LOGGER.info("Setting-up node %s...", node_id) - # (re)interview node (only) if needed - node_data = self._nodes[node_id] - if ( - # re-interview if we dont have any node attributes (empty node) - not node_data.attributes - # re-interview if the data model schema has changed - or node_data.interview_version != DATA_MODEL_SCHEMA_VERSION - ): + if not any(ping_result.values()): + LOGGER.warning( + "Skip set-up for node %s because it does not appear to be reachable...", + node_id, + ) + return + LOGGER.info("Setting-up node %s...", node_id) + # (re)interview node (only) if needed + node_data = self._nodes[node_id] + if ( + # re-interview if we dont have any node attributes (empty node) + not node_data.attributes + # re-interview if the data model schema has changed + or node_data.interview_version != DATA_MODEL_SCHEMA_VERSION + ): + try: + await self.interview_node(node_id) + except (NodeNotResolving, NodeInterviewFailed) as err: + LOGGER.warning( + "Unable to interview Node %s: %s", + node_id, + str(err) or err.__class__.__name__, + # log full stack trace if debug logging is enabled + exc_info=err + if LOGGER.isEnabledFor(logging.DEBUG) + else None, + ) + # NOTE: the node will be picked up by mdns discovery automatically + # when it comes available again. + return + # setup subscriptions for the node try: - await self.interview_node(node_id) - except (NodeNotResolving, NodeInterviewFailed) as err: + await self._subscribe_node(node_id) + except (NodeNotResolving, ChipStackError) as err: LOGGER.warning( - "Unable to interview Node %s: %s", + "Unable to subscribe to Node %s: %s", node_id, str(err) or err.__class__.__name__, # log full stack trace if debug logging is enabled exc_info=err if LOGGER.isEnabledFor(logging.DEBUG) else None, ) # NOTE: the node will be picked up by mdns discovery automatically - # when it comes available again. - return - # setup subscriptions for the node - try: - await self._subscribe_node(node_id) - except (NodeNotResolving, ChipStackError) as err: - LOGGER.warning( - "Unable to subscribe to Node %s: %s", - node_id, - str(err) or err.__class__.__name__, - # log full stack trace if debug logging is enabled - exc_info=err if LOGGER.isEnabledFor(logging.DEBUG) else None, - ) - # NOTE: the node will be picked up by mdns discovery automatically - # when it becomes available again. + # when it becomes available again. finally: self._nodes_in_setup.discard(node_id)