Skip to content

Commit

Permalink
Refactor the interview logic a small bit (#538)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt committed Feb 7, 2024
1 parent 415fe93 commit 459b203
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 54 deletions.
6 changes: 6 additions & 0 deletions matter_server/server/const.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
"""Server-only constants for the Python Matter Server."""

import pathlib
from typing import Final

# The minimum schema version (of a client) the server can support
MIN_SCHEMA_VERSION = 5

# schema version of our data model
# only bump if the format of the data in MatterNodeData changed
# and a full re-interview is mandatory
DATA_MODEL_SCHEMA_VERSION = 6

# the paa-root-certs path is hardcoded in the sdk at this time
# and always uses the development subfolder
# regardless of anything you pass into instantiating the controller
Expand Down
68 changes: 14 additions & 54 deletions matter_server/server/device_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
MatterNodeEvent,
NodePingResult,
)
from .const import PAA_ROOT_CERTS_DIR
from .const import DATA_MODEL_SCHEMA_VERSION, PAA_ROOT_CERTS_DIR
from .helpers.paa_certificates import fetch_certificates

if TYPE_CHECKING:
Expand Down Expand Up @@ -501,7 +501,7 @@ async def interview_node(self, node_id: int) -> None:
existing_info.date_commissioned if existing_info else datetime.utcnow()
),
last_interview=datetime.utcnow(),
interview_version=SCHEMA_VERSION,
interview_version=DATA_MODEL_SCHEMA_VERSION,
available=True,
attributes=parse_attributes_from_read_result(read_response.tlvAttributes),
)
Expand Down Expand Up @@ -1050,48 +1050,28 @@ async def _call_sdk(self, func: Callable[..., _T], *args: Any, **kwargs: Any) ->
),
)

async def _check_interview_and_subscription(
self, node_id: int, reschedule_interval: int = 30
) -> None:
"""Handle interview (if needed) and subscription for known node."""

async def _setup_node(self, node_id: int) -> None:
"""Handle set-up of subscriptions and interview (if needed) for known/discovered node."""
if node_id not in self._nodes:
raise NodeNotExists(f"Node {node_id} does not exist.")

# (re)interview node (only) if needed
node_data = self._nodes.get(node_id)
node_data = self._nodes[node_id]
if (
node_data is None
# re-interview if the schema has changed
or node_data.interview_version < SCHEMA_VERSION
# re-interview if we dont have any node attributes (empty node)
not node_data.attributes
# re-interview if the data model schema has changed
or node_data.interview_version != DATA_MODEL_SCHEMA_VERSION
):
try:
await self.interview_node(node_id)
except NodeNotResolving:
LOGGER.warning(
"Unable to interview Node %s as it is unavailable",
node_id,
)
# NOTE: the node will be picked up by mdns discovery automatically
# when it becomes available again.
except NodeInterviewFailed:
LOGGER.warning(
"Unable to interview Node %s, will retry later in the background.",
node_id,
)
# reschedule interview on error
# increase interval at each attempt with maximum of
# MAX_POLL_INTERVAL seconds (= 10 minutes)
self._schedule_interview(
node_id,
min(reschedule_interval + 10, MAX_POLL_INTERVAL),
)
except (NodeNotResolving, NodeInterviewFailed) as err:
LOGGER.warning("Unable to interview Node %s", exc_info=err)
# NOTE: the node will be picked up by mdns discovery automatically
# when it comes available again.
return

# setup subscriptions for the node
if node_id in self._subscriptions:
return

try:
await self._subscribe_node(node_id)
except NodeNotResolving:
Expand All @@ -1102,26 +1082,6 @@ async def _check_interview_and_subscription(
# NOTE: the node will be picked up by mdns discovery automatically
# when it becomes available again.

def _schedule_interview(self, node_id: int, delay: int) -> None:
"""(Re)Schedule interview and/or initial subscription for a node."""
assert self.server.loop is not None
# cancel any existing (re)schedule timer
if existing := self._sub_retry_timer.pop(node_id, None):
existing.cancel()

def create_interview_task() -> None:
asyncio.create_task(
self._check_interview_and_subscription(
node_id,
)
)
# the handle to the timer can now be removed
self._sub_retry_timer.pop(node_id, None)

self._sub_retry_timer[node_id] = self.server.loop.call_later(
delay, create_interview_task
)

async def _resolve_node(
self, node_id: int, retries: int = 2, attempt: int = 1
) -> DeviceProxyWrapper:
Expand Down Expand Up @@ -1230,7 +1190,7 @@ async def _process_mdns_queue(
continue # node is already set-up, no action needed
LOGGER.info("Node %s discovered on MDNS", node_id)
# setup the node
await self._check_interview_and_subscription(node_id)
await self._setup_node(node_id)
elif state_change == ServiceStateChange.Removed:
if not node.available:
continue # node is already offline, nothing to do
Expand Down

0 comments on commit 459b203

Please sign in to comment.