Skip to content

Commit

Permalink
add semaphore for concurrent node setups
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt committed Mar 1, 2024
1 parent 8214e08 commit a7e1ad0
Showing 1 changed file with 42 additions and 38 deletions.
80 changes: 42 additions & 38 deletions matter_server/server/device_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def __init__(
self._sdk_executor = ThreadPoolExecutor(
max_workers=1, thread_name_prefix="SDKExecutor"
)
self._node_setup_throttle = asyncio.Semaphore(10)

async def initialize(self) -> None:
"""Async initialize of controller."""
Expand Down Expand Up @@ -1070,53 +1071,56 @@ async def _setup_node(self, node_id: int) -> None:
return
self._nodes_in_setup.add(node_id)
try:
# Ping the node to rule out stale mdns reports and to prevent that we
# send an unreachable node to the sdk which is very slow with resolving it.
# This will also precache the ip addresses of the node for later use.
ping_result = await self.ping_node(
node_id, attempts=3, allow_cached_ips=False
)
if not any(ping_result.values()):
LOGGER.warning(
"Skip set-up for node %s because it does not appear to be reachable...",
node_id,
async with self._node_setup_throttle:
# Ping the node to rule out stale mdns reports and to prevent that we
# send an unreachable node to the sdk which is very slow with resolving it.
# This will also precache the ip addresses of the node for later use.
ping_result = await self.ping_node(
node_id, attempts=3, allow_cached_ips=False
)
return
LOGGER.info("Setting-up node %s...", node_id)
# (re)interview node (only) if needed
node_data = self._nodes[node_id]
if (
# re-interview if we dont have any node attributes (empty node)
not node_data.attributes
# re-interview if the data model schema has changed
or node_data.interview_version != DATA_MODEL_SCHEMA_VERSION
):
if not any(ping_result.values()):
LOGGER.warning(
"Skip set-up for node %s because it does not appear to be reachable...",
node_id,
)
return
LOGGER.info("Setting-up node %s...", node_id)
# (re)interview node (only) if needed
node_data = self._nodes[node_id]
if (
# re-interview if we dont have any node attributes (empty node)
not node_data.attributes
# re-interview if the data model schema has changed
or node_data.interview_version != DATA_MODEL_SCHEMA_VERSION
):
try:
await self.interview_node(node_id)
except (NodeNotResolving, NodeInterviewFailed) as err:
LOGGER.warning(
"Unable to interview Node %s: %s",
node_id,
str(err) or err.__class__.__name__,
# log full stack trace if debug logging is enabled
exc_info=err
if LOGGER.isEnabledFor(logging.DEBUG)
else None,
)
# NOTE: the node will be picked up by mdns discovery automatically
# when it comes available again.
return
# setup subscriptions for the node
try:
await self.interview_node(node_id)
except (NodeNotResolving, NodeInterviewFailed) as err:
await self._subscribe_node(node_id)
except (NodeNotResolving, ChipStackError) as err:
LOGGER.warning(
"Unable to interview Node %s: %s",
"Unable to subscribe to Node %s: %s",
node_id,
str(err) or err.__class__.__name__,
# log full stack trace if debug logging is enabled
exc_info=err if LOGGER.isEnabledFor(logging.DEBUG) else None,
)
# NOTE: the node will be picked up by mdns discovery automatically
# when it comes available again.
return
# setup subscriptions for the node
try:
await self._subscribe_node(node_id)
except (NodeNotResolving, ChipStackError) as err:
LOGGER.warning(
"Unable to subscribe to Node %s: %s",
node_id,
str(err) or err.__class__.__name__,
# log full stack trace if debug logging is enabled
exc_info=err if LOGGER.isEnabledFor(logging.DEBUG) else None,
)
# NOTE: the node will be picked up by mdns discovery automatically
# when it becomes available again.
# when it becomes available again.
finally:
self._nodes_in_setup.discard(node_id)

Expand Down

0 comments on commit a7e1ad0

Please sign in to comment.