Skip to content

Commit

Permalink
More MCS Bugs (#2668)
Browse files Browse the repository at this point in the history
* Ensure the import isn't actually made into trino with failed data

* Additional fixes for error handling
  • Loading branch information
ravenac95 authored Dec 18, 2024
1 parent a5d16c6 commit 98036f5
Showing 1 changed file with 8 additions and 5 deletions.
13 changes: 8 additions & 5 deletions warehouse/metrics_tools/compute/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ async def _handle_query_job_submit_request(
exported_dependent_tables_map = await self.resolve_dependent_tables(input)
except Exception as e:
self.logger.error(f"job[{job_id}] failed to export dependencies: {e}")
await self._notify_job_failed(job_id, False, e)
return
raise e
self.logger.debug(f"job[{job_id}] dependencies exported")

tasks = await self._batch_query_to_scheduler(
Expand All @@ -163,18 +162,22 @@ async def _handle_query_job_submit_request(
f"job[{job_id}] task failed with uncaught exception: {e}"
)
exceptions.append(e)
# Report failure early for any listening clients. The server
# will collect all errors for any internal reporting needed
await self._notify_job_failed(job_id, True, e)

# If there are any exceptions then we report those as failed and short
# circuit this method
if len(exceptions) > 0:
raise JobTasksFailed(job_id, len(exceptions), exceptions)

# Import the final result into the database
self.logger.info("job[{job_id}]: importing final result into the database")
await self.import_adapter.import_reference(calculation_export, final_export)

self.logger.debug(f"job[{job_id}]: notifying job completed")
await self._notify_job_completed(job_id)

if len(exceptions) > 0:
raise JobTasksFailed(job_id, len(exceptions), exceptions)

async def _batch_query_to_scheduler(
self,
job_id: str,
Expand Down

0 comments on commit 98036f5

Please sign in to comment.