From 74b0ca316d1ee305ee873f03d8f865413ddc2887 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Thu, 29 Feb 2024 21:55:59 +0100 Subject: [PATCH] Add timeouts to sdk calls and subscription set-up (#604) --- matter_server/server/device_controller.py | 83 +++++++++++++++-------- 1 file changed, 53 insertions(+), 30 deletions(-) diff --git a/matter_server/server/device_controller.py b/matter_server/server/device_controller.py index aecb237c..ff41f13b 100644 --- a/matter_server/server/device_controller.py +++ b/matter_server/server/device_controller.py @@ -14,6 +14,7 @@ import time from typing import TYPE_CHECKING, Any, Callable, Iterable, TypeVar, cast +import async_timeout from chip.ChipDeviceCtrl import DeviceProxyWrapper from chip.clusters import Attribute, Objects as Clusters from chip.clusters.Attribute import ValueDecodeFailure @@ -61,6 +62,8 @@ DATA_KEY_NODES = "nodes" DATA_KEY_LAST_NODE_ID = "last_node_id" +DEFAULT_CALL_TIMEOUT = 300 + LOGGER = logging.getLogger(__name__) MIN_NODE_SUBSCRIPTION_CEILING = 30 MAX_NODE_SUBSCRIPTION_CEILING = 300 @@ -990,25 +993,28 @@ def resubscription_succeeded( self._last_subscription_attempt[node_id] = 0 future = loop.create_future() device = await self._resolve_node(node_id) - Attribute.Read( - future=future, - eventLoop=loop, - device=device.deviceProxy, - devCtrl=self.chip_controller, - attributes=[Attribute.AttributePath()], # wildcard - events=[ - Attribute.EventPath(EndpointId=None, Cluster=None, Event=None, Urgent=1) - ], - returnClusterObject=False, - subscriptionParameters=Attribute.SubscriptionParameters( - interval_floor, interval_ceiling - ), - # Use fabricfiltered as False to detect changes made by other controllers - # and to be able to provide a list of all fabrics attached to the device - fabricFiltered=False, - autoResubscribe=True, - ).raise_on_error() - sub: Attribute.SubscriptionTransaction = await future + async with async_timeout.timeout(DEFAULT_CALL_TIMEOUT): + Attribute.Read( + future=future, + eventLoop=loop, + device=device.deviceProxy, + devCtrl=self.chip_controller, + attributes=[Attribute.AttributePath()], # wildcard + events=[ + Attribute.EventPath( + EndpointId=None, Cluster=None, Event=None, Urgent=1 + ) + ], + returnClusterObject=False, + subscriptionParameters=Attribute.SubscriptionParameters( + interval_floor, interval_ceiling + ), + # Use fabricfiltered as False to detect changes made by other controllers + # and to be able to provide a list of all fabrics attached to the device + fabricFiltered=False, + autoResubscribe=True, + ).raise_on_error() + sub: Attribute.SubscriptionTransaction = await future sub.SetAttributeUpdateCallback(attribute_updated_callback) sub.SetEventUpdateCallback(event_callback) @@ -1035,18 +1041,26 @@ def _get_next_node_id(self) -> int: self.server.storage.set(DATA_KEY_LAST_NODE_ID, next_node_id, force=True) return next_node_id - async def _call_sdk(self, func: Callable[..., _T], *args: Any, **kwargs: Any) -> _T: + async def _call_sdk( + self, + func: Callable[..., _T], + *args: Any, + call_timeout: int = DEFAULT_CALL_TIMEOUT, + **kwargs: Any, + ) -> _T: """Call function on the SDK in executor and return result.""" if self.server.loop is None: raise RuntimeError("Server not started.") - return cast( - _T, - await self.server.loop.run_in_executor( - self._sdk_executor, - partial(func, *args, **kwargs), - ), - ) + # prevent a single job in the executor blocking everything with a timeout. + async with async_timeout.timeout(call_timeout): + return cast( + _T, + await self.server.loop.run_in_executor( + self._sdk_executor, + partial(func, *args, **kwargs), + ), + ) async def _setup_node(self, node_id: int) -> None: """Handle set-up of subscriptions and interview (if needed) for known/discovered node.""" @@ -1079,17 +1093,26 @@ async def _setup_node(self, node_id: int) -> None: try: await self.interview_node(node_id) except (NodeNotResolving, NodeInterviewFailed) as err: - LOGGER.warning("Unable to interview Node %s", exc_info=err) + LOGGER.warning( + "Unable to interview Node %s: %s", + node_id, + str(err), + # log full stack trace if debug logging is enabled + exc_info=err if LOGGER.isEnabledFor(logging.DEBUG) else None, + ) # NOTE: the node will be picked up by mdns discovery automatically # when it comes available again. return # setup subscriptions for the node try: await self._subscribe_node(node_id) - except NodeNotResolving: + except (NodeNotResolving, TimeoutError) as err: LOGGER.warning( - "Unable to subscribe to Node %s as it is unavailable", + "Unable to subscribe to Node %s: %s", node_id, + str(err), + # log full stack trace if debug logging is enabled + exc_info=err if LOGGER.isEnabledFor(logging.DEBUG) else None, ) # NOTE: the node will be picked up by mdns discovery automatically # when it becomes available again.