Skip to content

Commit

Permalink
era5 data ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
Yang committed Jul 12, 2023
1 parent 2d2825c commit 8824477
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 13 deletions.
4 changes: 4 additions & 0 deletions src/zampy/conventions/ALMA.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,9 @@
"variable": "Qh",
"units": "watt/meter**2",
"positive_dir": "downward"
},
"surface_pressure": {
"variable": "Psurf",
"units": "pascal"
}
}
136 changes: 125 additions & 11 deletions src/zampy/datasets/era5.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
"""ERA5 dataset."""

from pathlib import Path
from typing import List
from typing import Union
import numpy as np
import xarray as xr
from zampy.datasets import converter
from zampy.datasets import utils
from zampy.datasets import validation
from zampy.datasets.dataset_protocol import Dataset
from zampy.datasets.dataset_protocol import SpatialBounds
from zampy.datasets.dataset_protocol import TimeBounds
from zampy.datasets.dataset_protocol import Variable
from zampy.datasets.dataset_protocol import copy_properties_file
from zampy.datasets.dataset_protocol import write_properties_file
from zampy.reference.variables import VARIABLE_REFERENCE_LOOKUP
from zampy.reference.variables import unit_registry
from zampy.utils import regrid


## Ignore missing class/method docstrings: they are implemented in the Dataset class.
Expand All @@ -31,6 +37,7 @@ class ERA5(Dataset): # noqa: D101
Variable(name="v10", unit=unit_registry.velocity),
)

# variable names used in cdsapi downloading request
variable_names = (
"mean_total_precipitation_rate",
"surface_thermal_radiation_downwards",
Expand Down Expand Up @@ -59,7 +66,7 @@ def download( # noqa: PLR0913
download_dir: Path,
time_bounds: TimeBounds,
spatial_bounds: SpatialBounds,
variable_names: list[str],
variable_names: List[str],
overwrite: bool = False,
) -> bool:
validation.validate_download_request(
Expand Down Expand Up @@ -94,22 +101,129 @@ def ingest(
ingest_dir: Path,
overwrite: bool = False,
) -> bool:
download_folder = download_dir / self.name
ingest_folder = ingest_dir / self.name
ingest_folder.mkdir(parents=True, exist_ok=True)

data_file_pattern = "era5_*.nc"
data_files = list(download_folder.glob(data_file_pattern))

for file in data_files:
convert_to_zampy(
ingest_folder,
file=file,
overwrite=overwrite,
)

copy_properties_file(download_folder, ingest_folder)

return True

# def load( # noqa: PLR0913
# self,
# ingest_dir: Path,
# time_bounds: TimeBounds,
# spatial_bounds: SpatialBounds,
# resolution: float,
# regrid_method: str,
# variable_names: List[str],
# ) -> None:
# pass
def load( # noqa: PLR0913
self,
ingest_dir: Path,
time_bounds: TimeBounds,
spatial_bounds: SpatialBounds,
resolution: float,
regrid_method: str,
variable_names: List[str],
) -> xr.Dataset:
files: List[Path] = []
for var in self.variable_names:
if var in variable_names:
files += (ingest_dir / self.name).glob(f"era5_{var}*.nc")

ds = xr.open_mfdataset(files, chunks={"latitude": 200, "longitude": 200})
ds = ds.sel(time=slice(time_bounds.start, time_bounds.end))
ds = regrid.regrid_data(ds, spatial_bounds, resolution, regrid_method)

return ds

def convert(
self,
ingest_dir: Path,
convention: Union[str, Path],
) -> bool:
converter.check_convention(convention)
ingest_folder = ingest_dir / self.name

data_file_pattern = "ETH_GlobalCanopyHeight_10m_2020_*.nc"

data_files = list(ingest_folder.glob(data_file_pattern))

for file in data_files:
# start conversion process
print(f"Start processing file `{file.name}`.")
ds = xr.open_dataset(file, chunks={"x": 50, "y": 50})
ds = converter.convert(ds, dataset=self, convention=convention)
# TODO: support derived variables
# TODO: other calculations
# call ds.compute()

return True


def convert_to_zampy(
ingest_folder: Path,
file: Path,
overwrite: bool = False,
) -> None:
"""Convert the downloaded nc files to standard CF/Zampy netCDF files.
The downloaded ERA5 data already follows CF1.6 convention. However, it uses
(abbreviated) variable name instead of standard name, which prohibits the format
conversion. Therefore we need to ingest the downloaded files and rename all
variables to standard names.
Args:
ingest_folder: Folder where the files have to be written to.
file: Path to the ERA5 nc file.
overwrite: Overwrite all existing files. If False, file that already exist will
be skipped.
"""
ncfile = ingest_folder / file.with_suffix(".nc").name
if ncfile.exists() and not overwrite:
print(f"File '{ncfile.name}' already exists, skipping...")
else:
ds = parse_nc_file(file)

ds.to_netcdf(path=ncfile)


var_reference_era5_to_zampy = {
"mtpr": "mean_total_precipitation_rate",
"strd": "surface_thermal_radiation",
"ssrd": "surface_solar_radiation",
"sp": "surface_pressure",
"u10": "10m_u_component_of_wind",
"v10": "10m_v_component_of_wind",
}


def parse_nc_file(file: Path) -> xr.Dataset:
"""Parse the downloaded ERA5 nc files, to CF/Zampy standard dataset.
Args:
file: Path to the ERA5 nc file.
Returns:
CF/Zampy formatted xarray Dataset
"""
# Open chunked: will be dask array -> file writing can be parallelized.
ds = xr.open_dataset(file, chunks={"x": 50, "y": 50})

for variable in ds.variables:
if variable in var_reference_era5_to_zampy:
var = str(variable) # Cast to string to please mypy
variable_name = var_reference_era5_to_zampy[var]
ds = ds.rename({var: variable_name})
ds[variable_name].attrs["units"] = str(
VARIABLE_REFERENCE_LOOKUP[variable_name].unit
)
ds[variable_name].attrs["description"] = VARIABLE_REFERENCE_LOOKUP[
variable_name
].desc

# TODO: add dataset attributes.

return ds
5 changes: 3 additions & 2 deletions src/zampy/datasets/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import itertools
import urllib.request
from pathlib import Path
from typing import List
from typing import Optional
from typing import Union
import cdsapi
Expand Down Expand Up @@ -69,7 +70,7 @@ def get_file_size(fpath: Path) -> int:

def cds_request( # noqa: PLR0913
product: str,
variables: list[str],
variables: List[str],
time_bounds: TimeBounds,
spatial_bounds: SpatialBounds,
path: Path,
Expand Down Expand Up @@ -195,7 +196,7 @@ def cds_request( # noqa: PLR0913
)


def time_bounds_to_year_month(time_bounds: TimeBounds) -> list[tuple[str, str]]:
def time_bounds_to_year_month(time_bounds: TimeBounds) -> List[tuple[str, str]]:
"""Return year/month pairs."""
date_range = pd.date_range(start=time_bounds.start, end=time_bounds.end, freq="M")
year_month_pairs = [(str(date.year), str(date.month)) for date in date_range]
Expand Down
9 changes: 9 additions & 0 deletions src/zampy/reference/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
### Reference

To allow data ingestion into the unified `zampy` format and to enable variables conversion between different conventions, we need a unique identifier for each variable. Therefore we create a `variables.py` file to store the `zampy` unique variable names for reference.

### Ingestion

During data ingestion, all variables will be formatted to `zampy` variable format.


1 change: 1 addition & 0 deletions src/zampy/reference/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
Variable("air-temperature", unit_registry.kelvin),
Variable("dewpoint-temperature", unit_registry.kelvin),
Variable("relative-humidity", unit_registry.percent),
Variable("surface_pressure", unit_registry.pascal),
Variable(
"specific-humidity",
unit_registry.fraction,
Expand Down

0 comments on commit 8824477

Please sign in to comment.