Skip to content

Commit

Permalink
Extend refspec support to [path] entries (without offset/length) (#187)
Browse files Browse the repository at this point in the history
* Improve typing of ChunkEntry

* Handle kerchunk [path] with no offset/length

* Raise NotImplementedError on inlined data

* Explain the need for Dict type hint
  • Loading branch information
maresb committed Jul 22, 2024
1 parent 10ef7e5 commit 0ad4de5
Showing 1 changed file with 36 additions and 14 deletions.
50 changes: 36 additions & 14 deletions virtualizarr/manifests/manifest.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import json
import re
from collections.abc import Iterable, Iterator
from typing import Any, Callable, NewType, Tuple, Union, cast
from typing import Any, Callable, Dict, NewType, Tuple, TypedDict, cast

import numpy as np
from pydantic import BaseModel, ConfigDict
from upath import UPath

from virtualizarr.types import ChunkKey

Expand All @@ -15,7 +16,13 @@
_CHUNK_KEY = rf"^{_INTEGER}+({_SEPARATOR}{_INTEGER})*$" # matches 1 integer, optionally followed by more integers each separated by a separator (i.e. a period)


ChunkDict = NewType("ChunkDict", dict[ChunkKey, dict[str, Union[str, int]]])
class ChunkDictEntry(TypedDict):
path: str
offset: int
length: int


ChunkDict = NewType("ChunkDict", dict[ChunkKey, ChunkDictEntry])


class ChunkEntry(BaseModel):
Expand All @@ -35,16 +42,23 @@ def __repr__(self) -> str:
return f"ChunkEntry(path='{self.path}', offset={self.offset}, length={self.length})"

@classmethod
def from_kerchunk(cls, path_and_byte_range_info: list[str | int]) -> "ChunkEntry":
path, offset, length = path_and_byte_range_info
def from_kerchunk(
cls, path_and_byte_range_info: tuple[str] | tuple[str, int, int]
) -> "ChunkEntry":
if len(path_and_byte_range_info) == 1:
path = path_and_byte_range_info[0]
offset = 0
length = UPath(path).stat().st_size
else:
path, offset, length = path_and_byte_range_info
return ChunkEntry(path=path, offset=offset, length=length)

def to_kerchunk(self) -> list[str | int]:
def to_kerchunk(self) -> tuple[str, int, int]:
"""Write out in the format that kerchunk uses for chunk entries."""
return [self.path, self.offset, self.length]
return (self.path, self.offset, self.length)

def dict(self) -> dict[str, Union[str, int]]:
return dict(path=self.path, offset=self.offset, length=self.length)
def dict(self) -> ChunkDictEntry:
return ChunkDictEntry(path=self.path, offset=self.offset, length=self.length)


class ChunkManifest:
Expand Down Expand Up @@ -283,12 +297,20 @@ def to_zarr_json(self, filepath: str) -> None:
json.dump(entries, json_file, indent=4, separators=(", ", ": "))

@classmethod
def _from_kerchunk_chunk_dict(cls, kerchunk_chunk_dict) -> "ChunkManifest":
chunkentries = {
cast(ChunkKey, k): ChunkEntry.from_kerchunk(v).dict()
for k, v in kerchunk_chunk_dict.items()
}
return ChunkManifest(entries=cast(ChunkDict, chunkentries))
def _from_kerchunk_chunk_dict(
cls,
# The type hint requires `Dict` instead of `dict` due to
# the conflicting ChunkManifest.dict method.
kerchunk_chunk_dict: Dict[ChunkKey, str | tuple[str] | tuple[str, int, int]],
) -> "ChunkManifest":
chunk_entries: dict[ChunkKey, ChunkDictEntry] = {}
for k, v in kerchunk_chunk_dict.items():
if isinstance(v, (str, bytes)):
raise NotImplementedError("TODO: handle inlined data")
elif not isinstance(v, (tuple, list)):
raise TypeError(f"Unexpected type {type(v)} for chunk value: {v}")
chunk_entries[k] = ChunkEntry.from_kerchunk(v).dict()
return ChunkManifest(entries=chunk_entries)

def rename_paths(
self,
Expand Down

0 comments on commit 0ad4de5

Please sign in to comment.