From 98036f5251524b663aa2191e5dde68e55d341c68 Mon Sep 17 00:00:00 2001 From: Reuven Gonzales Date: Wed, 18 Dec 2024 14:09:32 -0800 Subject: [PATCH] More MCS Bugs (#2668) * Ensure the import isn't actually made into trino with failed data * Additional fixes for error handling --- warehouse/metrics_tools/compute/service.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/warehouse/metrics_tools/compute/service.py b/warehouse/metrics_tools/compute/service.py index 89a747a1..d47bdcf1 100644 --- a/warehouse/metrics_tools/compute/service.py +++ b/warehouse/metrics_tools/compute/service.py @@ -141,8 +141,7 @@ async def _handle_query_job_submit_request( exported_dependent_tables_map = await self.resolve_dependent_tables(input) except Exception as e: self.logger.error(f"job[{job_id}] failed to export dependencies: {e}") - await self._notify_job_failed(job_id, False, e) - return + raise e self.logger.debug(f"job[{job_id}] dependencies exported") tasks = await self._batch_query_to_scheduler( @@ -163,8 +162,15 @@ async def _handle_query_job_submit_request( f"job[{job_id}] task failed with uncaught exception: {e}" ) exceptions.append(e) + # Report failure early for any listening clients. The server + # will collect all errors for any internal reporting needed await self._notify_job_failed(job_id, True, e) + # If there are any exceptions then we report those as failed and short + # circuit this method + if len(exceptions) > 0: + raise JobTasksFailed(job_id, len(exceptions), exceptions) + # Import the final result into the database self.logger.info("job[{job_id}]: importing final result into the database") await self.import_adapter.import_reference(calculation_export, final_export) @@ -172,9 +178,6 @@ async def _handle_query_job_submit_request( self.logger.debug(f"job[{job_id}]: notifying job completed") await self._notify_job_completed(job_id) - if len(exceptions) > 0: - raise JobTasksFailed(job_id, len(exceptions), exceptions) - async def _batch_query_to_scheduler( self, job_id: str,