Skip to content

Commit

Permalink
Process the mdns messages semi parallel (#537)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt authored Feb 7, 2024
1 parent 86d68cc commit 415fe93
Showing 1 changed file with 38 additions and 23 deletions.
61 changes: 38 additions & 23 deletions matter_server/server/device_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class MatterDeviceController:
"""Class that manages the Matter devices."""

chip_controller: ChipDeviceController | None
fabric_id_hex: str

def __init__(
self,
Expand All @@ -123,7 +124,9 @@ def __init__(
self._resolve_lock = asyncio.Lock()
self._aiobrowser: AsyncServiceBrowser | None = None
self._aiozc: AsyncZeroconf | None = None
self._mdns_inprogress: set[int] = set()
self._mdns_queues: dict[
str, tuple[asyncio.Queue[ServiceStateChange], asyncio.Task]
] = {}

async def initialize(self) -> None:
"""Async initialize of controller."""
Expand All @@ -134,9 +137,10 @@ async def initialize(self) -> None:
self.chip_controller = self.server.stack.fabric_admin.NewController(
paaTrustStorePath=str(PAA_ROOT_CERTS_DIR)
)
self.compressed_fabric_id = await self._call_sdk(
self.chip_controller.GetCompressedFabricId
self.compressed_fabric_id = cast(
int, await self._call_sdk(self.chip_controller.GetCompressedFabricId)
)
self.fabric_id_hex = hex(self.compressed_fabric_id)[2:]
LOGGER.debug("CHIP Device Controller Initialized")

async def start(self) -> None:
Expand Down Expand Up @@ -181,7 +185,10 @@ async def stop(self) -> None:
for sub in self._subscriptions.values():
await self._call_sdk(sub.Shutdown)
self._subscriptions = {}
# shutdown mdns browser
# shutdown (and cleanup) mdns browser
for key in tuple(self._mdns_queues.keys()):
_, mdns_task = self._mdns_queues.pop(key)
mdns_task.cancel()
if self._aiobrowser:
await self._aiobrowser.async_cancel()
if self._aiozc:
Expand Down Expand Up @@ -1189,38 +1196,46 @@ def _on_mdns_service_state_change(
)
return
if service_type == MDNS_TYPE_OPERATIONAL_NODE:
asyncio.create_task(
self._on_mdns_operational_node_state(name, state_change)
)

async def _on_mdns_operational_node_state(
self, name: str, state_change: ServiceStateChange
name = name.lower()
if not name.startswith(self.fabric_id_hex):
# filter out messages that are not for our fabric
return
if existing := self._mdns_queues.get(name):
queue = existing[0]
else:
# we want mdns messages to be processes sequentially PER NODE but in
# PARALLEL overall, hence we create a node specific mdns queue per mdns name.
queue = asyncio.Queue()
task = asyncio.create_task(self._process_mdns_queue(name, queue))
self._mdns_queues[name] = (queue, task)
queue.put_nowait(state_change)

async def _process_mdns_queue(
self, name: str, queue: asyncio.Queue[ServiceStateChange]
) -> None:
"""Handle a (operational) Matter node MDNS state change."""
"""Process the incoming MDNS messages of an (operational) Matter node."""
# the mdns name is constructed as [fabricid]-[nodeid]._matter._tcp.local.
# extract the node id from the name
node_id = int(name.split("-")[1].split(".")[0], 16)
if node_id not in self._nodes:
return # should not happen, but just in case
if node_id in self._mdns_inprogress:
# mdns records can potentially arrive multiplied so debounce any duplicates
return
try:
self._mdns_inprogress.add(node_id)
while True:
state_change = await queue.get()
if node_id not in self._nodes:
continue # this should not happen, but just in case
node = self._nodes[node_id]
if state_change in (ServiceStateChange.Added, ServiceStateChange.Updated):
if state_change in (
ServiceStateChange.Added,
ServiceStateChange.Updated,
):
if node.available:
return # node is already set-up, no action needed
continue # node is already set-up, no action needed
LOGGER.info("Node %s discovered on MDNS", node_id)
# setup the node
await self._check_interview_and_subscription(node_id)
elif state_change == ServiceStateChange.Removed:
if not node.available:
return # node is already offline, nothing to do
continue # node is already offline, nothing to do
LOGGER.info("Node %s vanished according to MDNS", node_id)
await self._node_offline(node_id)
finally:
self._mdns_inprogress.remove(node_id)

async def _on_mdns_commissionable_node_state(
self, name: str, state_change: ServiceStateChange
Expand Down

0 comments on commit 415fe93

Please sign in to comment.