Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry subscription setup if necessary #873

Merged
merged 7 commits into from
Sep 4, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 52 additions & 40 deletions matter_server/server/device_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@
NODE_RESUBSCRIBE_FORCE_TIMEOUT = 5
NODE_PING_TIMEOUT = 10
NODE_PING_TIMEOUT_BATTERY_POWERED = 60
NODE_MDNS_BACKOFF = 610 # must be higher than (highest) sub ceiling
FALLBACK_NODE_SCANNER_INTERVAL = 1800
NODE_MDNS_SUBSCRIPTION_RETRY_TIMEOUT = 30 * 60
FALLBACK_NODE_SCANNER_INTERVAL = 30 * 60
CUSTOM_ATTRIBUTES_POLLER_INTERVAL = 30

MDNS_TYPE_OPERATIONAL_NODE = "_matter._tcp.local."
Expand Down Expand Up @@ -146,7 +146,7 @@ def __init__(
self._thread_credentials_set: bool = False
self._nodes_in_setup: set[int] = set()
self._nodes_in_ota: set[int] = set()
self._node_last_seen: dict[int, float] = {}
self._node_last_seen_on_mdns: dict[int, float] = {}
self._nodes: dict[int, MatterNodeData] = {}
self._last_known_ip_addresses: dict[int, list[str]] = {}
self._last_subscription_attempt: dict[int, int] = {}
Expand Down Expand Up @@ -340,7 +340,7 @@ async def commission_with_code(
break

# make sure we start a subscription for this newly added node
await self._setup_node(node_id)
await self._setup_node_with_retry(node_id)
LOGGER.info("Commissioning of Node ID %s completed.", node_id)
# return full node object once we're complete
return self.get_node(node_id)
Expand Down Expand Up @@ -418,7 +418,7 @@ async def commission_on_network(
else:
break
# make sure we start a subscription for this newly added node
await self._setup_node(node_id)
await self._setup_node_with_retry(node_id)
LOGGER.info("Commissioning of Node ID %s completed.", node_id)
# return full node object once we're complete
return self.get_node(node_id)
Expand Down Expand Up @@ -1114,7 +1114,6 @@ def attribute_updated_callback_threadsafe(
path: Attribute.AttributePath,
transaction: Attribute.SubscriptionTransaction,
) -> None:
self._node_last_seen[node_id] = time.time()
new_value = transaction.GetTLVAttribute(path)
# failsafe: ignore ValueDecodeErrors
# these are set by the SDK if parsing the value failed miserably
Expand Down Expand Up @@ -1142,7 +1141,6 @@ def event_callback(
data,
transaction,
)
self._node_last_seen[node_id] = time.time()
node_event = MatterNodeEvent(
node_id=node_id,
endpoint_id=data.Header.EndpointId,
Expand Down Expand Up @@ -1203,7 +1201,6 @@ def resubscription_succeeded(
transaction: Attribute.SubscriptionTransaction,
) -> None:
# pylint: disable=unused-argument, invalid-name
self._node_last_seen[node_id] = time.time()
node_logger.info("Re-Subscription succeeded")
self._last_subscription_attempt[node_id] = 0
# mark node as available and signal consumers
Expand Down Expand Up @@ -1263,7 +1260,6 @@ def resubscription_succeeded(
report_interval_ceiling,
)

self._node_last_seen[node_id] = time.time()
self.server.signal_event(EventType.NODE_UPDATED, node)

def _get_next_node_id(self) -> int:
Expand All @@ -1272,16 +1268,11 @@ def _get_next_node_id(self) -> int:
self.server.storage.set(DATA_KEY_LAST_NODE_ID, next_node_id, force=True)
return next_node_id

async def _setup_node(self, node_id: int) -> None:
"""Handle set-up of subscriptions and interview (if needed) for known/discovered node."""
if node_id not in self._nodes:
raise NodeNotExists(f"Node {node_id} does not exist.")
if node_id in self._nodes_in_setup:
# prevent duplicate setup actions
return
self._nodes_in_setup.add(node_id)

node_logger = self.get_node_logger(LOGGER, node_id)
async def _setup_node(
self,
node_logger: logging.LoggerAdapter,
node_id: int,
) -> None:
node_data = self._nodes[node_id]
log_timers: dict[int, asyncio.TimerHandle] = {}
is_thread_node = (
Expand Down Expand Up @@ -1332,6 +1323,7 @@ async def log_node_long_setup(time_start: float) -> None:
log_timers[node_id] = self._loop.call_later(
15 * 60, lambda: asyncio.create_task(log_node_long_setup(time_start))
)

try:
node_logger.info("Setting-up node...")

Expand All @@ -1347,9 +1339,7 @@ async def log_node_long_setup(time_start: float) -> None:
# log full stack trace if verbose logging is enabled
exc_info=err if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else None,
)
# NOTE: the node will be picked up by mdns discovery automatically
# when it comes available again.
return
raise err

# (re)interview node (only) if needed
if (
Expand All @@ -1369,9 +1359,7 @@ async def log_node_long_setup(time_start: float) -> None:
if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL)
else None,
)
# NOTE: the node will be picked up by mdns discovery automatically
# when it comes available again.
return
raise err

# setup subscriptions for the node
try:
Expand All @@ -1383,21 +1371,51 @@ async def log_node_long_setup(time_start: float) -> None:
# log full stack trace if verbose logging is enabled
exc_info=err if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else None,
)
# NOTE: the node will be picked up by mdns discovery automatically
# when it becomes available again.
return
raise err

# check if this node has any custom clusters that need to be polled
if polled_attributes := check_polled_attributes(node_data):
self._polled_attributes[node_id] = polled_attributes
self._schedule_custom_attributes_poller()

finally:
log_timers[node_id].cancel()
self._nodes_in_setup.discard(node_id)
if is_thread_node:
self._thread_node_setup_throttle.release()

async def _setup_node_with_retry(self, node_id: int) -> None:
"""Handle set-up of subscriptions and interview (if needed) for known/discovered node."""
if node_id not in self._nodes:
raise NodeNotExists(f"Node {node_id} does not exist.")
if node_id in self._nodes_in_setup:
# prevent duplicate setup actions
return
self._nodes_in_setup.add(node_id)

node_logger = self.get_node_logger(LOGGER, node_id)

try:
while True:
agners marked this conversation as resolved.
Show resolved Hide resolved
try:
await self._setup_node(node_logger, node_id)
break
except (NodeNotResolving, NodeInterviewFailed, ChipStackError):
if (
time.time() - self._node_last_seen_on_mdns.get(node_id, 0)
> NODE_MDNS_SUBSCRIPTION_RETRY_TIMEOUT
):
# NOTE: assume the node will be picked up by mdns discovery later
# automatically when it becomes available again.
logging.info(
agners marked this conversation as resolved.
Show resolved Hide resolved
"Node setup not completed after %s minutes, giving up.",
NODE_MDNS_SUBSCRIPTION_RETRY_TIMEOUT // 60,
)
break

node_logger.info("Retrying node setup in 60 seconds...")
await asyncio.sleep(60)
finally:
self._nodes_in_setup.discard(node_id)

def _handle_endpoints_removed(self, node_id: int, endpoints: Iterable[int]) -> None:
"""Handle callback for when bridge endpoint(s) get deleted."""
node = self._nodes[node_id]
Expand Down Expand Up @@ -1479,9 +1497,7 @@ def _on_mdns_operational_node_state(
if not (node := self._nodes.get(node_id)):
return # this should not happen, but guard just in case

now = time.time()
last_seen = self._node_last_seen.get(node_id, 0)
self._node_last_seen[node_id] = now
self._node_last_seen_on_mdns[node_id] = time.time()

# we only treat UPDATE state changes as ADD if the node is marked as
# unavailable to ensure we catch a node being operational
Expand All @@ -1494,9 +1510,6 @@ def _on_mdns_operational_node_state(

if not self._chip_device_controller.node_has_subscription(node_id):
node_logger.info("Discovered on mDNS")
elif (now - last_seen) > NODE_MDNS_BACKOFF:
# node came back online after being offline for a while or restarted
node_logger.info("Re-discovered on mDNS")
marcelveldt marked this conversation as resolved.
Show resolved Hide resolved
elif state_change == ServiceStateChange.Added:
# Trigger node re-subscriptions when mDNS entry got added
node_logger.info("Activity on mDNS, trigger resubscribe")
Expand All @@ -1511,7 +1524,7 @@ def _on_mdns_operational_node_state(
return

# setup the node - this will (re) setup the subscriptions etc.
asyncio.create_task(self._setup_node(node_id))
asyncio.create_task(self._setup_node_with_retry(node_id))
marcelveldt marked this conversation as resolved.
Show resolved Hide resolved

def _on_mdns_commissionable_node_state(
self, name: str, state_change: ServiceStateChange
Expand Down Expand Up @@ -1609,13 +1622,12 @@ async def _fallback_node_scanner(self) -> None:
if node.available:
continue
now = time.time()
last_seen = self._node_last_seen.get(node_id, 0)
last_seen = self._node_last_seen_on_mdns.get(node_id, 0)
if now - last_seen < FALLBACK_NODE_SCANNER_INTERVAL:
continue
if await self.ping_node(node_id, attempts=3):
LOGGER.info("Node %s discovered using fallback ping", node_id)
self._node_last_seen[node_id] = now
await self._setup_node(node_id)
await self._setup_node_with_retry(node_id)

# reschedule self to run at next interval
self._schedule_fallback_scanner()
Expand Down