From 16b1a4341fa07b4a4039fd9187e3edb1ff1913cf Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Wed, 4 Oct 2023 18:25:20 +0200 Subject: [PATCH 1/2] Issue #28 initial implementation of parallel requests (on `/jobs`) --- src/openeo_aggregator/backend.py | 34 +++++---- src/openeo_aggregator/connection.py | 109 +++++++++++++++++++++++++++- 2 files changed, 125 insertions(+), 18 deletions(-) diff --git a/src/openeo_aggregator/backend.py b/src/openeo_aggregator/backend.py index 81cc47d8..bdabafce 100644 --- a/src/openeo_aggregator/backend.py +++ b/src/openeo_aggregator/backend.py @@ -599,22 +599,28 @@ def __init__( def get_user_jobs(self, user_id: str) -> Union[List[BatchJobMetadata], dict]: jobs = [] federation_missing = set() - for con in self.backends: - with con.authenticated_from_request(request=flask.request, user=User(user_id)), \ - TimingLogger(f"get_user_jobs: {con.id}", logger=_log.debug): + + results = self.backends.request_parallel( + path="/jobs", method="GET", expected_status=[200], authenticated_from_request=flask.request + ) + for backend_id, success, result in results: + if success: try: - backend_jobs = con.list_jobs() + for job in result["jobs"]: + try: + job["id"] = JobIdMapping.get_aggregator_job_id( + backend_job_id=job["id"], backend_id=backend_id + ) + jobs.append(BatchJobMetadata.from_api_dict(job)) + except Exception as e: + _log.error(f"get_user_jobs: skipping job with parse issue: {e!r}", exc_info=True) except Exception as e: - # TODO: user warning https://github.com/Open-EO/openeo-api/issues/412 - _log.warning(f"Failed to get job listing from backend {con.id!r}: {e!r}") - federation_missing.add(con.id) - backend_jobs = [] - for job in backend_jobs: - try: - job["id"] = JobIdMapping.get_aggregator_job_id(backend_job_id=job["id"], backend_id=con.id) - jobs.append(BatchJobMetadata.from_api_dict(job)) - except Exception as e: - _log.error(f"get_user_jobs: skipping job with parse issue: {e!r}", exc_info=True) + _log.warning(f"Invalid job listing from backend {backend_id!r}: {e!r}") + federation_missing.add(backend_id) + else: + # TODO: user warning https://github.com/Open-EO/openeo-api/issues/412 + _log.warning(f"Failed to get job listing from backend {backend_id!r}: {result!r}") + federation_missing.add(backend_id) if self.partitioned_job_tracker: for job in self.partitioned_job_tracker.list_user_jobs(user_id=user_id): diff --git a/src/openeo_aggregator/connection.py b/src/openeo_aggregator/connection.py index 92361c64..ea47ffe9 100644 --- a/src/openeo_aggregator/connection.py +++ b/src/openeo_aggregator/connection.py @@ -1,14 +1,28 @@ import collections +import concurrent.futures import contextlib import logging import re -from typing import Any, Callable, Dict, Iterator, List, Optional, Set, Tuple, Union +from typing import ( + Any, + Callable, + Dict, + Iterable, + Iterator, + List, + Optional, + Set, + Tuple, + Union, +) import flask import requests from openeo import Connection from openeo.capabilities import ComparableVersion from openeo.rest.auth.auth import BearerAuth, OpenEoApiAuthBase +from openeo.rest.connection import RestApiConnection +from openeo.util import TimingLogger from openeo_driver.backend import OidcProvider from openeo_driver.errors import ( AuthenticationRequiredException, @@ -31,6 +45,10 @@ _log = logging.getLogger(__name__) +# Type annotation aliases + +BackendId = str + class LockedAuthException(InternalException): def __init__(self): super().__init__(message="Setting auth while locked.") @@ -105,8 +123,11 @@ def _build_oidc_provider_map(self, configured_providers: List[OidcProvider]) -> def get_oidc_provider_map(self) -> Dict[str, str]: return self._oidc_provider_map - def _get_bearer(self, request: flask.Request) -> str: - """Extract authorization header from request and (optionally) transform for given backend """ + def extract_bearer(self, request: flask.Request) -> str: + """ + Extract authorization header from flask request + and (optionally) transform for current backend. + """ if "Authorization" not in request.headers: raise AuthenticationRequiredException auth = request.headers["Authorization"] @@ -134,7 +155,7 @@ def authenticated_from_request( Context manager to temporarily authenticate upstream connection based on current incoming flask request. """ self._auth_locked = False - self.auth = BearerAuth(bearer=self._get_bearer(request=request)) + self.auth = BearerAuth(bearer=self.extract_bearer(request=request)) # TODO store and use `user` object? try: yield self @@ -320,6 +341,86 @@ def map( # TODO: customizable exception handling: skip, warn, re-raise? yield con.id, res + def request_parallel( + self, + path: str, + *, + method: str = "GET", + parse_json: bool = True, + authenticated_from_request: Optional[flask.Request] = None, + expected_status: Union[int, Iterable[int], None] = None, + request_timeout: float = 5, + overall_timeout: float = 8, + max_workers=5, + ) -> List[Tuple[BackendId, bool, Any]]: + """ + Request a given (relative) url on each backend in parallel + :param path: relative (openEO) path to request + :return: + """ + + def do_request( + root_url: str, + path: str, + *, + method: str = "GET", + headers: Optional[dict] = None, + auth: Optional[str] = None, + ) -> Union[dict, bytes]: + """Isolated request, to behanled by future.""" + with TimingLogger(title=f"request_parallel {method} {path} on {root_url}", logger=_log): + con = RestApiConnection(root_url=root_url) + resp = con.request( + method=method, + path=path, + headers=headers, + auth=auth, + timeout=request_timeout, + expected_status=expected_status, + ) + if parse_json: + return resp.json() + else: + return resp.content + + connections: List[BackendConnection] = self.get_connections() + max_workers = min(max_workers, len(connections)) + + with TimingLogger( + title=f"request_parallel {method} {path} on {len(connections)} backends with thread pool {max_workers=}", + logger=_log, + ), concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + # Submit all futures (one for each backend connection) + futures: List[Tuple[BackendId, concurrent.futures.Future]] = [] + for con in connections: + if authenticated_from_request: + auth = BearerAuth(bearer=con.extract_bearer(request=authenticated_from_request)) + else: + auth = None + future = executor.submit( + do_request, + root_url=con.root_url, + path=path, + method=method, + headers=con.default_headers, + auth=auth, + ) + futures.append((con.id, future)) + + # Give futures some time to finish + concurrent.futures.wait([f for (_, f) in futures], timeout=overall_timeout) + + # Collect results. + results: List[Tuple[BackendId, bool, Any]] = [] + for backend_id, future in futures: + try: + result = future.result(timeout=0) + results.append((backend_id, True, result)) + except Exception as e: + results.append((backend_id, False, e)) + + return results + def streaming_flask_response( backend_response: requests.Response, From c136b2f90844690540a97cebe49f021994193426 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Thu, 5 Oct 2023 11:20:20 +0200 Subject: [PATCH 2/2] Issue 28 Finetune parallelized `/jobs` requests --- CHANGELOG.md | 1 + src/openeo_aggregator/about.py | 2 +- src/openeo_aggregator/backend.py | 49 +++++++++++++++-------------- src/openeo_aggregator/connection.py | 22 +++++++++---- tests/test_views.py | 36 +++++++++++++++++++-- 5 files changed, 77 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 49c5807f..a1df36ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The format is roughly based on [Keep a Changelog](https://keepachangelog.com/en/ ### Changed - Dockerfile: switch to `python:3.9-slim-bullseye` base image +- Parallelize `/jobs` requests to upstream back-ends ([#28](https://github.com/Open-EO/openeo-aggregator/issues/28)) ### Fixed diff --git a/src/openeo_aggregator/about.py b/src/openeo_aggregator/about.py index 7228ea02..f63d43d8 100644 --- a/src/openeo_aggregator/about.py +++ b/src/openeo_aggregator/about.py @@ -1,7 +1,7 @@ import logging import sys -__version__ = "0.11.0a1" +__version__ = "0.11.1a1" def log_version_info(): diff --git a/src/openeo_aggregator/backend.py b/src/openeo_aggregator/backend.py index bdabafce..7f405d7e 100644 --- a/src/openeo_aggregator/backend.py +++ b/src/openeo_aggregator/backend.py @@ -597,42 +597,43 @@ def __init__( self.partitioned_job_tracker = partitioned_job_tracker def get_user_jobs(self, user_id: str) -> Union[List[BatchJobMetadata], dict]: - jobs = [] + all_jobs = [] federation_missing = set() results = self.backends.request_parallel( path="/jobs", method="GET", expected_status=[200], authenticated_from_request=flask.request ) - for backend_id, success, result in results: - if success: - try: - for job in result["jobs"]: - try: - job["id"] = JobIdMapping.get_aggregator_job_id( - backend_job_id=job["id"], backend_id=backend_id - ) - jobs.append(BatchJobMetadata.from_api_dict(job)) - except Exception as e: - _log.error(f"get_user_jobs: skipping job with parse issue: {e!r}", exc_info=True) - except Exception as e: - _log.warning(f"Invalid job listing from backend {backend_id!r}: {e!r}") - federation_missing.add(backend_id) - else: - # TODO: user warning https://github.com/Open-EO/openeo-api/issues/412 - _log.warning(f"Failed to get job listing from backend {backend_id!r}: {result!r}") + for backend_id, result in results.successes.items(): + try: + jobs = result["jobs"] + assert isinstance(jobs, list), "must be a list" + except Exception as e: + _log.warning(f"Invalid job listing from backend {backend_id!r}: {e!r}") federation_missing.add(backend_id) + else: + for job in jobs: + try: + job["id"] = JobIdMapping.get_aggregator_job_id(backend_job_id=job["id"], backend_id=backend_id) + all_jobs.append(BatchJobMetadata.from_api_dict(job)) + except Exception as e: + _log.error(f"get_user_jobs: skipping job with parse issue: {e!r}", exc_info=True) + for backend_id, exc in results.failures.items(): + _log.warning(f"Failed to get job listing from backend {backend_id!r}: {exc!r}") + federation_missing.add(backend_id) if self.partitioned_job_tracker: for job in self.partitioned_job_tracker.list_user_jobs(user_id=user_id): job["id"] = JobIdMapping.get_aggregator_job_id(backend_job_id=job["id"], backend_id=JobIdMapping.AGG) - jobs.append(BatchJobMetadata.from_api_dict(job)) + all_jobs.append(BatchJobMetadata.from_api_dict(job)) federation_missing.update(self.backends.get_disabled_connection_ids()) - return dict_no_none({ - "jobs": jobs, - # TODO: experimental "federation:missing" https://github.com/openEOPlatform/architecture-docs/issues/179 - "federation:missing": list(federation_missing) or None - }) + return dict_no_none( + { + "jobs": all_jobs, + # TODO: experimental "federation:missing" https://github.com/openEOPlatform/architecture-docs/issues/179 + "federation:missing": list(federation_missing) or None, + } + ) def create_job( self, diff --git a/src/openeo_aggregator/connection.py b/src/openeo_aggregator/connection.py index ea47ffe9..571a1ea6 100644 --- a/src/openeo_aggregator/connection.py +++ b/src/openeo_aggregator/connection.py @@ -1,6 +1,7 @@ import collections import concurrent.futures import contextlib +import dataclasses import logging import re from typing import ( @@ -193,6 +194,14 @@ def request(*args, **kwargs): _ConnectionsCache = collections.namedtuple("_ConnectionsCache", ["expiry", "connections"]) +@dataclasses.dataclass(frozen=True) +class ParallelResponse: + """Set of responses and failures info for a parallelized request.""" + + successes: Dict[BackendId, dict] + failures: Dict[BackendId, Exception] + + class MultiBackendConnection: """ Collection of multiple connections to different backends @@ -352,7 +361,7 @@ def request_parallel( request_timeout: float = 5, overall_timeout: float = 8, max_workers=5, - ) -> List[Tuple[BackendId, bool, Any]]: + ) -> ParallelResponse: """ Request a given (relative) url on each backend in parallel :param path: relative (openEO) path to request @@ -411,15 +420,16 @@ def do_request( concurrent.futures.wait([f for (_, f) in futures], timeout=overall_timeout) # Collect results. - results: List[Tuple[BackendId, bool, Any]] = [] + successes = {} + failures = {} for backend_id, future in futures: try: - result = future.result(timeout=0) - results.append((backend_id, True, result)) + successes[backend_id] = future.result(timeout=0) except Exception as e: - results.append((backend_id, False, e)) + failures[backend_id] = e + + return ParallelResponse(successes=successes, failures=failures) - return results def streaming_flask_response( diff --git a/tests/test_views.py b/tests/test_views.py index 45e99f7c..ec96171b 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -1,12 +1,13 @@ import logging import re +import time from typing import List, Tuple import pytest import requests from openeo.rest import OpenEoApiError, OpenEoRestError from openeo.rest.connection import url_join -from openeo.util import rfc3339 +from openeo.util import ContextTimer, rfc3339 from openeo_driver.backend import ServiceMetadata from openeo_driver.errors import ( JobNotFinishedException, @@ -1155,7 +1156,7 @@ class TestBatchJobs: def test_list_jobs_no_auth(self, api100): api100.get("/jobs").assert_error(401, "AuthenticationRequired") - def test_list_jobs(self, api100, requests_mock, backend1, backend2): + def test_list_jobs_basic(self, api100, requests_mock, backend1, backend2): requests_mock.get(backend1 + "/jobs", json={"jobs": [ {"id": "job03", "status": "running", "created": "2021-06-03T12:34:56Z"}, {"id": "job08", "status": "running", "created": "2021-06-08T12:34:56Z", "title": "Job number 8."}, @@ -1174,6 +1175,37 @@ def test_list_jobs(self, api100, requests_mock, backend1, backend2): "links": [], } + def test_list_jobs_auth(self, api100, requests_mock, backend1, backend2): + def b1_get_jobs(request, context): + assert request.headers["Authorization"] == TEST_USER_AUTH_HEADER["Authorization"] + return { + "jobs": [ + {"id": "job03", "status": "running", "created": "2021-06-03T12:34:56Z"}, + {"id": "job08", "status": "running", "created": "2021-06-08T12:34:56Z", "title": "Job number 8."}, + ] + } + + def b2_get_jobs(request, context): + assert request.headers["Authorization"] == TEST_USER_AUTH_HEADER["Authorization"] + return { + "jobs": [ + {"id": "job05", "status": "running", "created": "2021-06-05T12:34:56Z"}, + ] + } + + requests_mock.get(backend1 + "/jobs", json=b1_get_jobs) + requests_mock.get(backend2 + "/jobs", json=b2_get_jobs) + api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN) + res = api100.get("/jobs").assert_status_code(200).json + assert res == { + "jobs": [ + {"id": "b1-job03", "status": "running", "created": "2021-06-03T12:34:56Z"}, + {"id": "b1-job08", "status": "running", "created": "2021-06-08T12:34:56Z", "title": "Job number 8."}, + {"id": "b2-job05", "status": "running", "created": "2021-06-05T12:34:56Z"}, + ], + "links": [], + } + @pytest.mark.parametrize("b2_oidc_pid", ["egi", "aho"]) def test_list_jobs_oidc_pid_mapping(self, config, requests_mock, backend1, backend2, b2_oidc_pid): # Override /credentials/oidc of backend2 before building flask app and ApiTester