From 3d01559fe81344cadb4aebd773b42d9c3d363e66 Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Thu, 18 Jul 2024 16:11:59 +0200 Subject: [PATCH] Load reference metadata for input and output files and store in provenance --- src/ctapipe/core/provenance.py | 30 ++++++++++++++++++-- src/ctapipe/core/tests/test_provenance.py | 34 +++++++++++++++++------ src/ctapipe/io/datawriter.py | 2 +- src/ctapipe/io/hdf5merger.py | 2 +- src/ctapipe/tools/dump_instrument.py | 12 +++++--- src/ctapipe/tools/merge.py | 3 +- 6 files changed, 64 insertions(+), 19 deletions(-) diff --git a/src/ctapipe/core/provenance.py b/src/ctapipe/core/provenance.py index 02ee888ab75..258d565fa18 100644 --- a/src/ctapipe/core/provenance.py +++ b/src/ctapipe/core/provenance.py @@ -11,6 +11,7 @@ import platform import sys import uuid +import warnings from collections import UserList from contextlib import contextmanager from importlib import import_module @@ -57,6 +58,10 @@ def get_module_version(name): return "not installed" +class MissingReferenceMetadata(UserWarning): + """Warning raised if reference metadata could not be read from input file.""" + + class Provenance(metaclass=Singleton): """ Manage the provenance info for a stack of *activities* @@ -251,13 +256,19 @@ def register_input(self, url, role=None): role: str role name that this input satisfies """ - self._prov["input"].append(dict(url=url, role=role)) + reference_meta = self._get_reference_meta(url=url) + self._prov["input"].append( + dict(url=url, role=role, reference_meta=reference_meta) + ) def register_output(self, url, role=None): """ Add a URL of a file to the list of outputs (can be a filename or full url, if no URL specifier is given, assume 'file://') + Should only be called once the file is finalized, so that reference metadata + can be read. + Parameters ---------- url: str @@ -265,7 +276,10 @@ def register_output(self, url, role=None): role: str role name that this output satisfies """ - self._prov["output"].append(dict(url=url, role=role)) + reference_meta = self._get_reference_meta(url=url) + self._prov["output"].append( + dict(url=url, role=role, reference_meta=reference_meta) + ) def register_config(self, config): """add a dictionary of configuration parameters to this activity""" @@ -302,6 +316,18 @@ def sample_cpu_and_memory(self): def provenance(self): return self._prov + def _get_reference_meta(self, url): + # here to prevent circular imports / top-level cross-dependencies + from ..io.metadata import read_reference_metadata + + try: + return read_reference_metadata(url).to_dict() + except Exception: + warnings.warn( + f"Could not read reference metadata for input file: {url}", + MissingReferenceMetadata, + ) + def _get_python_packages(): return [ diff --git a/src/ctapipe/core/tests/test_provenance.py b/src/ctapipe/core/tests/test_provenance.py index f34ff6d9c3a..465e11ba27e 100644 --- a/src/ctapipe/core/tests/test_provenance.py +++ b/src/ctapipe/core/tests/test_provenance.py @@ -4,6 +4,7 @@ from ctapipe.core import Provenance from ctapipe.core.provenance import _ActivityProvenance +from ctapipe.io.metadata import Reference @pytest.fixture @@ -15,19 +16,18 @@ def provenance(monkeypatch): prov = Provenance() monkeypatch.setattr(prov, "_activities", []) monkeypatch.setattr(prov, "_finished_activities", []) - - prov.start_activity("test1") - prov.add_input_file("input.txt") - prov.add_output_file("output.txt") - prov.start_activity("test2") - prov.add_input_file("input_a.txt") - prov.add_input_file("input_b.txt") - prov.finish_activity("test2") - prov.finish_activity("test1") return prov def test_provenance_activity_names(provenance): + provenance.start_activity("test1") + provenance.add_input_file("input.txt") + provenance.add_output_file("output.txt") + provenance.start_activity("test2") + provenance.add_input_file("input_a.txt") + provenance.add_input_file("input_b.txt") + provenance.finish_activity("test2") + provenance.finish_activity("test1") assert set(provenance.finished_activity_names) == {"test2", "test1"} @@ -52,6 +52,8 @@ def test_provenence_contextmanager(): def test_provenance_json(provenance: Provenance): + provenance.start_activity("test1") + provenance.finish_activity("test1") data = json.loads(provenance.as_json()) activity = data[0] @@ -60,3 +62,17 @@ def test_provenance_json(provenance: Provenance): packages = activity["system"]["python"].get("packages") assert isinstance(packages, list) assert any(p["name"] == "numpy" for p in packages) + + +def test_provenance_input_reference_meta(provenance: Provenance, dl1_file): + provenance.start_activity("test1") + provenance.add_input_file(dl1_file, "events") + provenance.finish_activity("test1") + data = json.loads(provenance.as_json()) + + inputs = data[0]["input"] + assert len(inputs) == 1 + input_meta = inputs[0] + assert "reference_meta" in input_meta + assert "CTA PRODUCT ID" in input_meta["reference_meta"] + Reference.from_dict(input_meta["reference_meta"]) diff --git a/src/ctapipe/io/datawriter.py b/src/ctapipe/io/datawriter.py index d0f93905f66..3119bc96695 100644 --- a/src/ctapipe/io/datawriter.py +++ b/src/ctapipe/io/datawriter.py @@ -388,6 +388,7 @@ def finish(self): self._write_context_metadata_headers() self._writer.close() + PROV.add_output_file(str(self.output_path), role="DL1/Event") @property def datalevels(self): @@ -432,7 +433,6 @@ def _setup_output_path(self): ", use the `overwrite` option or choose another `output_path` " ) self.log.debug("output path: %s", self.output_path) - PROV.add_output_file(str(self.output_path), role="DL1/Event") # check that options make sense writable_things = [ diff --git a/src/ctapipe/io/hdf5merger.py b/src/ctapipe/io/hdf5merger.py index 58b98a78070..8ea754c266b 100644 --- a/src/ctapipe/io/hdf5merger.py +++ b/src/ctapipe/io/hdf5merger.py @@ -183,7 +183,6 @@ def __init__(self, output_path=None, **kwargs): mode="a" if appending else "w", filters=DEFAULT_FILTERS, ) - Provenance().add_output_file(str(self.output_path)) self.required_nodes = None self.data_model_version = None @@ -384,6 +383,7 @@ def __exit__(self, exc_type, exc_value, traceback): def close(self): if hasattr(self, "h5file"): self.h5file.close() + Provenance().add_output_file(str(self.output_path)) def _append_subarray(self, other): # focal length choice doesn't matter here, set to equivalent so we don't get diff --git a/src/ctapipe/tools/dump_instrument.py b/src/ctapipe/tools/dump_instrument.py index 7f169acdefb..a3ad471696b 100644 --- a/src/ctapipe/tools/dump_instrument.py +++ b/src/ctapipe/tools/dump_instrument.py @@ -91,11 +91,15 @@ def write_camera_definitions(self): try: geom_table.write(geom_filename, **args) - readout_table.write(readout_filename, **args) Provenance().add_output_file(geom_filename, "CameraGeometry") + except OSError as err: + self.log.exception("couldn't write camera geometry because: %s", err) + + try: + readout_table.write(readout_filename, **args) Provenance().add_output_file(readout_filename, "CameraReadout") except OSError as err: - self.log.warning("couldn't write camera definition because: %s", err) + self.log.exception("couldn't write camera definition because: %s", err) def write_optics_descriptions(self): """writes out optics files for each telescope type""" @@ -109,7 +113,7 @@ def write_optics_descriptions(self): tab.write(filename, **args) Provenance().add_output_file(filename, "OpticsDescription") except OSError as err: - self.log.warning( + self.log.exception( "couldn't write optics description '%s' because: %s", filename, err ) @@ -123,7 +127,7 @@ def write_subarray_description(self): tab.write(filename, **args) Provenance().add_output_file(filename, "SubarrayDescription") except OSError as err: - self.log.warning( + self.log.exception( "couldn't write subarray description '%s' because: %s", filename, err ) diff --git a/src/ctapipe/tools/merge.py b/src/ctapipe/tools/merge.py index ba9daad1549..3665d504cd4 100644 --- a/src/ctapipe/tools/merge.py +++ b/src/ctapipe/tools/merge.py @@ -161,7 +161,7 @@ def setup(self): ) sys.exit(1) - self.merger = HDF5Merger(parent=self) + self.merger = self.enter_context(HDF5Merger(parent=self)) if self.merger.output_path in self.input_files: raise ToolConfigurationError( "Output path contained in input files. Fix your configuration / cli arguments." @@ -195,7 +195,6 @@ def finish(self): current_activity = Provenance().current_activity.provenance self.merger.meta.activity = meta.Activity.from_provenance(current_activity) meta.write_to_hdf5(self.merger.meta.to_dict(), self.merger.h5file) - self.merger.close() def main():