diff --git a/matter_server/server/device_controller.py b/matter_server/server/device_controller.py index 60ab3c0b..61422257 100644 --- a/matter_server/server/device_controller.py +++ b/matter_server/server/device_controller.py @@ -105,7 +105,6 @@ def __init__( self.thread_credentials_set: bool = False self.compressed_fabric_id: int | None = None self._node_lock: dict[int, asyncio.Lock] = {} - self._node_setup_lock: asyncio.Semaphore = asyncio.Semaphore(5) self._aiobrowser: AsyncServiceBrowser | None = None self._aiozc: AsyncZeroconf | None = None self._sdk_executor = ThreadPoolExecutor(max_workers=1) @@ -1036,51 +1035,45 @@ async def _setup_node(self, node_id: int) -> None: # prevent duplicate setup actions return self._nodes_in_setup.add(node_id) - # ping the node to out stale mdns reports and to prevent that we - # send an unreachable node to the sdk which is very slow with resolving it - # this will also precache the ip addresses of the node for later use. - ping_result = await self.ping_node(node_id) - if not any(ping_result.values()): - LOGGER.warning( - "Skip set-up for node %s because it does not appear to be reachable...", - node_id, - ) - return - # we use a lock for the node setup process to process nodes sequentially - # to prevent a flood of the (thread) network when there are many nodes being setup. - async with self._node_setup_lock: - # add this little random sleep here to do a bit of throttling - # can be optimized later - await asyncio.sleep(randint(0, 5)) # noqa: S311 + try: + # ping the node to out stale mdns reports and to prevent that we + # send an unreachable node to the sdk which is very slow with resolving it + # this will also precache the ip addresses of the node for later use. + ping_result = await self.ping_node(node_id) + if not any(ping_result.values()): + LOGGER.warning( + "Skip set-up for node %s because it does not appear to be reachable...", + node_id, + ) + return LOGGER.info("Setting-up node %s...", node_id) - try: - # (re)interview node (only) if needed - node_data = self._nodes[node_id] - if ( - # re-interview if we dont have any node attributes (empty node) - not node_data.attributes - # re-interview if the data model schema has changed - or node_data.interview_version != DATA_MODEL_SCHEMA_VERSION - ): - try: - await self.interview_node(node_id) - except (NodeNotResolving, NodeInterviewFailed) as err: - LOGGER.warning("Unable to interview Node %s", exc_info=err) - # NOTE: the node will be picked up by mdns discovery automatically - # when it comes available again. - return - # setup subscriptions for the node + # (re)interview node (only) if needed + node_data = self._nodes[node_id] + if ( + # re-interview if we dont have any node attributes (empty node) + not node_data.attributes + # re-interview if the data model schema has changed + or node_data.interview_version != DATA_MODEL_SCHEMA_VERSION + ): try: - await self._subscribe_node(node_id) - except NodeNotResolving: - LOGGER.warning( - "Unable to subscribe to Node %s as it is unavailable", - node_id, - ) + await self.interview_node(node_id) + except (NodeNotResolving, NodeInterviewFailed) as err: + LOGGER.warning("Unable to interview Node %s", exc_info=err) # NOTE: the node will be picked up by mdns discovery automatically - # when it becomes available again. - finally: - self._nodes_in_setup.discard(node_id) + # when it comes available again. + return + # setup subscriptions for the node + try: + await self._subscribe_node(node_id) + except NodeNotResolving: + LOGGER.warning( + "Unable to subscribe to Node %s as it is unavailable", + node_id, + ) + # NOTE: the node will be picked up by mdns discovery automatically + # when it becomes available again. + finally: + self._nodes_in_setup.discard(node_id) async def _resolve_node( self, node_id: int, retries: int = 2, attempt: int = 1