From d03cf0c51f0b33fc9031e128facfd7b37ababa51 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Wed, 28 Feb 2024 22:59:37 +0100 Subject: [PATCH] A collection of small (stability) fixes and tweaks (#597) --- matter_server/server/__main__.py | 59 ++++++++-- matter_server/server/client_handler.py | 18 ++- matter_server/server/device_controller.py | 136 +++++++++++++--------- matter_server/server/server.py | 26 +++++ 4 files changed, 170 insertions(+), 69 deletions(-) diff --git a/matter_server/server/__main__.py b/matter_server/server/__main__.py index 2175a9a2..9620612a 100644 --- a/matter_server/server/__main__.py +++ b/matter_server/server/__main__.py @@ -2,9 +2,14 @@ import argparse import asyncio +from contextlib import suppress import logging +from logging.handlers import RotatingFileHandler import os from pathlib import Path +import sys +import threading +from typing import Final from aiorun import run import coloredlogs @@ -20,6 +25,11 @@ DEFAULT_LISTEN_ADDRESS = None DEFAULT_STORAGE_PATH = os.path.join(Path.home(), ".matter_server") +FORMAT_DATE: Final = "%Y-%m-%d" +FORMAT_TIME: Final = "%H:%M:%S" +FORMAT_DATETIME: Final = f"{FORMAT_DATE} {FORMAT_TIME}" +MAX_LOG_FILESIZE = 1000000 * 10 # 10 MB + # Get parsed passed in arguments. parser = argparse.ArgumentParser( description="Matter Controller Server using WebSockets." @@ -86,6 +96,7 @@ def _setup_logging() -> None: + log_fmt = "%(asctime)s (%(threadName)s) %(levelname)s [%(name)s] %(message)s" custom_level_style = { **coloredlogs.DEFAULT_LEVEL_STYLES, "chip_automation": {"color": "green", "faint": True}, @@ -94,38 +105,68 @@ def _setup_logging() -> None: "chip_error": {"color": "red"}, } # Let coloredlogs handle all levels, we filter levels in the logging module - coloredlogs.install(level=logging.NOTSET, level_styles=custom_level_style) + coloredlogs.install( + level=logging.NOTSET, level_styles=custom_level_style, fmt=log_fmt + ) + + # Capture warnings.warn(...) and friends messages in logs. + # The standard destination for them is stderr, which may end up unnoticed. + # This way they're where other messages are, and can be filtered as usual. + logging.captureWarnings(True) - handlers = None + logging.basicConfig(level=args.log_level.upper()) + logger = logging.getLogger() + + # setup file handler if args.log_file: - handlers = [logging.FileHandler(args.log_file)] - logging.basicConfig(handlers=handlers, level=args.log_level.upper()) + log_filename = os.path.join(args.log_file) + file_handler = RotatingFileHandler( + log_filename, maxBytes=MAX_LOG_FILESIZE, backupCount=1 + ) + # rotate log at each start + with suppress(OSError): + file_handler.doRollover() + file_handler.setFormatter(logging.Formatter(log_fmt, datefmt=FORMAT_DATETIME)) + logger.addHandler(file_handler) stack.init_logging(args.log_level_sdk.upper()) - logging.getLogger().setLevel(args.log_level.upper()) + logger.setLevel(args.log_level.upper()) - if not logging.getLogger().isEnabledFor(logging.DEBUG): + if not logger.isEnabledFor(logging.DEBUG): logging.getLogger("PersistentStorage").setLevel(logging.WARNING) # Temporary disable the logger of chip.clusters.Attribute because it now logs # an error on every custom attribute that couldn't be parsed which confuses people. # We can restore the default log level again when we've patched the device controller # to handle the raw attribute data to deal with custom clusters. logging.getLogger("chip.clusters.Attribute").setLevel(logging.CRITICAL) - if not logging.getLogger().isEnabledFor(logging.DEBUG): # (temporary) raise the log level of zeroconf as its a logs an annoying # warning at startup while trying to bind to a loopback IPv6 interface logging.getLogger("zeroconf").setLevel(logging.ERROR) + # register global uncaught exception loggers + sys.excepthook = lambda *args: logger.exception( + "Uncaught exception", + exc_info=args, + ) + threading.excepthook = lambda args: logger.exception( + "Uncaught thread exception", + exc_info=( # type: ignore[arg-type] + args.exc_type, + args.exc_value, + args.exc_traceback, + ), + ) + def main() -> None: """Run main execution.""" - _setup_logging() - # make sure storage path exists if not os.path.isdir(args.storage_path): os.mkdir(args.storage_path) + _setup_logging() + # Init server server = MatterServer( args.storage_path, diff --git a/matter_server/server/client_handler.py b/matter_server/server/client_handler.py index 42541748..0c695303 100644 --- a/matter_server/server/client_handler.py +++ b/matter_server/server/client_handler.py @@ -193,12 +193,24 @@ async def _run_handler( result = await result self._send_message(SuccessResultMessage(msg.message_id, result)) except ChipStackError as err: - self._logger.exception("SDK Error during handling message: %s", msg) + self._logger.error( + "SDK Error during handling message: %s: %s", + msg.command, + str(err), + # only print the full stacktrace if debug logging is enabled + exc_info=err if self._logger.isEnabledFor(logging.DEBUG) else None, + ) self._send_message( ErrorResultMessage(msg.message_id, SDKStackError.error_code, str(err)) ) - except Exception as err: # pylint: disable=broad-except - self._logger.exception("Error handling message: %s", msg) + except Exception as err: # pylint: disable=broad-except # noqa: BLE001 + self._logger.error( + "SDK Error during handling message: %s: %s", + msg.command, + str(err), + # only print the full stacktrace if debug logging is enabled + exc_info=err if self._logger.isEnabledFor(logging.DEBUG) else None, + ) error_code = getattr(err, "error_code", MatterError.error_code) self._send_message(ErrorResultMessage(msg.message_id, error_code, str(err))) diff --git a/matter_server/server/device_controller.py b/matter_server/server/device_controller.py index 9e074dae..eb96aff7 100644 --- a/matter_server/server/device_controller.py +++ b/matter_server/server/device_controller.py @@ -62,11 +62,16 @@ DATA_KEY_LAST_NODE_ID = "last_node_id" LOGGER = logging.getLogger(__name__) -NODE_SUBSCRIPTION_CEILING = 60 -NODE_SUBSCRIPTION_CEILING_BATTERY_POWERED = 1800 +MIN_NODE_SUBSCRIPTION_CEILING = 30 +MAX_NODE_SUBSCRIPTION_CEILING = 300 +MIN_NODE_SUBSCRIPTION_CEILING_BATTERY_POWERED = 300 +MAX_NODE_SUBSCRIPTION_CEILING_BATTERY_POWERED = 1800 MAX_COMMISSION_RETRIES = 3 NODE_RESUBSCRIBE_ATTEMPTS_UNAVAILABLE = 3 NODE_RESUBSCRIBE_TIMEOUT_OFFLINE = 30 * 60 * 1000 +NODE_PING_TIMEOUT = 10 +NODE_PING_TIMEOUT_BATTERY_POWERED = 60 +NODE_MDNS_BACKOFF = 60 MDNS_TYPE_OPERATIONAL_NODE = "_matter._tcp.local." MDNS_TYPE_COMMISSIONABLE_NODE = "_matterc._udp.local." @@ -96,7 +101,7 @@ def __init__( self.event_history: deque[Attribute.EventReadResult] = deque(maxlen=25) self._subscriptions: dict[int, Attribute.SubscriptionTransaction] = {} self._nodes_in_setup: set[int] = set() - self._mdns_last_seen: dict[int, float] = {} + self._node_last_seen: dict[int, float] = {} self._nodes: dict[int, MatterNodeData] = {} self._last_known_ip_addresses: dict[int, list[str]] = {} self._last_subscription_attempt: dict[int, int] = {} @@ -550,8 +555,7 @@ async def send_device_command( cluster_cls: Cluster = ALL_CLUSTERS[cluster_id] command_cls = getattr(cluster_cls.Commands, command_name) command = dataclass_from_dict(command_cls, payload, allow_sdk_types=True) - node_lock = self._get_node_lock(node_id) - async with node_lock: + async with self._get_node_lock(node_id): return await self.chip_controller.SendCommand( nodeid=node_id, endpoint=endpoint_id, @@ -572,10 +576,9 @@ async def read_attribute( raise NodeNotReady(f"Node {node_id} is not (yet) available.") endpoint_id, cluster_id, attribute_id = parse_attribute_path(attribute_path) assert self.server.loop is not None - future = self.server.loop.create_future() - device = await self._resolve_node(node_id) - node_lock = self._get_node_lock(node_id) - async with node_lock: + async with self._get_node_lock(node_id): + future = self.server.loop.create_future() + device = await self._resolve_node(node_id) Attribute.Read( future=future, eventLoop=self.server.loop, @@ -620,10 +623,11 @@ async def write_attribute( value_type=attribute.attribute_type.Type, allow_sdk_types=True, ) - return await self.chip_controller.WriteAttribute( - nodeid=node_id, - attributes=[(endpoint_id, attribute)], - ) + async with self._get_node_lock(node_id): + return await self.chip_controller.WriteAttribute( + nodeid=node_id, + attributes=[(endpoint_id, attribute)], + ) @api_command(APICommand.REMOVE_NODE) async def remove_node(self, node_id: int) -> None: @@ -670,7 +674,12 @@ async def remove_node(self, node_id: int) -> None: ), ) except ChipStackError as err: - LOGGER.warning("Removing current fabric from device failed.", exc_info=err) + LOGGER.warning( + "Removing current fabric from device failed: %s", + str(err), + # only log stacktrace if we have debug logging enabled + exc_info=err if LOGGER.isEnabledFor(logging.DEBUG) else None, + ) return if ( result is None @@ -709,7 +718,7 @@ async def ping_node(self, node_id: int) -> NodePingResult: raise NodeNotExists( f"Node {node_id} does not exist or is not yet interviewed" ) - node_logger = LOGGER.getChild(f"[node {node_id}]") + node_logger = LOGGER.getChild(f"node_{node_id}") battery_powered = ( node.attributes.get(ROUTING_ROLE_ATTRIBUTE_PATH, 0) @@ -718,7 +727,11 @@ async def ping_node(self, node_id: int) -> NodePingResult: async def _do_ping(ip_address: str) -> None: """Ping IP and add to result.""" - timeout = 10 if battery_powered else 2 + timeout = ( + NODE_PING_TIMEOUT_BATTERY_POWERED + if battery_powered + else NODE_PING_TIMEOUT + ) if "%" in ip_address: # ip address contains an interface index clean_ip, interface_idx = ip_address.split("%", 1) @@ -767,7 +780,7 @@ async def get_node_ip_addresses( raise NodeNotExists( f"Node {node_id} does not exist or is not yet interviewed" ) - node_logger = LOGGER.getChild(f"[node {node_id}]") + node_logger = LOGGER.getChild(f"node_{node_id}") # query mdns for all IP's # ensure both fabric id and node id have 16 characters (prefix with zero's) mdns_name = f"{self.compressed_fabric_id:0{16}X}-{node_id:0{16}X}.{MDNS_TYPE_OPERATIONAL_NODE}" @@ -800,17 +813,15 @@ async def _subscribe_node(self, node_id: int) -> None: f"Node {node_id} does not exist or has not been interviewed." ) - node_logger = LOGGER.getChild(f"[node {node_id}]") - node_lock = self._get_node_lock(node_id) + node_logger = LOGGER.getChild(f"node_{node_id}") node = self._nodes[node_id] # check if we already have setup subscriptions for this node, # if so, we need to unsubscribe if prev_sub := self._subscriptions.get(node_id, None): - async with node_lock: - node_logger.debug("Unsubscribing from existing subscription.") - await self._call_sdk(prev_sub.Shutdown) - del self._subscriptions[node_id] + node_logger.info("Unsubscribing from existing subscription.") + await self._call_sdk(prev_sub.Shutdown) + del self._subscriptions[node_id] # determine if node is battery powered sleeping device # Endpoint 0, ThreadNetworkDiagnostics Cluster, routingRole attribute @@ -827,8 +838,7 @@ def attribute_updated_callback( path: Attribute.TypedAttributePath, transaction: Attribute.SubscriptionTransaction, ) -> None: - self._mdns_last_seen[node_id] = time.time() - assert loop is not None + self._node_last_seen[node_id] = time.time() new_value = transaction.GetAttribute(path) # failsafe: ignore ValueDecodeErrors # these are set by the SDK if parsing the value failed miserably @@ -955,7 +965,7 @@ def resubscription_succeeded( transaction: Attribute.SubscriptionTransaction, ) -> None: # pylint: disable=unused-argument, invalid-name - self._mdns_last_seen[node_id] = time.time() + self._node_last_seen[node_id] = time.time() node_logger.info("Re-Subscription succeeded") self._last_subscription_attempt[node_id] = 0 # mark node as available and signal consumers @@ -966,34 +976,36 @@ def resubscription_succeeded( node_logger.info("Setting up attributes and events subscription.") interval_floor = 0 interval_ceiling = ( - NODE_SUBSCRIPTION_CEILING_BATTERY_POWERED + randint( # noqa: S311 + MIN_NODE_SUBSCRIPTION_CEILING_BATTERY_POWERED, + MAX_NODE_SUBSCRIPTION_CEILING_BATTERY_POWERED, + ) if battery_powered - else NODE_SUBSCRIPTION_CEILING + else randint( # noqa: S311 + MIN_NODE_SUBSCRIPTION_CEILING, MAX_NODE_SUBSCRIPTION_CEILING + ) ) self._last_subscription_attempt[node_id] = 0 future = loop.create_future() device = await self._resolve_node(node_id) - async with node_lock: - Attribute.Read( - future=future, - eventLoop=loop, - device=device.deviceProxy, - devCtrl=self.chip_controller, - attributes=[Attribute.AttributePath()], # wildcard - events=[ - Attribute.EventPath( - EndpointId=None, Cluster=None, Event=None, Urgent=1 - ) - ], - returnClusterObject=False, - subscriptionParameters=Attribute.SubscriptionParameters( - interval_floor, interval_ceiling - ), - # Use fabricfiltered as False to detect changes made by other controllers - # and to be able to provide a list of all fabrics attached to the device - fabricFiltered=False, - autoResubscribe=True, - ).raise_on_error() + Attribute.Read( + future=future, + eventLoop=loop, + device=device.deviceProxy, + devCtrl=self.chip_controller, + attributes=[Attribute.AttributePath()], # wildcard + events=[ + Attribute.EventPath(EndpointId=None, Cluster=None, Event=None, Urgent=1) + ], + returnClusterObject=False, + subscriptionParameters=Attribute.SubscriptionParameters( + interval_floor, interval_ceiling + ), + # Use fabricfiltered as False to detect changes made by other controllers + # and to be able to provide a list of all fabrics attached to the device + fabricFiltered=False, + autoResubscribe=True, + ).raise_on_error() sub: Attribute.SubscriptionTransaction = await future sub.SetAttributeUpdateCallback(attribute_updated_callback) @@ -1012,7 +1024,7 @@ def resubscription_succeeded( tlv_attributes = sub._readTransaction._cache.attributeTLVCache node.attributes.update(parse_attributes_from_read_result(tlv_attributes)) node_logger.info("Subscription succeeded") - self._mdns_last_seen[node_id] = time.time() + self._node_last_seen[node_id] = time.time() self.server.signal_event(EventType.NODE_UPDATED, node) def _get_next_node_id(self) -> int: @@ -1042,9 +1054,18 @@ async def _setup_node(self, node_id: int) -> None: # prevent duplicate setup actions return self._nodes_in_setup.add(node_id) - # pre-cache ip-addresses - await self.get_node_ip_addresses(node_id) try: + # Ping the node to rule out stale mdns reports and to prevent that we + # send an unreachable node to the sdk which is very slow with resolving it. + # This will also precache the ip addresses of the node for later use. + ping_result = await self.ping_node(node_id) + if not any(ping_result.values()): + LOGGER.warning( + "Skip set-up for node %s because it does not appear to be reachable...", + node_id, + ) + return + LOGGER.info("Setting-up node %s...", node_id) # (re)interview node (only) if needed node_data = self._nodes[node_id] if ( @@ -1060,7 +1081,6 @@ async def _setup_node(self, node_id: int) -> None: # NOTE: the node will be picked up by mdns discovery automatically # when it comes available again. return - # setup subscriptions for the node try: await self._subscribe_node(node_id) @@ -1174,18 +1194,20 @@ def _on_mdns_operational_node_state( # mdns events for matter devices arrive in bursts of (duplicate) messages # so we debounce this as we only use the mdns messages for operational node discovery # and we have other logic in place to determine node aliveness - now = time.time() - last_seen = self._mdns_last_seen.get(node_id, 0) - self._mdns_last_seen[node_id] = now - if now - last_seen < NODE_SUBSCRIPTION_CEILING: + last_seen = self._node_last_seen.get(node_id, 0) + if node.available and now - last_seen < NODE_MDNS_BACKOFF: return + self._node_last_seen[node_id] = now # we treat UPDATE state changes as ADD if the node is marked as # unavailable to ensure we catch a node being operational if node.available and state_change == ServiceStateChange.Updated: return + if node_id in self._nodes_in_setup: + # prevent duplicate setup actions + return LOGGER.info("Node %s (re)discovered on MDNS", node_id) # setup the node - this will (re) setup the subscriptions etc. asyncio.create_task(self._setup_node(node_id)) diff --git a/matter_server/server/server.py b/matter_server/server/server.py index 249813d2..029e7b68 100644 --- a/matter_server/server/server.py +++ b/matter_server/server/server.py @@ -8,6 +8,7 @@ import logging import os from pathlib import Path +import traceback from typing import Any, Callable, Set, cast import weakref @@ -38,6 +39,30 @@ DASHBOARD_DIR_EXISTS = DASHBOARD_DIR.exists() +def _global_loop_exception_handler(_: Any, context: dict[str, Any]) -> None: + """Handle all exception inside the core loop.""" + kwargs = {} + if exception := context.get("exception"): + kwargs["exc_info"] = (type(exception), exception, exception.__traceback__) + + logger = logging.getLogger(__package__) + if source_traceback := context.get("source_traceback"): + stack_summary = "".join(traceback.format_list(source_traceback)) + logger.error( + "Error doing job: %s: %s", + context["message"], + stack_summary, + **kwargs, # type: ignore[arg-type] + ) + return + + logger.error( + "Error doing task: %s", + context["message"], + **kwargs, # type: ignore[arg-type] + ) + + def mount_websocket(server: MatterServer, path: str) -> None: """Mount the websocket endpoint.""" clients: weakref.WeakSet[WebsocketClientHandler] = weakref.WeakSet() @@ -107,6 +132,7 @@ async def start(self) -> None: "CHIP Core version does not match CHIP Clusters version." ) self.loop = asyncio.get_running_loop() + self.loop.set_exception_handler(_global_loop_exception_handler) await self.device_controller.initialize() await self.storage.start() await self.device_controller.start()