From 8824477ddc2a53e922d9cf298f0a1813c31cc299 Mon Sep 17 00:00:00 2001 From: Yang Date: Wed, 12 Jul 2023 16:34:28 +0200 Subject: [PATCH] era5 data ingestion --- src/zampy/conventions/ALMA.json | 4 + src/zampy/datasets/era5.py | 136 ++++++++++++++++++++++++++++--- src/zampy/datasets/utils.py | 5 +- src/zampy/reference/README.md | 9 ++ src/zampy/reference/variables.py | 1 + 5 files changed, 142 insertions(+), 13 deletions(-) create mode 100644 src/zampy/reference/README.md diff --git a/src/zampy/conventions/ALMA.json b/src/zampy/conventions/ALMA.json index 0e62826..fb9efc0 100644 --- a/src/zampy/conventions/ALMA.json +++ b/src/zampy/conventions/ALMA.json @@ -32,5 +32,9 @@ "variable": "Qh", "units": "watt/meter**2", "positive_dir": "downward" + }, + "surface_pressure": { + "variable": "Psurf", + "units": "pascal" } } \ No newline at end of file diff --git a/src/zampy/datasets/era5.py b/src/zampy/datasets/era5.py index 54e374f..83aa560 100644 --- a/src/zampy/datasets/era5.py +++ b/src/zampy/datasets/era5.py @@ -1,16 +1,22 @@ """ERA5 dataset.""" from pathlib import Path +from typing import List from typing import Union import numpy as np +import xarray as xr +from zampy.datasets import converter from zampy.datasets import utils from zampy.datasets import validation from zampy.datasets.dataset_protocol import Dataset from zampy.datasets.dataset_protocol import SpatialBounds from zampy.datasets.dataset_protocol import TimeBounds from zampy.datasets.dataset_protocol import Variable +from zampy.datasets.dataset_protocol import copy_properties_file from zampy.datasets.dataset_protocol import write_properties_file +from zampy.reference.variables import VARIABLE_REFERENCE_LOOKUP from zampy.reference.variables import unit_registry +from zampy.utils import regrid ## Ignore missing class/method docstrings: they are implemented in the Dataset class. @@ -31,6 +37,7 @@ class ERA5(Dataset): # noqa: D101 Variable(name="v10", unit=unit_registry.velocity), ) + # variable names used in cdsapi downloading request variable_names = ( "mean_total_precipitation_rate", "surface_thermal_radiation_downwards", @@ -59,7 +66,7 @@ def download( # noqa: PLR0913 download_dir: Path, time_bounds: TimeBounds, spatial_bounds: SpatialBounds, - variable_names: list[str], + variable_names: List[str], overwrite: bool = False, ) -> bool: validation.validate_download_request( @@ -94,22 +101,129 @@ def ingest( ingest_dir: Path, overwrite: bool = False, ) -> bool: + download_folder = download_dir / self.name + ingest_folder = ingest_dir / self.name + ingest_folder.mkdir(parents=True, exist_ok=True) + + data_file_pattern = "era5_*.nc" + data_files = list(download_folder.glob(data_file_pattern)) + + for file in data_files: + convert_to_zampy( + ingest_folder, + file=file, + overwrite=overwrite, + ) + + copy_properties_file(download_folder, ingest_folder) + return True - # def load( # noqa: PLR0913 - # self, - # ingest_dir: Path, - # time_bounds: TimeBounds, - # spatial_bounds: SpatialBounds, - # resolution: float, - # regrid_method: str, - # variable_names: List[str], - # ) -> None: - # pass + def load( # noqa: PLR0913 + self, + ingest_dir: Path, + time_bounds: TimeBounds, + spatial_bounds: SpatialBounds, + resolution: float, + regrid_method: str, + variable_names: List[str], + ) -> xr.Dataset: + files: List[Path] = [] + for var in self.variable_names: + if var in variable_names: + files += (ingest_dir / self.name).glob(f"era5_{var}*.nc") + + ds = xr.open_mfdataset(files, chunks={"latitude": 200, "longitude": 200}) + ds = ds.sel(time=slice(time_bounds.start, time_bounds.end)) + ds = regrid.regrid_data(ds, spatial_bounds, resolution, regrid_method) + + return ds def convert( self, ingest_dir: Path, convention: Union[str, Path], ) -> bool: + converter.check_convention(convention) + ingest_folder = ingest_dir / self.name + + data_file_pattern = "ETH_GlobalCanopyHeight_10m_2020_*.nc" + + data_files = list(ingest_folder.glob(data_file_pattern)) + + for file in data_files: + # start conversion process + print(f"Start processing file `{file.name}`.") + ds = xr.open_dataset(file, chunks={"x": 50, "y": 50}) + ds = converter.convert(ds, dataset=self, convention=convention) + # TODO: support derived variables + # TODO: other calculations + # call ds.compute() + return True + + +def convert_to_zampy( + ingest_folder: Path, + file: Path, + overwrite: bool = False, +) -> None: + """Convert the downloaded nc files to standard CF/Zampy netCDF files. + + The downloaded ERA5 data already follows CF1.6 convention. However, it uses + (abbreviated) variable name instead of standard name, which prohibits the format + conversion. Therefore we need to ingest the downloaded files and rename all + variables to standard names. + + Args: + ingest_folder: Folder where the files have to be written to. + file: Path to the ERA5 nc file. + overwrite: Overwrite all existing files. If False, file that already exist will + be skipped. + """ + ncfile = ingest_folder / file.with_suffix(".nc").name + if ncfile.exists() and not overwrite: + print(f"File '{ncfile.name}' already exists, skipping...") + else: + ds = parse_nc_file(file) + + ds.to_netcdf(path=ncfile) + + +var_reference_era5_to_zampy = { + "mtpr": "mean_total_precipitation_rate", + "strd": "surface_thermal_radiation", + "ssrd": "surface_solar_radiation", + "sp": "surface_pressure", + "u10": "10m_u_component_of_wind", + "v10": "10m_v_component_of_wind", +} + + +def parse_nc_file(file: Path) -> xr.Dataset: + """Parse the downloaded ERA5 nc files, to CF/Zampy standard dataset. + + Args: + file: Path to the ERA5 nc file. + + Returns: + CF/Zampy formatted xarray Dataset + """ + # Open chunked: will be dask array -> file writing can be parallelized. + ds = xr.open_dataset(file, chunks={"x": 50, "y": 50}) + + for variable in ds.variables: + if variable in var_reference_era5_to_zampy: + var = str(variable) # Cast to string to please mypy + variable_name = var_reference_era5_to_zampy[var] + ds = ds.rename({var: variable_name}) + ds[variable_name].attrs["units"] = str( + VARIABLE_REFERENCE_LOOKUP[variable_name].unit + ) + ds[variable_name].attrs["description"] = VARIABLE_REFERENCE_LOOKUP[ + variable_name + ].desc + + # TODO: add dataset attributes. + + return ds diff --git a/src/zampy/datasets/utils.py b/src/zampy/datasets/utils.py index 1aea9a8..580ced8 100644 --- a/src/zampy/datasets/utils.py +++ b/src/zampy/datasets/utils.py @@ -2,6 +2,7 @@ import itertools import urllib.request from pathlib import Path +from typing import List from typing import Optional from typing import Union import cdsapi @@ -69,7 +70,7 @@ def get_file_size(fpath: Path) -> int: def cds_request( # noqa: PLR0913 product: str, - variables: list[str], + variables: List[str], time_bounds: TimeBounds, spatial_bounds: SpatialBounds, path: Path, @@ -195,7 +196,7 @@ def cds_request( # noqa: PLR0913 ) -def time_bounds_to_year_month(time_bounds: TimeBounds) -> list[tuple[str, str]]: +def time_bounds_to_year_month(time_bounds: TimeBounds) -> List[tuple[str, str]]: """Return year/month pairs.""" date_range = pd.date_range(start=time_bounds.start, end=time_bounds.end, freq="M") year_month_pairs = [(str(date.year), str(date.month)) for date in date_range] diff --git a/src/zampy/reference/README.md b/src/zampy/reference/README.md new file mode 100644 index 0000000..630757d --- /dev/null +++ b/src/zampy/reference/README.md @@ -0,0 +1,9 @@ +### Reference + +To allow data ingestion into the unified `zampy` format and to enable variables conversion between different conventions, we need a unique identifier for each variable. Therefore we create a `variables.py` file to store the `zampy` unique variable names for reference. + +### Ingestion + +During data ingestion, all variables will be formatted to `zampy` variable format. + + \ No newline at end of file diff --git a/src/zampy/reference/variables.py b/src/zampy/reference/variables.py index 7daa865..5d7e9e2 100644 --- a/src/zampy/reference/variables.py +++ b/src/zampy/reference/variables.py @@ -22,6 +22,7 @@ Variable("air-temperature", unit_registry.kelvin), Variable("dewpoint-temperature", unit_registry.kelvin), Variable("relative-humidity", unit_registry.percent), + Variable("surface_pressure", unit_registry.pascal), Variable( "specific-humidity", unit_registry.fraction,