diff --git a/matter_server/server/device_controller.py b/matter_server/server/device_controller.py index 0610b44e..b3970b0d 100644 --- a/matter_server/server/device_controller.py +++ b/matter_server/server/device_controller.py @@ -75,6 +75,8 @@ def __init__( # we keep the last events in memory so we can include them in the diagnostics dump self.event_history: Deque[Attribute.EventReadResult] = deque(maxlen=25) self._subscriptions: dict[int, Attribute.SubscriptionTransaction] = {} + self._attr_subscriptions: dict[int, list[tuple[Any, ...]] | str] = {} + self._resub_timer: dict[int, asyncio.TimerHandle] = {} self._nodes: dict[int, MatterNodeData | None] = {} self.wifi_credentials_set: bool = False self.thread_credentials_set: bool = False @@ -452,8 +454,17 @@ async def subscribe_attribute( subkey=str(node_id), value=node, ) + # (re)setup node subscription - asyncio.create_task(self._subscribe_node(node_id)) + # this could potentially be called multiple times within a short timeframe + # so debounce it a bit + def resubscribe(): + self._resub_timer.pop(node_id, None) + asyncio.create_task(self._subscribe_node(node_id)) + + if existing_timer := self._resub_timer.pop(node_id, None): + existing_timer.cancel() + self._resub_timer[node_id] = self.server.loop.call_later(5, resubscribe) async def _subscribe_node(self, node_id: int) -> None: """ @@ -471,20 +482,11 @@ async def _subscribe_node(self, node_id: int) -> None: ) node_logger = LOGGER.getChild(f"[node {node_id}]") - node_logger.debug("Setting up subscriptions...") node_lock = self._get_node_lock(node_id) - # check if we already have an subscription for this node, - # if so, we need to unsubscribe first because a device can only maintain - # a very limited amount of concurrent subscriptions. - if sub := self._subscriptions.pop(node_id, None): - async with node_lock: - node_logger.info("Unsubscribing from existing subscription.") - await self._call_sdk(sub.Shutdown) - node = cast(MatterNodeData, self._nodes[node_id]) await self._resolve_node(node_id=node_id) - # work out attribute subscriptions + # work out all (current) attribute subscriptions attr_subscriptions = [] for ( endpoint_id, @@ -511,11 +513,25 @@ async def _subscribe_node(self, node_id: int) -> None: # Wildcard endpoint, specific cluster attr_subscriptions.append(cluster) - node_logger.debug("Setting up attributes and events subscription.") if len(attr_subscriptions) > 50: # prevent memory overload on node and fallback to wildcard sub if too many # individual subscriptions attr_subscriptions = "*" + + # check if we already have an subscription for this node, + # if so, we need to unsubscribe first because a device can only maintain + # a very limited amount of concurrent subscriptions. + if sub := self._subscriptions.pop(node_id, None): + if self._attr_subscriptions.get(node_id) == attr_subscriptions: + # the current subscription already matches, no need to re-setup + node_logger.debug("Re-using existing subscription.") + return + async with node_lock: + node_logger.debug("Unsubscribing from existing subscription.") + await self._call_sdk(sub.Shutdown) + + node_logger.debug("Setting up attributes and events subscription.") + self._attr_subscriptions[node_id] = attr_subscriptions async with node_lock: sub: Attribute.SubscriptionTransaction = await self.chip_controller.Read( nodeid=node_id, @@ -645,10 +661,12 @@ def resubscription_succeeded( sub.SetResubscriptionAttemptedCallback(resubscription_attempted) sub.SetResubscriptionSucceededCallback(resubscription_succeeded) self._subscriptions[node_id] = sub + # if we reach this point, it means the node could be resolved # and the initial subscription succeeded, mark the node available. node.available = True node_logger.info("Subscription succeeded") + cache = sub.GetAttributes() self.server.signal_event(EventType.NODE_UPDATED, node) def _get_next_node_id(self) -> int: