Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move to new cds #62

Open
wants to merge 39 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
2950aee
edit the doc due to new cds api
SarahAlidoost Oct 16, 2024
2c3dac3
add a version to cdsapi in pyproject
SarahAlidoost Oct 16, 2024
bba68a9
fix regrid.most_common due to changes in version xarray_regrid v0.4.0
SarahAlidoost Oct 16, 2024
b63505f
fix the use of regrid.most_common in land_cover.py due to changes in …
SarahAlidoost Oct 18, 2024
c17eb37
fix ruff error
SarahAlidoost Oct 18, 2024
1b99791
rename valid_time dim to time in ecmwf_dataset, see issue 59
SarahAlidoost Oct 18, 2024
692bae5
unify chunks before regrid
SarahAlidoost Oct 18, 2024
dcb9bb0
fix calculating freq in recipe
SarahAlidoost Oct 18, 2024
5ce238c
fix version syntax in fapar_lai
SarahAlidoost Oct 18, 2024
47b57ce
fix version in land_cover cds request
SarahAlidoost Oct 18, 2024
00aa356
fix version in test_fapar_lai
SarahAlidoost Oct 18, 2024
d9ab924
fix version in test_land_cover
SarahAlidoost Oct 18, 2024
4e6fe3e
fix version of land_cover request in test_cds_utils
SarahAlidoost Oct 18, 2024
ddb7c58
add spatial bounds for landcover, fix spatial bounds in target grid i…
SarahAlidoost Oct 21, 2024
868bc0e
increase timeout in cds request
SarahAlidoost Oct 21, 2024
1459245
fix test cds_utils
SarahAlidoost Oct 21, 2024
d0d57a7
fix mypy error
SarahAlidoost Oct 21, 2024
71c2224
remove parallel from lai to avoid segmentation fault error
SarahAlidoost Oct 21, 2024
75e5a67
update doc
SarahAlidoost Oct 21, 2024
5b82327
reduce time range in recipe of stemmus_scope
SarahAlidoost Oct 21, 2024
75df274
replace frequency H with h due to deprecation errors of pandas
SarahAlidoost Oct 21, 2024
f4a5882
add a few more checks in tests
SarahAlidoost Oct 21, 2024
540b9e7
fix linter error
SarahAlidoost Oct 21, 2024
3ae6603
fix ruff errors
SarahAlidoost Oct 21, 2024
5ba1f7f
fix a minor thing
SarahAlidoost Oct 21, 2024
42edfc3
add more tests for frequency in recipe
SarahAlidoost Oct 22, 2024
1eab581
remove unused import from test_recipe
SarahAlidoost Oct 22, 2024
49637d1
fix ruff format
SarahAlidoost Oct 22, 2024
c24bbf7
refactor get_url_size
SarahAlidoost Oct 22, 2024
0cab2df
Update docs/index.md
SarahAlidoost Oct 25, 2024
b3bdeb7
update test dataset, and related tests
SarahAlidoost Oct 30, 2024
56c5a5b
change freq M to ME in cds_utils
SarahAlidoost Oct 30, 2024
0f70d77
add acheck before ds coarsen in eth_canopy_height
SarahAlidoost Oct 30, 2024
8b6070f
fix tests init
SarahAlidoost Oct 30, 2024
09ddc53
use dask unique instead of numpy unique
SarahAlidoost Nov 15, 2024
d03b0f4
fix linter errors
SarahAlidoost Nov 15, 2024
ab1cbc3
fix ruff format errors
SarahAlidoost Nov 15, 2024
09d38f6
add parallel=True to FaparLAI
SarahAlidoost Nov 15, 2024
eca46d1
fix get unique values in land cover
SarahAlidoost Nov 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,25 @@ The configuration file should contain the `working_directory`, for instance:
working_directory: /path_to_a_working_directory/ #for example: /home/bart/Zampy
```

If you need access to data on CDS or ADS server, you should add your CDS or ADS credentials to `zampy_config.yml`:
The old Climate Data Store (CDS) is shut down on 3 September 2024. For more
information see:
[the-new-climate-data-store-beta](https://forum.ecmwf.int/t/the-new-climate-data-store-beta-cds-beta-is-now-live/3315).
To use the new CDS/ADS, you need to have an ECMWF account, your existing CDS/ADS
credentials does not work.

If you need access to data on CDS or ADS server, you should add your CDS/ADS
credentials to `zampy_config.yml`. To find your key, go to [CDS how to
api](https://cds.climate.copernicus.eu/how-to-api), or [ADS how to
api](https://ads.atmosphere.copernicus.eu/how-to-api). You can skip the steps
related to `.cdsapirc` and simply add the key to `zampy_config.yml`:

```yaml
cdsapi:
url: # for example https://cds.climate.copernicus.eu/api/v2
key: # for example 12345:xhashd-232jcsha-dsaj429-cdjajd29319
url: # for example https://cds.climate.copernicus.eu/api
key: # for example xhashd-232jcsha-dsaj429-cdjajd29319
adsapi:
url: # for example https://ads.atmosphere.copernicus.eu/api/v2
key: # for example 12345:xhashd-232jcsha-dsaj429-cdjajd29319
url: # for example https://ads.atmosphere.copernicus.eu/api
key: # for example xhashd-232jcsha-dsaj429-cdjajd29319
```

## Instructions for CDS/ADS datasets
Expand All @@ -45,9 +55,6 @@ To download the following datasets, users need access to CDS/ADS via `cdsapi`/`a
- ADS
- CAMS EGG4 (e.g. co2)

To generate these API keys, you need to be a registered user on *CDS* via the [registration page](https://cds.climate.copernicus.eu/user/register?destination=%2F%23!%2Fhome), or on *ADS* via the [registration page](https://ads.atmosphere.copernicus.eu/user/register?destination=%2F%23!%2Fhome).

Before submitting any request with `zampy`, please put your `cdsapi`/`adsapi` credentials in `zampy_config.yml`. Here is a short [instruction](https://cds.climate.copernicus.eu/api-how-to) about how to find your CDS/ADS API key. You can skip the steps related to `.cdsapirc` and simply add the key to `zampy_config.yml`.

### Agree to the Terms of Use on CDS/ADS

Expand Down
10 changes: 9 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ download:
cams:
variables:
- co2_concentration

convert:
convention: ALMA
frequency: 1H # outputs at 1 hour frequency. Pandas-like freq-keyword.
Expand All @@ -67,6 +67,14 @@ When you have your reciped created and saved on your disk, you can execute your
zampy /path_to_recipe/sample_recipe.yml
```

>NOTE: You may recieve an error message from CDS/ADS if not all the required
>licences have been accepted. Follow the instructions in the error message to
>accept the licences and run the recipe again.
SarahAlidoost marked this conversation as resolved.
Show resolved Hide resolved

When downloading process starts, you can also check the status of your requests
in your CDS/ADS profile.


### Interact with `zampy` in notebooks

It is possible to use `zampy` directly in Python via its Python API. This is not recommended, as it is more difficult to reproduce the workflow if there is no recipe.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ dependencies = [
"pint",
"cf_xarray", # required to auto-pint CF compliant datasets.
"pint-xarray",
"cdsapi",
"cdsapi>=0.7.2",
"xarray-regrid", # for regridding
]
dynamic = ["version"]
Expand Down
4 changes: 2 additions & 2 deletions recipes/STEMMUS_SCOPE_input.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name: "STEMMUS_SCOPE_input"

download:
time: ["2020-01-01", "2020-06-30"]
time: ["2020-01-01", "2020-02-15"]
bbox: [60, 10, 50, 0] # NESW
datasets:
era5_land:
Expand Down Expand Up @@ -37,5 +37,5 @@ download:

convert:
convention: ALMA
frequency: 1H # outputs at 1 hour frequency. Pandas-like freq-keyword.
frequency: 1h # outputs at 1 hour frequency. Pandas-like freq-keyword.
resolution: 0.25 # output resolution in degrees.
23 changes: 18 additions & 5 deletions src/zampy/datasets/cds_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
"01", "02", "03", "04", "05", "06", "07", "08", "09", "10",
"11", "12", "13", "14", "15", "16", "17", "18", "19", "20",
"21", "22", "23", "24", "25", "26", "27", "28", "29", "30",
"31",
"31",
] # fmt: skip

ALL_HOURS = [
ALL_HOURS = [
"00:00", "01:00", "02:00", "03:00", "04:00", "05:00", "06:00",
"07:00", "08:00", "09:00", "10:00", "11:00", "12:00", "13:00",
"14:00", "15:00", "16:00", "17:00", "18:00", "19:00", "20:00",
Expand Down Expand Up @@ -97,11 +97,13 @@ def cds_request(

url, api_key = cds_api_key(fname)

# TODO: expose timeout, see issue 64
c = cdsapi.Client(
url=url,
key=api_key,
verify=True,
quiet=True,
timeout=300,
)
# choose retrieve function
retrieve_func = RETRIEVE_FUNCTION[fname]
Expand All @@ -124,7 +126,8 @@ def cds_request_land_cover(
dataset: str,
time_bounds: TimeBounds,
path: Path,
overwrite: bool,
spatial_bounds: SpatialBounds | None = None,
overwrite: bool = False,
) -> None:
"""Download land cover data via CDS API.

Expand All @@ -136,6 +139,7 @@ def cds_request_land_cover(
dataset: Dataset name for retrieval via `cdsapi`.
time_bounds: Zampy time bounds object.
path: File path to which the data should be saved.
spatial_bounds: Zampy spatial bounds object.
overwrite: If an existing file (of the same size!) should be overwritten.
"""
fname = PRODUCT_FNAME[dataset]
Expand All @@ -152,18 +156,27 @@ def cds_request_land_cover(
years_months = time_bounds_to_year_month(time_bounds)
years = {year for (year, _) in years_months}

if spatial_bounds is not None:
area = [
spatial_bounds.north,
spatial_bounds.west,
spatial_bounds.south,
spatial_bounds.east,
]

for year in tqdm(years):
if int(year) < 2016:
version = "v2.0.7cds"
version = "v2_0_7cds"
SarahAlidoost marked this conversation as resolved.
Show resolved Hide resolved
else:
version = "v2.1.1"
version = "v2_1_1"
r = c.retrieve(
dataset,
{
"variable": "all",
"format": "zip",
"year": year,
"version": version,
"area": area,
},
)
fpath = path / f"{fname}_LCCS_MAP_300m_{year}.zip"
Expand Down
10 changes: 9 additions & 1 deletion src/zampy/datasets/ecmwf_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,19 @@ def load(
files += (ingest_dir / self.name).glob(f"{self.name}_{var}*.nc")

ds = xr.open_mfdataset(files, chunks={"latitude": 200, "longitude": 200})
ds = ds.sel(time=slice(time_bounds.start, time_bounds.end))

# rename valid_time to time
if "valid_time" in ds.dims:
ds = ds.rename({"valid_time": "time"})

ds = ds.sel(time=slice(time_bounds.start, time_bounds.end))
grid = xarray_regrid.create_regridding_dataset(
make_grid(spatial_bounds, resolution)
)

# this is needed before regrid
ds = ds.unify_chunks()
SarahAlidoost marked this conversation as resolved.
Show resolved Hide resolved

ds = ds.regrid.linear(grid)

return ds
Expand Down
5 changes: 2 additions & 3 deletions src/zampy/datasets/fapar_lai.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ def load(
variable_names: list[str],
) -> xr.Dataset:
files = list((ingest_dir / self.name).glob("*.nc"))

ds = xr.open_mfdataset(files, parallel=True)
ds = xr.open_mfdataset(files)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parallel kwarg was to ensure the dataset being opened as Dask arrays, to avoid any memory issues. Will that go OK now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the parallel kwargs is True, the open and preprocess steps will be performed in parallel using dask. So, we need to configure Dask for the number of jobs or CPUs for parallel processing. Otherwise dask.distributed.Client() will use all available cores by default and this causes a segmentation fault error. This is the case in cli.py where n_workers are not set, see here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I do think that making use of dask.distributed is quite important for performance. You can configure the defaults with a config file https://docs.dask.org/en/latest/configuration.html#specify-configuration

dask.distributed.Client() will use all available cores by default and this causes a segmentation fault error

I guess on some systems? Which seems like something else should be causing the segfault. On my laptop it's no problem starting as many workers as cpu threads, and it's the default behavior of dask for a reason.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining. I did some tests, and it looks like the Dask configuration didn’t fix the issue. When I added parallel=True back, the segmentation fault errors showed up again on macOS and Linux, but not on Windows, see GA workflows below. We only use parallel=True for the fapar_lai.py and not for others. Do you know why it’s needed? After refactoring the code as below, the tests are passing locally. What do you think?

client = Client()
ds = xr.open_mfdataset(files, parallel=True)
client.close()

ds = ds.sel(time=slice(time_bounds.start, time_bounds.end))

grid = xarray_regrid.create_regridding_dataset(
Expand Down Expand Up @@ -223,7 +222,7 @@ def download_fapar_lai(
"format": "zip",
"variable": "lai",
"horizontal_resolution": "1km",
"product_version": "V3",
"product_version": "v3",
"satellite": "spot" if year < 2014 else "proba",
"sensor": "vgt",
"month": f"{month:0>2}",
Expand Down
50 changes: 39 additions & 11 deletions src/zampy/datasets/land_cover.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def download(
cds_utils.cds_request_land_cover(
dataset=self.cds_dataset,
time_bounds=time_bounds,
spatial_bounds=spatial_bounds,
path=download_folder,
overwrite=overwrite,
)
Expand Down Expand Up @@ -134,16 +135,30 @@ def load(
)
raise ValueError(msg)
files = list((ingest_dir / self.name).glob(f"{self.name}_*.nc"))

ds = xr.open_mfdataset(files, chunks={"latitude": 200, "longitude": 200})
ds = ds.sel(time=slice(time_bounds.start, time_bounds.end))

grid = xarray_regrid.create_regridding_dataset(
utils.make_grid(spatial_bounds, resolution)
)
ds = ds.regrid.most_common(grid, time_dim="time", max_mem=1e9)

return ds
ds_regrid = {}
for variable in variable_names:
# select the variable to be regridded
da = ds[variable]

# get values for most common method
if "flag_values" in da.attrs:
regrid_values = da.attrs["flag_values"]
else:
regrid_values = np.unique(da.values)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do note that this is a very expensive operations, as every single value in the dataarray has to be checked...

It might be better to drop some variables or handle this better, otherwise it won't work on a large area (or be very slow).

The vars current_pixel_state and change_count have a valid_min and valid_max attr.

observation_count does too, but the range is very large, and it is probably irrelevant after regridding (doesn't make physical sense anymore).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I replaced np.unique with da.unique.


da_regrid = da.regrid.most_common(grid, values=regrid_values)

# make sure dtype is the same
ds_regrid[variable] = da_regrid.astype(da.dtype)
SarahAlidoost marked this conversation as resolved.
Show resolved Hide resolved

return xr.Dataset(ds_regrid)

def convert(
self,
Expand Down Expand Up @@ -207,27 +222,40 @@ def extract_netcdf_to_zampy(file: Path) -> xr.Dataset:

# only keep land cover class variable
with xr.open_dataset(unzip_folder / zipped_file_name) as ds:
var_list = [var for var in ds.data_vars]
var_list = list(ds.data_vars)
raw_variable = "lccs_class"
var_list.remove(raw_variable)
ds = ds.drop_vars(var_list) # noqa: PLW2901

ds = ds.sortby(["lat", "lon"]) # noqa: PLW2901
ds = ds.rename({"lat": "latitude", "lon": "longitude"}) # noqa: PLW2901
new_grid = xarray_regrid.Grid(
north=90,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally regridded the full world, as this would then only have to be done once during ingestion to the zampy format. Now this has to be redone every time the config is different

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that spatial bounds can be specified for landcover, see issue #63, a regridding to the whole globe is not needed which is also memory expensive.

east=180,
south=-90,
west=-180,
north=ds["latitude"].max().item(),
east=ds["longitude"].max().item(),
south=ds["latitude"].min().item(),
west=ds["longitude"].min().item(),
resolution_lat=0.05,
resolution_lon=0.05,
)

target_dataset = xarray_regrid.create_regridding_dataset(new_grid)

ds_regrid = ds.regrid.most_common(
target_dataset, time_dim="time", max_mem=1e9
)
# select the variable to be regridded
da = ds[raw_variable]

# get values for most common method
if "flag_values" in da.attrs:
regrid_values = da.attrs["flag_values"]
else:
regrid_values = np.unique(da.values)

da_regrid = da.regrid.most_common(target_dataset, values=regrid_values)

# make sure dtype is the same
da_regrid = da_regrid.astype(da.dtype)

# convert dataarray to dataset
ds_regrid = da_regrid.to_dataset()

# rename variable to follow the zampy convention
variable_name = "land_cover"
Expand Down
7 changes: 5 additions & 2 deletions src/zampy/datasets/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@ def download_url(url: str, fpath: Path, overwrite: bool) -> None:
print(f"File '{fpath.name}' already exists, skipping...")


def get_url_size(url: str) -> int:
def get_url_size(url: str) -> int | None:
"""Return the size (bytes) of a given URL."""
response = requests.head(url)
return int(response.headers["Content-Length"])
content_length = response.headers.get("Content-Length")
if content_length:
return int(content_length)
return None


def get_file_size(fpath: Path) -> int:
Expand Down
14 changes: 10 additions & 4 deletions src/zampy/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,19 @@ def run(self) -> None:
ds = converter.convert(ds, dataset, convention=self.convention)

if "time" in ds.dims: # Dataset with only DEM (e.g.) has no time dim.
freq = xr.infer_freq(ds["time"])
if freq is None: # fallback:
freq = (
data_freq = None

if len(ds["time"]) == 1:
data_freq = pd.Timedelta(self.frequency)
elif len(ds["time"]) > 3: # see pandas _FrequencyInferer
freq = xr.infer_freq(ds["time"])
data_freq = pd.to_timedelta(pd.tseries.frequencies.to_offset(freq))

if data_freq is None: # fallback:
data_freq = pd.Timedelta(
ds["time"].isel(time=1).to_numpy()
- ds["time"].isel(time=0).to_numpy()
)
data_freq = pd.to_timedelta(pd.tseries.frequencies.to_offset(freq))

if data_freq < pd.Timedelta(self.frequency):
ds = ds.resample(time=self.frequency).mean()
Expand Down
4 changes: 3 additions & 1 deletion tests/test_cds_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def test_cds_request_land_cover(mock_retrieve, valid_path_config):
dataset,
time_bounds,
path,
SpatialBounds(54, 56, 1, 3),
overwrite,
)

Expand All @@ -139,7 +140,8 @@ def test_cds_request_land_cover(mock_retrieve, valid_path_config):
"variable": "all",
"format": "zip",
"year": "1996",
"version": "v2.0.7cds",
"version": "v2_0_7cds",
"area": [54, 3, 1, 56],
},
)

Expand Down
3 changes: 3 additions & 0 deletions tests/test_datasets/test_era5.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test data is still based on the old cds files. Might be good to replace those with the new format...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ def test_load(self):
np.testing.assert_allclose(ds.latitude.values, expected_lat)
np.testing.assert_allclose(ds.longitude.values, expected_lon)

# check if valid_time not in the dataset
assert "valid_time" not in ds.dims

def test_convert(self, dummy_dir):
"""Test convert function."""
_, era5_dataset = self.ingest_dummy_data(dummy_dir)
Expand Down
3 changes: 3 additions & 0 deletions tests/test_datasets/test_era5_land.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ def test_load(self):
np.testing.assert_allclose(ds.latitude.values, expected_lat)
np.testing.assert_allclose(ds.longitude.values, expected_lon)

# check if valid_time not in the dataset
assert "valid_time" not in ds.dims

def test_convert(self, dummy_dir):
"""Test convert function."""
_, era5_land_dataset = self.ingest_dummy_data(dummy_dir)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_datasets/test_fapar_lai.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def test_download(self, mock_retrieve, valid_path_config, dummy_dir):
"format": "zip",
"variable": "lai",
"horizontal_resolution": "1km",
"product_version": "V3",
"product_version": "v3",
"satellite": "proba",
"sensor": "vgt",
"month": "01",
Expand Down
Loading
Loading