Skip to content

Commit

Permalink
Issue 28 Finetune parallelized /jobs requests
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Oct 5, 2023
1 parent 16b1a43 commit c136b2f
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The format is roughly based on [Keep a Changelog](https://keepachangelog.com/en/
### Changed

- Dockerfile: switch to `python:3.9-slim-bullseye` base image
- Parallelize `/jobs` requests to upstream back-ends ([#28](https://github.com/Open-EO/openeo-aggregator/issues/28))

### Fixed

Expand Down
2 changes: 1 addition & 1 deletion src/openeo_aggregator/about.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import sys

__version__ = "0.11.0a1"
__version__ = "0.11.1a1"


def log_version_info():
Expand Down
49 changes: 25 additions & 24 deletions src/openeo_aggregator/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,42 +597,43 @@ def __init__(
self.partitioned_job_tracker = partitioned_job_tracker

def get_user_jobs(self, user_id: str) -> Union[List[BatchJobMetadata], dict]:
jobs = []
all_jobs = []
federation_missing = set()

results = self.backends.request_parallel(
path="/jobs", method="GET", expected_status=[200], authenticated_from_request=flask.request
)
for backend_id, success, result in results:
if success:
try:
for job in result["jobs"]:
try:
job["id"] = JobIdMapping.get_aggregator_job_id(
backend_job_id=job["id"], backend_id=backend_id
)
jobs.append(BatchJobMetadata.from_api_dict(job))
except Exception as e:
_log.error(f"get_user_jobs: skipping job with parse issue: {e!r}", exc_info=True)
except Exception as e:
_log.warning(f"Invalid job listing from backend {backend_id!r}: {e!r}")
federation_missing.add(backend_id)
else:
# TODO: user warning https://github.com/Open-EO/openeo-api/issues/412
_log.warning(f"Failed to get job listing from backend {backend_id!r}: {result!r}")
for backend_id, result in results.successes.items():
try:
jobs = result["jobs"]
assert isinstance(jobs, list), "must be a list"
except Exception as e:
_log.warning(f"Invalid job listing from backend {backend_id!r}: {e!r}")
federation_missing.add(backend_id)
else:
for job in jobs:
try:
job["id"] = JobIdMapping.get_aggregator_job_id(backend_job_id=job["id"], backend_id=backend_id)
all_jobs.append(BatchJobMetadata.from_api_dict(job))
except Exception as e:
_log.error(f"get_user_jobs: skipping job with parse issue: {e!r}", exc_info=True)
for backend_id, exc in results.failures.items():
_log.warning(f"Failed to get job listing from backend {backend_id!r}: {exc!r}")
federation_missing.add(backend_id)

if self.partitioned_job_tracker:
for job in self.partitioned_job_tracker.list_user_jobs(user_id=user_id):
job["id"] = JobIdMapping.get_aggregator_job_id(backend_job_id=job["id"], backend_id=JobIdMapping.AGG)
jobs.append(BatchJobMetadata.from_api_dict(job))
all_jobs.append(BatchJobMetadata.from_api_dict(job))

federation_missing.update(self.backends.get_disabled_connection_ids())
return dict_no_none({
"jobs": jobs,
# TODO: experimental "federation:missing" https://github.com/openEOPlatform/architecture-docs/issues/179
"federation:missing": list(federation_missing) or None
})
return dict_no_none(
{
"jobs": all_jobs,
# TODO: experimental "federation:missing" https://github.com/openEOPlatform/architecture-docs/issues/179
"federation:missing": list(federation_missing) or None,
}
)

def create_job(
self,
Expand Down
22 changes: 16 additions & 6 deletions src/openeo_aggregator/connection.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import collections
import concurrent.futures
import contextlib
import dataclasses
import logging
import re
from typing import (
Expand Down Expand Up @@ -193,6 +194,14 @@ def request(*args, **kwargs):
_ConnectionsCache = collections.namedtuple("_ConnectionsCache", ["expiry", "connections"])


@dataclasses.dataclass(frozen=True)
class ParallelResponse:
"""Set of responses and failures info for a parallelized request."""

successes: Dict[BackendId, dict]
failures: Dict[BackendId, Exception]


class MultiBackendConnection:
"""
Collection of multiple connections to different backends
Expand Down Expand Up @@ -352,7 +361,7 @@ def request_parallel(
request_timeout: float = 5,
overall_timeout: float = 8,
max_workers=5,
) -> List[Tuple[BackendId, bool, Any]]:
) -> ParallelResponse:
"""
Request a given (relative) url on each backend in parallel
:param path: relative (openEO) path to request
Expand Down Expand Up @@ -411,15 +420,16 @@ def do_request(
concurrent.futures.wait([f for (_, f) in futures], timeout=overall_timeout)

# Collect results.
results: List[Tuple[BackendId, bool, Any]] = []
successes = {}
failures = {}
for backend_id, future in futures:
try:
result = future.result(timeout=0)
results.append((backend_id, True, result))
successes[backend_id] = future.result(timeout=0)
except Exception as e:
results.append((backend_id, False, e))
failures[backend_id] = e

return ParallelResponse(successes=successes, failures=failures)

return results


def streaming_flask_response(
Expand Down
36 changes: 34 additions & 2 deletions tests/test_views.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import logging
import re
import time
from typing import List, Tuple

import pytest
import requests
from openeo.rest import OpenEoApiError, OpenEoRestError
from openeo.rest.connection import url_join
from openeo.util import rfc3339
from openeo.util import ContextTimer, rfc3339
from openeo_driver.backend import ServiceMetadata
from openeo_driver.errors import (
JobNotFinishedException,
Expand Down Expand Up @@ -1155,7 +1156,7 @@ class TestBatchJobs:
def test_list_jobs_no_auth(self, api100):
api100.get("/jobs").assert_error(401, "AuthenticationRequired")

def test_list_jobs(self, api100, requests_mock, backend1, backend2):
def test_list_jobs_basic(self, api100, requests_mock, backend1, backend2):
requests_mock.get(backend1 + "/jobs", json={"jobs": [
{"id": "job03", "status": "running", "created": "2021-06-03T12:34:56Z"},
{"id": "job08", "status": "running", "created": "2021-06-08T12:34:56Z", "title": "Job number 8."},
Expand All @@ -1174,6 +1175,37 @@ def test_list_jobs(self, api100, requests_mock, backend1, backend2):
"links": [],
}

def test_list_jobs_auth(self, api100, requests_mock, backend1, backend2):
def b1_get_jobs(request, context):
assert request.headers["Authorization"] == TEST_USER_AUTH_HEADER["Authorization"]
return {
"jobs": [
{"id": "job03", "status": "running", "created": "2021-06-03T12:34:56Z"},
{"id": "job08", "status": "running", "created": "2021-06-08T12:34:56Z", "title": "Job number 8."},
]
}

def b2_get_jobs(request, context):
assert request.headers["Authorization"] == TEST_USER_AUTH_HEADER["Authorization"]
return {
"jobs": [
{"id": "job05", "status": "running", "created": "2021-06-05T12:34:56Z"},
]
}

requests_mock.get(backend1 + "/jobs", json=b1_get_jobs)
requests_mock.get(backend2 + "/jobs", json=b2_get_jobs)
api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)
res = api100.get("/jobs").assert_status_code(200).json
assert res == {
"jobs": [
{"id": "b1-job03", "status": "running", "created": "2021-06-03T12:34:56Z"},
{"id": "b1-job08", "status": "running", "created": "2021-06-08T12:34:56Z", "title": "Job number 8."},
{"id": "b2-job05", "status": "running", "created": "2021-06-05T12:34:56Z"},
],
"links": [],
}

@pytest.mark.parametrize("b2_oidc_pid", ["egi", "aho"])
def test_list_jobs_oidc_pid_mapping(self, config, requests_mock, backend1, backend2, b2_oidc_pid):
# Override /credentials/oidc of backend2 before building flask app and ApiTester
Expand Down

0 comments on commit c136b2f

Please sign in to comment.