From af3b046df424c8dfe1fa63ccbf55e865b9636096 Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Thu, 18 Jul 2024 15:12:53 +0200 Subject: [PATCH] Add functions to read reference metadata --- src/ctapipe/io/hdf5merger.py | 2 +- src/ctapipe/io/metadata.py | 83 +++++++++++++++---- src/ctapipe/io/tests/test_metadata.py | 111 +++++++++++--------------- 3 files changed, 116 insertions(+), 80 deletions(-) diff --git a/src/ctapipe/io/hdf5merger.py b/src/ctapipe/io/hdf5merger.py index 4161dba0878..58b98a78070 100644 --- a/src/ctapipe/io/hdf5merger.py +++ b/src/ctapipe/io/hdf5merger.py @@ -247,7 +247,7 @@ def _update_meta(self): def _read_meta(self, h5file): try: - return metadata.Reference.from_dict(metadata.read_metadata(h5file)) + return metadata.read_reference_metadata_hdf5(h5file) except Exception: raise CannotMerge( f"CTA Reference meta not found in input file: {h5file.filename}" diff --git a/src/ctapipe/io/metadata.py b/src/ctapipe/io/metadata.py index b4fd670144f..2aef43d111e 100644 --- a/src/ctapipe/io/metadata.py +++ b/src/ctapipe/io/metadata.py @@ -22,6 +22,7 @@ some_astropy_table.write("output.ecsv") """ +import gzip import os import uuid import warnings @@ -29,6 +30,8 @@ from contextlib import ExitStack import tables +from astropy.io import fits +from astropy.table import Table from astropy.time import Time from tables import NaturalNameWarning from traitlets import Enum, HasTraits, Instance, List, Unicode, UseEnum, default @@ -45,7 +48,8 @@ "Activity", "Instrument", "write_to_hdf5", - "read_metadata", + "read_hdf5_metadata", + "read_reference_metadata", ] @@ -295,6 +299,12 @@ def from_dict(cls, metadata): instrument=Instrument(**kwargs["instrument"]), ) + @classmethod + def from_fits(cls, header): + # for now, just use from_dict, but we might need special handling + # of some keys + return cls.from_dict(header) + def __repr__(self): return str(self.to_dict()) @@ -320,27 +330,72 @@ def write_to_hdf5(metadata, h5file, path="/"): node._v_attrs[key] = value # pylint: disable=protected-access -def read_metadata(h5file, path="/"): +def read_hdf5_metadata(h5file, path="/"): + """ + Read hdf5 attributes into a dict + """ + with ExitStack() as stack: + if not isinstance(h5file, tables.File): + h5file = stack.enter_context(tables.open_file(h5file)) + + node = h5file.get_node(path) + return {key: node._v_attrs[key] for key in node._v_attrs._f_list()} + + +def read_reference_metadata(path): + """Read CTAO data product metadata from path + + File is first opened to determine file format, then the metadata + is read. Supported are currently FITS and HDF5. """ - Read metadata from an hdf5 file + header_bytes = 8 + with open(path, "rb") as f: + first_bytes = f.read(header_bytes) + + if first_bytes.startswith(b"\x1f\x8b"): + with gzip.open(path, "rb") as f: + first_bytes = f.read(header_bytes) + + if first_bytes.startswith(b"\x89HDF"): + return _read_reference_metadata_hdf5(path) + + if first_bytes.startswith(b"SIMPLE"): + return _read_reference_metadata_fits(path) + + if first_bytes.startswith(b"# %ECSV"): + return Reference.from_dict(Table.read(path).meta) + + raise ValueError( + f"'{path}' is not one of the supported file formats: fits, hdf5, ecsv" + ) + + +def _read_reference_metadata_hdf5(h5file, path="/"): + meta = read_hdf5_metadata(h5file, path) + return Reference.from_dict(meta) + + +def _read_reference_metadata_ecsv(path): + return Reference.from_dict(Table.read(path).meta) + + +def _read_reference_metadata_fits(fitsfile, hdu: int | str = 0): + """ + Read reference metadata from a fits file Parameters ---------- - h5filename: string, Path, or `tables.file.File` + fitsfile: string, Path, or `tables.file.File` hdf5 file - path: string - default: '/' is the path to ctapipe global metadata + hdu: int or str + HDU index or name. Returns ------- - metadata: dictionary + reference_metadata: Reference """ with ExitStack() as stack: - if not isinstance(h5file, tables.File): - h5file = stack.enter_context(tables.open_file(h5file)) - - node = h5file.get_node(path) - metadata = {key: node._v_attrs[key] for key in node._v_attrs._f_list()} - return metadata + if not isinstance(fitsfile, fits.HDUList): + fitsfile = stack.enter_context(fits.open(fitsfile)) - raise OSError("Could not read metadata") + return Reference.from_fits(fitsfile[hdu].header) diff --git a/src/ctapipe/io/tests/test_metadata.py b/src/ctapipe/io/tests/test_metadata.py index a69e9be76ab..aa242b0448f 100644 --- a/src/ctapipe/io/tests/test_metadata.py +++ b/src/ctapipe/io/tests/test_metadata.py @@ -1,16 +1,19 @@ """ Test CTA Reference metadata functionality """ +import uuid +import pytest import tables +from astropy.io import fits +from astropy.table import Table from ctapipe.core.provenance import Provenance from ctapipe.io import metadata as meta -def test_construct_and_write_metadata(tmp_path): - """basic test of making a Reference object and writing it""" - +@pytest.fixture() +def reference(): prov = Provenance() prov.start_activity("test") prov.finish_activity() @@ -43,41 +46,58 @@ def test_construct_and_write_metadata(tmp_path): id_="threshold", ), ) + return reference + +def test_to_dict(reference): + """Test for Reference.to_dict""" ref_dict = reference.to_dict() assert ref_dict["CTA PRODUCT FORMAT"] == "hdf5" assert ref_dict["CTA PRODUCT DATA LEVELS"] == "DL1_IMAGES,DL1_PARAMETERS" + assert str(uuid.UUID(ref_dict["CTA PRODUCT ID"])) == ref_dict["CTA PRODUCT ID"] - import uuid # pylint: disable=import-outside-toplevel - assert str(uuid.UUID(ref_dict["CTA PRODUCT ID"])) == ref_dict["CTA PRODUCT ID"] +def test_from_dict(reference): + as_dict = reference.to_dict() + back = meta.Reference.from_dict(as_dict) + assert back.to_dict() == as_dict + + +@pytest.mark.parametrize("format", ("fits", "fits.gz")) +def test_reference_metadata_fits(tmp_path, format, reference): + """Test for writing reference metadata""" + path = tmp_path / f"test.{format}" + + hdul = fits.HDUList(fits.PrimaryHDU()) + hdul[0].header.update(reference.to_dict(fits=True)) + hdul.writeto(path) + + back = meta.read_reference_metadata(path) + assert back.to_dict() == reference.to_dict() + + +def test_reference_metadata_h5(tmp_path, reference): + path = tmp_path / "test.h5" - # check that we can write this to the header of a typical table file in multiple - # formats: - from astropy.table import Table # pylint: disable=import-outside-toplevel + with tables.open_file(path, "w") as f: + meta.write_to_hdf5(reference.to_dict(), f) - table = Table(dict(x=[1, 2, 3], y=[15.2, 15.2, 14.5])) - for path in [tmp_path / "test.fits", tmp_path / "test.ecsv"]: - if ".fits" in path.suffixes: - reference.format = "fits" - ref_dict = reference.to_dict(fits=True) - else: - reference.format = "ecsv" - ref_dict = reference.to_dict() + back = meta.read_reference_metadata(path) + assert back.to_dict() == reference.to_dict() - table.meta = ref_dict - table.write(path) - # write to pytables file +def test_reference_metadata_ecsv(tmp_path, reference): + path = tmp_path / "test.ecsv" - import tables # pylint: disable=import-outside-toplevel + t = Table({"a": [1, 2, 3], "b": [4, 5, 6]}) + t.meta.update(reference.to_dict()) + t.write(path) - with tables.open_file(tmp_path / "test.h5", mode="w") as h5file: - h5file.create_group(where="/", name="node") - meta.write_to_hdf5(ref_dict, h5file, path="/node") + back = meta.read_reference_metadata(path) + assert back.to_dict() == reference.to_dict() -def test_read_metadata(tmp_path): +def test_read_hdf5_metadata(tmp_path): # Testing one can read both a path as well as a PyTables file object filename = tmp_path / "test.h5" metadata_in = {"SOFTWARE": "ctapipe", "FOO": "BAR"} @@ -86,49 +106,10 @@ def test_read_metadata(tmp_path): h5file.create_group(where="/node", name="subnode", createparents=True) meta.write_to_hdf5(metadata_in, h5file, path=metadata_path) - metadata_out = meta.read_metadata(filename, path=metadata_path) + metadata_out = meta.read_hdf5_metadata(filename, path=metadata_path) assert metadata_out == metadata_in with tables.open_file(filename, "r") as file: - metadata_out = meta.read_metadata(file, path=metadata_path) + metadata_out = meta.read_hdf5_metadata(file, path=metadata_path) assert metadata_out == metadata_in - - -def test_from_dict(): - prov = Provenance() - prov.start_activity("test") - prov.finish_activity() - prov_activity = prov.finished_activities[0] - - reference = meta.Reference( - contact=meta.Contact( - name="Somebody", - email="a@b.com", - organization="CTA Consortium", - ), - product=meta.Product( - description="An Amazing Product", - creation_time="2020-10-11 15:23:31", - data_category="Sim", - data_levels=["DL1_IMAGES", "DL1_PARAMETERS"], - data_association="Subarray", - data_model_name="Unofficial DL1", - data_model_version="1.0", - data_model_url="https://example.org", - format="hdf5", - ), - process=meta.Process(type_="Simulation", subtype="Prod3b", id_="423442"), - activity=meta.Activity.from_provenance(prov_activity.provenance), - instrument=meta.Instrument( - site="CTA-North", - class_="Array", - type_="Layout H1B", - version="1.0", - id_="threshold", - ), - ) - - as_dict = reference.to_dict() - back = meta.Reference.from_dict(as_dict) - assert back.to_dict() == as_dict