diff --git a/README.md b/README.md index 1da61af..9a40b89 100644 --- a/README.md +++ b/README.md @@ -24,4 +24,17 @@ Tool for downloading Land Surface Model input data - Load recipes using Pydantic ([for example](https://github.com/phenology/springtime/blob/main/src/springtime/datasets/daymet.py)). - Support both a CLI & Python API. -Note: items in *italic* will not be worked on for now/low priority, but we want to allow space for these in the future. \ No newline at end of file +Note: items in *italic* will not be worked on for now/low priority, but we want to allow space for these in the future. + +## Instructions for CDS datasets (e.g. ERA5) +To download the following datasets, users need access to CDS via cdsapi: + +- ERA5 +- ERA5 land +- LAI + +First, you need to be a registered user on CDS via the [registration page](https://cds.climate.copernicus.eu/user/register?destination=%2F%23!%2Fhome). + +Before submitting any request with `zampy`, please configure your `.cdsapirc` file following the instructions on https://cds.climate.copernicus.eu/api-how-to. + +When downloading a dataset for the first time, it is **necessary to agree to the Terms of Use of every datasets that you intend to download**. This can only be done via the CDS website. When you try to download these datasets, you will be prompted to go to the terms of use and accept them. \ No newline at end of file diff --git a/demo/era5_dataset_demo.ipynb b/demo/era5_dataset_demo.ipynb new file mode 100644 index 0000000..5232850 --- /dev/null +++ b/demo/era5_dataset_demo.ipynb @@ -0,0 +1,1204 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Handle ERA5 dataset with Zampy\n", + "Demo notebook for developers." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "from zampy.datasets import ERA5\n", + "from zampy.datasets.dataset_protocol import TimeBounds, SpatialBounds\n", + "from pathlib import Path" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "work_dir = Path(\"/home/yangliu/EcoExtreML/temp\")\n", + "download_dir = work_dir / \"download\"\n", + "ingest_dir = work_dir / \"ingest\"\n", + "times = TimeBounds(np.datetime64(\"2010-01-01T00:00:00\"), np.datetime64(\"2010-01-31T23:00:00\"))\n", + "bbox_demo = SpatialBounds(54, 56, 1, 3)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Download dataset." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "File 'era5_10m_v_component_of_wind_2010-1.nc' already exists, skipping...\n" + ] + }, + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "era5_dataset = ERA5()\n", + "era5_dataset.download(\n", + " download_dir=download_dir,\n", + " time_bounds=times,\n", + " spatial_bounds=bbox_demo,\n", + " variable_names=[\"10m_v_component_of_wind\"], #\"surface_pressure\", \"mean_total_precipitation_rate\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Data ingestion to the unified format in `zampy`." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "File 'era5_10m_v_component_of_wind_2010-1.nc' already exists, skipping...\n" + ] + }, + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "era5_dataset.ingest(download_dir, ingest_dir)" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "ds = era5_dataset.load(\n", + " ingest_dir=ingest_dir,\n", + " time_bounds=times,\n", + " spatial_bounds=bbox_demo,\n", + " variable_names=[\"10m_v_component_of_wind\"],\n", + " resolution=1.0,\n", + " regrid_method=\"flox\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "
<xarray.Dataset>\n",
+       "Dimensions:                  (time: 744, latitude: 54, longitude: 54)\n",
+       "Coordinates:\n",
+       "  * time                     (time) datetime64[ns] 2010-01-01 ... 2010-01-31T...\n",
+       "  * latitude                 (latitude) float64 1.0 2.0 3.0 ... 52.0 53.0 54.0\n",
+       "  * longitude                (longitude) float64 3.0 4.0 5.0 ... 54.0 55.0 56.0\n",
+       "Data variables:\n",
+       "    10m_v_component_of_wind  (time, latitude, longitude) float32 dask.array<chunksize=(744, 54, 54), meta=np.ndarray>\n",
+       "Attributes:\n",
+       "    Conventions:  CF-1.6\n",
+       "    history:      2023-07-11 10:17:10 GMT by grib_to_netcdf-2.25.1: /opt/ecmw...
" + ], + "text/plain": [ + "\n", + "Dimensions: (time: 744, latitude: 54, longitude: 54)\n", + "Coordinates:\n", + " * time (time) datetime64[ns] 2010-01-01 ... 2010-01-31T...\n", + " * latitude (latitude) float64 1.0 2.0 3.0 ... 52.0 53.0 54.0\n", + " * longitude (longitude) float64 3.0 4.0 5.0 ... 54.0 55.0 56.0\n", + "Data variables:\n", + " 10m_v_component_of_wind (time, latitude, longitude) float32 dask.array\n", + "Attributes:\n", + " Conventions: CF-1.6\n", + " history: 2023-07-11 10:17:10 GMT by grib_to_netcdf-2.25.1: /opt/ecmw..." + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ds" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "10m_v_component_of_wind renamed to Wind_E.\n", + "Conversion of dataset 'era5' following ALMA convention is complete!\n" + ] + } + ], + "source": [ + "from zampy.datasets import converter\n", + "\n", + "ds_convert = converter.convert(ds, era5_dataset, \"ALMA\")" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "
<xarray.Dataset>\n",
+       "Dimensions:    (time: 744, latitude: 54, longitude: 54)\n",
+       "Coordinates:\n",
+       "  * time       (time) datetime64[ns] 2010-01-01 ... 2010-01-31T23:00:00\n",
+       "  * latitude   (latitude) float64 1.0 2.0 3.0 4.0 5.0 ... 51.0 52.0 53.0 54.0\n",
+       "  * longitude  (longitude) float64 3.0 4.0 5.0 6.0 7.0 ... 53.0 54.0 55.0 56.0\n",
+       "Data variables:\n",
+       "    Wind_E     (time, latitude, longitude) float32 dask.array<chunksize=(744, 54, 54), meta=np.ndarray>\n",
+       "Attributes:\n",
+       "    Conventions:  ALMA\n",
+       "    history:      2023-07-11 10:17:10 GMT by grib_to_netcdf-2.25.1: /opt/ecmw...
" + ], + "text/plain": [ + "\n", + "Dimensions: (time: 744, latitude: 54, longitude: 54)\n", + "Coordinates:\n", + " * time (time) datetime64[ns] 2010-01-01 ... 2010-01-31T23:00:00\n", + " * latitude (latitude) float64 1.0 2.0 3.0 4.0 5.0 ... 51.0 52.0 53.0 54.0\n", + " * longitude (longitude) float64 3.0 4.0 5.0 6.0 7.0 ... 53.0 54.0 55.0 56.0\n", + "Data variables:\n", + " Wind_E (time, latitude, longitude) float32 dask.array\n", + "Attributes:\n", + " Conventions: ALMA\n", + " history: 2023-07-11 10:17:10 GMT by grib_to_netcdf-2.25.1: /opt/ecmw..." + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ds_convert" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "ecoextreml", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.0" + }, + "orig_nbformat": 4 + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/eth_dataset_demo.ipynb b/demo/eth_dataset_demo.ipynb similarity index 100% rename from eth_dataset_demo.ipynb rename to demo/eth_dataset_demo.ipynb diff --git a/pyproject.toml b/pyproject.toml index 4816f7e..ad83aae 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -62,6 +62,7 @@ dependencies = [ "cf_xarray", # required to auto-pint CF compliant datasets. "pint-xarray", "flox", # For better groupby methods. + "cdsapi", ] dynamic = ["version"] @@ -142,6 +143,7 @@ extend-select = [ ] ignore = [ "PLR2004", # magic value used in comparsion (i.e. `if ndays == 28: month_is_feb`). + "PLR0913", # too many arguments ] line-length = 88 exclude = ["docs", "build"] diff --git a/src/zampy/conventions/ALMA.json b/src/zampy/conventions/ALMA.json index 0e62826..8398410 100644 --- a/src/zampy/conventions/ALMA.json +++ b/src/zampy/conventions/ALMA.json @@ -23,6 +23,16 @@ "units": "watt/meter**2", "positive_dir": "downward" }, + "surface_solar_radiation_downwards": { + "variable": "SWdown", + "units": "watt/meter**2", + "positive_dir": "downward" + }, + "surface_thermal_radiation_downwards": { + "variable": "LWdown", + "units": "watt/meter**2", + "positive_dir": "downward" + }, "latent_heat_flux": { "variable": "Qle", "units": "watt/meter**2", @@ -32,5 +42,23 @@ "variable": "Qh", "units": "watt/meter**2", "positive_dir": "downward" + }, + "surface_pressure": { + "variable": "Psurf", + "units": "pascal" + }, + "eastward_component_of_wind": { + "variable": "Wind_E", + "units": "meter/second", + "positive_dir": "eastward" + }, + "northward_component_of_wind": { + "variable": "Wind_N", + "units": "meter/second", + "positive_dir": "northward" + }, + "total_precipitation": { + "variable": "Rainf", + "units": "millimeter/second" } } \ No newline at end of file diff --git a/src/zampy/datasets/__init__.py b/src/zampy/datasets/__init__.py index fdff791..e1b1724 100644 --- a/src/zampy/datasets/__init__.py +++ b/src/zampy/datasets/__init__.py @@ -1,7 +1,8 @@ """Datasets implementations.""" from zampy.datasets import dataset_protocol from zampy.datasets import validation +from zampy.datasets.era5 import ERA5 from zampy.datasets.eth_canopy_height import EthCanopyHeight -__all__ = ["dataset_protocol", "validation", "EthCanopyHeight"] +__all__ = ["dataset_protocol", "validation", "EthCanopyHeight", "ERA5"] diff --git a/src/zampy/datasets/converter.py b/src/zampy/datasets/converter.py index 02c21e1..f641950 100644 --- a/src/zampy/datasets/converter.py +++ b/src/zampy/datasets/converter.py @@ -7,11 +7,11 @@ import pint_xarray # noqa: F401 import xarray as xr from zampy.datasets.dataset_protocol import Dataset +from zampy.reference.variables import unit_registry -CONVENTIONS = { - "ALMA": "src/zampy/conventions/ALMA.json", -} +CONVENTIONS = ["ALMA"] +conventions_path = Path(__file__).resolve().parents[1] / "conventions" def check_convention(convention: Union[str, Path]) -> None: @@ -48,7 +48,9 @@ def convert( """ converted = False if isinstance(convention, str): - convention_file = Path(CONVENTIONS[convention]).open(mode="r", encoding="UTF8") + convention_file = (conventions_path / f"{convention}.json").open( + mode="r", encoding="UTF8" + ) else: convention_file = convention.open(mode="r", encoding="UTF8") @@ -58,7 +60,7 @@ def convert( convert_units = convention_dict[var.lower()]["units"] var_name = convention_dict[var.lower()]["variable"] var_units = data[var].attrs["units"] - if var_units != convert_units: + if unit_registry(var_units) != unit_registry(convert_units): # lazy dask array data = _convert_var(data, var, convert_units) data = data.rename({var: var_name}) @@ -75,6 +77,7 @@ def convert( f"Conversion of dataset '{dataset.name}' following {convention} " "convention is complete!" ) + data.attrs["Conventions"] = convention else: warnings.warn( f"All variables already follow the {convention} convention or " diff --git a/src/zampy/datasets/dataset_protocol.py b/src/zampy/datasets/dataset_protocol.py index 2803cff..77b7042 100644 --- a/src/zampy/datasets/dataset_protocol.py +++ b/src/zampy/datasets/dataset_protocol.py @@ -88,7 +88,7 @@ def __init__(self) -> None: ... @abstractmethod - def download( # noqa: PLR0913 + def download( self, download_dir: Path, time_bounds: TimeBounds, @@ -131,7 +131,7 @@ def ingest( ... @abstractmethod - def load( # noqa: PLR0913 + def load( self, ingest_dir: Path, time_bounds: TimeBounds, diff --git a/src/zampy/datasets/era5.py b/src/zampy/datasets/era5.py new file mode 100644 index 0000000..6c7c64d --- /dev/null +++ b/src/zampy/datasets/era5.py @@ -0,0 +1,247 @@ +"""ERA5 dataset.""" + +from pathlib import Path +from typing import List +from typing import Union +import numpy as np +import xarray as xr +from zampy.datasets import converter +from zampy.datasets import utils +from zampy.datasets import validation +from zampy.datasets.dataset_protocol import Dataset +from zampy.datasets.dataset_protocol import SpatialBounds +from zampy.datasets.dataset_protocol import TimeBounds +from zampy.datasets.dataset_protocol import Variable +from zampy.datasets.dataset_protocol import copy_properties_file +from zampy.datasets.dataset_protocol import write_properties_file +from zampy.reference.variables import VARIABLE_REFERENCE_LOOKUP +from zampy.reference.variables import unit_registry +from zampy.utils import regrid + + +## Ignore missing class/method docstrings: they are implemented in the Dataset class. +# ruff: noqa: D102 + + +class ERA5(Dataset): # noqa: D101 + name = "era5" + time_bounds = TimeBounds(np.datetime64("1940-01-01"), np.datetime64("2023-06-30")) + spatial_bounds = SpatialBounds(90, 180, -90, -180) + + raw_variables = ( + Variable(name="mtpr", unit=unit_registry.kilogram_per_square_meter_second), + Variable(name="strd", unit=unit_registry.joule_per_square_meter), + Variable(name="ssrd", unit=unit_registry.joule_per_square_meter), + Variable(name="sp", unit=unit_registry.pascal), + Variable(name="u10", unit=unit_registry.meter_per_second), + Variable(name="v10", unit=unit_registry.meter_per_second), + ) + + # variable names used in cdsapi downloading request + variable_names = ( + "mean_total_precipitation_rate", + "surface_thermal_radiation_downwards", + "surface_solar_radiation_downwards", + "surface_pressure", + "10m_u_component_of_wind", + "10m_v_component_of_wind", + ) + + license = "cc-by-4.0" + bib = """ + @article{hersbach2020era5, + title={The ERA5 global reanalysis}, + author={Hersbach, Hans et al.}, + journal={Quarterly Journal of the Royal Meteorological Society}, + volume={146}, + number={730}, + pages={1999--2049}, + year={2020}, + publisher={Wiley Online Library} + } + """ + + def download( + self, + download_dir: Path, + time_bounds: TimeBounds, + spatial_bounds: SpatialBounds, + variable_names: List[str], + overwrite: bool = False, + ) -> bool: + validation.validate_download_request( + self, + download_dir, + time_bounds, + spatial_bounds, + variable_names, + ) + + download_folder = download_dir / self.name + download_folder.mkdir(parents=True, exist_ok=True) + + utils.cds_request( + dataset="reanalysis-era5-single-levels", + variables=variable_names, + time_bounds=time_bounds, + spatial_bounds=spatial_bounds, + path=download_folder, + overwrite=overwrite, + ) + + write_properties_file( + download_folder, spatial_bounds, time_bounds, variable_names + ) + + return True + + def ingest( + self, + download_dir: Path, + ingest_dir: Path, + overwrite: bool = False, + ) -> bool: + download_folder = download_dir / self.name + ingest_folder = ingest_dir / self.name + ingest_folder.mkdir(parents=True, exist_ok=True) + + data_file_pattern = "era5_*.nc" + data_files = list(download_folder.glob(data_file_pattern)) + + for file in data_files: + convert_to_zampy( + ingest_folder, + file=file, + overwrite=overwrite, + ) + + copy_properties_file(download_folder, ingest_folder) + + return True + + def load( + self, + ingest_dir: Path, + time_bounds: TimeBounds, + spatial_bounds: SpatialBounds, + resolution: float, + regrid_method: str, + variable_names: List[str], + ) -> xr.Dataset: + files: List[Path] = [] + for var in self.variable_names: + if var in variable_names: + files += (ingest_dir / self.name).glob(f"era5_{var}*.nc") + + ds = xr.open_mfdataset(files, chunks={"latitude": 200, "longitude": 200}) + ds = ds.sel(time=slice(time_bounds.start, time_bounds.end)) + ds = regrid.regrid_data(ds, spatial_bounds, resolution, regrid_method) + + return ds + + def convert( + self, + ingest_dir: Path, + convention: Union[str, Path], + ) -> bool: + converter.check_convention(convention) + ingest_folder = ingest_dir / self.name + + data_file_pattern = "era5_*.nc" + + data_files = list(ingest_folder.glob(data_file_pattern)) + + for file in data_files: + # start conversion process + print(f"Start processing file `{file.name}`.") + ds = xr.open_dataset(file, chunks={"x": 50, "y": 50}) + ds = converter.convert(ds, dataset=self, convention=convention) + # TODO: support derived variables + # TODO: other calculations + # call ds.compute() + + return True + + +def convert_to_zampy( + ingest_folder: Path, + file: Path, + overwrite: bool = False, +) -> None: + """Convert the downloaded nc files to standard CF/Zampy netCDF files. + + The downloaded ERA5 data already follows CF1.6 convention. However, it uses + (abbreviated) variable name instead of standard name, which prohibits the format + conversion. Therefore we need to ingest the downloaded files and rename all + variables to standard names. + + Args: + ingest_folder: Folder where the files have to be written to. + file: Path to the ERA5 nc file. + overwrite: Overwrite all existing files. If False, file that already exist will + be skipped. + """ + ncfile = ingest_folder / file.with_suffix(".nc").name + if ncfile.exists() and not overwrite: + print(f"File '{ncfile.name}' already exists, skipping...") + else: + ds = parse_nc_file(file) + + ds.to_netcdf(path=ncfile) + + +var_reference_era5_to_zampy = { + "mtpr": "total_precipitation", + "strd": "surface_thermal_radiation_downwards", + "ssrd": "surface_solar_radiation_downwards", + "sp": "surface_pressure", + "u10": "eastward_component_of_wind", + "v10": "northward_component_of_wind", +} + +WATER_DENSITY = 997.0 # kg/m3 + + +def parse_nc_file(file: Path) -> xr.Dataset: + """Parse the downloaded ERA5 nc files, to CF/Zampy standard dataset. + + Args: + file: Path to the ERA5 nc file. + + Returns: + CF/Zampy formatted xarray Dataset + """ + # Open chunked: will be dask array -> file writing can be parallelized. + ds = xr.open_dataset(file, chunks={"x": 50, "y": 50}) + + for variable in ds.variables: + if variable in var_reference_era5_to_zampy: + var = str(variable) # Cast to string to please mypy + variable_name = var_reference_era5_to_zampy[var] + ds = ds.rename({var: variable_name}) + # convert radiation to flux J/m2 to W/m2 + # https://confluence.ecmwf.int/pages/viewpage.action?pageId=155337784 + if variable_name in ( + "surface_solar_radiation_downwards", + "surface_thermal_radiation_downwards", + ): + ds[variable_name] = ds[variable_name] / 3600 + # conversion precipitation kg/m2s to mm/s + elif variable_name == "total_precipitation": + ds[variable_name] = ds[variable_name] / WATER_DENSITY + ds[variable_name].attrs["units"] = "meter_per_second" + # convert from m/s to mm/s + ds = converter._convert_var( + ds, variable_name, VARIABLE_REFERENCE_LOOKUP[variable_name].unit + ) + + ds[variable_name].attrs["units"] = str( + VARIABLE_REFERENCE_LOOKUP[variable_name].unit + ) + ds[variable_name].attrs["description"] = VARIABLE_REFERENCE_LOOKUP[ + variable_name + ].desc + + # TODO: add dataset attributes. + + return ds diff --git a/src/zampy/datasets/eth_canopy_height.py b/src/zampy/datasets/eth_canopy_height.py index bae9322..9e8a2a0 100644 --- a/src/zampy/datasets/eth_canopy_height.py +++ b/src/zampy/datasets/eth_canopy_height.py @@ -55,7 +55,7 @@ class EthCanopyHeight(Dataset): # noqa: D101 data_url = "https://share.phys.ethz.ch/~pf/nlangdata/ETH_GlobalCanopyHeight_10m_2020_version1/3deg_cogs/" - def download( # noqa: PLR0913 + def download( self, download_dir: Path, time_bounds: TimeBounds, @@ -121,7 +121,7 @@ def ingest( return True - def load( # noqa: PLR0913 + def load( self, ingest_dir: Path, time_bounds: TimeBounds, diff --git a/src/zampy/datasets/utils.py b/src/zampy/datasets/utils.py index 1b27975..c6a6214 100644 --- a/src/zampy/datasets/utils.py +++ b/src/zampy/datasets/utils.py @@ -1,10 +1,24 @@ """Shared utilities from datasets.""" import urllib.request from pathlib import Path +from typing import List from typing import Optional +from typing import Tuple from typing import Union +import cdsapi +import pandas as pd import requests from tqdm import tqdm +from tqdm.contrib.itertools import product +from zampy.datasets.dataset_protocol import SpatialBounds +from zampy.datasets.dataset_protocol import TimeBounds + + +PRODUCT_FNAME = { + "reanalysis-era5-single-levels": "era5", + "reanalysis-era5-land": "era5-land", +} +CDSAPI_CONFIG_PATH = Path.home() / ".cdsapirc" class TqdmUpdate(tqdm): @@ -54,3 +68,96 @@ def get_file_size(fpath: Path) -> int: return 0 else: return fpath.stat().st_size + + +def cds_request( + dataset: str, + variables: List[str], + time_bounds: TimeBounds, + spatial_bounds: SpatialBounds, + path: Path, + overwrite: bool, +) -> None: + """Download data via CDS API. + + To raise a request via CDS API, the user needs to set up the + configuration file `.cdsapirc` following the instructions on + https://cds.climate.copernicus.eu/api-how-to. + + Following the efficiency tips of request, + https://confluence.ecmwf.int/display/CKB/Climate+Data+Store+%28CDS%29+documentation + The downloading is organized by asking for one month of data per request. + + Args: + dataset: Dataset name for retrieval via `cdsapi`. + variables: Zampy variable. + time_bounds: Zampy time bounds object. + spatial_bounds: Zampy spatial bounds object. + path: File path to which the data should be saved. + overwrite: If an existing file (of the same size!) should be overwritten. + """ + fname = PRODUCT_FNAME[dataset] + + with CDSAPI_CONFIG_PATH.open(encoding="utf8") as f: + url = f.readline().split(":", 1)[1].strip() + api_key = f.readline().split(":", 1)[1].strip() + + c = cdsapi.Client( + url=url, + key=api_key, + verify=True, + quiet=True, + ) + + # create list of year/month pairs + year_month_pairs = time_bounds_to_year_month(time_bounds) + + for (year, month), variable in product( + year_month_pairs, variables, position=0, leave=True + ): + # raise download request + r = c.retrieve( + dataset, + { + "product_type": "reanalysis", + "variable": [variable], + "year": year, + "month": month, + # fmt: off + "day": [ + "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", + "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", + "21", "22", "23", "24", "25", "26", "27", "28", "29", "30", + "31", + ], + "time": [ + "00:00", "01:00", "02:00", "03:00", "04:00", "05:00", "06:00", + "07:00", "08:00", "09:00", "10:00", "11:00", "12:00", "13:00", + "14:00", "15:00", "16:00", "17:00", "18:00", "19:00", "20:00", + "21:00", "22:00", "23:00", + ], + # fmt: on + "area": [ + spatial_bounds.north, + spatial_bounds.west, + spatial_bounds.south, + spatial_bounds.east, + ], + "format": "netcdf", + }, + ) + # check existence and overwrite + fpath = path / f"{fname}_{variable}_{year}-{month}.nc" + + if get_file_size(fpath) != r.content_length or overwrite: + r.download(fpath) + tqdm.write(f"Download {fpath.name} successfully.") + else: + print(f"File '{fpath.name}' already exists, skipping...") + + +def time_bounds_to_year_month(time_bounds: TimeBounds) -> List[Tuple[str, str]]: + """Return year/month pairs.""" + date_range = pd.date_range(start=time_bounds.start, end=time_bounds.end, freq="M") + year_month_pairs = [(str(date.year), str(date.month)) for date in date_range] + return year_month_pairs diff --git a/src/zampy/reference/README.md b/src/zampy/reference/README.md new file mode 100644 index 0000000..630757d --- /dev/null +++ b/src/zampy/reference/README.md @@ -0,0 +1,9 @@ +### Reference + +To allow data ingestion into the unified `zampy` format and to enable variables conversion between different conventions, we need a unique identifier for each variable. Therefore we create a `variables.py` file to store the `zampy` unique variable names for reference. + +### Ingestion + +During data ingestion, all variables will be formatted to `zampy` variable format. + + \ No newline at end of file diff --git a/src/zampy/reference/variables.py b/src/zampy/reference/variables.py index 9ef9709..9968727 100644 --- a/src/zampy/reference/variables.py +++ b/src/zampy/reference/variables.py @@ -3,23 +3,42 @@ from zampy.datasets.dataset_protocol import Variable -unit_registry = UnitRegistry() -unit_registry.define("fraction = [] = frac") -unit_registry.define("percent = 1e-2 frac = pct") -unit_registry.define("ppm = 1e-6 fraction") -unit_registry.define("degree_north = degree = degree_N = degreeN") -unit_registry.define("degree_east = degree = degree_E = degreeE") +def unit_registration() -> UnitRegistry: + """Create unit registration for all custom units.""" + unit_registry = UnitRegistry() + unit_registry.define("fraction = [] = frac") + unit_registry.define("percent = 1e-2 frac = pct") + unit_registry.define("ppm = 1e-6 fraction") + unit_registry.define("degree_north = degree = degree_N = degreeN") + unit_registry.define("degree_east = degree = degree_E = degreeE") + unit_registry.define("watt_per_square_meter = watt/meter**2") + unit_registry.define("joule_per_square_meter = joule/meter**2") + unit_registry.define( + "kilogram_per_square_meter_second = kilogram/(meter**2*second)" + ) + unit_registry.define("milimeter_per_second = watt/meter**2") + return unit_registry + +unit_registry = unit_registration() # By default, we use the variable names and units following the CF convention: # https://cfconventions.org/Data/cf-standard-names/current/build/cf-standard-name-table.html VARIABLE_REFERENCE = ( - Variable("air-temperature", unit_registry.kelvin), - Variable("dewpoint-temperature", unit_registry.kelvin), - Variable("relative-humidity", unit_registry.percent), + Variable("air_temperature", unit_registry.kelvin), + Variable("dewpoint_temperature", unit_registry.kelvin), + Variable("relative_humidity", unit_registry.percent), + Variable("surface_pressure", unit_registry.pascal), + Variable("eastward_component_of_wind", unit_registry.meter_per_second), + Variable("northward_component_of_wind", unit_registry.meter_per_second), + Variable("surface_solar_radiation_downwards", unit_registry.watt_per_square_meter), + Variable( + "surface_thermal_radiation_downwards", unit_registry.watt_per_square_meter + ), + Variable("total_precipitation", unit_registry.millimeter_per_second), Variable( - "specific-humidity", + "specific_humidity", unit_registry.fraction, desc="Mass fraction of water in air.", ), diff --git a/tests/test_data/era5/era5_10m_u_component_of_wind_1996-1.nc b/tests/test_data/era5/era5_10m_u_component_of_wind_1996-1.nc new file mode 100644 index 0000000..202dfb3 Binary files /dev/null and b/tests/test_data/era5/era5_10m_u_component_of_wind_1996-1.nc differ diff --git a/tests/test_data/era5/era5_10m_v_component_of_wind_1996-1.nc b/tests/test_data/era5/era5_10m_v_component_of_wind_1996-1.nc new file mode 100644 index 0000000..f5ba129 Binary files /dev/null and b/tests/test_data/era5/era5_10m_v_component_of_wind_1996-1.nc differ diff --git a/tests/test_data/era5/era5_mean_total_precipitation_rate_1996-1.nc b/tests/test_data/era5/era5_mean_total_precipitation_rate_1996-1.nc new file mode 100644 index 0000000..d0018cc Binary files /dev/null and b/tests/test_data/era5/era5_mean_total_precipitation_rate_1996-1.nc differ diff --git a/tests/test_data/era5/era5_surface_pressure_1996-1.nc b/tests/test_data/era5/era5_surface_pressure_1996-1.nc new file mode 100644 index 0000000..a69a103 Binary files /dev/null and b/tests/test_data/era5/era5_surface_pressure_1996-1.nc differ diff --git a/tests/test_data/era5/era5_surface_solar_radiation_downwards_1996-1.nc b/tests/test_data/era5/era5_surface_solar_radiation_downwards_1996-1.nc new file mode 100644 index 0000000..559909f Binary files /dev/null and b/tests/test_data/era5/era5_surface_solar_radiation_downwards_1996-1.nc differ diff --git a/tests/test_data/era5/era5_surface_thermal_radiation_downwards_1996-1.nc b/tests/test_data/era5/era5_surface_thermal_radiation_downwards_1996-1.nc new file mode 100644 index 0000000..2528c69 Binary files /dev/null and b/tests/test_data/era5/era5_surface_thermal_radiation_downwards_1996-1.nc differ diff --git a/tests/test_data/era5/properties.json b/tests/test_data/era5/properties.json new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_datasets/test_era5.py b/tests/test_datasets/test_era5.py new file mode 100644 index 0000000..c3908b5 --- /dev/null +++ b/tests/test_datasets/test_era5.py @@ -0,0 +1,218 @@ +"""Unit test for ERA5 dataset.""" + +import json +from pathlib import Path +from unittest.mock import patch +import numpy as np +import pytest +import xarray as xr +from zampy.datasets import era5 +from zampy.datasets.dataset_protocol import SpatialBounds +from zampy.datasets.dataset_protocol import TimeBounds +from . import data_folder + + +@pytest.fixture(scope="function") +def valid_path_cds(tmp_path_factory): + """Create a dummy .cdsapirc file.""" + fn = tmp_path_factory.mktemp("usrhome") / ".cdsapirc" + with open(fn, mode="w", encoding="utf-8") as f: + f.write("url: a\nkey: 123:abc-def") + return fn + + +@pytest.fixture(scope="function") +def dummy_dir(tmp_path_factory): + """Create a dummpy directory for testing.""" + return tmp_path_factory.mktemp("data") + + +class TestERA5: + """Test the ERA5 class.""" + + @patch("cdsapi.Client.retrieve") + def test_download(self, mock_retrieve, valid_path_cds, dummy_dir): + """Test download functionality. + Here we mock the downloading and save property file to a fake path. + """ + times = TimeBounds(np.datetime64("2010-01-01"), np.datetime64("2010-01-31")) + bbox = SpatialBounds(54, 56, 1, 3) + variable = ["10m_v_component_of_wind"] + download_dir = Path(dummy_dir, "download") + + era5_dataset = era5.ERA5() + # create a dummy .cdsapirc + patching = patch("zampy.datasets.utils.CDSAPI_CONFIG_PATH", valid_path_cds) + with patching: + era5_dataset.download( + download_dir=download_dir, + time_bounds=times, + spatial_bounds=bbox, + variable_names=variable, + overwrite=True, + ) + + # make sure that the download is called + mock_retrieve.assert_called_once_with( + "reanalysis-era5-single-levels", + { + "product_type": "reanalysis", + "variable": variable, + "year": "2010", + "month": "1", + # fmt: off + "day": [ + "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", + "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", + "21", "22", "23", "24", "25", "26", "27", "28", "29", "30", + "31", + ], + "time": [ + "00:00", "01:00", "02:00", "03:00", "04:00", "05:00", "06:00", + "07:00", "08:00", "09:00", "10:00", "11:00", "12:00", "13:00", + "14:00", "15:00", "16:00", "17:00", "18:00", "19:00", "20:00", + "21:00", "22:00", "23:00", + ], + # fmt: on + "area": [ + bbox.north, + bbox.west, + bbox.south, + bbox.east, + ], + "format": "netcdf", + }, + ) + + # check property file + with (download_dir / "era5" / "properties.json").open( + mode="r", encoding="utf-8" + ) as file: + json_dict = json.load(file) + # check property + assert json_dict["variable_names"] == variable + + def ingest_dummy_data(self, temp_dir): + """Ingest dummy tif data to nc for other tests.""" + era5_dataset = era5.ERA5() + era5_dataset.ingest(download_dir=data_folder, ingest_dir=Path(temp_dir)) + ds = xr.load_dataset( + Path( + temp_dir, + "era5", + "era5_10m_v_component_of_wind_1996-1.nc", + ) + ) + + return ds, era5_dataset + + def test_ingest(self, dummy_dir): + """Test ingest function.""" + ds, _ = self.ingest_dummy_data(dummy_dir) + assert type(ds) == xr.Dataset + + def test_load(self): + """Test load function.""" + times = TimeBounds(np.datetime64("1996-01-01"), np.datetime64("1996-01-02")) + bbox = SpatialBounds(39, -107, 37, -109) + variable = ["10m_v_component_of_wind"] + + era5_dataset = era5.ERA5() + + ds = era5_dataset.load( + ingest_dir=Path(data_folder), + time_bounds=times, + spatial_bounds=bbox, + variable_names=variable, + resolution=1.0, + regrid_method="flox", + ) + + # we assert the regridded coordinates + expected_lat = [37.0, 38.0, 39.0] + expected_lon = [-109.0, -108.0, -107.0] + + np.testing.assert_allclose(ds.latitude.values, expected_lat) + np.testing.assert_allclose(ds.longitude.values, expected_lon) + + def test_convert(self, dummy_dir): + """Test convert function.""" + _, era5_dataset = self.ingest_dummy_data(dummy_dir) + era5_dataset.convert(ingest_dir=Path(dummy_dir), convention="ALMA") + # TODO: finish this test when the function is complete. + + +def test_convert_to_zampy(dummy_dir): + """Test function for converting file to zampy format.""" + ingest_folder = Path(data_folder, "era5") + era5.convert_to_zampy( + ingest_folder=Path(dummy_dir), + file=Path(ingest_folder, "era5_10m_v_component_of_wind_1996-1.nc"), + overwrite=True, + ) + + ds = xr.load_dataset(Path(dummy_dir, "era5_10m_v_component_of_wind_1996-1.nc")) + + assert list(ds.data_vars)[0] == "northward_component_of_wind" + + +def test_parse_nc_file_10m_wind(): + """Test parsing netcdf file function with 10 meter velocity u/v component.""" + variables = { + "10m_v_component_of_wind": "northward_component_of_wind", + "10m_u_component_of_wind": "eastward_component_of_wind", + } + for variable in variables: + ds = era5.parse_nc_file(data_folder / "era5" / f"era5_{variable}_1996-1.nc") + expected_var_name = variables[variable] + assert list(ds.data_vars)[0] == expected_var_name + assert ds[expected_var_name].attrs["units"] == "meter_per_second" + + +def test_parse_nc_file_radiation(): + """Test parsing netcdf file function with surface radiation.""" + variables = { + "surface_thermal_radiation_downwards": "strd", + "surface_solar_radiation_downwards": "ssrd", + } + for variable in variables: + ds_original = xr.load_dataset( + data_folder / "era5" / f"era5_{variable}_1996-1.nc" + ) + ds = era5.parse_nc_file(data_folder / "era5" / f"era5_{variable}_1996-1.nc") + + assert list(ds.data_vars)[0] == variable + assert ds[variable].attrs["units"] == "watt_per_square_meter" + assert np.allclose( + ds_original[variables[variable]].values, + ds[variable].values * 3600, + equal_nan=True, + ) + + +def test_parse_nc_file_precipitation(): + """Test parsing netcdf file function with precipitation.""" + ds_original = xr.load_dataset( + data_folder / "era5" / "era5_mean_total_precipitation_rate_1996-1.nc" + ) + ds = era5.parse_nc_file( + data_folder / "era5" / "era5_mean_total_precipitation_rate_1996-1.nc" + ) + expected_var_name = "total_precipitation" + + assert list(ds.data_vars)[0] == expected_var_name + assert ds["total_precipitation"].attrs["units"] == "millimeter_per_second" + assert np.allclose( + ds_original["mtpr"].values, + ds["total_precipitation"].values * era5.WATER_DENSITY / 1000, + equal_nan=True, + ) + + +def test_parse_nc_file_pressure(): + """Test parsing netcdf file function with surface pressure.""" + ds = era5.parse_nc_file(data_folder / "era5" / "era5_surface_pressure_1996-1.nc") + expected_var_name = "surface_pressure" + + assert list(ds.data_vars)[0] == expected_var_name + assert ds["surface_pressure"].attrs["units"] == "pascal" diff --git a/tests/test_utils.py b/tests/test_utils.py index f77ad62..5239c2c 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -3,7 +3,11 @@ import tempfile from pathlib import Path from unittest.mock import patch +import numpy as np +import pytest from zampy.datasets import utils +from zampy.datasets.dataset_protocol import SpatialBounds +from zampy.datasets.dataset_protocol import TimeBounds def test_tqdm_update(): @@ -66,3 +70,71 @@ def test_download_url(mock_urlretrieve): utils.download_url(url, fpath, overwrite) # assrt that the urlretrieve function is called. assert mock_urlretrieve.called + + +@pytest.fixture(scope="function") +def valid_path_cds(tmp_path_factory): + """Create a dummy .cdsapirc file.""" + fn = tmp_path_factory.mktemp("usrhome") / ".cdsapirc" + with open(fn, mode="w", encoding="utf-8") as f: + f.write("url: a\nkey: 123:abc-def") + return fn + + +@patch("cdsapi.Client.retrieve") +def test_cds_request(mock_retrieve, valid_path_cds): + """ "Test cds request for downloading data from CDS server.""" + product = "reanalysis-era5-single-levels" + variables = ["10m_v_component_of_wind"] + time_bounds = TimeBounds( + np.datetime64("2010-01-01T00:00:00"), np.datetime64("2010-01-31T23:00:00") + ) + spatial_bounds = SpatialBounds(54, 56, 1, 3) + path = Path(__file__).resolve().parent + overwrite = True + + # create a dummy .cdsapirc + patching = patch("zampy.datasets.utils.CDSAPI_CONFIG_PATH", valid_path_cds) + with patching: + utils.cds_request( + product, variables, time_bounds, spatial_bounds, path, overwrite + ) + + mock_retrieve.assert_called_with( + product, + { + "product_type": "reanalysis", + "variable": variables, + "year": "2010", + "month": "1", + # fmt: off + "day": [ + "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", + "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", + "21", "22", "23", "24", "25", "26", "27", "28", "29", "30", + "31", + ], + "time": [ + "00:00", "01:00", "02:00", "03:00", "04:00", "05:00", "06:00", + "07:00", "08:00", "09:00", "10:00", "11:00", "12:00", "13:00", + "14:00", "15:00", "16:00", "17:00", "18:00", "19:00", "20:00", + "21:00", "22:00", "23:00", + ], + # fmt: on + "area": [ + spatial_bounds.north, + spatial_bounds.west, + spatial_bounds.south, + spatial_bounds.east, + ], + "format": "netcdf", + }, + ) + + +def test_time_bounds_to_year_month(): + """Test year and month pair converter function.""" + times = TimeBounds(np.datetime64("2010-01-01"), np.datetime64("2010-01-31")) + expected = [("2010", "1")] + year_month_pairs = utils.time_bounds_to_year_month(times) + assert expected == year_month_pairs