Skip to content

Commit

Permalink
The validation of the chunks now is able to detect full or partial ch…
Browse files Browse the repository at this point in the history
…unk and raise a proper error based on the mode selected, it is also possible to use the auto region detection with the mode "a"
  • Loading branch information
josephnowak committed Sep 20, 2024
1 parent 0f14b77 commit a2a786b
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 32 deletions.
76 changes: 48 additions & 28 deletions xarray/backends/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def __getitem__(self, key):
# could possibly have a work-around for 0d data here


def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name, safe_chunks, region):
def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name, safe_chunks, region, mode):
"""
Given encoding chunks (possibly None or []) and variable chunks
(possibly None or []).
Expand Down Expand Up @@ -163,7 +163,7 @@ def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name, safe_chunks, regi

if len(enc_chunks_tuple) != ndim:
# throw away encoding chunks, start over
return _determine_zarr_chunks(None, var_chunks, ndim, name, safe_chunks, region)
return _determine_zarr_chunks(None, var_chunks, ndim, name, safe_chunks, region, mode)

for x in enc_chunks_tuple:
if not isinstance(x, int):
Expand All @@ -189,9 +189,19 @@ def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name, safe_chunks, regi
# TODO: incorporate synchronizer to allow writes from multiple dask
# threads
if var_chunks and enc_chunks_tuple:
# If it is possible to write on partial chunks then it is not necessary to check
# the last one contained on the region
allow_partial_chunks = True
end = -1
if mode == "r+":
# This mode forces to write only on full chunks, even on the last one
allow_partial_chunks = False
end = None

base_error = (
f"Specified zarr chunks encoding['chunks']={enc_chunks_tuple!r} for "
f"variable named {name!r} would overlap multiple dask chunks {var_chunks!r}. "
f"variable named {name!r} would overlap multiple dask chunks {var_chunks!r} "
f"on the region {region}. "
f"Writing this array in parallel with dask could lead to corrupted data."
f"Consider either rechunking using `chunk()`, deleting "
f"or modifying `encoding['chunks']`, or specify `safe_chunks=False`."
Expand All @@ -200,27 +210,27 @@ def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name, safe_chunks, regi
for zchunk, dchunks, interval in zip(
enc_chunks_tuple, var_chunks, region, strict=True
):
if not safe_chunks or len(dchunks) <= 1:
# It is not necessary to perform any additional validation if the
# safe_chunks is False, or there are less than two dchunks
if not safe_chunks:
continue

start = 0
# The first border size is the amount of data that needs to be updated on the
# first chunk taking into account the region slice.
first_border_size = zchunk
if interval.start:
# If the start of the interval is not None or 0, it means that the data
# is being appended or updated, and in both cases it is mandatory that
# the residue of the division between the first dchunk and the zchunk
# being equal to the border size
border_size = zchunk - interval.start % zchunk
if dchunks[0] % zchunk != border_size:
raise ValueError(base_error)
# Avoid validating the first chunk inside the loop
start = 1
first_border_size = zchunk - interval.start % zchunk

for dchunk in dchunks[start:-1]:
if dchunk % zchunk:
if not allow_partial_chunks and first_border_size < zchunk:
# If the border is smaller than zchunk, then it is a partial chunk write
raise ValueError(first_border_size)

for dchunk in dchunks[:end]:
if (dchunk - first_border_size) % zchunk:
raise ValueError(base_error)

# The first border is only useful during the first iteration,
# so ignore it in the next validations
first_border_size = 0

return enc_chunks_tuple

raise AssertionError("We should never get here. Function logic must be wrong.")
Expand Down Expand Up @@ -261,16 +271,24 @@ def _get_zarr_dims_and_attrs(zarr_obj, dimension_key, try_nczarr):


def extract_zarr_variable_encoding(
variable, region, raise_on_invalid=False, name=None, safe_chunks=True
variable,
raise_on_invalid=False,
name=None,
safe_chunks=True,
region=None,
mode=None
):
"""
Extract zarr encoding dictionary from xarray Variable
Parameters
----------
variable : Variable
region: tuple[slice]
region: tuple[slice], optional
raise_on_invalid : bool, optional
safe_chunks: bool, optional
name: str | Hashable, optional
mode: str, optional
Returns
-------
Expand Down Expand Up @@ -304,12 +322,13 @@ def extract_zarr_variable_encoding(
del encoding[k]

chunks = _determine_zarr_chunks(
encoding.get("chunks"),
variable.chunks,
variable.ndim,
name,
safe_chunks,
region,
enc_chunks=encoding.get("chunks"),
var_chunks=variable.chunks,
ndim=variable.ndim,
name=name,
safe_chunks=safe_chunks,
region=region,
mode=mode
)
encoding["chunks"] = chunks
return encoding
Expand Down Expand Up @@ -845,6 +864,7 @@ def set_variables(self, variables, check_encoding_set, writer, unlimited_dims=No
raise_on_invalid=vn in check_encoding_set,
name=vn,
safe_chunks=self._safe_chunks,
mode=self._mode
)

if name not in existing_keys:
Expand Down Expand Up @@ -927,9 +947,9 @@ def _validate_and_autodetect_region(self, ds) -> None:
if not isinstance(region, dict):
raise TypeError(f"``region`` must be a dict, got {type(region)}")
if any(v == "auto" for v in region.values()):
if self._mode != "r+":
if self._mode not in ["r+", "a"]:
raise ValueError(
f"``mode`` must be 'r+' when using ``region='auto'``, got {self._mode!r}"
f"``mode`` must be 'r+' or 'a' when using ``region='auto'``, got {self._mode!r}"
)
region = self._auto_detect_regions(ds, region)

Expand Down
8 changes: 8 additions & 0 deletions xarray/core/dataarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -4304,6 +4304,14 @@ def to_zarr(
if Zarr arrays are written in parallel. This option may be useful in combination
with ``compute=False`` to initialize a Zarr store from an existing
DataArray with arbitrary chunk structure.
In addition to the many-to-one relationship validation, it also detects partial
chunks writes when using the region parameter,
these partial chunks are considered unsafe in the mode "r+" but safe in
the mode "a".
Note: Even with these validations it can still be unsafe to write
two or more chunked arrays in the same location in parallel if they are
not writing in independent regions, for those cases it is better to use
a synchronizer.
storage_options : dict, optional
Any additional parameters for the storage backend (ignored for local
paths).
Expand Down
8 changes: 8 additions & 0 deletions xarray/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2509,6 +2509,14 @@ def to_zarr(
if Zarr arrays are written in parallel. This option may be useful in combination
with ``compute=False`` to initialize a Zarr from an existing
Dataset with arbitrary chunk structure.
In addition to the many-to-one relationship validation, it also detects partial
chunks writes when using the region parameter,
these partial chunks are considered unsafe in the mode "r+" but safe in
the mode "a".
Note: Even with these validations it can still be unsafe to write
two or more chunked arrays in the same location in parallel if they are
not writing in independent regions, for those cases it is better to use
a synchronizer.
storage_options : dict, optional
Any additional parameters for the storage backend (ignored for local
paths).
Expand Down
83 changes: 79 additions & 4 deletions xarray/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -5991,9 +5991,10 @@ def test_zarr_region_append(self, tmp_path):
}
)

# Don't allow auto region detection in append mode due to complexities in
# implementing the overlap logic and lack of safety with parallel writes
with pytest.raises(ValueError):
# Now it is valid to use auto region detection with the append mode,
# but it is still unsafe to modify dimensions or metadata using the region
# parameter.
with pytest.raises(KeyError):
ds_new.to_zarr(
tmp_path / "test.zarr", mode="a", append_dim="x", region="auto"
)
Expand Down Expand Up @@ -6105,7 +6106,6 @@ def test_zarr_region_chunk_partial_offset(tmp_path):
@requires_zarr
@requires_dask
def test_zarr_safe_chunk_append_dim(tmp_path):
# https://github.com/pydata/xarray/pull/8459#issuecomment-1819417545
store = tmp_path / "foo.zarr"
data = np.ones((20,))
da = xr.DataArray(data, dims=["x"], coords={"x": range(20)}, name="foo").chunk(x=5)
Expand Down Expand Up @@ -6151,3 +6151,78 @@ def test_zarr_safe_chunk_append_dim(tmp_path):
# and it does not matter the size of the chunk
da.isel(x=slice(7, 19)).chunk(x=-1).to_zarr(store, append_dim="x", safe_chunks=True)
assert xr.open_zarr(store)["foo"].equals(da.isel(x=slice(0, 19)))


@requires_zarr
@requires_dask
def test_zarr_safe_chunk_region(tmp_path):
store = tmp_path / "foo.zarr"

arr = xr.DataArray(
list(range(10)),
dims=["a"],
coords={"a": list(range(10))},
name="foo"
).chunk(a=3)
arr.to_zarr(store, mode="w")

for mode in ["r+", "a"]:
with pytest.raises(ValueError):
# There are two Dask chunks on the same Zarr chunk,
# which means that it is unsafe in any mode
arr.isel(a=slice(0, 3)).chunk(a=(2, 1)).to_zarr(store, region="auto", mode=mode)

with pytest.raises(ValueError):
# the first chunk is covering the border size, but it is not
# completely covering the second chunk, which means that it is
# unsafe in any mode
arr.isel(a=slice(1, 5)).chunk(a=(3, 1)).to_zarr(store, region="auto", mode=mode)

with pytest.raises(ValueError):
# The first chunk is safe but the other two chunks are overlapping with
# the same Zarr chunk
arr.isel(a=slice(0, 5)).chunk(a=(3, 1, 1)).to_zarr(store, region="auto", mode=mode)

# Fully update two contiguous chunks is safe in any mode
arr.isel(a=slice(3, 9)).to_zarr(store, region="auto", mode=mode)

# Write the last chunk partially is safe in "a" mode
arr.isel(a=slice(3, 8)).to_zarr(store, region="auto", mode="a")
with pytest.raises(ValueError):
# with "r+" mode it is invalid to write partial chunk even on the last one
arr.isel(a=slice(3, 8)).to_zarr(store, region="auto", mode="r+")

# This is safe with mode "a", the border size is covered by the first chunk of Dask
arr.isel(a=slice(1, 4)).chunk(a=(2, 1)).to_zarr(store, region="auto", mode="a")

with pytest.raises(ValueError):
# This is considered unsafe in mode "r+" because it is writing in a partial chunk
arr.isel(a=slice(1, 4)).chunk(a=(2, 1)).to_zarr(store, region="auto", mode="r+")

# This is safe on mode "a" because there is a single dask chunk
arr.isel(a=slice(1, 5)).chunk(a=(4,)).to_zarr(store, region="auto", mode="a")

with pytest.raises(ValueError):
# This is unsafe on mode "r+", because there is a single dask
# chunk smaller than the Zarr chunk
arr.isel(a=slice(1, 5)).chunk(a=(4,)).to_zarr(store, region="auto", mode="r+")

# The first chunk is completely covering the first Zarr chunk
# and the last chunk is a partial chunk
arr.isel(a=slice(0, 5)).chunk(a=(3, 2)).to_zarr(store, region="auto", mode="a")

with pytest.raises(ValueError):
# The last chunk is partial, so it is considered unsafe on mode "r+"
arr.isel(a=slice(0, 5)).chunk(a=(3, 2)).to_zarr(store, region="auto", mode="r+")

# The first chunk is covering the border size (2 elements)
# and also the second chunk (3 elements), so it is valid
arr.isel(a=slice(1, 8)).chunk(a=(5, 2)).to_zarr(store, region="auto", mode="a")

with pytest.raises(ValueError):
# The first chunk is not fully covering the first zarr chunk
arr.isel(a=slice(1, 8)).chunk(a=(5, 2)).to_zarr(store, region="auto", mode="r+")

with pytest.raises(ValueError):
# Validate that the border condition is not affecting the "r+" mode
arr.isel(a=slice(1, 9)).to_zarr(store, region="auto", mode="r+")

0 comments on commit a2a786b

Please sign in to comment.