Skip to content

Commit

Permalink
Adjust subscription logic a bit (#416)
Browse files Browse the repository at this point in the history
Co-authored-by: Martin Hjelmare <[email protected]>
  • Loading branch information
marcelveldt and MartinHjelmare authored Oct 25, 2023
1 parent 93c6a65 commit d0cb830
Showing 1 changed file with 36 additions and 28 deletions.
64 changes: 36 additions & 28 deletions matter_server/server/device_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datetime import datetime
from functools import partial
import logging
import random
from typing import TYPE_CHECKING, Any, Callable, Iterable, Type, TypeVar, cast

from chip.ChipDeviceCtrl import CommissionableNode
Expand Down Expand Up @@ -564,16 +565,11 @@ async def _subscribe_node(self, node_id: int) -> None:
# Wildcard endpoint, specific cluster
attr_subscriptions.append(cluster)

if len(attr_subscriptions) > 50:
# prevent memory overload on node and fallback to wildcard sub if too many
# individual subscriptions (e.g. bridges)
attr_subscriptions = "*" # type: ignore[assignment]

if not node.attribute_subscriptions:
# temp fix for backwards compatbility with HA releases below 2023.7
# fallback to wildcard subscriptions if we have no explicit
# node subscriptions defined.
# TODO: remove this after HA release 2023.8
if len(attr_subscriptions) > 9:
# strictly taken a matter device can only handle 9 individual subscriptions
# (3 subscriptions of 3 paths per fabric)
# although the device can probably handle more, we play it safe and opt for
# wildcard as soon as we have more than 9 paths to watch for.
attr_subscriptions = "*" # type: ignore[assignment]

# check if we already have an subscription for this node,
Expand All @@ -591,21 +587,31 @@ async def _subscribe_node(self, node_id: int) -> None:
self._attr_subscriptions[node_id] = attr_subscriptions
async with node_lock:
node_logger.debug("Setting up attributes and events subscription.")
# Use a report interval of 0, X which means we want to receive state changes
# as soon as possible (the 0 as floor) but we want to receive a report
# at least once every X seconds, this is also used to detect the node is still alive.
# A resubscription will be initiated automatically by the sdk
# if there was no report within the interval.
# NOTE 1: The report interval ceiling is subject to a lot of discussion
# as setting it too low causes a lot of (unneeded) traffic and causes network
# congestion as well as drains batteries on sleeping devices.
# Preferred would be to set the interval as high as possible ( 30 mins or even 1 hour)
# but that would also mean that detecting that a device is offline would be delayed
# by that amount of time as the interval ceiling also meant as liveness detection.
# For now we settle on (more or less) 10 minutes but we might need to increase this
# even more in the future.
# see also: https://github.com/project-chip/connectedhomeip/issues/29804
# NOTE 2: We randomize the interval a bit to prevent all nodes reporting
# at the exact same time, causing congestion.
interval_floor = 0
interval_ceiling = random.randint(500, 700)
sub: Attribute.SubscriptionTransaction = await self.chip_controller.Read(
nodeid=node_id,
# In order to prevent network congestion due to wildcard subscriptions on all nodes,
# we keep a list of attributes we are explicitly interested in.
attributes=attr_subscriptions,
# simply subscribe to urgent device events only (e.g. button press etc.)
# non urgent events are disagnostic reports etc. for which we have no usecase (yet).
# non urgent events are diagnostic reports etc. for which we have no usecase (yet).
events=[("*", 1)],
# Use a report interval of 0, 300 which means we want to receive state changes
# as soon as possible (the 0 as floor) but we want to receive a report
# at least once every 5 minutes (300 as ceiling).
# This is also used to detect the node is still alive.
# A resubscription will be initiated automatically by the sdk
# if there was no report within the interval.
reportInterval=(0, 300),
reportInterval=(interval_floor, interval_ceiling),
# Use fabricfiltered as False to detect changes made by other controllers
# and to be able to provide a list of all fabrics attached to the device
fabricFiltered=False,
Expand Down Expand Up @@ -717,15 +723,17 @@ def resubscription_attempted(
if nextResubscribeIntervalMsec / 1000 > MAX_POLL_INTERVAL:
# workaround to handle devices that are unplugged
# from power for a longer period of time
# cancel subscription and add this node to our node polling job
# TODO: fix this once OerationalNodeDiscovery is available:
# where the sdk is extending the poll timeout at every attempt
# until even 1,5 hours which is way too long.
# instead a device back alive should be detected using mDNS,
# which is not yet implemented in the core sdk.
# For now, we just override the timeout.
# NOTE 1: fix this once OperationalNodeDiscovery is available:
# https://github.com/project-chip/connectedhomeip/pull/26718
sub.Shutdown()
self._subscriptions.pop(node_id)
assert self.server.loop
self.server.loop.create_task(
self._check_interview_and_subscription(node_id, MAX_POLL_INTERVAL)
)
# https://github.com/project-chip/connectedhomeip/issues/29663
# NOTE 2: We could also just implement zeroconf/mdns ourselves
# to listen for the announcements.
sub.OverrideLivenessTimeoutMs(MAX_POLL_INTERVAL * 1000)

def resubscription_succeeded(
transaction: Attribute.SubscriptionTransaction,
Expand Down

0 comments on commit d0cb830

Please sign in to comment.