diff --git a/pyproject.toml b/pyproject.toml index 9cfc2630..64201224 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -194,6 +194,8 @@ ignore = [ "PLC0414", # Misidentifies xarrary.DataArray as pandas.DataFrame "PD003", + # Conflicts with code block examples: + "D412", ] [tool.ruff.lint.per-file-ignores] diff --git a/src/ewatercycle/_forcings/_caravan_reference.py b/src/ewatercycle/_forcings/_caravan_reference.py new file mode 100644 index 00000000..59d1f7f7 --- /dev/null +++ b/src/ewatercycle/_forcings/_caravan_reference.py @@ -0,0 +1,107 @@ +"""Holds which caravan variables belong to which datasets.""" + +ERA5_VARS = [ # data vars for ERA5 + "dewpoint_temperature_2m_max", + "dewpoint_temperature_2m_mean", + "dewpoint_temperature_2m_min", + "potential_evaporation_sum", + "snow_depth_water_equivalent_max", + "snow_depth_water_equivalent_mean", + "snow_depth_water_equivalent_min", + "streamflow", + "surface_net_solar_radiation_max", + "surface_net_solar_radiation_mean", + "surface_net_solar_radiation_min", + "surface_net_thermal_radiation_max", + "surface_net_thermal_radiation_mean", + "surface_net_thermal_radiation_min", + "surface_pressure_max", + "surface_pressure_mean", + "surface_pressure_min", + "temperature_2m_max", + "temperature_2m_mean", + "temperature_2m_min", + "total_precipitation_sum", + "u_component_of_wind_10m_max", + "u_component_of_wind_10m_mean", + "u_component_of_wind_10m_min", + "v_component_of_wind_10m_max", + "v_component_of_wind_10m_mean", + "v_component_of_wind_10m_min", + "volumetric_soil_water_layer_1_max", + "volumetric_soil_water_layer_1_mean", + "volumetric_soil_water_layer_1_min", + "volumetric_soil_water_layer_2_max", + "volumetric_soil_water_layer_2_mean", + "volumetric_soil_water_layer_2_min", + "volumetric_soil_water_layer_3_max", + "volumetric_soil_water_layer_3_mean", + "volumetric_soil_water_layer_3_min", + "volumetric_soil_water_layer_4_max", + "volumetric_soil_water_layer_4_mean", + "volumetric_soil_water_layer_4_min", +] + + +CAMELS_VARS = [ # data vars for daymet/nldas/mauer + "area_gages2", + "area_geospa_fabric", + "baseflow_index", + "carbonate_rocks_frac", + "clay_frac", + "dayl", + "dom_land_cover", + "dom_land_cover_frac", + "elev_mean", + "evspsblpot", + "frac_forest", + "gauge_lat", + "gauge_lon", + "gauge_name", + "geol_1st_class", + "geol_2nd_class", + "geol_permeability", + "geol_porostiy", + "glim_1st_class_frac", + "glim_2nd_class_frac", + "gvf_diff", + "gvf_max", + "hfd_mean", + "high_prec_timing", + "high_q_dur", + "high_q_freq", + "huc_02", + "lai_diff", + "lai_max", + "low_prec_timing", + "low_q_dur", + "low_q_freq", + "max_water_content", + "organic_frac", + "other_frac", + "p_seasonality", + "pr", + "q5", + "q95", + "q_mean", + "root_depth_50", + "root_depth_99", + "runoff_ratio", + "sand_frac", + "silt_frac", + "slope_fdc", + "slope_mean", + "soil_conductivity", + "soil_depth_pelletier", + "soil_depth_statsgo", + "soil_porosity", + "srad", + "stream_elas", + "swe", + "tas", + "tasmax", + "tasmin", + "vp", + "water_frac", + "zero_q_freq", +] diff --git a/src/ewatercycle/_forcings/caravan.py b/src/ewatercycle/_forcings/caravan.py index 8a535f1c..ededfb82 100644 --- a/src/ewatercycle/_forcings/caravan.py +++ b/src/ewatercycle/_forcings/caravan.py @@ -8,11 +8,13 @@ import xarray as xr from cartopy.io import shapereader +from ewatercycle._forcings._caravan_reference import CAMELS_VARS, ERA5_VARS from ewatercycle.base.forcing import DefaultForcing from ewatercycle.util import get_time COMMON_URL = "ca13056c-c347-4a27-b320-930c2a4dd207" -OPENDAP_URL = f"https://opendap.4tu.nl/thredds/dodsC/data2/djht/{COMMON_URL}/1/" +VERSION = 2 +OPENDAP_URL = f"https://opendap.4tu.nl/thredds/dodsC/data2/djht/{COMMON_URL}/{VERSION}/" SHAPEFILE_URL = ( f"https://data.4tu.nl/file/{COMMON_URL}/bbe94526-cf1a-4b96-8155-244f20094719" ) @@ -71,11 +73,11 @@ class CaravanForcing(DefaultForcing): HRU_id = 1022500 camels_forcing = sources['CaravanForcing'].generate( - start_time = experiment_start_date, - end_time = experiment_end_date, - directory = forcing_path, - basin_id = f"camels_0{HRU_id}" - ) + start_time = experiment_start_date, + end_time = experiment_end_date, + directory = forcing_path, + basin_id = f"camels_0{HRU_id}" + ) which gives something like: @@ -168,12 +170,24 @@ def generate( # type: ignore[override] If none is specified, will be downloaded automatically. kwargs: Additional keyword arguments. basin_id: The ID of the desired basin. Data sets can be explored using - `CaravanForcing.get_dataset(dataset_name)` or - `CaravanForcing.get_basin_id(dataset_name)` - where `dataset_name` is the name of a dataset in Caravan - (for example, "camels" or "camelsgb"). - For more information do `help(CaravanForcing.get_basin_id)` or see - https://www.ewatercycle.org/caravan-map/. + `CaravanForcing.get_dataset(dataset_name)` or + `CaravanForcing.get_basin_id(dataset_name)` + where `dataset_name` is the name of a dataset in Caravan + (for example, "camels" or "camelsgb"). + For more information do `help(CaravanForcing.get_basin_id)` or see + https://www.ewatercycle.org/caravan-map/. + data_source: The ID of the data source to be used. For some datasets + multiple datasources are available. currently this is only + implemented for the (basins in the) "camels" (ie. camels US) + dataset. If "data_sources" is not specified, it defaults to + 'era5_land' (the default for caravan). Options for Camels are: + - 'nldas' + - 'maurer' + - 'daymet' + See the documentation of Camels for details on the differences + between these data sources: https://dx.doi.org/10.5065/D6MW2F4D + retrieve_shape: If the shapefile should be retrieved (optional). By + default False. """ if "basin_id" not in kwargs: msg = ( @@ -181,14 +195,38 @@ def generate( # type: ignore[override] " Caravan." ) raise ValueError(msg) + basin_id = str(kwargs["basin_id"]) + if "data_source" not in kwargs: + data_source = "era5_land" + elif kwargs["data_source"] in ["era5_land", "nldas", "maurer", "daymet"]: + data_source = str(kwargs["data_source"]) + else: + msg = ( + "If 'data_source' is provided it needs to be one of:\n" + " 'era5_land', 'nldas', 'maurer', 'daymet'" + ) + raise ValueError(msg) + dataset: str = basin_id.split("_")[0] ds = cls.get_dataset(dataset) ds_basin = ds.sel(basin_id=basin_id.encode()) + + if dataset != "camels" and data_source != "era5_land": + msg = ( + "Alternative data sources are only implemented for the camels " + "(USA) dataset" + ) + raise ValueError(msg) + ds_basin = get_camels_data(ds_basin, data_source.encode()) + ds_basin_time = crop_ds(ds_basin, start_time, end_time) - if shape is None: + retrieve_shape = ( + bool(kwargs["retrieve_shape"]) if "retrieve_shape" in kwargs else True + ) + if retrieve_shape: shape = get_shapefiles(Path(directory), basin_id) if len(variables) == 0: @@ -197,36 +235,31 @@ def generate( # type: ignore[override] # only return the properties which are also in property vars properties = set(variables).intersection(PROPERTY_VARS) non_property_vars = set(variables) - properties - variable_names = non_property_vars.intersection( - RENAME_ERA5.keys() - ) # only take the vars also in Rename dict + + # only take the vars also in Rename dict (ie. pr, tas, etc.) + variable_names = non_property_vars.intersection(set(RENAME_ERA5.values())) for prop in properties: ds_basin_time.coords.update({prop: ds_basin_time[prop].to_numpy()}) - ds_basin_time = ds_basin_time.rename(RENAME_ERA5) - variables = tuple([RENAME_ERA5[var] for var in variable_names]) - - # convert units to Kelvin for compatibility with CMOR MIP table units - for temp in ["tas", "tasmin", "tasmax"]: - ds_basin_time[temp].attrs.update({"height": "2m"}) - if (ds_basin_time[temp].attrs["unit"]) == "°C": - ds_basin_time[temp].values = ds_basin_time[temp].to_numpy() + 273.15 - ds_basin_time[temp].attrs["unit"] = "K" - - for var in ["evspsblpot", "pr"]: - if (ds_basin_time[var].attrs["unit"]) == "mm": - # mm/day --> kg m-2 s-1 - ds_basin_time[var].values = ds_basin_time[var].to_numpy() / (86400) - ds_basin_time[var].attrs["unit"] = "kg m-2 s-1" + ds_basin_time = convert_units(ds_basin_time) start_time_name = start_time[:10] end_time_name = end_time[:10] history_attrs = ds_basin_time.attrs["history"] - for var in variables: + for var in variable_names: ds_basin_time[var].attrs["history"] = history_attrs - ds_basin_time[var].to_netcdf( + + # Basin IDs can be duplicate because of problem in opendap data + # only select the one with data (if it exists) + valid_basin = ~ds_basin_time[var].isnull().all(dim="time") + if ds_basin_time[var]["basin_id"].size > 1 and valid_basin.any(): + dout = ds_basin_time[var][~ds_basin_time[var].isnull().all(dim="time")] + else: + dout = ds_basin_time[var] + + dout.isel(basin_id=0).to_netcdf( Path(directory) / f"{basin_id}_{start_time_name}_{end_time_name}_{var}.nc" ) @@ -235,15 +268,23 @@ def generate( # type: ignore[override] directory=Path(directory), start_time=start_time, end_time=end_time, - shape=Path(shape), + shape=Path(shape) if shape is not None else None, filenames={ var: f"{basin_id}_{start_time_name}_{end_time_name}_{var}.nc" - for var in variables + for var in variable_names }, ) forcing.save() return forcing + def to_xarray(self) -> xr.Dataset: + """Return this Forcing object as an xarray Dataset.""" + if len(self.filenames) == 0: + msg = "There are no variables stored in this Forcing object." + raise ValueError(msg) + fpaths = [self.directory / filename for _, filename in self.filenames.items()] + return xr.merge([xr.open_dataset(fpath, chunks="auto") for fpath in fpaths]) + def get_shapefiles(directory: Path, basin_id: str) -> Path: """Retrieve shapefiles from data 4TU.nl .""" @@ -319,3 +360,48 @@ def crop_ds(ds: xr.Dataset, start_time: str, end_time: str) -> xr.Dataset: return ds.isel( time=(ds["time"].to_numpy() >= start) & (ds["time"].to_numpy() <= end) ) + + +def get_camels_data(ds: xr.Dataset, data_source: bytes) -> xr.Dataset: + """Grab the right source of input data for the camels (USA) dataset. + + The way the dataset on OPENDAP is structured is not optimal, see the + discussion at: + https://github.com/eWaterCycle/ewatercycle/pull/433/ + + This function will select and rename the right variables. + """ + if data_source == b"era5_land": + ds_era5 = ds.sel(data_source=b"era5_land") + ds_era5 = ds_era5.drop_vars("data_source") + ds_era5 = ds_era5[[*PROPERTY_VARS, *ERA5_VARS, "streamflow"]] + return ds_era5.rename(RENAME_ERA5) + + ds_common = ds.sel(data_source=b"era5_land") + ds_common = ds_common.drop_vars("data_source") + ds_common = ds_common[[*PROPERTY_VARS, "streamflow"]] + ds_common = ds_common.rename({"streamflow": "Q"}) + + ds_camels = ds.sel(data_source=data_source) + ds_camels = ds_camels.drop_vars("data_source") + ds_camels = ds_camels[CAMELS_VARS] + + return xr.merge((ds_common, ds_camels)) + + +def convert_units(ds: xr.Dataset) -> xr.Dataset: + """Caravan uses degrees C, mm/d. We need Kelvin and km/m2/s.""" + # convert units to Kelvin for compatibility with CMOR MIP table units + for temp in ["tas", "tasmin", "tasmax"]: + if ds[temp].attrs["unit"] in ["°C", "degC"]: + ds[temp].values = ds[temp].to_numpy() + 273.15 + ds[temp].attrs["unit"] = "K" + ds[temp].attrs.update({"height": "2m"}) + + for var in ["evspsblpot", "pr"]: + if ds[var].attrs["unit"] in ["mm", "mm/d"]: + # mm/day --> kg m-2 s-1 + ds[var].values = ds[var].to_numpy() / (86400) + ds[var].attrs["unit"] = "kg m-2 s-1" + + return ds diff --git a/tests/src/base/forcing_files/README.md b/tests/src/base/forcing_files/README.md index 749fd46f..1109c39f 100644 --- a/tests/src/base/forcing_files/README.md +++ b/tests/src/base/forcing_files/README.md @@ -4,6 +4,28 @@ The data only includes a year of forcing for one catchment. For own use, please download from the original source and cite correctly. The Caravan dataset itself is also a combination of data from seperate sources. -The Carvan dataset is originanly obtained from https://zenodo.org/records/7944025 and is explained in a paper by Kratzert, F. :'Caravan - A global community dataset for large-sample hydrology' found here: https://doi-org.tudelft.idm.oclc.org/10.1038/s41597-023-01975-w +The Carvan dataset is originally obtained from https://zenodo.org/records/7944025 and is explained in a paper by Kratzert, F. :'Caravan - A global community dataset for large-sample hydrology' found here: https://doi-org.tudelft.idm.oclc.org/10.1038/s41597-023-01975-w Distributed under Creative Commons Attribution 4.0 International. + + +## Test data generation + +The test data file was generated with: + +```py +import xarray as xr + +from ewatercycle._forcings.caravan import OPENDAP_URL + +dataset="camels" +ds_dap = xr.open_dataset(f"{OPENDAP_URL}{dataset}.nc") + +ds_test = xr.concat( + (ds_dap.sel(basin_id=b"camels_01022500"), ds_dap.sel(basin_id=b"camels_03439000")), + dim="basin_id", +) + +# Can't write to hdf5 due to "name" variable +ds_test.to_netcdf("tests/src/base/forcing_files/test_caravan_file.nc", format="NETCDF3_CLASSIC") +``` diff --git a/tests/src/base/forcing_files/test_caravan_file.nc b/tests/src/base/forcing_files/test_caravan_file.nc index 69b8f9bd..7ea6292d 100644 Binary files a/tests/src/base/forcing_files/test_caravan_file.nc and b/tests/src/base/forcing_files/test_caravan_file.nc differ diff --git a/tests/src/base/test_forcing.py b/tests/src/base/test_forcing.py index aa10fd17..720d96a9 100644 --- a/tests/src/base/test_forcing.py +++ b/tests/src/base/test_forcing.py @@ -309,12 +309,12 @@ def test_retrieve_caravan_forcing(tmp_path: Path, mock_retrieve: mock.MagicMock) "high_prec_dur", "low_prec_freq", "low_prec_dur", - "total_precipitation_sum", - "potential_evaporation_sum", - "temperature_2m_mean", - "temperature_2m_min", - "temperature_2m_max", - "streamflow", + "pr", + "evspsblpot", + "tas", + "tasmin", + "tasmax", + "Q", ) basin_id = "camels_03439000" test_files_dir = Path(__file__).parent / "forcing_files" @@ -329,8 +329,8 @@ def test_retrieve_caravan_forcing(tmp_path: Path, mock_retrieve: mock.MagicMock) ) caravan_forcing.save() ds = caravan_forcing.to_xarray() - content = list(ds.data_vars.keys()) - expected = ["Q", "evspsblpot", "pr", "tas", "tasmax", "tasmin"] + content = set(ds.data_vars.keys()) + expected = {"Q", "evspsblpot", "pr", "tas", "tasmax", "tasmin"} assert content == expected mock_retrieve.assert_called_once_with(basin_id.split("_")[0]) @@ -354,8 +354,8 @@ def test_retrieve_caravan_forcing_empty_vars( ) caravan_forcing.save() ds = caravan_forcing.to_xarray() - content = list(ds.data_vars.keys()) - expected = ["Q", "evspsblpot", "pr", "tas", "tasmax", "tasmin"] + content = set(ds.data_vars.keys()) + expected = {"Q", "evspsblpot", "pr", "tas", "tasmax", "tasmin"} assert content == expected mock_retrieve.assert_called_once_with(basin_id.split("_")[0]) diff --git a/tests/src/observation/test_caravan.py b/tests/src/observation/test_caravan.py index 6137f1dd..8c5ff460 100644 --- a/tests/src/observation/test_caravan.py +++ b/tests/src/observation/test_caravan.py @@ -2,6 +2,7 @@ from pathlib import Path from unittest import mock +import numpy as np import pytest import xarray as xr from xarray.testing import assert_allclose @@ -42,35 +43,107 @@ def test_get_caravan_data(mock_retrieve): datetime.datetime(1981, 1, 3, 0, 0), ], }, - "basin_id": {"dims": (), "attrs": {}, "data": b"camels_03439000"}, + "basin_id": { + "dims": ("basin_id",), + "attrs": {}, + "data": [b"camels_03439000", b"camels_03439000"], + }, + "data_source": { + "dims": ("data_source",), + "attrs": {}, + "data": [b"era5_land", b"nldas", b"maurer", b"daymet"], + }, }, "attrs": { - "history": "Wed Mar 27 16:11:00 2024: /usr/bin/ncap2 -s time=double(time) -O Caravan/camels.nc Caravan/camels.nc\nMerged together from separate files; All forcing and state variables are derived from ERA5-Land hourly by ECMWF. Streamflow data was taken from the CAMELS (US) dataset by Newman et al. (2014).", - "NCO": "netCDF Operators version 5.0.6 (Homepage = http://nco.sf.net, Code = http://github.com/nco/nco)", + "title": "Basin mean forcing data for ", + "history": "Thu Jun 13 15:42:28 2024: /usr/local/bin/ncap2 -s time=double(time) -O a/500dc8bd-c096-404c-a1c8-182249c621f4.nc a/500dc8bd-c096-404c-a1c8-182249c621f4.nc\nConverted to netCDF by David Haasnoot for eWatercycle using CAMELS and combined with merged Caravan dataset", + "data_source_camels": "CAMELS-USA was compiled by A. Newman et al. `A large-sample watershed-scale hydrometeorological dataset for the contiguous USA` using daymet, nldas and maurer", + "data_source_caravan": "The Caravan dataset by F. Kratzert et al `Caravan - A global community dataset for large-sample hydrology` uses era5 data", + "url_source_data_camels": "https://dx.doi.org/10.5065/D6MW2F4D", + "url_source_data_caravan": "https://doi.org/10.1038/s41597-023-01975-w", + "NCO": "netCDF Operators version 5.2.4 (Homepage = http://nco.sf.net, Code = http://github.com/nco/nco, Citation = 10.1016/j.envsoft.2008.03.004)", + "_NCProperties": "version=2,netcdf=4.9.2,hdf5=1.12.2", + "DODS.strlen": 9, + "DODS.dimName": "string9", }, - "dims": {"time": 3}, + "dims": {"data_source": 4, "basin_id": 2, "time": 3}, "data_vars": { - "timezone": {"dims": (), "attrs": {}, "data": b"America/New_York"}, + "timezone": { + "dims": ("data_source", "basin_id"), + "attrs": {}, + "data": [ + [b"America/New_York", b"nan"], + [b"nan", b"nan"], + [b"nan", b"nan"], + [b"nan", b"nan"], + ], + }, "name": { - "dims": (), + "dims": ("data_source", "basin_id"), "attrs": {}, - "data": b"FRENCH BROAD RIVER AT ROSMAN, NC", + "data": [ + [b"FRENCH BROAD RIVER AT ROSMAN, NC", b"nan"], + [b"nan", b"nan"], + [b"nan", b"nan"], + [b"nan", b"nan"], + ], }, "country": { - "dims": (), + "dims": ("data_source", "basin_id"), "attrs": {}, - "data": b"United States of America", + "data": [ + [b"United States of America", b"nan"], + [b"nan", b"nan"], + [b"nan", b"nan"], + [b"nan", b"nan"], + ], + }, + "lat": { + "dims": ("data_source", "basin_id"), + "attrs": {"_ChunkSizes": [4, 1153]}, + "data": [ + [35.14333, np.nan], + [np.nan, np.nan], + [np.nan, np.nan], + [np.nan, np.nan], + ], + }, + "lon": { + "dims": ("data_source", "basin_id"), + "attrs": {"_ChunkSizes": [4, 1153]}, + "data": [ + [-82.82472, np.nan], + [np.nan, np.nan], + [np.nan, np.nan], + [np.nan, np.nan], + ], + }, + "area": { + "dims": ("data_source", "basin_id"), + "attrs": {"_ChunkSizes": [4, 1153]}, + "data": [ + [177.99471516630686, np.nan], + [np.nan, np.nan], + [np.nan, np.nan], + [np.nan, np.nan], + ], }, - "lat": {"dims": (), "attrs": {}, "data": 35.14333}, - "lon": {"dims": (), "attrs": {}, "data": -82.82472}, - "area": {"dims": (), "attrs": {}, "data": 177.99471}, "streamflow": { - "dims": ("time",), - "attrs": { - "unit": "mm/d", - "long_name": "Observed streamflow", - }, - "data": [2.266e-06, 2.184e-06, 2.122e-06], + "dims": ("data_source", "basin_id", "time"), + "attrs": {"unit": "m3/s"}, + "data": [ + [ + [ + 2.266136469058591e-06, + 2.183731341335023e-06, + 2.121927679731787e-06, + ], + [np.nan, np.nan, np.nan], + ], + [[np.nan, np.nan, np.nan], [np.nan, np.nan, np.nan]], + [[np.nan, np.nan, np.nan], [np.nan, np.nan, np.nan]], + [[np.nan, np.nan, np.nan], [np.nan, np.nan, np.nan]], + ], }, }, }