Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shutdown subscription after exactly 30 minutes #876

Merged
merged 3 commits into from
Sep 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 28 additions & 21 deletions matter_server/server/device_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from chip.clusters.ClusterObjects import ALL_ATTRIBUTES, ALL_CLUSTERS, Cluster
from chip.discovery import DiscoveryType
from chip.exceptions import ChipStackError
from chip.native import PyChipError
from zeroconf import BadTypeInNameException, IPVersion, ServiceStateChange, Zeroconf
from zeroconf.asyncio import AsyncServiceBrowser, AsyncServiceInfo, AsyncZeroconf

Expand Down Expand Up @@ -80,8 +81,8 @@
NODE_SUBSCRIPTION_CEILING_WIFI = 60
NODE_SUBSCRIPTION_CEILING_THREAD = 60
NODE_SUBSCRIPTION_CEILING_BATTERY_POWERED = 600
NODE_RESUBSCRIBE_ATTEMPTS_UNAVAILABLE = 3
NODE_RESUBSCRIBE_TIMEOUT_OFFLINE = 30 * 60 * 1000
NODE_RESUBSCRIBE_ATTEMPTS_UNAVAILABLE = 2
NODE_RESUBSCRIBE_TIMEOUT_OFFLINE = 30 * 60
NODE_RESUBSCRIBE_FORCE_TIMEOUT = 5
NODE_PING_TIMEOUT = 10
NODE_PING_TIMEOUT_BATTERY_POWERED = 60
Expand Down Expand Up @@ -149,7 +150,8 @@ def __init__(
self._node_last_seen_on_mdns: dict[int, float] = {}
self._nodes: dict[int, MatterNodeData] = {}
self._last_known_ip_addresses: dict[int, list[str]] = {}
self._last_subscription_attempt: dict[int, int] = {}
self._resubscription_attempt: dict[int, int] = {}
self._first_resubscribe_attempt: dict[int, float] = {}
self._known_commissioning_params: dict[int, CommissioningParameters] = {}
self._known_commissioning_params_timers: dict[int, asyncio.TimerHandle] = {}
self._aiobrowser: AsyncServiceBrowser | None = None
Expand Down Expand Up @@ -1188,29 +1190,37 @@ def resubscription_attempted(
nextResubscribeIntervalMsec: int,
) -> None:
# pylint: disable=unused-argument, invalid-name
resubscription_attempt = self._resubscription_attempt[node_id]
node_logger.info(
"Previous subscription failed with Error: %s, re-subscribing in %s ms...",
terminationError,
nextResubscribeIntervalMsec,
"Subscription failed with %s, resubscription attempt %s",
str(PyChipError(code=terminationError)),
resubscription_attempt,
)
resubscription_attempt = self._last_subscription_attempt[node_id] + 1
self._last_subscription_attempt[node_id] = resubscription_attempt
self._resubscription_attempt[node_id] = resubscription_attempt + 1
if resubscription_attempt == 0:
self._first_resubscribe_attempt[node_id] = time.time()
# Mark node as unavailable and signal consumers.
# We debounce it a bit so we only mark the node unavailable
# after some resubscription attempts and we shutdown the subscription
# if the resubscription interval exceeds 30 minutes (TTL of mdns).
# The node will be auto picked up by mdns if it's alive again.
# after some resubscription attempts.
if resubscription_attempt >= NODE_RESUBSCRIBE_ATTEMPTS_UNAVAILABLE:
self._node_unavailable(node_id)
if nextResubscribeIntervalMsec > NODE_RESUBSCRIBE_TIMEOUT_OFFLINE:
# Shutdown the subscription if we tried to resubscribe for more than 30
# minutes (typical TTL of mDNS). We assume this device got powered off.
# When the device gets powered on again, it typically announces itself via
# mDNS again. The mDNS browsing code will setup the subscription again.
if (
time.time() - self._first_resubscribe_attempt[node_id]
> NODE_RESUBSCRIBE_TIMEOUT_OFFLINE
):
asyncio.create_task(self._node_offline(node_id))

def resubscription_succeeded(
transaction: Attribute.SubscriptionTransaction,
) -> None:
# pylint: disable=unused-argument, invalid-name
node_logger.info("Re-Subscription succeeded")
self._last_subscription_attempt[node_id] = 0
self._resubscription_attempt[node_id] = 0
self._first_resubscribe_attempt.pop(node_id, None)
# mark node as available and signal consumers
node = self._nodes[node_id]
if not node.available:
Expand All @@ -1233,7 +1243,7 @@ def resubscription_succeeded(
interval_ceiling = NODE_SUBSCRIPTION_CEILING_BATTERY_POWERED
else:
interval_ceiling = NODE_SUBSCRIPTION_CEILING_THREAD
self._last_subscription_attempt[node_id] = 0
self._resubscription_attempt[node_id] = 0
# set-up the actual subscription
sub: Attribute.SubscriptionTransaction = (
await self._chip_device_controller.read_attribute(
Expand Down Expand Up @@ -1605,15 +1615,12 @@ def _node_unavailable(
async def _node_offline(self, node_id: int) -> None:
"""Mark node as offline."""
# shutdown existing subscriptions
node_logger = self.get_node_logger(LOGGER, node_id)
node_logger.info("Node considered offline, shutdown subscription")
await self._chip_device_controller.shutdown_subscription(node_id)

# mark node as unavailable (if it wasn't already)
node = self._nodes[node_id]
if not node.available:
return # nothing to do to
node.available = False
self.server.signal_event(EventType.NODE_UPDATED, node)
node_logger = self.get_node_logger(LOGGER, node_id)
node_logger.info("Marked node as offline")
self._node_unavailable(node_id)

async def _fallback_node_scanner(self) -> None:
"""Scan for operational nodes in the background that are missed by mdns."""
Expand Down