Skip to content

Commit

Permalink
Issue #635 add get_job_db() and create_job_db() factories as well
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Oct 9, 2024
1 parent 8784c5a commit 3bbbabe
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 9 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- Added `DataCube.load_stac()` to also support creating a `load_stac` based cube without a connection ([#638](https://github.com/Open-EO/openeo-python-client/issues/638))
- `MultiBackendJobManager`: Added `initialize_from_df(df)` (to `CsvJobDatabase` and `ParquetJobDatabase`) to initialize (and persist) the job database from a given DataFrame. ([#635](https://github.com/Open-EO/openeo-python-client/issues/635))
- `MultiBackendJobManager`: Added `initialize_from_df(df)` (to `CsvJobDatabase` and `ParquetJobDatabase`) to initialize (and persist) the job database from a given DataFrame.
Also added `create_job_db()` factory to easily create a job database from a given dataframe and its type guessed from filename extension.
([#635](https://github.com/Open-EO/openeo-python-client/issues/635))



### Changed

Expand Down
51 changes: 43 additions & 8 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,13 +441,7 @@ def run_jobs(
assert not kwargs, f"Unexpected keyword arguments: {kwargs!r}"

if isinstance(job_db, (str, Path)):
job_db_path = Path(job_db)
if job_db_path.suffix.lower() == ".csv":
job_db = CsvJobDatabase(path=job_db_path)
elif job_db_path.suffix.lower() == ".parquet":
job_db = ParquetJobDatabase(path=job_db_path)
else:
raise ValueError(f"Unsupported job database file type {job_db_path!r}")
job_db = get_job_db(path=job_db)

if not isinstance(job_db, JobDatabaseInterface):
raise ValueError(f"Unsupported job_db {job_db!r}")
Expand Down Expand Up @@ -697,7 +691,7 @@ def __init__(self):
super().__init__()
self._df = None

def initialize_from_df(self, df: pd.DataFrame, on_exists: str = "error"):
def initialize_from_df(self, df: pd.DataFrame, *, on_exists: str = "error"):
"""
Initialize the job database from a given dataframe,
which will be first normalized to be compatible
Expand Down Expand Up @@ -851,3 +845,44 @@ def persist(self, df: pd.DataFrame):
self._merge_into_df(df)
self.path.parent.mkdir(parents=True, exist_ok=True)
self.df.to_parquet(self.path, index=False)


def get_job_db(path: Union[str, Path]) -> JobDatabaseInterface:
"""
Factory to get a job database at a given path,
guessing the database type from filename extension.
:param path: path to job database file.
.. versionadded:: 0.33.0
"""
path = Path(path)
if path.suffix.lower() in {".csv"}:
job_db = CsvJobDatabase(path=path)
elif path.suffix.lower() in {".parquet", ".geoparquet"}:
job_db = ParquetJobDatabase(path=path)
else:
raise ValueError(f"Could not guess job database type from {path!r}")
return job_db


def create_job_db(path: Union[str, Path], df: pd.DataFrame, *, on_exists: str = "error"):
"""
Factory to create a job database at given path,
initialized from a given dataframe,
and its database type guessed from filename extension.
:param path: Path to the job database file.
:param df: DataFrame to store in the job database.
:param on_exists: What to do when the job database already exists:
- "error": (default) raise an exception
- "skip": work with existing database, ignore given dataframe and skip any initialization
.. versionadded:: 0.33.0
"""
job_db = get_job_db(path)
if isinstance(job_db, FullDataFrameJobDatabase):
job_db.initialize_from_df(df=df, on_exists=on_exists)
else:
raise NotImplementedError(f"Initialization of {type(job_db)} is not supported.")
return job_db
60 changes: 60 additions & 0 deletions tests/extra/test_job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
CsvJobDatabase,
MultiBackendJobManager,
ParquetJobDatabase,
create_job_db,
get_job_db,
)
from openeo.util import rfc3339

Expand Down Expand Up @@ -169,6 +171,35 @@ def start_job(row, connection, **kwargs):
assert set(result.status) == {"finished"}
assert set(result.backend_name) == {"foo", "bar"}

@pytest.mark.parametrize(
["filename", "expected_db_class"],
[
("jobz.csv", CsvJobDatabase),
("jobz.parquet", ParquetJobDatabase),
],
)
def test_create_job_db(self, tmp_path, requests_mock, sleep_mock, filename, expected_db_class):
"""
Basic run with `create_job_db()` usage
"""
manager = self._create_basic_mocked_manager(requests_mock, tmp_path)

def start_job(row, connection, **kwargs):
year = int(row["year"])
return BatchJob(job_id=f"job-{year}", connection=connection)

df = pd.DataFrame({"year": [2018, 2019, 2020, 2021, 2022]})
output_file = tmp_path / filename
job_db = create_job_db(path=output_file, df=df)

manager.run_jobs(job_db=job_db, start_job=start_job)
assert sleep_mock.call_count > 10

result = job_db.read()
assert len(result) == 5
assert set(result.status) == {"finished"}
assert set(result.backend_name) == {"foo", "bar"}

def test_basic_threading(self, tmp_path, requests_mock, sleep_mock):
manager = self._create_basic_mocked_manager(requests_mock, tmp_path)

Expand Down Expand Up @@ -897,3 +928,32 @@ def test_initialize_from_df(self, tmp_path):

df_from_disk = ParquetJobDatabase(path).read()
assert set(df_from_disk.columns) == expected_columns


@pytest.mark.parametrize(
["filename", "expected"],
[
("jobz.csv", CsvJobDatabase),
("jobz.parquet", ParquetJobDatabase),
],
)
def test_get_job_db(tmp_path, filename, expected):
path = tmp_path / filename
db = get_job_db(path)
assert isinstance(db, expected)
assert not path.exists()


@pytest.mark.parametrize(
["filename", "expected"],
[
("jobz.csv", CsvJobDatabase),
("jobz.parquet", ParquetJobDatabase),
],
)
def test_create_job_db(tmp_path, filename, expected):
df = pd.DataFrame({"year": [2023, 2024]})
path = tmp_path / filename
db = create_job_db(path=path, df=df)
assert isinstance(db, expected)
assert path.exists()

0 comments on commit 3bbbabe

Please sign in to comment.