Skip to content

Commit

Permalink
Improve node resolving for nodes offline at startup (#357)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt committed Jul 11, 2023
1 parent e815ff3 commit 5dcb8be
Showing 1 changed file with 13 additions and 26 deletions.
39 changes: 13 additions & 26 deletions matter_server/server/device_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def __init__(
self._interview_limit: asyncio.Semaphore = asyncio.Semaphore(
INTERVIEW_TASK_LIMIT
)
self._resolve_lock: asyncio.Lock = asyncio.Lock()
self._node_lock: dict[int, asyncio.Lock] = {}

async def initialize(self) -> None:
Expand Down Expand Up @@ -583,9 +584,9 @@ async def _subscribe_node(self, node_id: int) -> None:
node_logger.debug("Unsubscribing from existing subscription.")
await self._call_sdk(prev_sub.Shutdown)

node_logger.debug("Setting up attributes and events subscription.")
self._attr_subscriptions[node_id] = attr_subscriptions
async with node_lock:
node_logger.debug("Setting up attributes and events subscription.")
sub: Attribute.SubscriptionTransaction = await self.chip_controller.Read(
nodeid=node_id,
# In order to prevent network congestion due to wildcard subscriptions on all nodes,
Expand Down Expand Up @@ -744,7 +745,7 @@ async def _call_sdk(self, func: Callable[..., _T], *args: Any, **kwargs: Any) ->
)

async def _check_interview_and_subscription(
self, node_id: int, reschedule_interval: int = 300
self, node_id: int, reschedule_interval: int = 30
) -> None:
"""Handle interview (if needed) and subscription for known node."""

Expand All @@ -756,8 +757,8 @@ def reschedule() -> None:
asyncio.create_task,
self._check_interview_and_subscription(
node_id,
# increase interval at each attempt with maximum of 1 hour
min(reschedule_interval + 300, 3600),
# increase interval at each attempt with maximum of 10 minutes
min(reschedule_interval + 10, 600),
),
)

Expand Down Expand Up @@ -837,37 +838,23 @@ def _parse_attributes_from_read_result(
result[attribute_path] = attr_value
return result

async def _resolve_node(
self, node_id: int, retries: int = 3, allow_pase: bool = False
) -> None:
async def _resolve_node(self, node_id: int, retries: int = 3) -> None:
"""Resolve a Node on the network."""
node_lock = self._get_node_lock(node_id)
if self.chip_controller is None:
raise RuntimeError("Device Controller not initialized.")
try:
if allow_pase:
# last attempt allows PASE connection (last resort)
LOGGER.debug(
"Attempting to resolve node %s (with PASE connection)", node_id
async with node_lock, self._resolve_lock:
LOGGER.info("Attempting to resolve node %s...", node_id)
await self._call_sdk(
self.chip_controller.ResolveNode,
nodeid=node_id,
)
async with node_lock:
await self._call_sdk(
self.chip_controller.GetConnectedDeviceSync,
nodeid=node_id,
allowPASE=True,
timeoutMs=30000,
)
return
LOGGER.debug("Resolving node %s", node_id)
await self._call_sdk(self.chip_controller.ResolveNode, nodeid=node_id)
except (ChipStackError, TimeoutError) as err:
if not retries:
if retries <= 1:
# when we're out of retries, raise NodeNotResolving
raise NodeNotResolving(f"Unable to resolve Node {node_id}") from err
async with node_lock:
await self._resolve_node(
node_id=node_id, retries=retries - 1, allow_pase=retries - 1 == 0
)
await self._resolve_node(node_id=node_id, retries=retries - 1)
await asyncio.sleep(2)

def _handle_endpoints_removed(self, node_id: int, endpoints: Iterable[int]) -> None:
Expand Down

0 comments on commit 5dcb8be

Please sign in to comment.