Skip to content

Commit

Permalink
remove setup lock as the lock to the sdk is enough to throttle
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt committed Feb 28, 2024
1 parent 929646e commit c9ebafd
Showing 1 changed file with 36 additions and 43 deletions.
79 changes: 36 additions & 43 deletions matter_server/server/device_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ def __init__(
self.thread_credentials_set: bool = False
self.compressed_fabric_id: int | None = None
self._node_lock: dict[int, asyncio.Lock] = {}
self._node_setup_lock: asyncio.Semaphore = asyncio.Semaphore(5)
self._aiobrowser: AsyncServiceBrowser | None = None
self._aiozc: AsyncZeroconf | None = None
self._sdk_executor = ThreadPoolExecutor(max_workers=1)
Expand Down Expand Up @@ -1036,51 +1035,45 @@ async def _setup_node(self, node_id: int) -> None:
# prevent duplicate setup actions
return
self._nodes_in_setup.add(node_id)
# ping the node to out stale mdns reports and to prevent that we
# send an unreachable node to the sdk which is very slow with resolving it
# this will also precache the ip addresses of the node for later use.
ping_result = await self.ping_node(node_id)
if not any(ping_result.values()):
LOGGER.warning(
"Skip set-up for node %s because it does not appear to be reachable...",
node_id,
)
return
# we use a lock for the node setup process to process nodes sequentially
# to prevent a flood of the (thread) network when there are many nodes being setup.
async with self._node_setup_lock:
# add this little random sleep here to do a bit of throttling
# can be optimized later
await asyncio.sleep(randint(0, 5)) # noqa: S311
try:
# ping the node to out stale mdns reports and to prevent that we
# send an unreachable node to the sdk which is very slow with resolving it
# this will also precache the ip addresses of the node for later use.
ping_result = await self.ping_node(node_id)
if not any(ping_result.values()):
LOGGER.warning(
"Skip set-up for node %s because it does not appear to be reachable...",
node_id,
)
return
LOGGER.info("Setting-up node %s...", node_id)
try:
# (re)interview node (only) if needed
node_data = self._nodes[node_id]
if (
# re-interview if we dont have any node attributes (empty node)
not node_data.attributes
# re-interview if the data model schema has changed
or node_data.interview_version != DATA_MODEL_SCHEMA_VERSION
):
try:
await self.interview_node(node_id)
except (NodeNotResolving, NodeInterviewFailed) as err:
LOGGER.warning("Unable to interview Node %s", exc_info=err)
# NOTE: the node will be picked up by mdns discovery automatically
# when it comes available again.
return
# setup subscriptions for the node
# (re)interview node (only) if needed
node_data = self._nodes[node_id]
if (
# re-interview if we dont have any node attributes (empty node)
not node_data.attributes
# re-interview if the data model schema has changed
or node_data.interview_version != DATA_MODEL_SCHEMA_VERSION
):
try:
await self._subscribe_node(node_id)
except NodeNotResolving:
LOGGER.warning(
"Unable to subscribe to Node %s as it is unavailable",
node_id,
)
await self.interview_node(node_id)
except (NodeNotResolving, NodeInterviewFailed) as err:
LOGGER.warning("Unable to interview Node %s", exc_info=err)
# NOTE: the node will be picked up by mdns discovery automatically
# when it becomes available again.
finally:
self._nodes_in_setup.discard(node_id)
# when it comes available again.
return
# setup subscriptions for the node
try:
await self._subscribe_node(node_id)
except NodeNotResolving:
LOGGER.warning(
"Unable to subscribe to Node %s as it is unavailable",
node_id,
)
# NOTE: the node will be picked up by mdns discovery automatically
# when it becomes available again.
finally:
self._nodes_in_setup.discard(node_id)

async def _resolve_node(
self, node_id: int, retries: int = 2, attempt: int = 1
Expand Down

0 comments on commit c9ebafd

Please sign in to comment.