Skip to content

Commit

Permalink
Merge branch 'issue28-parallel-requests'
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Oct 5, 2023
2 parents b91e8cb + c136b2f commit 8f3b51f
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The format is roughly based on [Keep a Changelog](https://keepachangelog.com/en/
### Changed

- Dockerfile: switch to `python:3.9-slim-bullseye` base image
- Parallelize `/jobs` requests to upstream back-ends ([#28](https://github.com/Open-EO/openeo-aggregator/issues/28))

### Fixed

Expand Down
2 changes: 1 addition & 1 deletion src/openeo_aggregator/about.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import sys

__version__ = "0.11.0a1"
__version__ = "0.11.1a1"


def log_version_info():
Expand Down
47 changes: 27 additions & 20 deletions src/openeo_aggregator/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,36 +597,43 @@ def __init__(
self.partitioned_job_tracker = partitioned_job_tracker

def get_user_jobs(self, user_id: str) -> Union[List[BatchJobMetadata], dict]:
jobs = []
all_jobs = []
federation_missing = set()
for con in self.backends:
with con.authenticated_from_request(request=flask.request, user=User(user_id)), \
TimingLogger(f"get_user_jobs: {con.id}", logger=_log.debug):
try:
backend_jobs = con.list_jobs()
except Exception as e:
# TODO: user warning https://github.com/Open-EO/openeo-api/issues/412
_log.warning(f"Failed to get job listing from backend {con.id!r}: {e!r}")
federation_missing.add(con.id)
backend_jobs = []
for job in backend_jobs:

results = self.backends.request_parallel(
path="/jobs", method="GET", expected_status=[200], authenticated_from_request=flask.request
)
for backend_id, result in results.successes.items():
try:
jobs = result["jobs"]
assert isinstance(jobs, list), "must be a list"
except Exception as e:
_log.warning(f"Invalid job listing from backend {backend_id!r}: {e!r}")
federation_missing.add(backend_id)
else:
for job in jobs:
try:
job["id"] = JobIdMapping.get_aggregator_job_id(backend_job_id=job["id"], backend_id=con.id)
jobs.append(BatchJobMetadata.from_api_dict(job))
job["id"] = JobIdMapping.get_aggregator_job_id(backend_job_id=job["id"], backend_id=backend_id)
all_jobs.append(BatchJobMetadata.from_api_dict(job))
except Exception as e:
_log.error(f"get_user_jobs: skipping job with parse issue: {e!r}", exc_info=True)
for backend_id, exc in results.failures.items():
_log.warning(f"Failed to get job listing from backend {backend_id!r}: {exc!r}")
federation_missing.add(backend_id)

if self.partitioned_job_tracker:
for job in self.partitioned_job_tracker.list_user_jobs(user_id=user_id):
job["id"] = JobIdMapping.get_aggregator_job_id(backend_job_id=job["id"], backend_id=JobIdMapping.AGG)
jobs.append(BatchJobMetadata.from_api_dict(job))
all_jobs.append(BatchJobMetadata.from_api_dict(job))

federation_missing.update(self.backends.get_disabled_connection_ids())
return dict_no_none({
"jobs": jobs,
# TODO: experimental "federation:missing" https://github.com/openEOPlatform/architecture-docs/issues/179
"federation:missing": list(federation_missing) or None
})
return dict_no_none(
{
"jobs": all_jobs,
# TODO: experimental "federation:missing" https://github.com/openEOPlatform/architecture-docs/issues/179
"federation:missing": list(federation_missing) or None,
}
)

def create_job(
self,
Expand Down
119 changes: 115 additions & 4 deletions src/openeo_aggregator/connection.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,29 @@
import collections
import concurrent.futures
import contextlib
import dataclasses
import logging
import re
from typing import Any, Callable, Dict, Iterator, List, Optional, Set, Tuple, Union
from typing import (
Any,
Callable,
Dict,
Iterable,
Iterator,
List,
Optional,
Set,
Tuple,
Union,
)

import flask
import requests
from openeo import Connection
from openeo.capabilities import ComparableVersion
from openeo.rest.auth.auth import BearerAuth, OpenEoApiAuthBase
from openeo.rest.connection import RestApiConnection
from openeo.util import TimingLogger
from openeo_driver.backend import OidcProvider
from openeo_driver.errors import (
AuthenticationRequiredException,
Expand All @@ -31,6 +46,10 @@
_log = logging.getLogger(__name__)


# Type annotation aliases

BackendId = str

class LockedAuthException(InternalException):
def __init__(self):
super().__init__(message="Setting auth while locked.")
Expand Down Expand Up @@ -105,8 +124,11 @@ def _build_oidc_provider_map(self, configured_providers: List[OidcProvider]) ->
def get_oidc_provider_map(self) -> Dict[str, str]:
return self._oidc_provider_map

def _get_bearer(self, request: flask.Request) -> str:
"""Extract authorization header from request and (optionally) transform for given backend """
def extract_bearer(self, request: flask.Request) -> str:
"""
Extract authorization header from flask request
and (optionally) transform for current backend.
"""
if "Authorization" not in request.headers:
raise AuthenticationRequiredException
auth = request.headers["Authorization"]
Expand Down Expand Up @@ -134,7 +156,7 @@ def authenticated_from_request(
Context manager to temporarily authenticate upstream connection based on current incoming flask request.
"""
self._auth_locked = False
self.auth = BearerAuth(bearer=self._get_bearer(request=request))
self.auth = BearerAuth(bearer=self.extract_bearer(request=request))
# TODO store and use `user` object?
try:
yield self
Expand Down Expand Up @@ -172,6 +194,14 @@ def request(*args, **kwargs):
_ConnectionsCache = collections.namedtuple("_ConnectionsCache", ["expiry", "connections"])


@dataclasses.dataclass(frozen=True)
class ParallelResponse:
"""Set of responses and failures info for a parallelized request."""

successes: Dict[BackendId, dict]
failures: Dict[BackendId, Exception]


class MultiBackendConnection:
"""
Collection of multiple connections to different backends
Expand Down Expand Up @@ -320,6 +350,87 @@ def map(
# TODO: customizable exception handling: skip, warn, re-raise?
yield con.id, res

def request_parallel(
self,
path: str,
*,
method: str = "GET",
parse_json: bool = True,
authenticated_from_request: Optional[flask.Request] = None,
expected_status: Union[int, Iterable[int], None] = None,
request_timeout: float = 5,
overall_timeout: float = 8,
max_workers=5,
) -> ParallelResponse:
"""
Request a given (relative) url on each backend in parallel
:param path: relative (openEO) path to request
:return:
"""

def do_request(
root_url: str,
path: str,
*,
method: str = "GET",
headers: Optional[dict] = None,
auth: Optional[str] = None,
) -> Union[dict, bytes]:
"""Isolated request, to behanled by future."""
with TimingLogger(title=f"request_parallel {method} {path} on {root_url}", logger=_log):
con = RestApiConnection(root_url=root_url)
resp = con.request(
method=method,
path=path,
headers=headers,
auth=auth,
timeout=request_timeout,
expected_status=expected_status,
)
if parse_json:
return resp.json()
else:
return resp.content

connections: List[BackendConnection] = self.get_connections()
max_workers = min(max_workers, len(connections))

with TimingLogger(
title=f"request_parallel {method} {path} on {len(connections)} backends with thread pool {max_workers=}",
logger=_log,
), concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all futures (one for each backend connection)
futures: List[Tuple[BackendId, concurrent.futures.Future]] = []
for con in connections:
if authenticated_from_request:
auth = BearerAuth(bearer=con.extract_bearer(request=authenticated_from_request))
else:
auth = None
future = executor.submit(
do_request,
root_url=con.root_url,
path=path,
method=method,
headers=con.default_headers,
auth=auth,
)
futures.append((con.id, future))

# Give futures some time to finish
concurrent.futures.wait([f for (_, f) in futures], timeout=overall_timeout)

# Collect results.
successes = {}
failures = {}
for backend_id, future in futures:
try:
successes[backend_id] = future.result(timeout=0)
except Exception as e:
failures[backend_id] = e

return ParallelResponse(successes=successes, failures=failures)



def streaming_flask_response(
backend_response: requests.Response,
Expand Down
36 changes: 34 additions & 2 deletions tests/test_views.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import logging
import re
import time
from typing import List, Tuple

import pytest
import requests
from openeo.rest import OpenEoApiError, OpenEoRestError
from openeo.rest.connection import url_join
from openeo.util import rfc3339
from openeo.util import ContextTimer, rfc3339
from openeo_driver.backend import ServiceMetadata
from openeo_driver.errors import (
JobNotFinishedException,
Expand Down Expand Up @@ -1155,7 +1156,7 @@ class TestBatchJobs:
def test_list_jobs_no_auth(self, api100):
api100.get("/jobs").assert_error(401, "AuthenticationRequired")

def test_list_jobs(self, api100, requests_mock, backend1, backend2):
def test_list_jobs_basic(self, api100, requests_mock, backend1, backend2):
requests_mock.get(backend1 + "/jobs", json={"jobs": [
{"id": "job03", "status": "running", "created": "2021-06-03T12:34:56Z"},
{"id": "job08", "status": "running", "created": "2021-06-08T12:34:56Z", "title": "Job number 8."},
Expand All @@ -1174,6 +1175,37 @@ def test_list_jobs(self, api100, requests_mock, backend1, backend2):
"links": [],
}

def test_list_jobs_auth(self, api100, requests_mock, backend1, backend2):
def b1_get_jobs(request, context):
assert request.headers["Authorization"] == TEST_USER_AUTH_HEADER["Authorization"]
return {
"jobs": [
{"id": "job03", "status": "running", "created": "2021-06-03T12:34:56Z"},
{"id": "job08", "status": "running", "created": "2021-06-08T12:34:56Z", "title": "Job number 8."},
]
}

def b2_get_jobs(request, context):
assert request.headers["Authorization"] == TEST_USER_AUTH_HEADER["Authorization"]
return {
"jobs": [
{"id": "job05", "status": "running", "created": "2021-06-05T12:34:56Z"},
]
}

requests_mock.get(backend1 + "/jobs", json=b1_get_jobs)
requests_mock.get(backend2 + "/jobs", json=b2_get_jobs)
api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)
res = api100.get("/jobs").assert_status_code(200).json
assert res == {
"jobs": [
{"id": "b1-job03", "status": "running", "created": "2021-06-03T12:34:56Z"},
{"id": "b1-job08", "status": "running", "created": "2021-06-08T12:34:56Z", "title": "Job number 8."},
{"id": "b2-job05", "status": "running", "created": "2021-06-05T12:34:56Z"},
],
"links": [],
}

@pytest.mark.parametrize("b2_oidc_pid", ["egi", "aho"])
def test_list_jobs_oidc_pid_mapping(self, config, requests_mock, backend1, backend2, b2_oidc_pid):
# Override /credentials/oidc of backend2 before building flask app and ApiTester
Expand Down

0 comments on commit 8f3b51f

Please sign in to comment.