Skip to content

Commit

Permalink
debounce resubscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt committed Jun 27, 2023
1 parent 6818b52 commit 2fda946
Showing 1 changed file with 30 additions and 12 deletions.
42 changes: 30 additions & 12 deletions matter_server/server/device_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ def __init__(
# we keep the last events in memory so we can include them in the diagnostics dump
self.event_history: Deque[Attribute.EventReadResult] = deque(maxlen=25)
self._subscriptions: dict[int, Attribute.SubscriptionTransaction] = {}
self._attr_subscriptions: dict[int, list[tuple[Any, ...]] | str] = {}
self._resub_timer: dict[int, asyncio.TimerHandle] = {}
self._nodes: dict[int, MatterNodeData | None] = {}
self.wifi_credentials_set: bool = False
self.thread_credentials_set: bool = False
Expand Down Expand Up @@ -452,8 +454,17 @@ async def subscribe_attribute(
subkey=str(node_id),
value=node,
)

# (re)setup node subscription
asyncio.create_task(self._subscribe_node(node_id))
# this could potentially be called multiple times within a short timeframe
# so debounce it a bit
def resubscribe():
self._resub_timer.pop(node_id, None)
asyncio.create_task(self._subscribe_node(node_id))

if existing_timer := self._resub_timer.pop(node_id, None):
existing_timer.cancel()
self._resub_timer[node_id] = self.server.loop.call_later(5, resubscribe)

async def _subscribe_node(self, node_id: int) -> None:
"""
Expand All @@ -471,20 +482,11 @@ async def _subscribe_node(self, node_id: int) -> None:
)

node_logger = LOGGER.getChild(f"[node {node_id}]")
node_logger.debug("Setting up subscriptions...")
node_lock = self._get_node_lock(node_id)
# check if we already have an subscription for this node,
# if so, we need to unsubscribe first because a device can only maintain
# a very limited amount of concurrent subscriptions.
if sub := self._subscriptions.pop(node_id, None):
async with node_lock:
node_logger.info("Unsubscribing from existing subscription.")
await self._call_sdk(sub.Shutdown)

node = cast(MatterNodeData, self._nodes[node_id])
await self._resolve_node(node_id=node_id)

# work out attribute subscriptions
# work out all (current) attribute subscriptions
attr_subscriptions = []
for (
endpoint_id,
Expand All @@ -511,11 +513,25 @@ async def _subscribe_node(self, node_id: int) -> None:
# Wildcard endpoint, specific cluster
attr_subscriptions.append(cluster)

node_logger.debug("Setting up attributes and events subscription.")
if len(attr_subscriptions) > 50:
# prevent memory overload on node and fallback to wildcard sub if too many
# individual subscriptions
attr_subscriptions = "*"

# check if we already have an subscription for this node,
# if so, we need to unsubscribe first because a device can only maintain
# a very limited amount of concurrent subscriptions.
if sub := self._subscriptions.pop(node_id, None):
if self._attr_subscriptions.get(node_id) == attr_subscriptions:
# the current subscription already matches, no need to re-setup
node_logger.debug("Re-using existing subscription.")
return
async with node_lock:
node_logger.debug("Unsubscribing from existing subscription.")
await self._call_sdk(sub.Shutdown)

node_logger.debug("Setting up attributes and events subscription.")
self._attr_subscriptions[node_id] = attr_subscriptions
async with node_lock:
sub: Attribute.SubscriptionTransaction = await self.chip_controller.Read(
nodeid=node_id,
Expand Down Expand Up @@ -645,10 +661,12 @@ def resubscription_succeeded(
sub.SetResubscriptionAttemptedCallback(resubscription_attempted)
sub.SetResubscriptionSucceededCallback(resubscription_succeeded)
self._subscriptions[node_id] = sub

# if we reach this point, it means the node could be resolved
# and the initial subscription succeeded, mark the node available.
node.available = True
node_logger.info("Subscription succeeded")
cache = sub.GetAttributes()
self.server.signal_event(EventType.NODE_UPDATED, node)

def _get_next_node_id(self) -> int:
Expand Down

0 comments on commit 2fda946

Please sign in to comment.