From a2a786bcbf0bd0692dcbab2e9196cb0379c70d0a Mon Sep 17 00:00:00 2001 From: Joseph Gonzalez Date: Fri, 20 Sep 2024 16:15:50 -0400 Subject: [PATCH] The validation of the chunks now is able to detect full or partial chunk and raise a proper error based on the mode selected, it is also possible to use the auto region detection with the mode "a" --- xarray/backends/zarr.py | 76 ++++++++++++++++++++------------ xarray/core/dataarray.py | 8 ++++ xarray/core/dataset.py | 8 ++++ xarray/tests/test_backends.py | 83 +++++++++++++++++++++++++++++++++-- 4 files changed, 143 insertions(+), 32 deletions(-) diff --git a/xarray/backends/zarr.py b/xarray/backends/zarr.py index af289d2ea7b..98936aae31a 100644 --- a/xarray/backends/zarr.py +++ b/xarray/backends/zarr.py @@ -112,7 +112,7 @@ def __getitem__(self, key): # could possibly have a work-around for 0d data here -def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name, safe_chunks, region): +def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name, safe_chunks, region, mode): """ Given encoding chunks (possibly None or []) and variable chunks (possibly None or []). @@ -163,7 +163,7 @@ def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name, safe_chunks, regi if len(enc_chunks_tuple) != ndim: # throw away encoding chunks, start over - return _determine_zarr_chunks(None, var_chunks, ndim, name, safe_chunks, region) + return _determine_zarr_chunks(None, var_chunks, ndim, name, safe_chunks, region, mode) for x in enc_chunks_tuple: if not isinstance(x, int): @@ -189,9 +189,19 @@ def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name, safe_chunks, regi # TODO: incorporate synchronizer to allow writes from multiple dask # threads if var_chunks and enc_chunks_tuple: + # If it is possible to write on partial chunks then it is not necessary to check + # the last one contained on the region + allow_partial_chunks = True + end = -1 + if mode == "r+": + # This mode forces to write only on full chunks, even on the last one + allow_partial_chunks = False + end = None + base_error = ( f"Specified zarr chunks encoding['chunks']={enc_chunks_tuple!r} for " - f"variable named {name!r} would overlap multiple dask chunks {var_chunks!r}. " + f"variable named {name!r} would overlap multiple dask chunks {var_chunks!r} " + f"on the region {region}. " f"Writing this array in parallel with dask could lead to corrupted data." f"Consider either rechunking using `chunk()`, deleting " f"or modifying `encoding['chunks']`, or specify `safe_chunks=False`." @@ -200,27 +210,27 @@ def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name, safe_chunks, regi for zchunk, dchunks, interval in zip( enc_chunks_tuple, var_chunks, region, strict=True ): - if not safe_chunks or len(dchunks) <= 1: - # It is not necessary to perform any additional validation if the - # safe_chunks is False, or there are less than two dchunks + if not safe_chunks: continue - start = 0 + # The first border size is the amount of data that needs to be updated on the + # first chunk taking into account the region slice. + first_border_size = zchunk if interval.start: - # If the start of the interval is not None or 0, it means that the data - # is being appended or updated, and in both cases it is mandatory that - # the residue of the division between the first dchunk and the zchunk - # being equal to the border size - border_size = zchunk - interval.start % zchunk - if dchunks[0] % zchunk != border_size: - raise ValueError(base_error) - # Avoid validating the first chunk inside the loop - start = 1 + first_border_size = zchunk - interval.start % zchunk - for dchunk in dchunks[start:-1]: - if dchunk % zchunk: + if not allow_partial_chunks and first_border_size < zchunk: + # If the border is smaller than zchunk, then it is a partial chunk write + raise ValueError(first_border_size) + + for dchunk in dchunks[:end]: + if (dchunk - first_border_size) % zchunk: raise ValueError(base_error) + # The first border is only useful during the first iteration, + # so ignore it in the next validations + first_border_size = 0 + return enc_chunks_tuple raise AssertionError("We should never get here. Function logic must be wrong.") @@ -261,7 +271,12 @@ def _get_zarr_dims_and_attrs(zarr_obj, dimension_key, try_nczarr): def extract_zarr_variable_encoding( - variable, region, raise_on_invalid=False, name=None, safe_chunks=True + variable, + raise_on_invalid=False, + name=None, + safe_chunks=True, + region=None, + mode=None ): """ Extract zarr encoding dictionary from xarray Variable @@ -269,8 +284,11 @@ def extract_zarr_variable_encoding( Parameters ---------- variable : Variable - region: tuple[slice] + region: tuple[slice], optional raise_on_invalid : bool, optional + safe_chunks: bool, optional + name: str | Hashable, optional + mode: str, optional Returns ------- @@ -304,12 +322,13 @@ def extract_zarr_variable_encoding( del encoding[k] chunks = _determine_zarr_chunks( - encoding.get("chunks"), - variable.chunks, - variable.ndim, - name, - safe_chunks, - region, + enc_chunks=encoding.get("chunks"), + var_chunks=variable.chunks, + ndim=variable.ndim, + name=name, + safe_chunks=safe_chunks, + region=region, + mode=mode ) encoding["chunks"] = chunks return encoding @@ -845,6 +864,7 @@ def set_variables(self, variables, check_encoding_set, writer, unlimited_dims=No raise_on_invalid=vn in check_encoding_set, name=vn, safe_chunks=self._safe_chunks, + mode=self._mode ) if name not in existing_keys: @@ -927,9 +947,9 @@ def _validate_and_autodetect_region(self, ds) -> None: if not isinstance(region, dict): raise TypeError(f"``region`` must be a dict, got {type(region)}") if any(v == "auto" for v in region.values()): - if self._mode != "r+": + if self._mode not in ["r+", "a"]: raise ValueError( - f"``mode`` must be 'r+' when using ``region='auto'``, got {self._mode!r}" + f"``mode`` must be 'r+' or 'a' when using ``region='auto'``, got {self._mode!r}" ) region = self._auto_detect_regions(ds, region) diff --git a/xarray/core/dataarray.py b/xarray/core/dataarray.py index 37369afbf96..1a308213ab3 100644 --- a/xarray/core/dataarray.py +++ b/xarray/core/dataarray.py @@ -4304,6 +4304,14 @@ def to_zarr( if Zarr arrays are written in parallel. This option may be useful in combination with ``compute=False`` to initialize a Zarr store from an existing DataArray with arbitrary chunk structure. + In addition to the many-to-one relationship validation, it also detects partial + chunks writes when using the region parameter, + these partial chunks are considered unsafe in the mode "r+" but safe in + the mode "a". + Note: Even with these validations it can still be unsafe to write + two or more chunked arrays in the same location in parallel if they are + not writing in independent regions, for those cases it is better to use + a synchronizer. storage_options : dict, optional Any additional parameters for the storage backend (ignored for local paths). diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index 7b9b4819245..b1ce264cbc8 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -2509,6 +2509,14 @@ def to_zarr( if Zarr arrays are written in parallel. This option may be useful in combination with ``compute=False`` to initialize a Zarr from an existing Dataset with arbitrary chunk structure. + In addition to the many-to-one relationship validation, it also detects partial + chunks writes when using the region parameter, + these partial chunks are considered unsafe in the mode "r+" but safe in + the mode "a". + Note: Even with these validations it can still be unsafe to write + two or more chunked arrays in the same location in parallel if they are + not writing in independent regions, for those cases it is better to use + a synchronizer. storage_options : dict, optional Any additional parameters for the storage backend (ignored for local paths). diff --git a/xarray/tests/test_backends.py b/xarray/tests/test_backends.py index beaf22826ec..a7f13c12f8a 100644 --- a/xarray/tests/test_backends.py +++ b/xarray/tests/test_backends.py @@ -5991,9 +5991,10 @@ def test_zarr_region_append(self, tmp_path): } ) - # Don't allow auto region detection in append mode due to complexities in - # implementing the overlap logic and lack of safety with parallel writes - with pytest.raises(ValueError): + # Now it is valid to use auto region detection with the append mode, + # but it is still unsafe to modify dimensions or metadata using the region + # parameter. + with pytest.raises(KeyError): ds_new.to_zarr( tmp_path / "test.zarr", mode="a", append_dim="x", region="auto" ) @@ -6105,7 +6106,6 @@ def test_zarr_region_chunk_partial_offset(tmp_path): @requires_zarr @requires_dask def test_zarr_safe_chunk_append_dim(tmp_path): - # https://github.com/pydata/xarray/pull/8459#issuecomment-1819417545 store = tmp_path / "foo.zarr" data = np.ones((20,)) da = xr.DataArray(data, dims=["x"], coords={"x": range(20)}, name="foo").chunk(x=5) @@ -6151,3 +6151,78 @@ def test_zarr_safe_chunk_append_dim(tmp_path): # and it does not matter the size of the chunk da.isel(x=slice(7, 19)).chunk(x=-1).to_zarr(store, append_dim="x", safe_chunks=True) assert xr.open_zarr(store)["foo"].equals(da.isel(x=slice(0, 19))) + + +@requires_zarr +@requires_dask +def test_zarr_safe_chunk_region(tmp_path): + store = tmp_path / "foo.zarr" + + arr = xr.DataArray( + list(range(10)), + dims=["a"], + coords={"a": list(range(10))}, + name="foo" + ).chunk(a=3) + arr.to_zarr(store, mode="w") + + for mode in ["r+", "a"]: + with pytest.raises(ValueError): + # There are two Dask chunks on the same Zarr chunk, + # which means that it is unsafe in any mode + arr.isel(a=slice(0, 3)).chunk(a=(2, 1)).to_zarr(store, region="auto", mode=mode) + + with pytest.raises(ValueError): + # the first chunk is covering the border size, but it is not + # completely covering the second chunk, which means that it is + # unsafe in any mode + arr.isel(a=slice(1, 5)).chunk(a=(3, 1)).to_zarr(store, region="auto", mode=mode) + + with pytest.raises(ValueError): + # The first chunk is safe but the other two chunks are overlapping with + # the same Zarr chunk + arr.isel(a=slice(0, 5)).chunk(a=(3, 1, 1)).to_zarr(store, region="auto", mode=mode) + + # Fully update two contiguous chunks is safe in any mode + arr.isel(a=slice(3, 9)).to_zarr(store, region="auto", mode=mode) + + # Write the last chunk partially is safe in "a" mode + arr.isel(a=slice(3, 8)).to_zarr(store, region="auto", mode="a") + with pytest.raises(ValueError): + # with "r+" mode it is invalid to write partial chunk even on the last one + arr.isel(a=slice(3, 8)).to_zarr(store, region="auto", mode="r+") + + # This is safe with mode "a", the border size is covered by the first chunk of Dask + arr.isel(a=slice(1, 4)).chunk(a=(2, 1)).to_zarr(store, region="auto", mode="a") + + with pytest.raises(ValueError): + # This is considered unsafe in mode "r+" because it is writing in a partial chunk + arr.isel(a=slice(1, 4)).chunk(a=(2, 1)).to_zarr(store, region="auto", mode="r+") + + # This is safe on mode "a" because there is a single dask chunk + arr.isel(a=slice(1, 5)).chunk(a=(4,)).to_zarr(store, region="auto", mode="a") + + with pytest.raises(ValueError): + # This is unsafe on mode "r+", because there is a single dask + # chunk smaller than the Zarr chunk + arr.isel(a=slice(1, 5)).chunk(a=(4,)).to_zarr(store, region="auto", mode="r+") + + # The first chunk is completely covering the first Zarr chunk + # and the last chunk is a partial chunk + arr.isel(a=slice(0, 5)).chunk(a=(3, 2)).to_zarr(store, region="auto", mode="a") + + with pytest.raises(ValueError): + # The last chunk is partial, so it is considered unsafe on mode "r+" + arr.isel(a=slice(0, 5)).chunk(a=(3, 2)).to_zarr(store, region="auto", mode="r+") + + # The first chunk is covering the border size (2 elements) + # and also the second chunk (3 elements), so it is valid + arr.isel(a=slice(1, 8)).chunk(a=(5, 2)).to_zarr(store, region="auto", mode="a") + + with pytest.raises(ValueError): + # The first chunk is not fully covering the first zarr chunk + arr.isel(a=slice(1, 8)).chunk(a=(5, 2)).to_zarr(store, region="auto", mode="r+") + + with pytest.raises(ValueError): + # Validate that the border condition is not affecting the "r+" mode + arr.isel(a=slice(1, 9)).to_zarr(store, region="auto", mode="r+")