Skip to content

Commit

Permalink
Move delayed interview to own callback (#472)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt authored Dec 22, 2023
1 parent 8e0978d commit dfdebec
Showing 1 changed file with 32 additions and 27 deletions.
59 changes: 32 additions & 27 deletions matter_server/server/device_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,7 @@ async def start(self) -> None:
# the first attempt to initialize so that we prioritize nodes
# that are probably available so they are back online as soon as
# possible and we're not stuck trying to initialize nodes that are offline
assert self.server.loop
self.server.loop.call_later(
5,
asyncio.create_task,
self._check_interview_and_subscription(node_id),
)

self._schedule_interview(node_id, 5)
else:
asyncio.create_task(self._check_interview_and_subscription(node_id))
LOGGER.info("Loaded %s nodes from stored configuration", len(self._nodes))
Expand Down Expand Up @@ -913,23 +907,6 @@ async def _check_interview_and_subscription(
if node_id not in self._nodes:
raise NodeNotExists(f"Node {node_id} does not exist.")

# pop any existing reschedule timer
self._sub_retry_timer.pop(node_id, None)

def reschedule() -> None:
"""(Re)Schedule interview and/or initial subscription for a node."""
assert self.server.loop is not None
self._sub_retry_timer[node_id] = self.server.loop.call_later(
reschedule_interval,
asyncio.create_task,
self._check_interview_and_subscription(
node_id,
# increase interval at each attempt with maximum of
# MAX_POLL_INTERVAL seconds (= 10 minutes)
min(reschedule_interval + 10, MAX_POLL_INTERVAL),
),
)

# (re)interview node (only) if needed
node_data = self._nodes.get(node_id)
if (
Expand All @@ -944,8 +921,13 @@ def reschedule() -> None:
"Unable to interview Node %s, will retry later in the background.",
node_id,
)
# reschedule self on error
reschedule()
# reschedule interview on error
# increase interval at each attempt with maximum of
# MAX_POLL_INTERVAL seconds (= 10 minutes)
self._schedule_interview(
node_id,
min(reschedule_interval + 10, MAX_POLL_INTERVAL),
)
return

# setup subscriptions for the node
Expand All @@ -962,7 +944,30 @@ def reschedule() -> None:
)
# TODO: fix this once OperationalNodeDiscovery is available:
# https://github.com/project-chip/connectedhomeip/pull/26718
reschedule()
self._schedule_interview(
node_id,
min(reschedule_interval + 10, MAX_POLL_INTERVAL),
)

def _schedule_interview(self, node_id: int, delay: int) -> None:
"""(Re)Schedule interview and/or initial subscription for a node."""
assert self.server.loop is not None
# cancel any existing (re)schedule timer
if existing := self._sub_retry_timer.pop(node_id, None):
existing.cancel()

def create_interview_task() -> None:
asyncio.create_task(
self._check_interview_and_subscription(
node_id,
)
)
# the handle to the timer can now be removed
self._sub_retry_timer.pop(node_id, None)

self._sub_retry_timer[node_id] = self.server.loop.call_later(
delay, create_interview_task
)

async def _resolve_node(
self, node_id: int, retries: int = 2, attempt: int = 1
Expand Down

0 comments on commit dfdebec

Please sign in to comment.