Skip to content

Commit

Permalink
mdns changes
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt committed Feb 22, 2024
2 parents b81eca2 + 3f9b441 commit 6358300
Showing 1 changed file with 53 additions and 54 deletions.
107 changes: 53 additions & 54 deletions matter_server/server/device_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from datetime import datetime
from functools import partial
import logging
import time
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Iterable, TypeVar, cast

from chip.ChipDeviceCtrl import DeviceProxyWrapper
Expand Down Expand Up @@ -60,7 +61,8 @@
DATA_KEY_LAST_NODE_ID = "last_node_id"

LOGGER = logging.getLogger(__name__)
MAX_POLL_INTERVAL = 600
NODE_SUBSCRIPTION_CEILING = 30
NODE_SUBSCRIPTION_CEILING_BATTERY_POWERED = 1800
MAX_COMMISSION_RETRIES = 3
NODE_RESUBSCRIBE_ATTEMPTS_UNAVAILABLE = 3
NODE_RESUBSCRIBE_TIMEOUT_OFFLINE = 30 * 60 * 1000
Expand Down Expand Up @@ -92,6 +94,7 @@ def __init__(
# we keep the last events in memory so we can include them in the diagnostics dump
self.event_history: deque[Attribute.EventReadResult] = deque(maxlen=25)
self._subscriptions: dict[int, Attribute.SubscriptionTransaction] = {}
self._mdns_last_seen: dict[int, float] = {}
self._nodes: dict[int, MatterNodeData] = {}
self._last_subscription_attempt: dict[int, int] = {}
self.wifi_credentials_set: bool = False
Expand All @@ -101,9 +104,6 @@ def __init__(
self._resolve_lock = asyncio.Lock()
self._aiobrowser: AsyncServiceBrowser | None = None
self._aiozc: AsyncZeroconf | None = None
self._mdns_queues: dict[
str, tuple[asyncio.Queue[ServiceStateChange], asyncio.Task]
] = {}

async def initialize(self) -> None:
"""Async initialize of controller."""
Expand Down Expand Up @@ -175,9 +175,6 @@ async def stop(self) -> None:
await self._call_sdk(sub.Shutdown)
self._subscriptions = {}
# shutdown (and cleanup) mdns browser
for key in tuple(self._mdns_queues.keys()):
_, mdns_task = self._mdns_queues.pop(key)
mdns_task.cancel()
if self._aiobrowser:
await self._aiobrowser.async_cancel()
if self._aiozc:
Expand Down Expand Up @@ -673,7 +670,7 @@ async def subscribe_attribute(
The given attribute path(s) will be added to the list of attributes that
are watched for the given node. This is persistent over restarts.
"""
LOGGER.warning(
LOGGER.debug(
"The subscribe_attribute command has been deprecated and will be removed from"
" a future version. You no longer need to call this to subscribe to attribute changes."
)
Expand Down Expand Up @@ -755,11 +752,10 @@ async def _subscribe_node(self, node_id: int) -> None:
node = self._nodes[node_id]

# check if we already have setup subscriptions for this node,
# if so, we need to unsubscribe first unless nothing changed
# in the attribute paths we want to subscribe.
# if so, we need to unsubscribe
if prev_sub := self._subscriptions.get(node_id, None):
async with node_lock:
node_logger.info("Unsubscribing from existing subscription.")
node_logger.debug("Unsubscribing from existing subscription.")
await self._call_sdk(prev_sub.Shutdown)
del self._subscriptions[node_id]

Expand All @@ -778,6 +774,7 @@ def attribute_updated_callback(
path: Attribute.TypedAttributePath,
transaction: Attribute.SubscriptionTransaction,
) -> None:
self._mdns_last_seen[node_id] = time.time()
assert loop is not None
new_value = transaction.GetAttribute(path)
# failsafe: ignore ValueDecodeErrors
Expand Down Expand Up @@ -905,6 +902,7 @@ def resubscription_succeeded(
transaction: Attribute.SubscriptionTransaction,
) -> None:
# pylint: disable=unused-argument, invalid-name
self._mdns_last_seen[node_id] = time.time()
node_logger.info("Re-Subscription succeeded")
self._last_subscription_attempt[node_id] = 0
# mark node as available and signal consumers
Expand All @@ -914,7 +912,11 @@ def resubscription_succeeded(

node_logger.info("Setting up attributes and events subscription.")
interval_floor = 0
interval_ceiling = 600 if battery_powered else 120
interval_ceiling = (
NODE_SUBSCRIPTION_CEILING_BATTERY_POWERED
if battery_powered
else NODE_SUBSCRIPTION_CEILING
)
self._last_subscription_attempt[node_id] = 0
future = loop.create_future()
device = await self._resolve_node(node_id)
Expand Down Expand Up @@ -957,6 +959,7 @@ def resubscription_succeeded(
tlv_attributes = sub._readTransaction._cache.attributeTLVCache
node.attributes.update(parse_attributes_from_read_result(tlv_attributes))
node_logger.info("Subscription succeeded")
self._mdns_last_seen[node_id] = time.time()
self.server.signal_event(EventType.NODE_UPDATED, node)

def _get_next_node_id(self) -> int:
Expand Down Expand Up @@ -1078,58 +1081,54 @@ def _on_mdns_service_state_change(
name: str,
state_change: ServiceStateChange,
) -> None:
LOGGER.debug("Received %s MDNS event for %s", state_change, name)
if service_type == MDNS_TYPE_COMMISSIONABLE_NODE:
asyncio.create_task(
self._on_mdns_commissionable_node_state(name, state_change)
)
return
if service_type == MDNS_TYPE_OPERATIONAL_NODE:
name = name.lower()
if self.fabric_id_hex not in name:
# filter out messages that are not for our fabric
return
LOGGER.debug("Received %s MDNS event for %s", state_change, name)
if state_change not in (
ServiceStateChange.Added,
ServiceStateChange.Updated,
):
# we're not interested in removals as this is already
# handled in the subscription logic
return
if existing := self._mdns_queues.get(name):
queue = existing[0]
else:
# we want mdns messages to be processes sequentially PER NODE but in
# PARALLEL overall, hence we create a node specific mdns queue per mdns name.
queue = asyncio.Queue()
task = asyncio.create_task(self._process_mdns_queue(name, queue))
self._mdns_queues[name] = (queue, task)
queue.put_nowait(state_change)

async def _process_mdns_queue(
self, name: str, queue: asyncio.Queue[ServiceStateChange]
self._on_mdns_operational_node_state(name, state_change)

def _on_mdns_operational_node_state(
self, name: str, state_change: ServiceStateChange
) -> None:
"""Process the incoming MDNS messages of an (operational) Matter node."""
"""Handle a (operational) Matter node MDNS state change."""
name = name.lower()
if self.fabric_id_hex not in name:
# filter out messages that are not for our fabric
return

if state_change == ServiceStateChange.Removed:
# we're not interested in removals as this is already
# handled in the subscription logic
return

# the mdns name is constructed as [fabricid]-[nodeid]._matter._tcp.local.
# extract the node id from the name
node_id = int(name.split("-")[1].split(".")[0], 16)
while True:
state_change = await queue.get()
if node_id not in self._nodes:
continue # this should not happen, but just in case
node = self._nodes[node_id]
if state_change not in (
ServiceStateChange.Added,
ServiceStateChange.Updated,
):
# this should be already filtered out, but just in case
continue
if node.available:
# if the node is already set-up, no action is needed
continue
LOGGER.info("Node %s discovered on MDNS", node_id)
# setup the node
await self._setup_node(node_id)

if not (node := self._nodes.get(node_id)):
return # this should not happen, but guard just in case

# mdns events for matter devices arrive in bursts of (duplicate) messages
# so we debounce this as we only use the mdns messages for operational node discovery
# and we have other logic in place to determine node aliveness

now = time.time()
last_seen = self._mdns_last_seen.get(node_id, 0)
self._mdns_last_seen[node_id] = now
if now - last_seen < NODE_SUBSCRIPTION_CEILING:
return

# we treat UPDATE state changes as ADD if the node is marked as
# unavailable to ensure we catch a node being operational
if node.available and state_change == ServiceStateChange.Updated:
return

LOGGER.info("Node %s (re)discovered on MDNS", node_id)
# setup the node - this will (re) setup the subscriptions etc.
asyncio.create_task(self._setup_node(node_id))

async def _on_mdns_commissionable_node_state(
self, name: str, state_change: ServiceStateChange
Expand Down

0 comments on commit 6358300

Please sign in to comment.