From c136b2f90844690540a97cebe49f021994193426 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Thu, 5 Oct 2023 11:20:20 +0200 Subject: [PATCH] Issue 28 Finetune parallelized `/jobs` requests --- CHANGELOG.md | 1 + src/openeo_aggregator/about.py | 2 +- src/openeo_aggregator/backend.py | 49 +++++++++++++++-------------- src/openeo_aggregator/connection.py | 22 +++++++++---- tests/test_views.py | 36 +++++++++++++++++++-- 5 files changed, 77 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 49c5807f..a1df36ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The format is roughly based on [Keep a Changelog](https://keepachangelog.com/en/ ### Changed - Dockerfile: switch to `python:3.9-slim-bullseye` base image +- Parallelize `/jobs` requests to upstream back-ends ([#28](https://github.com/Open-EO/openeo-aggregator/issues/28)) ### Fixed diff --git a/src/openeo_aggregator/about.py b/src/openeo_aggregator/about.py index 7228ea02..f63d43d8 100644 --- a/src/openeo_aggregator/about.py +++ b/src/openeo_aggregator/about.py @@ -1,7 +1,7 @@ import logging import sys -__version__ = "0.11.0a1" +__version__ = "0.11.1a1" def log_version_info(): diff --git a/src/openeo_aggregator/backend.py b/src/openeo_aggregator/backend.py index bdabafce..7f405d7e 100644 --- a/src/openeo_aggregator/backend.py +++ b/src/openeo_aggregator/backend.py @@ -597,42 +597,43 @@ def __init__( self.partitioned_job_tracker = partitioned_job_tracker def get_user_jobs(self, user_id: str) -> Union[List[BatchJobMetadata], dict]: - jobs = [] + all_jobs = [] federation_missing = set() results = self.backends.request_parallel( path="/jobs", method="GET", expected_status=[200], authenticated_from_request=flask.request ) - for backend_id, success, result in results: - if success: - try: - for job in result["jobs"]: - try: - job["id"] = JobIdMapping.get_aggregator_job_id( - backend_job_id=job["id"], backend_id=backend_id - ) - jobs.append(BatchJobMetadata.from_api_dict(job)) - except Exception as e: - _log.error(f"get_user_jobs: skipping job with parse issue: {e!r}", exc_info=True) - except Exception as e: - _log.warning(f"Invalid job listing from backend {backend_id!r}: {e!r}") - federation_missing.add(backend_id) - else: - # TODO: user warning https://github.com/Open-EO/openeo-api/issues/412 - _log.warning(f"Failed to get job listing from backend {backend_id!r}: {result!r}") + for backend_id, result in results.successes.items(): + try: + jobs = result["jobs"] + assert isinstance(jobs, list), "must be a list" + except Exception as e: + _log.warning(f"Invalid job listing from backend {backend_id!r}: {e!r}") federation_missing.add(backend_id) + else: + for job in jobs: + try: + job["id"] = JobIdMapping.get_aggregator_job_id(backend_job_id=job["id"], backend_id=backend_id) + all_jobs.append(BatchJobMetadata.from_api_dict(job)) + except Exception as e: + _log.error(f"get_user_jobs: skipping job with parse issue: {e!r}", exc_info=True) + for backend_id, exc in results.failures.items(): + _log.warning(f"Failed to get job listing from backend {backend_id!r}: {exc!r}") + federation_missing.add(backend_id) if self.partitioned_job_tracker: for job in self.partitioned_job_tracker.list_user_jobs(user_id=user_id): job["id"] = JobIdMapping.get_aggregator_job_id(backend_job_id=job["id"], backend_id=JobIdMapping.AGG) - jobs.append(BatchJobMetadata.from_api_dict(job)) + all_jobs.append(BatchJobMetadata.from_api_dict(job)) federation_missing.update(self.backends.get_disabled_connection_ids()) - return dict_no_none({ - "jobs": jobs, - # TODO: experimental "federation:missing" https://github.com/openEOPlatform/architecture-docs/issues/179 - "federation:missing": list(federation_missing) or None - }) + return dict_no_none( + { + "jobs": all_jobs, + # TODO: experimental "federation:missing" https://github.com/openEOPlatform/architecture-docs/issues/179 + "federation:missing": list(federation_missing) or None, + } + ) def create_job( self, diff --git a/src/openeo_aggregator/connection.py b/src/openeo_aggregator/connection.py index ea47ffe9..571a1ea6 100644 --- a/src/openeo_aggregator/connection.py +++ b/src/openeo_aggregator/connection.py @@ -1,6 +1,7 @@ import collections import concurrent.futures import contextlib +import dataclasses import logging import re from typing import ( @@ -193,6 +194,14 @@ def request(*args, **kwargs): _ConnectionsCache = collections.namedtuple("_ConnectionsCache", ["expiry", "connections"]) +@dataclasses.dataclass(frozen=True) +class ParallelResponse: + """Set of responses and failures info for a parallelized request.""" + + successes: Dict[BackendId, dict] + failures: Dict[BackendId, Exception] + + class MultiBackendConnection: """ Collection of multiple connections to different backends @@ -352,7 +361,7 @@ def request_parallel( request_timeout: float = 5, overall_timeout: float = 8, max_workers=5, - ) -> List[Tuple[BackendId, bool, Any]]: + ) -> ParallelResponse: """ Request a given (relative) url on each backend in parallel :param path: relative (openEO) path to request @@ -411,15 +420,16 @@ def do_request( concurrent.futures.wait([f for (_, f) in futures], timeout=overall_timeout) # Collect results. - results: List[Tuple[BackendId, bool, Any]] = [] + successes = {} + failures = {} for backend_id, future in futures: try: - result = future.result(timeout=0) - results.append((backend_id, True, result)) + successes[backend_id] = future.result(timeout=0) except Exception as e: - results.append((backend_id, False, e)) + failures[backend_id] = e + + return ParallelResponse(successes=successes, failures=failures) - return results def streaming_flask_response( diff --git a/tests/test_views.py b/tests/test_views.py index 45e99f7c..ec96171b 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -1,12 +1,13 @@ import logging import re +import time from typing import List, Tuple import pytest import requests from openeo.rest import OpenEoApiError, OpenEoRestError from openeo.rest.connection import url_join -from openeo.util import rfc3339 +from openeo.util import ContextTimer, rfc3339 from openeo_driver.backend import ServiceMetadata from openeo_driver.errors import ( JobNotFinishedException, @@ -1155,7 +1156,7 @@ class TestBatchJobs: def test_list_jobs_no_auth(self, api100): api100.get("/jobs").assert_error(401, "AuthenticationRequired") - def test_list_jobs(self, api100, requests_mock, backend1, backend2): + def test_list_jobs_basic(self, api100, requests_mock, backend1, backend2): requests_mock.get(backend1 + "/jobs", json={"jobs": [ {"id": "job03", "status": "running", "created": "2021-06-03T12:34:56Z"}, {"id": "job08", "status": "running", "created": "2021-06-08T12:34:56Z", "title": "Job number 8."}, @@ -1174,6 +1175,37 @@ def test_list_jobs(self, api100, requests_mock, backend1, backend2): "links": [], } + def test_list_jobs_auth(self, api100, requests_mock, backend1, backend2): + def b1_get_jobs(request, context): + assert request.headers["Authorization"] == TEST_USER_AUTH_HEADER["Authorization"] + return { + "jobs": [ + {"id": "job03", "status": "running", "created": "2021-06-03T12:34:56Z"}, + {"id": "job08", "status": "running", "created": "2021-06-08T12:34:56Z", "title": "Job number 8."}, + ] + } + + def b2_get_jobs(request, context): + assert request.headers["Authorization"] == TEST_USER_AUTH_HEADER["Authorization"] + return { + "jobs": [ + {"id": "job05", "status": "running", "created": "2021-06-05T12:34:56Z"}, + ] + } + + requests_mock.get(backend1 + "/jobs", json=b1_get_jobs) + requests_mock.get(backend2 + "/jobs", json=b2_get_jobs) + api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN) + res = api100.get("/jobs").assert_status_code(200).json + assert res == { + "jobs": [ + {"id": "b1-job03", "status": "running", "created": "2021-06-03T12:34:56Z"}, + {"id": "b1-job08", "status": "running", "created": "2021-06-08T12:34:56Z", "title": "Job number 8."}, + {"id": "b2-job05", "status": "running", "created": "2021-06-05T12:34:56Z"}, + ], + "links": [], + } + @pytest.mark.parametrize("b2_oidc_pid", ["egi", "aho"]) def test_list_jobs_oidc_pid_mapping(self, config, requests_mock, backend1, backend2, b2_oidc_pid): # Override /credentials/oidc of backend2 before building flask app and ApiTester