Skip to content

Commit

Permalink
Guard node resolving with a lock (#515)
Browse files Browse the repository at this point in the history
Co-authored-by: Martin Hjelmare <[email protected]>
  • Loading branch information
marcelveldt and MartinHjelmare committed Jan 29, 2024
1 parent 65a4d60 commit 874a7bf
Showing 1 changed file with 28 additions and 25 deletions.
53 changes: 28 additions & 25 deletions matter_server/server/device_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def __init__(
self.thread_credentials_set: bool = False
self.compressed_fabric_id: int | None = None
self._node_lock: dict[int, asyncio.Lock] = {}
self._resolve_lock = asyncio.Lock()

async def initialize(self) -> None:
"""Async initialize of controller."""
Expand Down Expand Up @@ -497,7 +498,8 @@ async def send_device_command(
cluster_cls: Cluster = ALL_CLUSTERS[cluster_id]
command_cls = getattr(cluster_cls.Commands, command_name)
command = dataclass_from_dict(command_cls, payload)
async with self._get_node_lock(node_id):
node_lock = self._get_node_lock(node_id)
async with node_lock:
return await self.chip_controller.SendCommand(
nodeid=node_id,
endpoint=endpoint_id,
Expand All @@ -514,12 +516,12 @@ async def read_attribute(
"""Read a single attribute (or Cluster) on a node."""
if self.chip_controller is None:
raise RuntimeError("Device Controller not initialized.")
node_lock = self._get_node_lock(node_id)
endpoint_id, cluster_id, attribute_id = parse_attribute_path(attribute_path)
assert self.server.loop is not None
future = self.server.loop.create_future()
device = await self._resolve_node(node_id)
node_lock = self._get_node_lock(node_id)
async with node_lock:
assert self.server.loop is not None
future = self.server.loop.create_future()
device = await self._resolve_node(node_id)
Attribute.Read(
future=future,
eventLoop=self.server.loop,
Expand Down Expand Up @@ -932,15 +934,15 @@ def resubscription_succeeded(
node.available = True
self.server.signal_event(EventType.NODE_UPDATED, node)

node_logger.info("Setting up attributes and events subscription.")
interval_floor = 0
interval_ceiling = (
random.randint(60, 300) if battery_powered else random.randint(30, 120)
)
self._last_subscription_attempt[node_id] = 0
future = loop.create_future()
device = await self._resolve_node(node_id)
async with node_lock:
node_logger.info("Setting up attributes and events subscription.")
interval_floor = 0
interval_ceiling = (
random.randint(60, 300) if battery_powered else random.randint(30, 120)
)
self._last_subscription_attempt[node_id] = 0
future = loop.create_future()
device = await self._resolve_node(node_id)
Attribute.Read(
future=future,
eventLoop=loop,
Expand All @@ -961,13 +963,13 @@ def resubscription_succeeded(
fabricFiltered=False,
autoResubscribe=True,
).raise_on_error()
sub: Attribute.SubscriptionTransaction = await future
sub: Attribute.SubscriptionTransaction = await future

sub.SetAttributeUpdateCallback(attribute_updated_callback)
sub.SetEventUpdateCallback(event_callback)
sub.SetErrorCallback(error_callback)
sub.SetResubscriptionAttemptedCallback(resubscription_attempted)
sub.SetResubscriptionSucceededCallback(resubscription_succeeded)
sub.SetAttributeUpdateCallback(attribute_updated_callback)
sub.SetEventUpdateCallback(event_callback)
sub.SetErrorCallback(error_callback)
sub.SetResubscriptionAttemptedCallback(resubscription_attempted)
sub.SetResubscriptionSucceededCallback(resubscription_succeeded)

# if we reach this point, it means the node could be resolved
# and the initial subscription succeeded, mark the node available.
Expand Down Expand Up @@ -1085,12 +1087,13 @@ async def _resolve_node(
attempt,
retries,
)
return await self._call_sdk(
self.chip_controller.GetConnectedDeviceSync,
nodeid=node_id,
allowPASE=False,
timeoutMs=None,
)
async with self._resolve_lock:
return await self._call_sdk(
self.chip_controller.GetConnectedDeviceSync,
nodeid=node_id,
allowPASE=False,
timeoutMs=None,
)
except (ChipStackError, TimeoutError) as err:
if attempt >= retries:
# when we're out of retries, raise NodeNotResolving
Expand Down

0 comments on commit 874a7bf

Please sign in to comment.