Skip to content

Commit

Permalink
Add a lock on the node setup
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt committed Feb 28, 2024
1 parent c4e45dc commit 78666e6
Showing 1 changed file with 31 additions and 27 deletions.
58 changes: 31 additions & 27 deletions matter_server/server/device_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def __init__(
self.thread_credentials_set: bool = False
self.compressed_fabric_id: int | None = None
self._node_lock: dict[int, asyncio.Lock] = {}
self._node_setup_lock: asyncio.Lock = asyncio.Lock()
self._aiobrowser: AsyncServiceBrowser | None = None
self._aiozc: AsyncZeroconf | None = None

Expand Down Expand Up @@ -1042,35 +1043,38 @@ async def _setup_node(self, node_id: int) -> None:
self._nodes_in_setup.add(node_id)
# pre-cache ip-addresses
await self.get_node_ip_addresses(node_id)
try:
# (re)interview node (only) if needed
node_data = self._nodes[node_id]
if (
# re-interview if we dont have any node attributes (empty node)
not node_data.attributes
# re-interview if the data model schema has changed
or node_data.interview_version != DATA_MODEL_SCHEMA_VERSION
):
# we use a lock for the node setup process to process nodes sequentially
# to prevent a flood of the (thread) network when there are many nodes being setup.
async with self._node_setup_lock:
try:
# (re)interview node (only) if needed
node_data = self._nodes[node_id]
if (
# re-interview if we dont have any node attributes (empty node)
not node_data.attributes
# re-interview if the data model schema has changed
or node_data.interview_version != DATA_MODEL_SCHEMA_VERSION
):
try:
await self.interview_node(node_id)
except (NodeNotResolving, NodeInterviewFailed) as err:
LOGGER.warning("Unable to interview Node %s", exc_info=err)
# NOTE: the node will be picked up by mdns discovery automatically
# when it comes available again.
return

# setup subscriptions for the node
try:
await self.interview_node(node_id)
except (NodeNotResolving, NodeInterviewFailed) as err:
LOGGER.warning("Unable to interview Node %s", exc_info=err)
await self._subscribe_node(node_id)
except NodeNotResolving:
LOGGER.warning(
"Unable to subscribe to Node %s as it is unavailable",
node_id,
)
# NOTE: the node will be picked up by mdns discovery automatically
# when it comes available again.
return

# setup subscriptions for the node
try:
await self._subscribe_node(node_id)
except NodeNotResolving:
LOGGER.warning(
"Unable to subscribe to Node %s as it is unavailable",
node_id,
)
# NOTE: the node will be picked up by mdns discovery automatically
# when it becomes available again.
finally:
self._nodes_in_setup.discard(node_id)
# when it becomes available again.
finally:
self._nodes_in_setup.discard(node_id)

async def _resolve_node(
self, node_id: int, retries: int = 2, attempt: int = 1
Expand Down

0 comments on commit 78666e6

Please sign in to comment.