diff --git a/matter_server/server/device_controller.py b/matter_server/server/device_controller.py index 4a3e2261..6a9a9fdd 100644 --- a/matter_server/server/device_controller.py +++ b/matter_server/server/device_controller.py @@ -108,6 +108,7 @@ def __init__( self.thread_credentials_set: bool = False self.compressed_fabric_id: int | None = None self._node_lock: dict[int, asyncio.Lock] = {} + self._resolve_lock = asyncio.Lock() async def initialize(self) -> None: """Async initialize of controller.""" @@ -497,7 +498,8 @@ async def send_device_command( cluster_cls: Cluster = ALL_CLUSTERS[cluster_id] command_cls = getattr(cluster_cls.Commands, command_name) command = dataclass_from_dict(command_cls, payload) - async with self._get_node_lock(node_id): + node_lock = self._get_node_lock(node_id) + async with node_lock: return await self.chip_controller.SendCommand( nodeid=node_id, endpoint=endpoint_id, @@ -514,12 +516,12 @@ async def read_attribute( """Read a single attribute (or Cluster) on a node.""" if self.chip_controller is None: raise RuntimeError("Device Controller not initialized.") - node_lock = self._get_node_lock(node_id) endpoint_id, cluster_id, attribute_id = parse_attribute_path(attribute_path) + assert self.server.loop is not None + future = self.server.loop.create_future() + device = await self._resolve_node(node_id) + node_lock = self._get_node_lock(node_id) async with node_lock: - assert self.server.loop is not None - future = self.server.loop.create_future() - device = await self._resolve_node(node_id) Attribute.Read( future=future, eventLoop=self.server.loop, @@ -932,15 +934,15 @@ def resubscription_succeeded( node.available = True self.server.signal_event(EventType.NODE_UPDATED, node) + node_logger.info("Setting up attributes and events subscription.") + interval_floor = 0 + interval_ceiling = ( + random.randint(60, 300) if battery_powered else random.randint(30, 120) + ) + self._last_subscription_attempt[node_id] = 0 + future = loop.create_future() + device = await self._resolve_node(node_id) async with node_lock: - node_logger.info("Setting up attributes and events subscription.") - interval_floor = 0 - interval_ceiling = ( - random.randint(60, 300) if battery_powered else random.randint(30, 120) - ) - self._last_subscription_attempt[node_id] = 0 - future = loop.create_future() - device = await self._resolve_node(node_id) Attribute.Read( future=future, eventLoop=loop, @@ -961,13 +963,13 @@ def resubscription_succeeded( fabricFiltered=False, autoResubscribe=True, ).raise_on_error() - sub: Attribute.SubscriptionTransaction = await future + sub: Attribute.SubscriptionTransaction = await future - sub.SetAttributeUpdateCallback(attribute_updated_callback) - sub.SetEventUpdateCallback(event_callback) - sub.SetErrorCallback(error_callback) - sub.SetResubscriptionAttemptedCallback(resubscription_attempted) - sub.SetResubscriptionSucceededCallback(resubscription_succeeded) + sub.SetAttributeUpdateCallback(attribute_updated_callback) + sub.SetEventUpdateCallback(event_callback) + sub.SetErrorCallback(error_callback) + sub.SetResubscriptionAttemptedCallback(resubscription_attempted) + sub.SetResubscriptionSucceededCallback(resubscription_succeeded) # if we reach this point, it means the node could be resolved # and the initial subscription succeeded, mark the node available. @@ -1085,12 +1087,13 @@ async def _resolve_node( attempt, retries, ) - return await self._call_sdk( - self.chip_controller.GetConnectedDeviceSync, - nodeid=node_id, - allowPASE=False, - timeoutMs=None, - ) + async with self._resolve_lock: + return await self._call_sdk( + self.chip_controller.GetConnectedDeviceSync, + nodeid=node_id, + allowPASE=False, + timeoutMs=None, + ) except (ChipStackError, TimeoutError) as err: if attempt >= retries: # when we're out of retries, raise NodeNotResolving