Skip to content

Commit

Permalink
Restore single subscription to a node instead of dual (#481)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt authored Dec 27, 2023
1 parent 7b5b82f commit 64e9da9
Showing 1 changed file with 68 additions and 108 deletions.
176 changes: 68 additions & 108 deletions matter_server/server/device_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,7 @@ def __init__(
self.server = server
# we keep the last events in memory so we can include them in the diagnostics dump
self.event_history: deque[Attribute.EventReadResult] = deque(maxlen=25)
self._subscriptions: dict[
int,
tuple[Attribute.SubscriptionTransaction, Attribute.SubscriptionTransaction],
] = {}
self._subscriptions: dict[int, Attribute.SubscriptionTransaction] = {}
self._attr_subscriptions: dict[int, list[Attribute.AttributePath]] = {}
self._resub_debounce_timer: dict[int, asyncio.TimerHandle] = {}
self._sub_retry_timer: dict[int, asyncio.TimerHandle] = {}
Expand Down Expand Up @@ -148,9 +145,8 @@ async def stop(self) -> None:
raise RuntimeError("Device Controller not initialized.")

# unsubscribe all node subscriptions
for subs in self._subscriptions.values():
for sub in subs:
await self._call_sdk(sub.Shutdown)
for sub in self._subscriptions.values():
await self._call_sdk(sub.Shutdown)
self._subscriptions = {}
await self._call_sdk(self.chip_controller.Shutdown)
LOGGER.debug("Stopped.")
Expand Down Expand Up @@ -514,9 +510,8 @@ async def remove_node(self, node_id: int) -> None:
)

# shutdown any existing subscriptions
if attr_subs := self._subscriptions.pop(node_id, None):
for attr_sub in attr_subs:
await self._call_sdk(attr_sub.Shutdown)
if sub := self._subscriptions.pop(node_id, None):
await self._call_sdk(sub.Shutdown)

# pop any existing interview/subscription reschedule timer
self._sub_retry_timer.pop(node_id, None)
Expand Down Expand Up @@ -618,7 +613,9 @@ async def _subscribe_node(self, node_id: int) -> None:
node = cast(MatterNodeData, self._nodes[node_id])

# work out all (current) attribute subscriptions
attr_subscriptions: list[Attribute.AttributePath] = []
attr_subscriptions: list[Attribute.AttributePath] = list(
BASE_SUBSCRIBE_ATTRIBUTES
)
for (
endpoint_id,
cluster_id,
Expand All @@ -631,35 +628,29 @@ async def _subscribe_node(self, node_id: int) -> None:
)
if attr_path in attr_subscriptions:
continue
if cluster_id in (
Clusters.BridgedDeviceBasicInformation.id,
Clusters.BasicInformation.id,
):
# already watched in base subscription
continue
attr_subscriptions.append(attr_path)

if node.is_bridge or len(attr_subscriptions) > 3:
# a matter device can only handle 3 attribute paths per subscription
# and a maximum of 3 concurrent subscriptions per fabric
# although the device can probably handle more, we play it safe and opt for
# wildcard as soon as we have more than 3 paths to watch for.
# note that we create 2 subscriptions to the device as we we watch some base
# attributes in the first (lifeline) subscription.
if node.is_bridge or len(attr_subscriptions) > 9:
# A matter device can officially only handle 3 attribute paths per subscription
# and a maximum of 3 concurrent subscriptions per fabric.
# We cheat a bit here and use one single subscription for up to 9 paths,
# because in our experience that is more stable than multiple subscriptions
# to the same device. If we have more than 9 paths to watch for a node,
# we switch to a wildcard subscription.
attr_subscriptions = [Attribute.AttributePath()] # wildcard

# check if we already have setup subscriptions for this node,
# if so, we need to unsubscribe first unless nothing changed
# in the attribute paths we want to subscribe.
if prev_subs := self._subscriptions.pop(node_id, None):
if prev_sub := self._subscriptions.get(node_id, None):
if self._attr_subscriptions.get(node_id) == attr_subscriptions:
# the current subscription already matches, no need to re-setup
node_logger.debug("Re-using existing subscription.")
return
async with node_lock:
node_logger.debug("Unsubscribing from existing subscription.")
for prev_sub in prev_subs:
await self._call_sdk(prev_sub.Shutdown)
await self._call_sdk(prev_sub.Shutdown)
del self._subscriptions[node_id]

# store our list of subscriptions for this node
self._attr_subscriptions[node_id] = attr_subscriptions
Expand All @@ -671,82 +662,9 @@ async def _subscribe_node(self, node_id: int) -> None:
== Clusters.ThreadNetworkDiagnostics.Enums.RoutingRoleEnum.kSleepyEndDevice
)

async with node_lock:
node_logger.info("Setting up attributes and events subscription.")
interval_floor = 0
interval_ceiling = (
random.randint(60, 300) if battery_powered else random.randint(30, 60)
)
# we set-up 2 subscriptions to the node (we may maximum use 3 subs per node)
# the first subscription is a base subscription with the mandatory clusters/attributes
# we need to watch and can be considered as a lifeline to quickly notice if the
# device is online/offline while the second interval actually subscribes to
# the attributes and/or events.
base_sub = await self._setup_subscription(
node,
attr_subscriptions=list(BASE_SUBSCRIBE_ATTRIBUTES),
interval_floor=interval_floor,
interval_ceiling=interval_ceiling,
# subscribe to urgent device events only (e.g. button press etc.) only
event_subscriptions=[
Attribute.EventPath(
EndpointId=None, Cluster=None, Event=None, Urgent=1
)
],
)
attr_sub = await self._setup_subscription(
node,
attr_subscriptions=attr_subscriptions,
interval_floor=interval_floor,
interval_ceiling=interval_ceiling,
)
# if we reach this point, it means the node could be resolved
# and the initial subscription succeeded, mark the node available.
self._subscriptions[node_id] = (base_sub, attr_sub)
node.available = True
# update attributes with current state from read request
# NOTE: Make public method upstream for retrieving the attributeTLVCache
# pylint: disable=protected-access
for sub in (base_sub, attr_sub):
tlv_attributes = sub._readTransaction._cache.attributeTLVCache
node.attributes.update(parse_attributes_from_read_result(tlv_attributes))
node_logger.info("Subscription succeeded")
self.server.signal_event(EventType.NODE_UPDATED, node)

async def _setup_subscription(
self,
node: MatterNodeData,
attr_subscriptions: list[Attribute.AttributePath],
interval_floor: int = 0,
interval_ceiling: int = 60,
event_subscriptions: list[Attribute.EventPath] | None = None,
) -> Attribute.SubscriptionTransaction:
"""Handle Setup of a single Node AttributePath(s) subscription."""
node_id = node.node_id
node_logger = LOGGER.getChild(f"[node {node_id}]")
assert self.chip_controller is not None
node_logger.debug("Setting up attributes and events subscription.")
self._last_subscription_attempt[node_id] = 0
loop = cast(asyncio.AbstractEventLoop, self.server.loop)
future = loop.create_future()
device = await self._resolve_node(node_id)
Attribute.Read(
future=future,
eventLoop=loop,
device=device.deviceProxy,
devCtrl=self.chip_controller,
attributes=attr_subscriptions,
events=event_subscriptions,
returnClusterObject=False,
subscriptionParameters=Attribute.SubscriptionParameters(
interval_floor, interval_ceiling
),
# Use fabricfiltered as False to detect changes made by other controllers
# and to be able to provide a list of all fabrics attached to the device
fabricFiltered=False,
autoResubscribe=True,
).raise_on_error()
sub: Attribute.SubscriptionTransaction = await future

# set-up the actual subscription

def attribute_updated_callback(
path: Attribute.TypedAttributePath,
Expand Down Expand Up @@ -874,12 +792,54 @@ def resubscription_succeeded(
node.available = True
self.server.signal_event(EventType.NODE_UPDATED, node)

sub.SetAttributeUpdateCallback(attribute_updated_callback)
sub.SetEventUpdateCallback(event_callback)
sub.SetErrorCallback(error_callback)
sub.SetResubscriptionAttemptedCallback(resubscription_attempted)
sub.SetResubscriptionSucceededCallback(resubscription_succeeded)
return sub
async with node_lock:
node_logger.info("Setting up attributes and events subscription.")
interval_floor = 0
interval_ceiling = (
random.randint(60, 300) if battery_powered else random.randint(30, 120)
)
self._last_subscription_attempt[node_id] = 0
future = loop.create_future()
device = await self._resolve_node(node_id)
Attribute.Read(
future=future,
eventLoop=loop,
device=device.deviceProxy,
devCtrl=self.chip_controller,
attributes=attr_subscriptions,
events=[
Attribute.EventPath(
EndpointId=None, Cluster=None, Event=None, Urgent=1
)
],
returnClusterObject=False,
subscriptionParameters=Attribute.SubscriptionParameters(
interval_floor, interval_ceiling
),
# Use fabricfiltered as False to detect changes made by other controllers
# and to be able to provide a list of all fabrics attached to the device
fabricFiltered=False,
autoResubscribe=True,
).raise_on_error()
sub: Attribute.SubscriptionTransaction = await future

sub.SetAttributeUpdateCallback(attribute_updated_callback)
sub.SetEventUpdateCallback(event_callback)
sub.SetErrorCallback(error_callback)
sub.SetResubscriptionAttemptedCallback(resubscription_attempted)
sub.SetResubscriptionSucceededCallback(resubscription_succeeded)

# if we reach this point, it means the node could be resolved
# and the initial subscription succeeded, mark the node available.
self._subscriptions[node_id] = sub
node.available = True
# update attributes with current state from read request
# NOTE: Make public method upstream for retrieving the attributeTLVCache
# pylint: disable=protected-access
tlv_attributes = sub._readTransaction._cache.attributeTLVCache
node.attributes.update(parse_attributes_from_read_result(tlv_attributes))
node_logger.info("Subscription succeeded")
self.server.signal_event(EventType.NODE_UPDATED, node)

def _get_next_node_id(self) -> int:
"""Return next node_id."""
Expand Down

0 comments on commit 64e9da9

Please sign in to comment.