diff --git a/matter_server/server/device_controller.py b/matter_server/server/device_controller.py index e3a6787d..926029a6 100644 --- a/matter_server/server/device_controller.py +++ b/matter_server/server/device_controller.py @@ -84,6 +84,7 @@ def __init__( self._interview_limit: asyncio.Semaphore = asyncio.Semaphore( INTERVIEW_TASK_LIMIT ) + self._resolve_lock: asyncio.Lock = asyncio.Lock() self._node_lock: dict[int, asyncio.Lock] = {} async def initialize(self) -> None: @@ -583,9 +584,9 @@ async def _subscribe_node(self, node_id: int) -> None: node_logger.debug("Unsubscribing from existing subscription.") await self._call_sdk(prev_sub.Shutdown) - node_logger.debug("Setting up attributes and events subscription.") self._attr_subscriptions[node_id] = attr_subscriptions async with node_lock: + node_logger.debug("Setting up attributes and events subscription.") sub: Attribute.SubscriptionTransaction = await self.chip_controller.Read( nodeid=node_id, # In order to prevent network congestion due to wildcard subscriptions on all nodes, @@ -744,7 +745,7 @@ async def _call_sdk(self, func: Callable[..., _T], *args: Any, **kwargs: Any) -> ) async def _check_interview_and_subscription( - self, node_id: int, reschedule_interval: int = 300 + self, node_id: int, reschedule_interval: int = 30 ) -> None: """Handle interview (if needed) and subscription for known node.""" @@ -756,8 +757,8 @@ def reschedule() -> None: asyncio.create_task, self._check_interview_and_subscription( node_id, - # increase interval at each attempt with maximum of 1 hour - min(reschedule_interval + 300, 3600), + # increase interval at each attempt with maximum of 10 minutes + min(reschedule_interval + 10, 600), ), ) @@ -837,37 +838,23 @@ def _parse_attributes_from_read_result( result[attribute_path] = attr_value return result - async def _resolve_node( - self, node_id: int, retries: int = 3, allow_pase: bool = False - ) -> None: + async def _resolve_node(self, node_id: int, retries: int = 3) -> None: """Resolve a Node on the network.""" node_lock = self._get_node_lock(node_id) if self.chip_controller is None: raise RuntimeError("Device Controller not initialized.") try: - if allow_pase: - # last attempt allows PASE connection (last resort) - LOGGER.debug( - "Attempting to resolve node %s (with PASE connection)", node_id + async with node_lock, self._resolve_lock: + LOGGER.info("Attempting to resolve node %s...", node_id) + await self._call_sdk( + self.chip_controller.ResolveNode, + nodeid=node_id, ) - async with node_lock: - await self._call_sdk( - self.chip_controller.GetConnectedDeviceSync, - nodeid=node_id, - allowPASE=True, - timeoutMs=30000, - ) - return - LOGGER.debug("Resolving node %s", node_id) - await self._call_sdk(self.chip_controller.ResolveNode, nodeid=node_id) except (ChipStackError, TimeoutError) as err: - if not retries: + if retries <= 1: # when we're out of retries, raise NodeNotResolving raise NodeNotResolving(f"Unable to resolve Node {node_id}") from err - async with node_lock: - await self._resolve_node( - node_id=node_id, retries=retries - 1, allow_pase=retries - 1 == 0 - ) + await self._resolve_node(node_id=node_id, retries=retries - 1) await asyncio.sleep(2) def _handle_endpoints_removed(self, node_id: int, endpoints: Iterable[int]) -> None: