Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit f2ff761
Author: Marcel van der Veldt <[email protected]>
Date:   Thu Feb 29 20:28:55 2024 +0100

    adjust logger

commit e59c880
Author: Marcel van der Veldt <[email protected]>
Date:   Thu Feb 29 20:03:15 2024 +0100

    Add timeouts to sdk calls
  • Loading branch information
marcelveldt committed Feb 29, 2024
1 parent bdaeb2c commit d651326
Showing 1 changed file with 53 additions and 30 deletions.
83 changes: 53 additions & 30 deletions matter_server/server/device_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import time
from typing import TYPE_CHECKING, Any, Callable, Iterable, TypeVar, cast

import async_timeout
from chip.ChipDeviceCtrl import DeviceProxyWrapper
from chip.clusters import Attribute, Objects as Clusters
from chip.clusters.Attribute import ValueDecodeFailure
Expand Down Expand Up @@ -61,6 +62,8 @@
DATA_KEY_NODES = "nodes"
DATA_KEY_LAST_NODE_ID = "last_node_id"

DEFAULT_CALL_TIMEOUT = 300

LOGGER = logging.getLogger(__name__)
MIN_NODE_SUBSCRIPTION_CEILING = 30
MAX_NODE_SUBSCRIPTION_CEILING = 300
Expand Down Expand Up @@ -996,25 +999,28 @@ def resubscription_succeeded(
self._last_subscription_attempt[node_id] = 0
future = loop.create_future()
device = await self._resolve_node(node_id)
Attribute.Read(
future=future,
eventLoop=loop,
device=device.deviceProxy,
devCtrl=self.chip_controller,
attributes=[Attribute.AttributePath()], # wildcard
events=[
Attribute.EventPath(EndpointId=None, Cluster=None, Event=None, Urgent=1)
],
returnClusterObject=False,
subscriptionParameters=Attribute.SubscriptionParameters(
interval_floor, interval_ceiling
),
# Use fabricfiltered as False to detect changes made by other controllers
# and to be able to provide a list of all fabrics attached to the device
fabricFiltered=False,
autoResubscribe=True,
).raise_on_error()
sub: Attribute.SubscriptionTransaction = await future
async with async_timeout.timeout(DEFAULT_CALL_TIMEOUT):
Attribute.Read(
future=future,
eventLoop=loop,
device=device.deviceProxy,
devCtrl=self.chip_controller,
attributes=[Attribute.AttributePath()], # wildcard
events=[
Attribute.EventPath(
EndpointId=None, Cluster=None, Event=None, Urgent=1
)
],
returnClusterObject=False,
subscriptionParameters=Attribute.SubscriptionParameters(
interval_floor, interval_ceiling
),
# Use fabricfiltered as False to detect changes made by other controllers
# and to be able to provide a list of all fabrics attached to the device
fabricFiltered=False,
autoResubscribe=True,
).raise_on_error()
sub: Attribute.SubscriptionTransaction = await future

sub.SetAttributeUpdateCallback(attribute_updated_callback)
sub.SetEventUpdateCallback(event_callback)
Expand All @@ -1041,18 +1047,26 @@ def _get_next_node_id(self) -> int:
self.server.storage.set(DATA_KEY_LAST_NODE_ID, next_node_id, force=True)
return next_node_id

async def _call_sdk(self, func: Callable[..., _T], *args: Any, **kwargs: Any) -> _T:
async def _call_sdk(
self,
func: Callable[..., _T],
*args: Any,
call_timeout: int = DEFAULT_CALL_TIMEOUT,
**kwargs: Any,
) -> _T:
"""Call function on the SDK in executor and return result."""
if self.server.loop is None:
raise RuntimeError("Server not started.")

return cast(
_T,
await self.server.loop.run_in_executor(
self._sdk_executor,
partial(func, *args, **kwargs),
),
)
# prevent a single job in the executor blocking everything with a timeout.
async with async_timeout.timeout(call_timeout):
return cast(
_T,
await self.server.loop.run_in_executor(
self._sdk_executor,
partial(func, *args, **kwargs),
),
)

async def _setup_node(self, node_id: int) -> None:
"""Handle set-up of subscriptions and interview (if needed) for known/discovered node."""
Expand Down Expand Up @@ -1085,17 +1099,26 @@ async def _setup_node(self, node_id: int) -> None:
try:
await self.interview_node(node_id)
except (NodeNotResolving, NodeInterviewFailed) as err:
LOGGER.warning("Unable to interview Node %s", exc_info=err)
LOGGER.warning(
"Unable to interview Node %s: %s",
node_id,
str(err),
# log full stack trace if debug logging is enabled
exc_info=err if LOGGER.isEnabledFor(logging.DEBUG) else None,
)
# NOTE: the node will be picked up by mdns discovery automatically
# when it comes available again.
return
# setup subscriptions for the node
try:
await self._subscribe_node(node_id)
except NodeNotResolving:
except (NodeNotResolving, TimeoutError) as err:
LOGGER.warning(
"Unable to subscribe to Node %s as it is unavailable",
"Unable to subscribe to Node %s: %s",
node_id,
str(err),
# log full stack trace if debug logging is enabled
exc_info=err if LOGGER.isEnabledFor(logging.DEBUG) else None,
)
# NOTE: the node will be picked up by mdns discovery automatically
# when it becomes available again.
Expand Down

0 comments on commit d651326

Please sign in to comment.