From 18b53bd0097e462cbf63947e006e892f4fd32c20 Mon Sep 17 00:00:00 2001 From: ayushnag <35325113+ayushnag@users.noreply.github.com> Date: Mon, 13 May 2024 11:51:14 -0700 Subject: [PATCH 01/17] basic dmr parsing functionality --- virtualizarr/dmrpp.py | 94 ++++++++++++++++++++++++++++++++++++++++++ virtualizarr/xarray.py | 5 +++ 2 files changed, 99 insertions(+) create mode 100644 virtualizarr/dmrpp.py diff --git a/virtualizarr/dmrpp.py b/virtualizarr/dmrpp.py new file mode 100644 index 0000000..d82fba0 --- /dev/null +++ b/virtualizarr/dmrpp.py @@ -0,0 +1,94 @@ +import ast +import numpy as np +import xarray as xr + +from virtualizarr.zarr import ZArray +from xml.etree import ElementTree as ET +from virtualizarr.manifests import ManifestArray + +class DMRParser: + dap_namespace = "{http://xml.opendap.org/ns/DAP/4.0#}" + dmr_namespace = "{http://xml.opendap.org/dap/dmrpp/1.0.0#}" + dap_npdtype = {"Byte": "uint8", "UByte": "uint8","Int8": "int8","UInt8": "uint8","Int16": "int16","UInt16": "uint16","Int32": "int32","UInt32": "uint32", + "Int64": "int64", "UInt64": "uint64", "Url": "str", "Float32": "float32", "Float64": "float64", "String": "str" + } + + def __init__(self, dmr: str): + self.root = ET.fromstring(dmr) + self.data_filepath = self.root.attrib['name'] + self.global_dims = {} + + def parse_dataset(self): + # find all dimension names and sizes + for d in self.root.iterfind(self.dap_namespace + "Dimension"): + self.global_dims[d.attrib["name"]] = int(d.attrib["size"]) + vars_tags = [] + for dap_dtype in self.dap_npdtype: + vars_tags += self.root.findall(self.dap_namespace + dap_dtype) + # find all coordinate names (using Map tags) + coord_names = set() + for var_tag in vars_tags: + for map_tag in var_tag.iterfind(self.dap_namespace + "Map"): + coord_names.add(map_tag.attrib["name"].removeprefix('/')) + coords = {} + data_vars = {} + for var_tag in vars_tags: + if var_tag.attrib["name"] in coord_names: + coords[var_tag.attrib["name"]] = self.parse_variable(var_tag) + # if len(coords[v.attrib['name']].dims) == 1: + # dim1d, *_ = coords[v.attrib['name']].dims + # indexes[v.attrib['name']] = PandasIndex(coords[v.attrib['name']], dim1d) + else: + data_vars[var_tag.attrib['name']] = self.parse_variable(var_tag) + # find all dataset attributes + attrs = {} + for attr_tag in self.root.iterfind(self.dap_namespace + "Attribute"): + if attr_tag.attrib["type"] != "Container": + attrs.update(self.parse_attribute(attr_tag)) + return xr.Dataset(data_vars=data_vars, coords=xr.Coordinates(coords=coords, indexes={}), attrs=attrs) + + def parse_variable(self, root) -> xr.Variable: + # parse dimensions + dims = [] + for d in root.iterfind(self.dap_namespace + "Dim"): + dims.append(d.attrib["name"].removeprefix('/')) + shape = tuple([self.global_dims[d] for d in dims]) + # parse chunks + chunks = shape + chunks_tag = root.find(self.dmr_namespace + "chunks") + if chunks_tag.find(self.dmr_namespace + "chunkDimensionSizes") is not None: + dim_str = chunks_tag.find(self.dmr_namespace + "chunkDimensionSizes").text + chunks = tuple(map(int, dim_str.split())) + chunkmanifest = self.parse_chunks(chunks_tag, chunks) + # parse attributes + attrs = {} + for a in root.iterfind(self.dap_namespace + "Attribute"): + attrs.update(self.parse_attribute(a)) + # create ManifestArray and ZArray + dtype = np.dtype(self.dap_npdtype[root.tag.removeprefix(self.dap_namespace)]) + fill_value = attrs["_FillValue"] if "_FillValue" in attrs and attrs["_FillValue"] != '*' else None + zarray = ZArray(chunks=chunks, dtype=dtype, fill_value=fill_value, order='C', shape=shape, zarr_format=3) + marr = ManifestArray(zarray=zarray, chunkmanifest=chunkmanifest) + # create encoding dict (and remove those keys from attrs) + encoding_keys = {"_FillValue", "missing_value", "scale_factor", "add_offset"} + encoding = {key: value for key, value in attrs.items() if key in encoding_keys} + attrs = {key: value for key, value in attrs.items() if key not in encoding_keys} + return xr.Variable(dims=dims, data=marr, attrs=attrs, encoding=encoding) + + def parse_attribute(self, root) -> dict: + attr = {} + values = [] + # if multiple Value tags are present, store as "key": "[v1, v2, ...]" + for r in root: + values.append(r.text) + attr[root.attrib["name"]] = values[0] if len(values) == 1 else str(values) + return attr + + def parse_chunks(self, root, chunks: tuple) -> dict: + chunkmanifest = {} + for r in root.iterfind(self.dmr_namespace + "chunk"): + chunk_pos = np.zeros(len(chunks), dtype=int) if "chunkPositionInArray" not in r.attrib else np.asarray(ast.literal_eval(r.attrib["chunkPositionInArray"])) + chunk_num = chunk_pos // chunks # [0,1023,10235] // [1, 1023, 2047] -> [0,1,5] + chunk_key = '.'.join(map(str, chunk_num)) # [0,0,1] -> "0.0.1" + chunkmanifest[chunk_key] = {"path": self.data_filepath, "offset": int(r.attrib["offset"]), "length": int(r.attrib["nBytes"])} + return chunkmanifest \ No newline at end of file diff --git a/virtualizarr/xarray.py b/virtualizarr/xarray.py index 8ab69d0..cffae94 100644 --- a/virtualizarr/xarray.py +++ b/virtualizarr/xarray.py @@ -16,6 +16,7 @@ from xarray.backends import BackendArray from xarray.core.indexes import Index, PandasIndex from xarray.core.variable import IndexVariable +from virtualizarr.dmrpp import DMRParser import virtualizarr.kerchunk as kerchunk from virtualizarr.kerchunk import FileType, KerchunkStoreRefs @@ -99,6 +100,10 @@ def open_virtual_dataset( return open_virtual_dataset_from_v3_store( storepath=filepath, drop_variables=drop_variables, indexes=indexes ) + if filetype == "dmr++": + with open(filepath, "rb") as f: + parser = DMRParser(f.read()) + return parser.parse_dataset() else: # this is the only place we actually always need to use kerchunk directly # TODO avoid even reading byte ranges for variables that will be dropped later anyway? From 47d89018f295056396e842aadcf88b3d1cd537ec Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 14 May 2024 15:17:08 +0000 Subject: [PATCH 02/17] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- virtualizarr/dmrpp.py | 78 ++++++++++++++++++++++++++++++++---------- virtualizarr/xarray.py | 2 +- 2 files changed, 60 insertions(+), 20 deletions(-) diff --git a/virtualizarr/dmrpp.py b/virtualizarr/dmrpp.py index d82fba0..7d29d54 100644 --- a/virtualizarr/dmrpp.py +++ b/virtualizarr/dmrpp.py @@ -1,21 +1,36 @@ import ast +from xml.etree import ElementTree as ET + import numpy as np import xarray as xr +from virtualizarr.manifests import ManifestArray from virtualizarr.zarr import ZArray -from xml.etree import ElementTree as ET -from virtualizarr.manifests import ManifestArray + class DMRParser: dap_namespace = "{http://xml.opendap.org/ns/DAP/4.0#}" dmr_namespace = "{http://xml.opendap.org/dap/dmrpp/1.0.0#}" - dap_npdtype = {"Byte": "uint8", "UByte": "uint8","Int8": "int8","UInt8": "uint8","Int16": "int16","UInt16": "uint16","Int32": "int32","UInt32": "uint32", - "Int64": "int64", "UInt64": "uint64", "Url": "str", "Float32": "float32", "Float64": "float64", "String": "str" + dap_npdtype = { + "Byte": "uint8", + "UByte": "uint8", + "Int8": "int8", + "UInt8": "uint8", + "Int16": "int16", + "UInt16": "uint16", + "Int32": "int32", + "UInt32": "uint32", + "Int64": "int64", + "UInt64": "uint64", + "Url": "str", + "Float32": "float32", + "Float64": "float64", + "String": "str", } def __init__(self, dmr: str): self.root = ET.fromstring(dmr) - self.data_filepath = self.root.attrib['name'] + self.data_filepath = self.root.attrib["name"] self.global_dims = {} def parse_dataset(self): @@ -29,7 +44,7 @@ def parse_dataset(self): coord_names = set() for var_tag in vars_tags: for map_tag in var_tag.iterfind(self.dap_namespace + "Map"): - coord_names.add(map_tag.attrib["name"].removeprefix('/')) + coord_names.add(map_tag.attrib["name"].removeprefix("/")) coords = {} data_vars = {} for var_tag in vars_tags: @@ -39,19 +54,23 @@ def parse_dataset(self): # dim1d, *_ = coords[v.attrib['name']].dims # indexes[v.attrib['name']] = PandasIndex(coords[v.attrib['name']], dim1d) else: - data_vars[var_tag.attrib['name']] = self.parse_variable(var_tag) + data_vars[var_tag.attrib["name"]] = self.parse_variable(var_tag) # find all dataset attributes attrs = {} for attr_tag in self.root.iterfind(self.dap_namespace + "Attribute"): if attr_tag.attrib["type"] != "Container": attrs.update(self.parse_attribute(attr_tag)) - return xr.Dataset(data_vars=data_vars, coords=xr.Coordinates(coords=coords, indexes={}), attrs=attrs) - + return xr.Dataset( + data_vars=data_vars, + coords=xr.Coordinates(coords=coords, indexes={}), + attrs=attrs, + ) + def parse_variable(self, root) -> xr.Variable: # parse dimensions dims = [] for d in root.iterfind(self.dap_namespace + "Dim"): - dims.append(d.attrib["name"].removeprefix('/')) + dims.append(d.attrib["name"].removeprefix("/")) shape = tuple([self.global_dims[d] for d in dims]) # parse chunks chunks = shape @@ -66,15 +85,26 @@ def parse_variable(self, root) -> xr.Variable: attrs.update(self.parse_attribute(a)) # create ManifestArray and ZArray dtype = np.dtype(self.dap_npdtype[root.tag.removeprefix(self.dap_namespace)]) - fill_value = attrs["_FillValue"] if "_FillValue" in attrs and attrs["_FillValue"] != '*' else None - zarray = ZArray(chunks=chunks, dtype=dtype, fill_value=fill_value, order='C', shape=shape, zarr_format=3) + fill_value = ( + attrs["_FillValue"] + if "_FillValue" in attrs and attrs["_FillValue"] != "*" + else None + ) + zarray = ZArray( + chunks=chunks, + dtype=dtype, + fill_value=fill_value, + order="C", + shape=shape, + zarr_format=3, + ) marr = ManifestArray(zarray=zarray, chunkmanifest=chunkmanifest) # create encoding dict (and remove those keys from attrs) encoding_keys = {"_FillValue", "missing_value", "scale_factor", "add_offset"} encoding = {key: value for key, value in attrs.items() if key in encoding_keys} attrs = {key: value for key, value in attrs.items() if key not in encoding_keys} return xr.Variable(dims=dims, data=marr, attrs=attrs, encoding=encoding) - + def parse_attribute(self, root) -> dict: attr = {} values = [] @@ -83,12 +113,22 @@ def parse_attribute(self, root) -> dict: values.append(r.text) attr[root.attrib["name"]] = values[0] if len(values) == 1 else str(values) return attr - + def parse_chunks(self, root, chunks: tuple) -> dict: chunkmanifest = {} for r in root.iterfind(self.dmr_namespace + "chunk"): - chunk_pos = np.zeros(len(chunks), dtype=int) if "chunkPositionInArray" not in r.attrib else np.asarray(ast.literal_eval(r.attrib["chunkPositionInArray"])) - chunk_num = chunk_pos // chunks # [0,1023,10235] // [1, 1023, 2047] -> [0,1,5] - chunk_key = '.'.join(map(str, chunk_num)) # [0,0,1] -> "0.0.1" - chunkmanifest[chunk_key] = {"path": self.data_filepath, "offset": int(r.attrib["offset"]), "length": int(r.attrib["nBytes"])} - return chunkmanifest \ No newline at end of file + chunk_pos = ( + np.zeros(len(chunks), dtype=int) + if "chunkPositionInArray" not in r.attrib + else np.asarray(ast.literal_eval(r.attrib["chunkPositionInArray"])) + ) + chunk_num = ( + chunk_pos // chunks + ) # [0,1023,10235] // [1, 1023, 2047] -> [0,1,5] + chunk_key = ".".join(map(str, chunk_num)) # [0,0,1] -> "0.0.1" + chunkmanifest[chunk_key] = { + "path": self.data_filepath, + "offset": int(r.attrib["offset"]), + "length": int(r.attrib["nBytes"]), + } + return chunkmanifest diff --git a/virtualizarr/xarray.py b/virtualizarr/xarray.py index cffae94..1c7e762 100644 --- a/virtualizarr/xarray.py +++ b/virtualizarr/xarray.py @@ -16,9 +16,9 @@ from xarray.backends import BackendArray from xarray.core.indexes import Index, PandasIndex from xarray.core.variable import IndexVariable -from virtualizarr.dmrpp import DMRParser import virtualizarr.kerchunk as kerchunk +from virtualizarr.dmrpp import DMRParser from virtualizarr.kerchunk import FileType, KerchunkStoreRefs from virtualizarr.manifests import ChunkManifest, ManifestArray from virtualizarr.zarr import ( From aaf6af212d145e1a94fefc53d0e2b045f3f8023d Mon Sep 17 00:00:00 2001 From: Alex Goodman Date: Tue, 14 May 2024 18:03:04 -0500 Subject: [PATCH 03/17] Speedup DMR chunk key parsing --- virtualizarr/dmrpp.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/virtualizarr/dmrpp.py b/virtualizarr/dmrpp.py index 7d29d54..203ce38 100644 --- a/virtualizarr/dmrpp.py +++ b/virtualizarr/dmrpp.py @@ -1,4 +1,3 @@ -import ast from xml.etree import ElementTree as ET import numpy as np @@ -116,16 +115,17 @@ def parse_attribute(self, root) -> dict: def parse_chunks(self, root, chunks: tuple) -> dict: chunkmanifest = {} + default_num = [0 for i in range(len(chunks))] + chunk_key_template = ".".join(["{}" for i in range(len(chunks))]) for r in root.iterfind(self.dmr_namespace + "chunk"): - chunk_pos = ( - np.zeros(len(chunks), dtype=int) - if "chunkPositionInArray" not in r.attrib - else np.asarray(ast.literal_eval(r.attrib["chunkPositionInArray"])) - ) - chunk_num = ( - chunk_pos // chunks - ) # [0,1023,10235] // [1, 1023, 2047] -> [0,1,5] - chunk_key = ".".join(map(str, chunk_num)) # [0,0,1] -> "0.0.1" + chunk_num = default_num + if "chunkPositionInArray" in r.attrib: + # [0,1023,10235] // [1, 1023, 2047] -> [0,1,5] + chunk_pos = r.attrib["chunkPositionInArray"][1:-1].split(",") + chunk_num = [int(chunk_pos[i]) // chunks[i] + for i in range(len(chunks))] + # [0,0,1] -> "0.0.1" + chunk_key = chunk_key_template.format(*chunk_num) chunkmanifest[chunk_key] = { "path": self.data_filepath, "offset": int(r.attrib["offset"]), From 7b81eeb88f2b52eea965104886fe661cfb96d099 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 14 May 2024 23:26:09 +0000 Subject: [PATCH 04/17] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- virtualizarr/dmrpp.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/virtualizarr/dmrpp.py b/virtualizarr/dmrpp.py index 203ce38..1e66c47 100644 --- a/virtualizarr/dmrpp.py +++ b/virtualizarr/dmrpp.py @@ -122,8 +122,7 @@ def parse_chunks(self, root, chunks: tuple) -> dict: if "chunkPositionInArray" in r.attrib: # [0,1023,10235] // [1, 1023, 2047] -> [0,1,5] chunk_pos = r.attrib["chunkPositionInArray"][1:-1].split(",") - chunk_num = [int(chunk_pos[i]) // chunks[i] - for i in range(len(chunks))] + chunk_num = [int(chunk_pos[i]) // chunks[i] for i in range(len(chunks))] # [0,0,1] -> "0.0.1" chunk_key = chunk_key_template.format(*chunk_num) chunkmanifest[chunk_key] = { From 8334d0ab31d98f175343ae2164a8a7b43fec17f9 Mon Sep 17 00:00:00 2001 From: ayushnag <35325113+ayushnag@users.noreply.github.com> Date: Thu, 16 May 2024 11:06:20 -0700 Subject: [PATCH 05/17] added groups, docs, and bug fixes --- virtualizarr/dmrpp.py | 133 -------------- virtualizarr/readers/dmrpp.py | 331 ++++++++++++++++++++++++++++++++++ virtualizarr/xarray.py | 14 +- 3 files changed, 341 insertions(+), 137 deletions(-) delete mode 100644 virtualizarr/dmrpp.py create mode 100644 virtualizarr/readers/dmrpp.py diff --git a/virtualizarr/dmrpp.py b/virtualizarr/dmrpp.py deleted file mode 100644 index 1e66c47..0000000 --- a/virtualizarr/dmrpp.py +++ /dev/null @@ -1,133 +0,0 @@ -from xml.etree import ElementTree as ET - -import numpy as np -import xarray as xr - -from virtualizarr.manifests import ManifestArray -from virtualizarr.zarr import ZArray - - -class DMRParser: - dap_namespace = "{http://xml.opendap.org/ns/DAP/4.0#}" - dmr_namespace = "{http://xml.opendap.org/dap/dmrpp/1.0.0#}" - dap_npdtype = { - "Byte": "uint8", - "UByte": "uint8", - "Int8": "int8", - "UInt8": "uint8", - "Int16": "int16", - "UInt16": "uint16", - "Int32": "int32", - "UInt32": "uint32", - "Int64": "int64", - "UInt64": "uint64", - "Url": "str", - "Float32": "float32", - "Float64": "float64", - "String": "str", - } - - def __init__(self, dmr: str): - self.root = ET.fromstring(dmr) - self.data_filepath = self.root.attrib["name"] - self.global_dims = {} - - def parse_dataset(self): - # find all dimension names and sizes - for d in self.root.iterfind(self.dap_namespace + "Dimension"): - self.global_dims[d.attrib["name"]] = int(d.attrib["size"]) - vars_tags = [] - for dap_dtype in self.dap_npdtype: - vars_tags += self.root.findall(self.dap_namespace + dap_dtype) - # find all coordinate names (using Map tags) - coord_names = set() - for var_tag in vars_tags: - for map_tag in var_tag.iterfind(self.dap_namespace + "Map"): - coord_names.add(map_tag.attrib["name"].removeprefix("/")) - coords = {} - data_vars = {} - for var_tag in vars_tags: - if var_tag.attrib["name"] in coord_names: - coords[var_tag.attrib["name"]] = self.parse_variable(var_tag) - # if len(coords[v.attrib['name']].dims) == 1: - # dim1d, *_ = coords[v.attrib['name']].dims - # indexes[v.attrib['name']] = PandasIndex(coords[v.attrib['name']], dim1d) - else: - data_vars[var_tag.attrib["name"]] = self.parse_variable(var_tag) - # find all dataset attributes - attrs = {} - for attr_tag in self.root.iterfind(self.dap_namespace + "Attribute"): - if attr_tag.attrib["type"] != "Container": - attrs.update(self.parse_attribute(attr_tag)) - return xr.Dataset( - data_vars=data_vars, - coords=xr.Coordinates(coords=coords, indexes={}), - attrs=attrs, - ) - - def parse_variable(self, root) -> xr.Variable: - # parse dimensions - dims = [] - for d in root.iterfind(self.dap_namespace + "Dim"): - dims.append(d.attrib["name"].removeprefix("/")) - shape = tuple([self.global_dims[d] for d in dims]) - # parse chunks - chunks = shape - chunks_tag = root.find(self.dmr_namespace + "chunks") - if chunks_tag.find(self.dmr_namespace + "chunkDimensionSizes") is not None: - dim_str = chunks_tag.find(self.dmr_namespace + "chunkDimensionSizes").text - chunks = tuple(map(int, dim_str.split())) - chunkmanifest = self.parse_chunks(chunks_tag, chunks) - # parse attributes - attrs = {} - for a in root.iterfind(self.dap_namespace + "Attribute"): - attrs.update(self.parse_attribute(a)) - # create ManifestArray and ZArray - dtype = np.dtype(self.dap_npdtype[root.tag.removeprefix(self.dap_namespace)]) - fill_value = ( - attrs["_FillValue"] - if "_FillValue" in attrs and attrs["_FillValue"] != "*" - else None - ) - zarray = ZArray( - chunks=chunks, - dtype=dtype, - fill_value=fill_value, - order="C", - shape=shape, - zarr_format=3, - ) - marr = ManifestArray(zarray=zarray, chunkmanifest=chunkmanifest) - # create encoding dict (and remove those keys from attrs) - encoding_keys = {"_FillValue", "missing_value", "scale_factor", "add_offset"} - encoding = {key: value for key, value in attrs.items() if key in encoding_keys} - attrs = {key: value for key, value in attrs.items() if key not in encoding_keys} - return xr.Variable(dims=dims, data=marr, attrs=attrs, encoding=encoding) - - def parse_attribute(self, root) -> dict: - attr = {} - values = [] - # if multiple Value tags are present, store as "key": "[v1, v2, ...]" - for r in root: - values.append(r.text) - attr[root.attrib["name"]] = values[0] if len(values) == 1 else str(values) - return attr - - def parse_chunks(self, root, chunks: tuple) -> dict: - chunkmanifest = {} - default_num = [0 for i in range(len(chunks))] - chunk_key_template = ".".join(["{}" for i in range(len(chunks))]) - for r in root.iterfind(self.dmr_namespace + "chunk"): - chunk_num = default_num - if "chunkPositionInArray" in r.attrib: - # [0,1023,10235] // [1, 1023, 2047] -> [0,1,5] - chunk_pos = r.attrib["chunkPositionInArray"][1:-1].split(",") - chunk_num = [int(chunk_pos[i]) // chunks[i] for i in range(len(chunks))] - # [0,0,1] -> "0.0.1" - chunk_key = chunk_key_template.format(*chunk_num) - chunkmanifest[chunk_key] = { - "path": self.data_filepath, - "offset": int(r.attrib["offset"]), - "length": int(r.attrib["nBytes"]), - } - return chunkmanifest diff --git a/virtualizarr/readers/dmrpp.py b/virtualizarr/readers/dmrpp.py new file mode 100644 index 0000000..02206e7 --- /dev/null +++ b/virtualizarr/readers/dmrpp.py @@ -0,0 +1,331 @@ +from typing import Optional +from xml.etree import ElementTree as ET + +import numpy as np +import xarray as xr + +from virtualizarr.manifests import ManifestArray +from virtualizarr.manifests.manifest import validate_chunk_keys +from virtualizarr.types import ChunkKey +from virtualizarr.zarr import ZArray + + +class DMRParser: + """ + Parses a DMR file and creates a virtual xr.Dataset. + Handles groups, dimensions, coordinates, data variables, encoding, chunk manifests, and attributes. + """ + + _ns = { + "dap": "http://xml.opendap.org/ns/DAP/4.0#", + "dmr": "http://xml.opendap.org/dap/dmrpp/1.0.0#", + } + dap_np_dtype = { + "Byte": "uint8", + "UByte": "uint8", + "Int8": "int8", + "UInt8": "uint8", + "Int16": "int16", + "UInt16": "uint16", + "Int32": "int32", + "UInt32": "uint32", + "Int64": "int64", + "UInt64": "uint64", + "Url": "object", + "Float32": "float32", + "Float64": "float64", + "String": "object", + } + + def __init__(self, dmr: str, data_filepath: Optional[str] = None): + """ + Initialize the DMRParser with the given DMR data and data file path. + + Parameters + ---------- + dmr : str + The DMR file contents as a string. + + data_filepath : str, optional + The path to the actual data file that will be set in the chunk manifests. + If None, the data file path is taken from the DMR file. + """ + self.root = ET.fromstring(dmr) + self.data_filepath = ( + data_filepath if data_filepath is not None else self.root.attrib["name"] + ) + self._global_dims: dict[str, int] = {} + self._group: str | None = None + + def parse(self, group: Optional[str] = None) -> xr.Dataset: + """ + Parse the dataset from the dmrpp file + + Parameters + ---------- + group : str + The group to parse. If None, the entire dataset is parsed. + + Returns + ------- + An xr.Dataset wrapping virtualized zarr arrays. + """ + self._group = group + if self._group is not None: + self._group = ( + "/" + self._group.strip("/") + "/" + ) # ensure group is in form "/a/b/" + if self.data_filepath.endswith(".h5"): + return self._parse_hdf5_dataset() + group_tags = self.root.findall("dap:Group", self._ns) + if len(group_tags) > 0 and self._group is not None: + return self._parse_netcdf4_group(group_tags) + return self._parse_dataset() + + def _parse_netcdf4_group(self, group_tags: list[ET.Element]) -> xr.Dataset: + """ + Parse the dataset from the netcdf4 based dmrpp with groups, starting at the given group. + Set root to the given group. + + Parameters + ---------- + group_tags : list[ET.element] + A list of ET elements representing the groups in the DMR file. + Each will be a tag. + Returns + ------- + xr.Dataset + """ + self.root = group_tags[0] + for group_tag in group_tags: + if self._group is not None and group_tag.attrib[ + "name" + ] == self._group.strip("/"): + self.root = group_tag + return self._parse_dataset() + + def _parse_hdf5_dataset(self) -> xr.Dataset: + """ + Parse the dataset from the HDF5 based dmrpp, starting at the given group. + Set root to the given group. + + Returns + ------- + xr.Dataset + """ + if self._group is None: + # NOTE: This will return an xr.DataTree with all groups in the future... + raise ValueError("HDF5 based DMR parsing requires a group to be specified") + # Make a new root containing only dims, vars, and attrs for the dataset specified by group + ds_root = ET.Element(self.root.tag, self.root.attrib) + dim_names: set[str] = set() + vars_tags: list[ET.Element] = [] + orignames = {} # store original names for renaming later + for dap_dtype in self.dap_np_dtype: + vars_tags += self.root.findall(f"dap:{dap_dtype}", self._ns) + # Add variables part of group to ds_root + for var_tag in vars_tags: + fullname_tag = var_tag.find( + "./dap:Attribute[@name='fullnamepath']/dap:Value", self._ns + ) + origname_tag = var_tag.find( + "./dap:Attribute[@name='origname']/dap:Value", self._ns + ) + if ( + fullname_tag is not None + and origname_tag is not None + and fullname_tag.text is not None + and origname_tag.text is not None + and fullname_tag.text == self._group + origname_tag.text + ): + ds_root.append(var_tag) + orignames[var_tag.attrib["name"]] = origname_tag.text + for dim_tag in var_tag.findall("dap:Dim", self._ns): + dim_names.add(dim_tag.attrib["name"][1:]) + # Add dimensions part of group to root2 + for dim_tag in self.root.iterfind("dap:Dimension", self._ns): + if dim_tag.attrib["name"] in dim_names: + ds_root.append(dim_tag) + # make an empty xml element + container_attr_tag: ET.Element = ET.Element("Attribute") + for attr_tag in self.root.findall("dap:Attribute", self._ns): + fullname_tag = attr_tag.find( + "./dap:Attribute[@name='fullnamepath']/dap:Value", self._ns + ) + if fullname_tag is not None and fullname_tag.text == self._group[:-1]: + container_attr_tag = attr_tag + # add all attributes for the group to the new root (except fullnamepath) + ds_root.extend( + [a for a in container_attr_tag if a.attrib["name"] != "fullnamepath"] + ) + self.root = ds_root + return self._parse_dataset().rename(orignames) + + def _parse_dataset(self) -> xr.Dataset: + """ + Parse the dataset using the root element of the DMR file. + + Returns + ------- + xr.Dataset + """ + # find all dimension names and sizes + for dim_tag in self.root.iterfind("dap:Dimension", self._ns): + self._global_dims[dim_tag.attrib["name"]] = int(dim_tag.attrib["size"]) + vars_tags: list[ET.Element] = [] + for dap_dtype in self.dap_np_dtype: + vars_tags += self.root.findall(f"dap:{dap_dtype}", self._ns) + # find all coordinate names (using Map tags and coordinates attribute) + coord_names: set[str] = set() + for var_tag in vars_tags: + coord_tag = var_tag.find( + "./dap:Attribute[@name='coordinates']/dap:Value", self._ns + ) + if coord_tag is not None and coord_tag.text is not None: + coord_names.update(coord_tag.text.split(" ")) + for map_tag in var_tag.iterfind("dap:Map", self._ns): + coord_names.add(map_tag.attrib["name"].removeprefix("/")) + # if no coord_names are found or coords don't include dims, dims are used as coords + if len(coord_names) == 0 or len(coord_names) < len(self._global_dims): + coord_names = set(self._global_dims.keys()) + # find all coords + data variables + coords: dict[str, xr.Variable] = {} + data_vars: dict[str, xr.Variable] = {} + for var_tag in vars_tags: + if var_tag.attrib["name"] in coord_names: + coords[var_tag.attrib["name"]] = self._parse_variable(var_tag) + else: + data_vars[var_tag.attrib["name"]] = self._parse_variable(var_tag) + # find all dataset attributes + attrs: dict[str, str] = {} + for attr_tag in self.root.iterfind("dap:Attribute", self._ns): + if attr_tag.attrib["type"] != "Container": # container = nested attributes + attrs.update(self._parse_attribute(attr_tag)) + return xr.Dataset( + data_vars=data_vars, + coords=xr.Coordinates(coords=coords, indexes={}), + attrs=attrs, + ) + + def _parse_variable(self, var_tag: ET.Element) -> xr.Variable: + """ + Parse a variable from a DMR tag. + + Parameters + ---------- + var_tag : ET.Element + An ElementTree Element representing a variable in the DMR file. Will have DAP dtype as tag. + + Returns + ------- + xr.Variable + """ + # parse dimensions + dims: list[str] = [] + for dim_tag in var_tag.iterfind("dap:Dim", self._ns): + dim = ( + dim_tag.attrib["name"] + if self._group is None + else dim_tag.attrib["name"].removeprefix(self._group) + ) + dims.append(dim.removeprefix("/")) + shape = tuple([self._global_dims[d] for d in dims]) + # parse chunks + chunks = shape + chunks_tag = var_tag.find("dmr:chunks", self._ns) + if chunks_tag is None: + raise ValueError( + f"No chunks tag found in DMR file for variable {var_tag.attrib['name']}" + ) + chunk_dim_tag = chunks_tag.find("dmr:chunkDimensionSizes", self._ns) + if chunk_dim_tag is not None and chunk_dim_tag.text is not None: + chunks = tuple( + map(int, chunk_dim_tag.text.split()) + ) # 1 1447 2895 -> (1, 1447, 2895) + chunkmanifest = self._parse_chunks(chunks_tag, chunks) + # parse attributes + attrs: dict[str, str] = {} + for attr_tag in var_tag.iterfind("dap:Attribute", self._ns): + attrs.update(self._parse_attribute(attr_tag)) + # create ManifestArray and ZArray + # convert DAP dtype to numpy dtype + dtype = np.dtype( + self.dap_np_dtype[var_tag.tag.removeprefix("{" + self._ns["dap"] + "}")] + ) + fill_value = ( + attrs["_FillValue"] + if "_FillValue" in attrs and attrs["_FillValue"] != "*" + else None + ) + zarray = ZArray( + chunks=chunks, + dtype=dtype, + fill_value=fill_value, + order="C", + shape=shape, + zarr_format=3, + ) + marr = ManifestArray(zarray=zarray, chunkmanifest=chunkmanifest) + # create encoding dict (and remove those keys from attrs) + encoding_keys = {"_FillValue", "missing_value", "scale_factor", "add_offset"} + encoding = {key: value for key, value in attrs.items() if key in encoding_keys} + attrs = {key: value for key, value in attrs.items() if key not in encoding_keys} + return xr.Variable(dims=dims, data=marr, attrs=attrs, encoding=encoding) + + def _parse_attribute(self, attr_tag: ET.Element) -> dict: + """ + Parse an attribute from a DMR attr tag. + + Parameters + ---------- + attr_tag : ET.Element + An ElementTree Element with an tag. + + Returns + ------- + dict + """ + attr = {} + values = [] + # if multiple Value tags are present, store as "key": "[v1, v2, ...]" + for value_tag in attr_tag: + values.append(value_tag.text) + attr[attr_tag.attrib["name"]] = values[0] if len(values) == 1 else str(values) + return attr + + def _parse_chunks(self, chunks_tag: ET.Element, chunks: tuple) -> dict: + """ + Parse the chunk manifest from a DMR chunks tag. + + Parameters + ---------- + chunks_tag : ET.Element + An ElementTree Element with a tag. + + chunks : tuple + Chunk sizes for each dimension. + + Returns + ------- + dict + """ + chunkmanifest: dict[ChunkKey, object] = {} + default_num: list[int] = [0 for i in range(len(chunks))] + chunk_key_template = ".".join(["{}" for i in range(len(chunks))]) + for chunk_tag in chunks_tag.iterfind("dmr:chunk", self._ns): + chunk_num = default_num + if "chunkPositionInArray" in chunk_tag.attrib: + # "[0,1023,10235]"" -> ["0","1023","10235"] + chunk_pos = chunk_tag.attrib["chunkPositionInArray"][1:-1].split(",") + # [0,1023,10235] // [1, 1023, 2047] -> [0,1,5] + chunk_num = [int(chunk_pos[i]) // chunks[i] for i in range(len(chunks))] + chunk_key = ChunkKey( + chunk_key_template.format(*chunk_num) + ) # [0,1,5] -> "0.1.5" + chunkmanifest[chunk_key] = { + "path": self.data_filepath, + "offset": int(chunk_tag.attrib["offset"]), + "length": int(chunk_tag.attrib["nBytes"]), + } + validate_chunk_keys(chunkmanifest.keys()) + return chunkmanifest diff --git a/virtualizarr/xarray.py b/virtualizarr/xarray.py index 3e1068a..aff46ff 100644 --- a/virtualizarr/xarray.py +++ b/virtualizarr/xarray.py @@ -14,7 +14,6 @@ from xarray.core.variable import IndexVariable import virtualizarr.kerchunk as kerchunk -from virtualizarr.dmrpp import DMRParser from virtualizarr.kerchunk import FileType, KerchunkStoreRefs from virtualizarr.manifests import ChunkManifest, ManifestArray from virtualizarr.utils import _fsspec_openfile_from_filepath @@ -38,6 +37,7 @@ def open_virtual_dataset( loadable_variables: Iterable[str] | None = None, indexes: Mapping[str, Index] | None = None, virtual_array_class=ManifestArray, + group: Optional[str] = None, reader_options: Optional[dict] = { "storage_options": {"key": "", "secret": "", "anon": True} }, @@ -69,6 +69,8 @@ def open_virtual_dataset( virtual_array_class Virtual array class to use to represent the references to the chunks in each on-disk array. Currently can only be ManifestArray, but once VirtualZarrArray is implemented the default should be changed to that. + group : str, default None + Group path within the dataset to open. For example netcdf4 and hdf5 groups reader_options: dict, default {'storage_options':{'key':'', 'secret':'', 'anon':True}} Dict passed into Kerchunk file readers. Note: Each Kerchunk file reader has distinct arguments, so ensure reader_options match selected Kerchunk reader arguments. @@ -104,9 +106,13 @@ def open_virtual_dataset( storepath=filepath, drop_variables=drop_variables, indexes=indexes ) if filetype == "dmr++": - with open(filepath, "rb") as f: - parser = DMRParser(f.read()) - return parser.parse_dataset() + from virtualizarr.readers.dmrpp import DMRParser + + fpath = _fsspec_openfile_from_filepath( + filepath=filepath, reader_options=reader_options + ) + parser = DMRParser(fpath.read()) + return parser.parse(group=group) else: # this is the only place we actually always need to use kerchunk directly # TODO avoid even reading byte ranges for variables that will be dropped later anyway? From 7580fdcc5cdc018e7a56f9847bd01e92c679853f Mon Sep 17 00:00:00 2001 From: ayushnag <35325113+ayushnag@users.noreply.github.com> Date: Thu, 27 Jun 2024 15:40:47 -0700 Subject: [PATCH 06/17] rework hdf5 parser and group logic --- virtualizarr/manifests/manifest.py | 16 +- virtualizarr/readers/dmrpp.py | 353 ++++++++++++++++++----------- virtualizarr/xarray.py | 2 +- 3 files changed, 231 insertions(+), 140 deletions(-) diff --git a/virtualizarr/manifests/manifest.py b/virtualizarr/manifests/manifest.py index 530ce17..9ec25e3 100644 --- a/virtualizarr/manifests/manifest.py +++ b/virtualizarr/manifests/manifest.py @@ -71,8 +71,8 @@ class ChunkManifest: """ _paths: np.ndarray[Any, np.dtypes.StringDType] # type: ignore[name-defined] - _offsets: np.ndarray[Any, np.dtype[np.int32]] - _lengths: np.ndarray[Any, np.dtype[np.int32]] + _offsets: np.ndarray[Any, np.dtype[np.int64]] + _lengths: np.ndarray[Any, np.dtype[np.int64]] def __init__(self, entries: dict) -> None: """ @@ -100,8 +100,8 @@ def __init__(self, entries: dict) -> None: # Initializing to empty implies that entries with path='' are treated as missing chunks paths = np.empty(shape=shape, dtype=np.dtypes.StringDType()) # type: ignore[attr-defined] - offsets = np.empty(shape=shape, dtype=np.dtype("int32")) - lengths = np.empty(shape=shape, dtype=np.dtype("int32")) + offsets = np.empty(shape=shape, dtype=np.dtype("int64")) + lengths = np.empty(shape=shape, dtype=np.dtype("int64")) # populate the arrays for key, entry in entries.items(): @@ -128,8 +128,8 @@ def __init__(self, entries: dict) -> None: def from_arrays( cls, paths: np.ndarray[Any, np.dtype[np.dtypes.StringDType]], # type: ignore[name-defined] - offsets: np.ndarray[Any, np.dtype[np.int32]], - lengths: np.ndarray[Any, np.dtype[np.int32]], + offsets: np.ndarray[Any, np.dtype[np.int64]], + lengths: np.ndarray[Any, np.dtype[np.int64]], ) -> "ChunkManifest": """ Create manifest directly from numpy arrays containing the path and byte range information. @@ -161,11 +161,11 @@ def from_arrays( raise ValueError( f"paths array must have a numpy variable-length string dtype, but got dtype {paths.dtype}" ) - if offsets.dtype != np.dtype("int32"): + if offsets.dtype != np.dtype("int64"): raise ValueError( f"offsets array must have 32-bit integer dtype, but got dtype {offsets.dtype}" ) - if lengths.dtype != np.dtype("int32"): + if lengths.dtype != np.dtype("int64"): raise ValueError( f"lengths array must have 32-bit integer dtype, but got dtype {lengths.dtype}" ) diff --git a/virtualizarr/readers/dmrpp.py b/virtualizarr/readers/dmrpp.py index 02206e7..b1b4947 100644 --- a/virtualizarr/readers/dmrpp.py +++ b/virtualizarr/readers/dmrpp.py @@ -1,26 +1,30 @@ +import os +import warnings +from collections import defaultdict from typing import Optional from xml.etree import ElementTree as ET import numpy as np import xarray as xr -from virtualizarr.manifests import ManifestArray -from virtualizarr.manifests.manifest import validate_chunk_keys +from virtualizarr.manifests import ChunkManifest, ManifestArray from virtualizarr.types import ChunkKey from virtualizarr.zarr import ZArray class DMRParser: """ - Parses a DMR file and creates a virtual xr.Dataset. + Parses a DMR++ file and creates a virtual xr.Dataset. Handles groups, dimensions, coordinates, data variables, encoding, chunk manifests, and attributes. """ + # DAP and DMRPP XML namespaces _ns = { "dap": "http://xml.opendap.org/ns/DAP/4.0#", "dmr": "http://xml.opendap.org/dap/dmrpp/1.0.0#", } - dap_np_dtype = { + # DAP data types to numpy data types + _dap_np_dtype = { "Byte": "uint8", "UByte": "uint8", "Int8": "int8", @@ -36,6 +40,10 @@ class DMRParser: "Float64": "float64", "String": "object", } + # Default zlib compression value (-1 means default, currently level 6 is default) + _default_zlib_value = -1 + # Encoding keys that should be cast to float + _encoding_keys = {"_FillValue", "missing_value", "scale_factor", "add_offset"} def __init__(self, dmr: str, data_filepath: Optional[str] = None): """ @@ -54,114 +62,176 @@ def __init__(self, dmr: str, data_filepath: Optional[str] = None): self.data_filepath = ( data_filepath if data_filepath is not None else self.root.attrib["name"] ) - self._global_dims: dict[str, int] = {} - self._group: str | None = None - def parse(self, group: Optional[str] = None) -> xr.Dataset: + def parse_dataset(self, group=None) -> xr.Dataset: """ Parse the dataset from the dmrpp file Parameters ---------- group : str - The group to parse. If None, the entire dataset is parsed. + The group to parse. If None, and no groups are present, the dataset is parsed. + If None and groups are present, the first group is parsed. Returns ------- An xr.Dataset wrapping virtualized zarr arrays. """ - self._group = group - if self._group is not None: - self._group = ( - "/" + self._group.strip("/") + "/" - ) # ensure group is in form "/a/b/" + if group is not None: + # group = "/" + group.strip("/") # ensure group is in form "/a/b" + group = os.path.normpath(group).removeprefix( + "/" + ) # ensure group is in form "a/b/c" if self.data_filepath.endswith(".h5"): - return self._parse_hdf5_dataset() - group_tags = self.root.findall("dap:Group", self._ns) - if len(group_tags) > 0 and self._group is not None: - return self._parse_netcdf4_group(group_tags) - return self._parse_dataset() + return self._parse_hdf5_dataset(self.root, group) + if self.data_filepath.endswith(".nc"): + return self._parse_netcdf4_dataset(self.root, group) + raise ValueError("DMR file must be HDF5 or netCDF4 based") - def _parse_netcdf4_group(self, group_tags: list[ET.Element]) -> xr.Dataset: + def _parse_netcdf4_dataset( + self, root: ET.Element, group: Optional[str] = None + ) -> xr.Dataset: """ Parse the dataset from the netcdf4 based dmrpp with groups, starting at the given group. Set root to the given group. Parameters ---------- - group_tags : list[ET.element] - A list of ET elements representing the groups in the DMR file. - Each will be a tag. + root : ET.Element + The root element of the DMR file. + + group : str + The group to parse. If None, and no groups are present, the dataset is parsed. + If None and groups are present, the first group is parsed. Returns ------- xr.Dataset """ - self.root = group_tags[0] + group_tags = root.findall("dap:Group", self._ns) + if len(group_tags) == 0: + if group is not None: + # no groups found and group specified -> warning + warnings.warn( + "No groups found in NetCDF4 DMR file; ignoring group parameter" + ) + # no groups found and no group specified -> parse dataset + return self._parse_dataset(root) + all_groups = self._split_netcdf4(root) + if group is None: + # groups found and no group specified -> parse first group + return self._parse_dataset(group_tags[0]) + if group in all_groups: + # groups found and group specified -> parse specified group + return self._parse_dataset(all_groups[group]) + else: + # groups found and specified group not found -> error + raise ValueError(f"Group {group} not found in NetCDF4 DMR file") + + def _split_netcdf4(self, root: ET.Element) -> dict[str, ET.Element]: + """ + Split the input element into several ET.Elements by netcdf4 group + E.g. {"left": , "right": } + + Returns + ------- + dict[str, ET.Element] + """ + group_tags = root.findall("dap:Group", self._ns) + all_groups: dict[str, ET.Element] = defaultdict( + lambda: ET.Element(root.tag, root.attrib) + ) for group_tag in group_tags: - if self._group is not None and group_tag.attrib[ - "name" - ] == self._group.strip("/"): - self.root = group_tag - return self._parse_dataset() + all_groups[os.path.normpath(group_tag.attrib["name"])] = group_tag + return all_groups - def _parse_hdf5_dataset(self) -> xr.Dataset: + def _parse_hdf5_dataset(self, root: ET.Element, group: str) -> xr.Dataset: """ - Parse the dataset from the HDF5 based dmrpp, starting at the given group. + Parse the dataset from the HDF5 based dmrpp with groups, starting at the given group. Set root to the given group. + Parameters + ---------- + root : ET.Element + The root element of the DMR file. + + group : str + The group to parse. If None, and no groups are present, the dataset is parsed. + If None and groups are present, the first group is parsed. + Returns ------- xr.Dataset """ - if self._group is None: - # NOTE: This will return an xr.DataTree with all groups in the future... - raise ValueError("HDF5 based DMR parsing requires a group to be specified") - # Make a new root containing only dims, vars, and attrs for the dataset specified by group - ds_root = ET.Element(self.root.tag, self.root.attrib) - dim_names: set[str] = set() + all_groups = self._split_hdf5(root=root) + if group in all_groups: + # replace aliased variable names with original names: gt1r_heights -> heights + orignames = {} + vars_tags: list[ET.Element] = [] + for dap_dtype in self._dap_np_dtype: + vars_tags += all_groups[group].findall(f"dap:{dap_dtype}", self._ns) + for var_tag in vars_tags: + origname_tag = var_tag.find( + "./dap:Attribute[@name='origname']/dap:Value", self._ns + ) + if origname_tag is not None and origname_tag.text is not None: + orignames[var_tag.attrib["name"]] = origname_tag.text + return self._parse_dataset(all_groups[group]).rename(orignames) + raise ValueError(f"Group {group} not found in HDF5 DMR file") + + def _split_hdf5(self, root: ET.Element) -> dict[str, ET.Element]: + """ + Split the input element into several ET.Elements by HDF5 group + E.g. {"gtr1/heights": , "gtr1/temperatures": } + + Parameters + ---------- + root : ET.Element + The root element of the DMR file. + + Returns + ------- + dict[str, ET.Element] + """ + all_groups: dict[str, ET.Element] = defaultdict( + lambda: ET.Element(root.tag, root.attrib) + ) + group_dims: dict[str, set[str]] = defaultdict( + set + ) # {"gt1r/heights": {"dim1", "dim2", ...}} vars_tags: list[ET.Element] = [] - orignames = {} # store original names for renaming later - for dap_dtype in self.dap_np_dtype: - vars_tags += self.root.findall(f"dap:{dap_dtype}", self._ns) - # Add variables part of group to ds_root + for dap_dtype in self._dap_np_dtype: + vars_tags += root.findall(f"dap:{dap_dtype}", self._ns) + # Variables for var_tag in vars_tags: fullname_tag = var_tag.find( "./dap:Attribute[@name='fullnamepath']/dap:Value", self._ns ) - origname_tag = var_tag.find( - "./dap:Attribute[@name='origname']/dap:Value", self._ns - ) - if ( - fullname_tag is not None - and origname_tag is not None - and fullname_tag.text is not None - and origname_tag.text is not None - and fullname_tag.text == self._group + origname_tag.text - ): - ds_root.append(var_tag) - orignames[var_tag.attrib["name"]] = origname_tag.text - for dim_tag in var_tag.findall("dap:Dim", self._ns): - dim_names.add(dim_tag.attrib["name"][1:]) - # Add dimensions part of group to root2 - for dim_tag in self.root.iterfind("dap:Dimension", self._ns): - if dim_tag.attrib["name"] in dim_names: - ds_root.append(dim_tag) - # make an empty xml element - container_attr_tag: ET.Element = ET.Element("Attribute") - for attr_tag in self.root.findall("dap:Attribute", self._ns): + if fullname_tag is not None and fullname_tag.text is not None: + # '/gt1r/heights/ph_id_pulse' -> 'gt1r/heights' + group_name = os.path.dirname(fullname_tag.text).removeprefix("/") + all_groups[group_name].append(var_tag) + for dim_tag in var_tag.iterfind("dap:Dim", self._ns): + group_dims[group_name].add(dim_tag.attrib["name"].removeprefix("/")) + # Dimensions + for dim_tag in root.iterfind("dap:Dimension", self._ns): + for group_name, dims in group_dims.items(): + if dim_tag.attrib["name"] in dims: + all_groups[group_name].append(dim_tag) + # Attributes + for attr_tag in root.iterfind("dap:Attribute", self._ns): fullname_tag = attr_tag.find( "./dap:Attribute[@name='fullnamepath']/dap:Value", self._ns ) - if fullname_tag is not None and fullname_tag.text == self._group[:-1]: - container_attr_tag = attr_tag - # add all attributes for the group to the new root (except fullnamepath) - ds_root.extend( - [a for a in container_attr_tag if a.attrib["name"] != "fullnamepath"] - ) - self.root = ds_root - return self._parse_dataset().rename(orignames) + if fullname_tag is not None and fullname_tag.text is not None: + group_name = fullname_tag.text + # Add all attributes to the new dataset; fullnamepath is generally excluded + if group_name in all_groups: + all_groups[group_name].extend( + [a for a in attr_tag if a.attrib["name"] != "fullnamepath"] + ) + return all_groups - def _parse_dataset(self) -> xr.Dataset: + def _parse_dataset(self, root: ET.Element) -> xr.Dataset: """ Parse the dataset using the root element of the DMR file. @@ -169,13 +239,15 @@ def _parse_dataset(self) -> xr.Dataset: ------- xr.Dataset """ - # find all dimension names and sizes - for dim_tag in self.root.iterfind("dap:Dimension", self._ns): - self._global_dims[dim_tag.attrib["name"]] = int(dim_tag.attrib["size"]) + # Dimension names and sizes + dataset_dims: dict[str, int] = {} + for dim_tag in root.iterfind("dap:Dimension", self._ns): + dataset_dims[dim_tag.attrib["name"]] = int(dim_tag.attrib["size"]) + # Data variables and coordinates vars_tags: list[ET.Element] = [] - for dap_dtype in self.dap_np_dtype: - vars_tags += self.root.findall(f"dap:{dap_dtype}", self._ns) - # find all coordinate names (using Map tags and coordinates attribute) + for dap_dtype in self._dap_np_dtype: + vars_tags += root.findall(f"dap:{dap_dtype}", self._ns) + # Coordinate names (using Map tags and coordinates attribute) coord_names: set[str] = set() for var_tag in vars_tags: coord_tag = var_tag.find( @@ -186,17 +258,21 @@ def _parse_dataset(self) -> xr.Dataset: for map_tag in var_tag.iterfind("dap:Map", self._ns): coord_names.add(map_tag.attrib["name"].removeprefix("/")) # if no coord_names are found or coords don't include dims, dims are used as coords - if len(coord_names) == 0 or len(coord_names) < len(self._global_dims): - coord_names = set(self._global_dims.keys()) - # find all coords + data variables + if len(coord_names) == 0 or len(coord_names) < len(dataset_dims): + coord_names = set(dataset_dims.keys()) + # Seperate and parse coords + data variables coords: dict[str, xr.Variable] = {} data_vars: dict[str, xr.Variable] = {} for var_tag in vars_tags: if var_tag.attrib["name"] in coord_names: - coords[var_tag.attrib["name"]] = self._parse_variable(var_tag) + coords[var_tag.attrib["name"]] = self._parse_variable( + var_tag, dataset_dims + ) else: - data_vars[var_tag.attrib["name"]] = self._parse_variable(var_tag) - # find all dataset attributes + data_vars[var_tag.attrib["name"]] = self._parse_variable( + var_tag, dataset_dims + ) + # Attributes attrs: dict[str, str] = {} for attr_tag in self.root.iterfind("dap:Attribute", self._ns): if attr_tag.attrib["type"] != "Container": # container = nested attributes @@ -207,7 +283,9 @@ def _parse_dataset(self) -> xr.Dataset: attrs=attrs, ) - def _parse_variable(self, var_tag: ET.Element) -> xr.Variable: + def _parse_variable( + self, var_tag: ET.Element, dataset_dims: dict[str, int] + ) -> xr.Variable: """ Parse a variable from a DMR tag. @@ -216,61 +294,67 @@ def _parse_variable(self, var_tag: ET.Element) -> xr.Variable: var_tag : ET.Element An ElementTree Element representing a variable in the DMR file. Will have DAP dtype as tag. + dataset_dims : dict + A dictionary of dimension names and sizes. E.g. {"time": 1, "lat": 1447, "lon": 2895} + Must contain at least all the dimensions used by the variable. + Returns ------- xr.Variable """ - # parse dimensions - dims: list[str] = [] + # Dimension names + dim_names: list[str] = [] for dim_tag in var_tag.iterfind("dap:Dim", self._ns): - dim = ( - dim_tag.attrib["name"] - if self._group is None - else dim_tag.attrib["name"].removeprefix(self._group) - ) - dims.append(dim.removeprefix("/")) - shape = tuple([self._global_dims[d] for d in dims]) - # parse chunks - chunks = shape + dim_names.append(os.path.basename(dim_tag.attrib["name"])) + # convert DAP dtype to numpy dtype + dtype = np.dtype( + self._dap_np_dtype[var_tag.tag.removeprefix("{" + self._ns["dap"] + "}")] + ) + # Chunks and Filters + filters = None + shape = tuple([dataset_dims[d] for d in dim_names]) + chunks_shape = shape chunks_tag = var_tag.find("dmr:chunks", self._ns) - if chunks_tag is None: - raise ValueError( - f"No chunks tag found in DMR file for variable {var_tag.attrib['name']}" - ) - chunk_dim_tag = chunks_tag.find("dmr:chunkDimensionSizes", self._ns) - if chunk_dim_tag is not None and chunk_dim_tag.text is not None: - chunks = tuple( - map(int, chunk_dim_tag.text.split()) - ) # 1 1447 2895 -> (1, 1447, 2895) - chunkmanifest = self._parse_chunks(chunks_tag, chunks) - # parse attributes + if chunks_tag is not None: + # Chunks + chunk_dim_tag = chunks_tag.find("dmr:chunkDimensionSizes", self._ns) + if chunk_dim_tag is not None and chunk_dim_tag.text is not None: + # 1 1447 2895 -> (1, 1447, 2895) + chunks_shape = tuple(map(int, chunk_dim_tag.text.split())) + chunkmanifest = self._parse_chunks(chunks_tag, chunks_shape) + # Filters + if "compressionType" in chunks_tag.attrib: + filters = [] + # shuffle deflate --> ["shuffle", "deflate"] + compression_types = chunks_tag.attrib["compressionType"].split(" ") + for c in compression_types: + if c == "shuffle": + filters.append({"id": "shuffle", "elementsize": dtype.itemsize}) + elif c == "deflate": + filters.append( + {"id": "zlib", "level": self._default_zlib_value} + ) + # Attributes attrs: dict[str, str] = {} for attr_tag in var_tag.iterfind("dap:Attribute", self._ns): attrs.update(self._parse_attribute(attr_tag)) - # create ManifestArray and ZArray - # convert DAP dtype to numpy dtype - dtype = np.dtype( - self.dap_np_dtype[var_tag.tag.removeprefix("{" + self._ns["dap"] + "}")] - ) fill_value = ( attrs["_FillValue"] if "_FillValue" in attrs and attrs["_FillValue"] != "*" else None ) + # create ManifestArray and ZArray zarray = ZArray( - chunks=chunks, + chunks=chunks_shape, dtype=dtype, fill_value=fill_value, + filters=filters, order="C", shape=shape, - zarr_format=3, ) marr = ManifestArray(zarray=zarray, chunkmanifest=chunkmanifest) - # create encoding dict (and remove those keys from attrs) - encoding_keys = {"_FillValue", "missing_value", "scale_factor", "add_offset"} - encoding = {key: value for key, value in attrs.items() if key in encoding_keys} - attrs = {key: value for key, value in attrs.items() if key not in encoding_keys} - return xr.Variable(dims=dims, data=marr, attrs=attrs, encoding=encoding) + encoding = {k: attrs.get(k) for k in self._encoding_keys if k in attrs} + return xr.Variable(dims=dim_names, data=marr, attrs=attrs, encoding=encoding) def _parse_attribute(self, attr_tag: ET.Element) -> dict: """ @@ -287,13 +371,19 @@ def _parse_attribute(self, attr_tag: ET.Element) -> dict: """ attr = {} values = [] + dtype = np.dtype(self._dap_np_dtype[attr_tag.attrib["type"]]) # if multiple Value tags are present, store as "key": "[v1, v2, ...]" for value_tag in attr_tag: - values.append(value_tag.text) - attr[attr_tag.attrib["name"]] = values[0] if len(values) == 1 else str(values) + # cast attribute to dmr provided dtype + values.append(dtype.type(value_tag.text)) + attr[attr_tag.attrib["name"]] = ( + values[0] if len(values) == 1 else str(np.asarray(values)) + ) return attr - def _parse_chunks(self, chunks_tag: ET.Element, chunks: tuple) -> dict: + def _parse_chunks( + self, chunks_tag: ET.Element, chunks_shape: tuple[int, ...] + ) -> ChunkManifest: """ Parse the chunk manifest from a DMR chunks tag. @@ -303,29 +393,30 @@ def _parse_chunks(self, chunks_tag: ET.Element, chunks: tuple) -> dict: An ElementTree Element with a tag. chunks : tuple - Chunk sizes for each dimension. + Chunk sizes for each dimension. E.g. (1, 1447, 2895) Returns ------- - dict + ChunkManifest """ chunkmanifest: dict[ChunkKey, object] = {} - default_num: list[int] = [0 for i in range(len(chunks))] - chunk_key_template = ".".join(["{}" for i in range(len(chunks))]) + default_num: list[int] = [0 for i in range(len(chunks_shape))] + chunk_key_template = ".".join(["{}" for i in range(len(chunks_shape))]) for chunk_tag in chunks_tag.iterfind("dmr:chunk", self._ns): chunk_num = default_num if "chunkPositionInArray" in chunk_tag.attrib: - # "[0,1023,10235]"" -> ["0","1023","10235"] + # "[0,1023,10235]" -> ["0","1023","10235"] chunk_pos = chunk_tag.attrib["chunkPositionInArray"][1:-1].split(",") # [0,1023,10235] // [1, 1023, 2047] -> [0,1,5] - chunk_num = [int(chunk_pos[i]) // chunks[i] for i in range(len(chunks))] - chunk_key = ChunkKey( - chunk_key_template.format(*chunk_num) - ) # [0,1,5] -> "0.1.5" + chunk_num = [ + int(chunk_pos[i]) // chunks_shape[i] + for i in range(len(chunks_shape)) + ] + # [0,1,5] -> "0.1.5" + chunk_key = ChunkKey(chunk_key_template.format(*chunk_num)) chunkmanifest[chunk_key] = { "path": self.data_filepath, "offset": int(chunk_tag.attrib["offset"]), "length": int(chunk_tag.attrib["nBytes"]), } - validate_chunk_keys(chunkmanifest.keys()) - return chunkmanifest + return ChunkManifest(entries=chunkmanifest) diff --git a/virtualizarr/xarray.py b/virtualizarr/xarray.py index 58fd437..144701b 100644 --- a/virtualizarr/xarray.py +++ b/virtualizarr/xarray.py @@ -112,7 +112,7 @@ def open_virtual_dataset( filepath=filepath, reader_options=reader_options ) parser = DMRParser(fpath.read()) - return parser.parse(group=group) + return parser.parse_dataset(group=group) else: # this is the only place we actually always need to use kerchunk directly # TODO avoid even reading byte ranges for variables that will be dropped later anyway? From b1f9aee66961d23545d75f505822095470e49864 Mon Sep 17 00:00:00 2001 From: ayushnag <35325113+ayushnag@users.noreply.github.com> Date: Wed, 10 Jul 2024 13:41:11 -0700 Subject: [PATCH 07/17] update attrs cast to python dtype --- virtualizarr/readers/dmrpp.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/virtualizarr/readers/dmrpp.py b/virtualizarr/readers/dmrpp.py index b1b4947..4ac7dcb 100644 --- a/virtualizarr/readers/dmrpp.py +++ b/virtualizarr/readers/dmrpp.py @@ -35,10 +35,10 @@ class DMRParser: "UInt32": "uint32", "Int64": "int64", "UInt64": "uint64", - "Url": "object", + "Url": "str", "Float32": "float32", "Float64": "float64", - "String": "object", + "String": "str", } # Default zlib compression value (-1 means default, currently level 6 is default) _default_zlib_value = -1 @@ -341,7 +341,7 @@ def _parse_variable( fill_value = ( attrs["_FillValue"] if "_FillValue" in attrs and attrs["_FillValue"] != "*" - else None + else np.nan ) # create ManifestArray and ZArray zarray = ZArray( @@ -374,11 +374,9 @@ def _parse_attribute(self, attr_tag: ET.Element) -> dict: dtype = np.dtype(self._dap_np_dtype[attr_tag.attrib["type"]]) # if multiple Value tags are present, store as "key": "[v1, v2, ...]" for value_tag in attr_tag: - # cast attribute to dmr provided dtype - values.append(dtype.type(value_tag.text)) - attr[attr_tag.attrib["name"]] = ( - values[0] if len(values) == 1 else str(np.asarray(values)) - ) + # cast attribute to native python type using dmr provided dtype + values.append(dtype.type(value_tag.text).item()) + attr[attr_tag.attrib["name"]] = values[0] if len(values) == 1 else values return attr def _parse_chunks( From ae2917648448465f390466e62abfa3789ffee4e6 Mon Sep 17 00:00:00 2001 From: ayushnag <35325113+ayushnag@users.noreply.github.com> Date: Sun, 14 Jul 2024 16:01:13 -0700 Subject: [PATCH 08/17] parser passing tests --- virtualizarr/kerchunk.py | 1 + virtualizarr/readers/dmrpp.py | 13 +++-- virtualizarr/tests/test_readers/test_dmrpp.py | 54 +++++++++++++++++++ virtualizarr/xarray.py | 6 +-- 4 files changed, 64 insertions(+), 10 deletions(-) create mode 100644 virtualizarr/tests/test_readers/test_dmrpp.py diff --git a/virtualizarr/kerchunk.py b/virtualizarr/kerchunk.py index 97f64b1..8d22f0c 100644 --- a/virtualizarr/kerchunk.py +++ b/virtualizarr/kerchunk.py @@ -42,6 +42,7 @@ class FileType(AutoName): tiff = auto() fits = auto() zarr = auto() + dmrpp = auto() class NumpyEncoder(json.JSONEncoder): diff --git a/virtualizarr/readers/dmrpp.py b/virtualizarr/readers/dmrpp.py index 4ac7dcb..1ff66c8 100644 --- a/virtualizarr/readers/dmrpp.py +++ b/virtualizarr/readers/dmrpp.py @@ -1,7 +1,7 @@ import os import warnings from collections import defaultdict -from typing import Optional +from typing import Any, Optional from xml.etree import ElementTree as ET import numpy as np @@ -312,6 +312,7 @@ def _parse_variable( ) # Chunks and Filters filters = None + fill_value = np.nan shape = tuple([dataset_dims[d] for d in dim_names]) chunks_shape = shape chunks_tag = var_tag.find("dmr:chunks", self._ns) @@ -335,14 +336,12 @@ def _parse_variable( {"id": "zlib", "level": self._default_zlib_value} ) # Attributes - attrs: dict[str, str] = {} + attrs: dict[str, Any] = {} for attr_tag in var_tag.iterfind("dap:Attribute", self._ns): attrs.update(self._parse_attribute(attr_tag)) - fill_value = ( - attrs["_FillValue"] - if "_FillValue" in attrs and attrs["_FillValue"] != "*" - else np.nan - ) + if "_FillValue" in attrs and attrs["_FillValue"] != "*": + fill_value = attrs["_FillValue"] + attrs.pop("_FillValue", None) # create ManifestArray and ZArray zarray = ZArray( chunks=chunks_shape, diff --git a/virtualizarr/tests/test_readers/test_dmrpp.py b/virtualizarr/tests/test_readers/test_dmrpp.py new file mode 100644 index 0000000..b6d8d24 --- /dev/null +++ b/virtualizarr/tests/test_readers/test_dmrpp.py @@ -0,0 +1,54 @@ +import pytest +import xarray as xr + +from virtualizarr import open_virtual_dataset +from virtualizarr.kerchunk import FileType + +urls = [ + ( + "https://archive.podaac.earthdata.nasa.gov/podaac-ops-cumulus-protected/MUR25-JPL-L4-GLOB-v04.2/20240713090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc.dmrpp", + "https://archive.podaac.earthdata.nasa.gov/podaac-ops-cumulus-protected/MUR25-JPL-L4-GLOB-v04.2/20240713090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc", + "netcdf4", + ) +] + + +def match_zlib_level(result: xr.Dataset, expected: xr.Dataset): + # Fix the zlib level in the result to match the expected + # Many dmrpp's currently do not have the zlib level in the metadata (so default of -1 is used) + # Usage of the virtual Dataset is not affected, but the comparison will fail + # Remove once NASA dmrpps are updated with new dmrpp version: https://github.com/OPENDAP/bes/issues/954 + for x in result.variables: + if result[x].data.zarray.dict()["filters"] is not None: + e_filters = [ + z + for z in expected[x].data.zarray.dict()["filters"] + if "id" in z and z["id"] == "zlib" + ][0] + r_filters = [ + z + for z in result[x].data.zarray.dict()["filters"] + if "id" in z and z["id"] == "zlib" + ][0] + r_filters["level"] = e_filters["level"] + + +@pytest.mark.parametrize("dmrpp_url, data_url, data_type", urls) +def test_dmrpp_reader(dmrpp_url: str, data_url: str, data_type: str): + import earthaccess + + fs = earthaccess.get_fsspec_https_session() + result = open_virtual_dataset( + dmrpp_url, + indexes={}, + filetype=FileType("dmrpp"), + reader_options={"storage_options": fs.storage_options}, + ) + expected = open_virtual_dataset( + data_url, + indexes={}, + filetype=FileType(data_type), + reader_options={"storage_options": fs.storage_options}, + ) + match_zlib_level(result, expected) + xr.testing.assert_identical(result, expected) diff --git a/virtualizarr/xarray.py b/virtualizarr/xarray.py index e8fc49e..aa725e7 100644 --- a/virtualizarr/xarray.py +++ b/virtualizarr/xarray.py @@ -59,7 +59,7 @@ def open_virtual_dataset( File path to open as a set of virtualized zarr arrays. filetype : FileType, default None Type of file to be opened. Used to determine which kerchunk file format backend to use. - Can be one of {'netCDF3', 'netCDF4', 'HDF', 'TIFF', 'GRIB', 'FITS', 'zarr_v3'}. + Can be one of {'netCDF3', 'netCDF4', 'HDF', 'TIFF', 'GRIB', 'FITS', 'zarr_v3', 'dmrpp'}. If not provided will attempt to automatically infer the correct filetype from header bytes. drop_variables: list[str], default is None Variables in the file to drop before returning. @@ -125,13 +125,13 @@ def open_virtual_dataset( return open_virtual_dataset_from_v3_store( storepath=filepath, drop_variables=drop_variables, indexes=indexes ) - if filetype == "dmr++": + if filetype == "dmrpp": from virtualizarr.readers.dmrpp import DMRParser fpath = _fsspec_openfile_from_filepath( filepath=filepath, reader_options=reader_options ) - parser = DMRParser(fpath.read()) + parser = DMRParser(fpath.read(), data_filepath=filepath.strip(".dmrpp")) return parser.parse_dataset() else: if reader_options is None: From 6e763f911bc6aad77ff3092dc5905fb6f0163ef9 Mon Sep 17 00:00:00 2001 From: ayushnag <35325113+ayushnag@users.noreply.github.com> Date: Sun, 14 Jul 2024 16:39:40 -0700 Subject: [PATCH 09/17] match main manifest dtypes --- virtualizarr/manifests/manifest.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/virtualizarr/manifests/manifest.py b/virtualizarr/manifests/manifest.py index 2b8296a..7f99927 100644 --- a/virtualizarr/manifests/manifest.py +++ b/virtualizarr/manifests/manifest.py @@ -71,8 +71,8 @@ class ChunkManifest: """ _paths: np.ndarray[Any, np.dtypes.StringDType] # type: ignore[name-defined] - _offsets: np.ndarray[Any, np.dtype[np.int64]] - _lengths: np.ndarray[Any, np.dtype[np.int64]] + _offsets: np.ndarray[Any, np.dtype[np.uint64]] + _lengths: np.ndarray[Any, np.dtype[np.uint64]] def __init__(self, entries: dict) -> None: """ @@ -100,8 +100,8 @@ def __init__(self, entries: dict) -> None: # Initializing to empty implies that entries with path='' are treated as missing chunks paths = np.empty(shape=shape, dtype=np.dtypes.StringDType()) # type: ignore[attr-defined] - offsets = np.empty(shape=shape, dtype=np.dtype("int64")) - lengths = np.empty(shape=shape, dtype=np.dtype("int64")) + offsets = np.empty(shape=shape, dtype=np.dtype("uint64")) + lengths = np.empty(shape=shape, dtype=np.dtype("uint64")) # populate the arrays for key, entry in entries.items(): @@ -128,8 +128,8 @@ def __init__(self, entries: dict) -> None: def from_arrays( cls, paths: np.ndarray[Any, np.dtype[np.dtypes.StringDType]], # type: ignore[name-defined] - offsets: np.ndarray[Any, np.dtype[np.int64]], - lengths: np.ndarray[Any, np.dtype[np.int64]], + offsets: np.ndarray[Any, np.dtype[np.uint64]], + lengths: np.ndarray[Any, np.dtype[np.uint64]], ) -> "ChunkManifest": """ Create manifest directly from numpy arrays containing the path and byte range information. @@ -161,11 +161,11 @@ def from_arrays( raise ValueError( f"paths array must have a numpy variable-length string dtype, but got dtype {paths.dtype}" ) - if offsets.dtype != np.dtype("int64"): + if offsets.dtype != np.dtype("uint64"): raise ValueError( f"offsets array must have 32-bit integer dtype, but got dtype {offsets.dtype}" ) - if lengths.dtype != np.dtype("int64"): + if lengths.dtype != np.dtype("uint64"): raise ValueError( f"lengths array must have 32-bit integer dtype, but got dtype {lengths.dtype}" ) From ef8aa9c6e30590a28fef5414ac817e2656764675 Mon Sep 17 00:00:00 2001 From: ayushnag <35325113+ayushnag@users.noreply.github.com> Date: Sat, 3 Aug 2024 11:41:09 +0530 Subject: [PATCH 10/17] modularize dmrpp.py --- virtualizarr/readers/dmrpp.py | 392 +++++++++++++----- virtualizarr/tests/test_readers/test_dmrpp.py | 50 +-- virtualizarr/xarray.py | 19 +- 3 files changed, 316 insertions(+), 145 deletions(-) diff --git a/virtualizarr/readers/dmrpp.py b/virtualizarr/readers/dmrpp.py index 1ff66c8..0d6453f 100644 --- a/virtualizarr/readers/dmrpp.py +++ b/virtualizarr/readers/dmrpp.py @@ -16,6 +16,7 @@ class DMRParser: """ Parses a DMR++ file and creates a virtual xr.Dataset. Handles groups, dimensions, coordinates, data variables, encoding, chunk manifests, and attributes. + Highly modular to allow support for older dmrpp schema versions """ # DAP and DMRPP XML namespaces @@ -35,13 +36,13 @@ class DMRParser: "UInt32": "uint32", "Int64": "int64", "UInt64": "uint64", - "Url": "str", + "Url": "object", "Float32": "float32", "Float64": "float64", - "String": "str", + "String": "object", } - # Default zlib compression value (-1 means default, currently level 6 is default) - _default_zlib_value = -1 + # Default zlib compression value + _default_zlib_value = 6 # Encoding keys that should be cast to float _encoding_keys = {"_FillValue", "missing_value", "scale_factor", "add_offset"} @@ -82,7 +83,7 @@ def parse_dataset(self, group=None) -> xr.Dataset: group = os.path.normpath(group).removeprefix( "/" ) # ensure group is in form "a/b/c" - if self.data_filepath.endswith(".h5"): + if self._is_hdf5(self.root): return self._parse_hdf5_dataset(self.root, group) if self.data_filepath.endswith(".nc"): return self._parse_netcdf4_dataset(self.root, group) @@ -103,6 +104,7 @@ def _parse_netcdf4_dataset( group : str The group to parse. If None, and no groups are present, the dataset is parsed. If None and groups are present, the first group is parsed. + Returns ------- xr.Dataset @@ -132,6 +134,11 @@ def _split_netcdf4(self, root: ET.Element) -> dict[str, ET.Element]: Split the input element into several ET.Elements by netcdf4 group E.g. {"left": , "right": } + Parameters + ---------- + root : ET.Element + The root element of the DMR file. + Returns ------- dict[str, ET.Element] @@ -144,7 +151,17 @@ def _split_netcdf4(self, root: ET.Element) -> dict[str, ET.Element]: all_groups[os.path.normpath(group_tag.attrib["name"])] = group_tag return all_groups - def _parse_hdf5_dataset(self, root: ET.Element, group: str) -> xr.Dataset: + def _is_hdf5(self, root: ET.Element) -> bool: + """Check if the DMR file is HDF5 based.""" + if root.find(".//dap:Attribute[@name='fullnamepath']", self._ns) is not None: + return True + if root.find("./dap:Attribute[@name='HDF5_GLOBAL']", self._ns) is not None: + return True + return False + + def _parse_hdf5_dataset( + self, root: ET.Element, group: Optional[str] = None + ) -> xr.Dataset: """ Parse the dataset from the HDF5 based dmrpp with groups, starting at the given group. Set root to the given group. @@ -163,25 +180,43 @@ def _parse_hdf5_dataset(self, root: ET.Element, group: str) -> xr.Dataset: xr.Dataset """ all_groups = self._split_hdf5(root=root) + if len(all_groups) == 0: + raise ValueError("No groups found in HDF based DMR file") + if group is None: + # pick a random group if no group is specified + group = next(iter(all_groups)) + attrs = {} + for attr_tag in root.iterfind("dap:Attribute", self._ns): + if attr_tag.attrib["type"] != "Container": + attrs.update(self._parse_attribute(attr_tag)) if group in all_groups: # replace aliased variable names with original names: gt1r_heights -> heights - orignames = {} - vars_tags: list[ET.Element] = [] - for dap_dtype in self._dap_np_dtype: - vars_tags += all_groups[group].findall(f"dap:{dap_dtype}", self._ns) - for var_tag in vars_tags: - origname_tag = var_tag.find( - "./dap:Attribute[@name='origname']/dap:Value", self._ns - ) - if origname_tag is not None and origname_tag.text is not None: - orignames[var_tag.attrib["name"]] = origname_tag.text - return self._parse_dataset(all_groups[group]).rename(orignames) + orignames = self.find_original_names(all_groups[group]) + vds = self._parse_dataset(all_groups[group]) + # Only one group so found attrs are global attrs + if len(all_groups) == 1: + vds.attrs.update(attrs) + return vds.rename(orignames) raise ValueError(f"Group {group} not found in HDF5 DMR file") + def find_original_names(self, root: ET.Element) -> dict[str, str]: + orignames: dict[str, str] = {} + vars_tags: list[ET.Element] = [] + for dap_dtype in self._dap_np_dtype: + vars_tags += root.findall(f"dap:{dap_dtype}", self._ns) + for var_tag in vars_tags: + origname_tag = var_tag.find( + "./dap:Attribute[@name='origname']/dap:Value", self._ns + ) + if origname_tag is not None and origname_tag.text is not None: + orignames[var_tag.attrib["name"]] = origname_tag.text + return orignames + def _split_hdf5(self, root: ET.Element) -> dict[str, ET.Element]: """ Split the input element into several ET.Elements by HDF5 group - E.g. {"gtr1/heights": , "gtr1/temperatures": } + E.g. {"gtr1/heights": , "gtr1/temperatures": }. Builds up new elements + each with dimensions, variables, and attributes. Parameters ---------- @@ -192,7 +227,8 @@ def _split_hdf5(self, root: ET.Element) -> dict[str, ET.Element]: ------- dict[str, ET.Element] """ - all_groups: dict[str, ET.Element] = defaultdict( + # Add all variable, dimension, and attribute tags to their respective groups + groups_roots: dict[str, ET.Element] = defaultdict( lambda: ET.Element(root.tag, root.attrib) ) group_dims: dict[str, set[str]] = defaultdict( @@ -209,80 +245,182 @@ def _split_hdf5(self, root: ET.Element) -> dict[str, ET.Element]: if fullname_tag is not None and fullname_tag.text is not None: # '/gt1r/heights/ph_id_pulse' -> 'gt1r/heights' group_name = os.path.dirname(fullname_tag.text).removeprefix("/") - all_groups[group_name].append(var_tag) - for dim_tag in var_tag.iterfind("dap:Dim", self._ns): - group_dims[group_name].add(dim_tag.attrib["name"].removeprefix("/")) + groups_roots[group_name].append(var_tag) + dim_tags = var_tag.findall("dap:Dim", self._ns) + dims = self._parse_multi_dims(dim_tags) + group_dims[group_name].update(dims.keys()) # Dimensions for dim_tag in root.iterfind("dap:Dimension", self._ns): - for group_name, dims in group_dims.items(): - if dim_tag.attrib["name"] in dims: - all_groups[group_name].append(dim_tag) + for g, d in group_dims.items(): + if dim_tag.attrib["name"] in d: + groups_roots[g].append(dim_tag) # Attributes - for attr_tag in root.iterfind("dap:Attribute", self._ns): - fullname_tag = attr_tag.find( - "./dap:Attribute[@name='fullnamepath']/dap:Value", self._ns - ) - if fullname_tag is not None and fullname_tag.text is not None: - group_name = fullname_tag.text - # Add all attributes to the new dataset; fullnamepath is generally excluded - if group_name in all_groups: - all_groups[group_name].extend( - [a for a in attr_tag if a.attrib["name"] != "fullnamepath"] - ) - return all_groups + container_attr_tag = root.find("dap:Attribute[@name='HDF5_GLOBAL']", self._ns) + if container_attr_tag is None: + attrs_tags = root.findall("dap:Attribute", self._ns) + for attr_tag in attrs_tags: + fullname_tag = attr_tag.find( + "./dap:Attribute[@name='fullnamepath']/dap:Value", self._ns + ) + if fullname_tag is not None and fullname_tag.text is not None: + group_name = os.path.dirname(fullname_tag.text).removeprefix("/") + # Add all attributes to the new dataset + groups_roots[group_name].extend(attr_tag) + else: + groups_roots[next(iter(groups_roots))].extend(container_attr_tag) + return groups_roots def _parse_dataset(self, root: ET.Element) -> xr.Dataset: """ Parse the dataset using the root element of the DMR file. + Parameters + ---------- + root : ET.Element + The root element of the DMR file. + Returns ------- xr.Dataset """ # Dimension names and sizes - dataset_dims: dict[str, int] = {} - for dim_tag in root.iterfind("dap:Dimension", self._ns): - dataset_dims[dim_tag.attrib["name"]] = int(dim_tag.attrib["size"]) + dim_tags = root.findall("dap:Dimension", self._ns) + dataset_dims = self._parse_multi_dims(dim_tags) # Data variables and coordinates - vars_tags: list[ET.Element] = [] - for dap_dtype in self._dap_np_dtype: - vars_tags += root.findall(f"dap:{dap_dtype}", self._ns) - # Coordinate names (using Map tags and coordinates attribute) - coord_names: set[str] = set() - for var_tag in vars_tags: - coord_tag = var_tag.find( - "./dap:Attribute[@name='coordinates']/dap:Value", self._ns - ) - if coord_tag is not None and coord_tag.text is not None: - coord_names.update(coord_tag.text.split(" ")) - for map_tag in var_tag.iterfind("dap:Map", self._ns): - coord_names.add(map_tag.attrib["name"].removeprefix("/")) + coord_names = self._find_coord_names(root) # if no coord_names are found or coords don't include dims, dims are used as coords if len(coord_names) == 0 or len(coord_names) < len(dataset_dims): coord_names = set(dataset_dims.keys()) # Seperate and parse coords + data variables - coords: dict[str, xr.Variable] = {} + coord_vars: dict[str, xr.Variable] = {} data_vars: dict[str, xr.Variable] = {} - for var_tag in vars_tags: + for var_tag in self._find_var_tags(root): + variable = self._parse_variable(var_tag, dataset_dims) if var_tag.attrib["name"] in coord_names: - coords[var_tag.attrib["name"]] = self._parse_variable( - var_tag, dataset_dims - ) + coord_vars[var_tag.attrib["name"]] = variable else: - data_vars[var_tag.attrib["name"]] = self._parse_variable( - var_tag, dataset_dims - ) + data_vars[var_tag.attrib["name"]] = variable # Attributes attrs: dict[str, str] = {} for attr_tag in self.root.iterfind("dap:Attribute", self._ns): - if attr_tag.attrib["type"] != "Container": # container = nested attributes - attrs.update(self._parse_attribute(attr_tag)) + attrs.update(self._parse_attribute(attr_tag)) return xr.Dataset( data_vars=data_vars, - coords=xr.Coordinates(coords=coords, indexes={}), + coords=xr.Coordinates(coords=coord_vars, indexes={}), attrs=attrs, ) + def _find_var_tags(self, root: ET.Element) -> list[ET.Element]: + """ + Find all variable tags in the DMR file. Also known as array tags. + Tags are labeled with the DAP data type. E.g. , , + + Parameters + ---------- + root : ET.Element + The root element of the DMR file. + + Returns + ------- + list[ET.Element] + """ + vars_tags: list[ET.Element] = [] + for dap_dtype in self._dap_np_dtype: + vars_tags += root.findall(f"dap:{dap_dtype}", self._ns) + return vars_tags + + def _find_coord_names(self, root: ET.Element) -> set[str]: + """ + Find the name of all coordinates in root. Checks inside all variables and global attributes. + + Parameters + ---------- + root : ET.Element + The root element of the DMR file. + + Returns + ------- + set[str] : The set of unique coordinate names. + """ + # Check for coordinate names within each variable attributes + coord_names: set[str] = set() + for var_tag in self._find_var_tags(root): + coord_tag = var_tag.find( + "./dap:Attribute[@name='coordinates']/dap:Value", self._ns + ) + if coord_tag is not None and coord_tag.text is not None: + coord_names.update(coord_tag.text.split(" ")) + for map_tag in var_tag.iterfind("dap:Map", self._ns): + coord_names.add(map_tag.attrib["name"].removeprefix("/")) + # Check for coordinate names in a global attribute + coord_tag = var_tag.find("./dap:Attribute[@name='coordinates']", self._ns) + if coord_tag is not None and coord_tag.text is not None: + coord_names.update(coord_tag.text.split(" ")) + return coord_names + + def _parse_dim(self, root: ET.Element) -> dict[str, int | None]: + """ + Parse single or tag + + If the tag has no name attribute, it is a phony dimension. E.g. --> {"phony_dim": 300} + If the tag has no size attribute, it is an unlimited dimension. E.g. --> {"time": None} + If the tag has both name and size attributes, it is a regular dimension. E.g. --> {"lat": 1447} + + Parameters + ---------- + root : ET.Element + The root element Dim/Dimension tag + + Returns + ------- + dict + E.g. {"time": 1, "lat": 1447, "lon": 2895}, {"phony_dim": 300}, {"time": None, "lat": None, "lon": None} + """ + if "name" not in root.attrib and "size" in root.attrib: + return {"phony_dim": int(root.attrib["size"])} + if "name" in root.attrib and "size" not in root.attrib: + return {os.path.basename(root.attrib["name"]): None} + if "name" in root.attrib and "size" in root.attrib: + return {os.path.basename(root.attrib["name"]): int(root.attrib["size"])} + raise ValueError("Not enough information to parse Dim/Dimension tag") + + def _parse_multi_dims( + self, dim_tags: list[ET.Element], global_dims: dict[str, int] = {} + ) -> dict: + """ + Parse multiple or tags. Generally tags are found within dmrpp variable tags. + + Returns best possible matching of {dimension: shape} present in the list and global_dims. E.g tags=(Dim("lat", None), Dim("lon", None)) and global_dims={"lat": 100, "lon": 100, "time": 5} --> {"lat": 100, "lon": 100} + + E.g. tags=(Dim("time", None), Dim("", 200)) and global_dims={"lat": 100, "lon": 100, "time": 5} --> {"time": 5, "phony_dim0": 200} + + This function is often used to fill in missing sizes from the global_dims. E.g. Variable tags may contain only dimension names and not sizes. If the {name: size} matching is known from the global_dims, it is used to fill in the missing sizes. + + Parameters + ---------- + dim_tags : tuple[ET.Element] + A tuple of ElementTree Elements representing dimensions in the DMR file. + + global_dims : dict + A dictionary of dimension names and sizes. E.g. {"time": 1, "lat": 1447, "lon": 2895} + + Returns + ------- + dict + E.g. {"time": 1, "lat": 1447, "lon": 2895} + """ + dims: dict[str, int | None] = {} + for dim_tag in dim_tags: + dim: dict[str, int | None] = self._parse_dim(dim_tag) + if "phony_dim" in dim: + dims["phony_dim_" + str(len(dims))] = dim["phony_dim"] + else: + dims.update(dim) + for name, size in list(dims.items()): + if name in global_dims and size is None: + dims[name] = global_dims[name] + return dims + def _parse_variable( self, var_tag: ET.Element, dataset_dims: dict[str, int] ) -> xr.Variable: @@ -296,52 +434,41 @@ def _parse_variable( dataset_dims : dict A dictionary of dimension names and sizes. E.g. {"time": 1, "lat": 1447, "lon": 2895} - Must contain at least all the dimensions used by the variable. + Must contain at least all the dimensions used by the variable. Necessary since the variable + metadata only contains the dimension names and not the sizes. Returns ------- xr.Variable """ # Dimension names - dim_names: list[str] = [] - for dim_tag in var_tag.iterfind("dap:Dim", self._ns): - dim_names.append(os.path.basename(dim_tag.attrib["name"])) + dim_tags = var_tag.findall("dap:Dim", self._ns) + dim_shapes = self._parse_multi_dims(dim_tags, dataset_dims) # convert DAP dtype to numpy dtype dtype = np.dtype( self._dap_np_dtype[var_tag.tag.removeprefix("{" + self._ns["dap"] + "}")] ) # Chunks and Filters filters = None - fill_value = np.nan - shape = tuple([dataset_dims[d] for d in dim_names]) + shape = tuple(dim_shapes.values()) chunks_shape = shape chunks_tag = var_tag.find("dmr:chunks", self._ns) if chunks_tag is not None: # Chunks - chunk_dim_tag = chunks_tag.find("dmr:chunkDimensionSizes", self._ns) - if chunk_dim_tag is not None and chunk_dim_tag.text is not None: - # 1 1447 2895 -> (1, 1447, 2895) - chunks_shape = tuple(map(int, chunk_dim_tag.text.split())) + found_chunk_dims = self._parse_chunks_dimensions(chunks_tag) + chunks_shape = found_chunk_dims if found_chunk_dims is not None else shape chunkmanifest = self._parse_chunks(chunks_tag, chunks_shape) # Filters - if "compressionType" in chunks_tag.attrib: - filters = [] - # shuffle deflate --> ["shuffle", "deflate"] - compression_types = chunks_tag.attrib["compressionType"].split(" ") - for c in compression_types: - if c == "shuffle": - filters.append({"id": "shuffle", "elementsize": dtype.itemsize}) - elif c == "deflate": - filters.append( - {"id": "zlib", "level": self._default_zlib_value} - ) + filters = self._parse_filters(chunks_tag, dtype) # Attributes attrs: dict[str, Any] = {} for attr_tag in var_tag.iterfind("dap:Attribute", self._ns): attrs.update(self._parse_attribute(attr_tag)) - if "_FillValue" in attrs and attrs["_FillValue"] != "*": - fill_value = attrs["_FillValue"] - attrs.pop("_FillValue", None) + # Remove attributes only used for parsing logic + fill_value = attrs.pop("_FillValue", 0.0) + attrs.pop("fullnamepath", None) + attrs.pop("origname", None) + attrs.pop("coordinates", None) # create ManifestArray and ZArray zarray = ZArray( chunks=chunks_shape, @@ -353,11 +480,13 @@ def _parse_variable( ) marr = ManifestArray(zarray=zarray, chunkmanifest=chunkmanifest) encoding = {k: attrs.get(k) for k in self._encoding_keys if k in attrs} - return xr.Variable(dims=dim_names, data=marr, attrs=attrs, encoding=encoding) + return xr.Variable( + dims=dim_shapes.keys(), data=marr, attrs=attrs, encoding=encoding + ) - def _parse_attribute(self, attr_tag: ET.Element) -> dict: + def _parse_attribute(self, attr_tag: ET.Element) -> dict[str, Any]: """ - Parse an attribute from a DMR attr tag. + Parse an attribute from a DMR attr tag. Converts the attribute value to a native python type. Parameters ---------- @@ -368,16 +497,87 @@ def _parse_attribute(self, attr_tag: ET.Element) -> dict: ------- dict """ - attr = {} + attr: dict[str, Any] = {} values = [] + if "type" in attr_tag.attrib and attr_tag.attrib["type"] == "Container": + return attr dtype = np.dtype(self._dap_np_dtype[attr_tag.attrib["type"]]) # if multiple Value tags are present, store as "key": "[v1, v2, ...]" for value_tag in attr_tag: # cast attribute to native python type using dmr provided dtype - values.append(dtype.type(value_tag.text).item()) + val = ( + dtype.type(value_tag.text).item() + if dtype != np.object_ + else value_tag.text + ) + if val == "*": + val = np.nan + values.append(val) attr[attr_tag.attrib["name"]] = values[0] if len(values) == 1 else values return attr + def _parse_filters( + self, chunks_tag: ET.Element, dtype: np.dtype + ) -> list[dict] | None: + """ + Parse filters from a DMR chunks tag. + + Parameters + ---------- + chunks_tag : ET.Element + An ElementTree Element with a tag. + + dtype : np.dtype + The numpy dtype of the variable. + + Returns + ------- + list[dict] | None + E.g. [{"id": "shuffle", "elementsize": 4}, {"id": "zlib", "level": 4}] + """ + if "compressionType" in chunks_tag.attrib: + filters: list[dict] = [] + # shuffle deflate --> ["shuffle", "deflate"] + compression_types = chunks_tag.attrib["compressionType"].split(" ") + for c in compression_types: + if c == "shuffle": + filters.append({"id": "shuffle", "elementsize": dtype.itemsize}) + elif c == "deflate": + filters.append( + { + "id": "zlib", + "level": int( + chunks_tag.attrib.get( + "deflateLevel", self._default_zlib_value + ) + ), + } + ) + return filters + return None + + def _parse_chunks_dimensions( + self, chunks_tag: ET.Element + ) -> tuple[int, ...] | None: + """ + Parse the chunk dimensions from a DMR chunks tag. Returns None if no chunk dimensions are found. + + Parameters + ---------- + chunks_tag : ET.Element + An ElementTree Element with a tag. + + Returns + ------- + tuple[int, ...] | None + + """ + chunk_dim_tag = chunks_tag.find("dmr:chunkDimensionSizes", self._ns) + if chunk_dim_tag is not None and chunk_dim_tag.text is not None: + # 1 1447 2895 -> (1, 1447, 2895) + return tuple(map(int, chunk_dim_tag.text.split())) + return None + def _parse_chunks( self, chunks_tag: ET.Element, chunks_shape: tuple[int, ...] ) -> ChunkManifest: @@ -397,8 +597,10 @@ def _parse_chunks( ChunkManifest """ chunkmanifest: dict[ChunkKey, object] = {} - default_num: list[int] = [0 for i in range(len(chunks_shape))] - chunk_key_template = ".".join(["{}" for i in range(len(chunks_shape))]) + default_num: list[int] = ( + [0 for i in range(len(chunks_shape))] if chunks_shape else [0] + ) + chunk_key_template = ".".join(["{}" for i in range(len(default_num))]) for chunk_tag in chunks_tag.iterfind("dmr:chunk", self._ns): chunk_num = default_num if "chunkPositionInArray" in chunk_tag.attrib: diff --git a/virtualizarr/tests/test_readers/test_dmrpp.py b/virtualizarr/tests/test_readers/test_dmrpp.py index b6d8d24..d2b19d6 100644 --- a/virtualizarr/tests/test_readers/test_dmrpp.py +++ b/virtualizarr/tests/test_readers/test_dmrpp.py @@ -2,53 +2,21 @@ import xarray as xr from virtualizarr import open_virtual_dataset -from virtualizarr.kerchunk import FileType +from virtualizarr.tests import network urls = [ ( - "https://archive.podaac.earthdata.nasa.gov/podaac-ops-cumulus-protected/MUR25-JPL-L4-GLOB-v04.2/20240713090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc.dmrpp", - "https://archive.podaac.earthdata.nasa.gov/podaac-ops-cumulus-protected/MUR25-JPL-L4-GLOB-v04.2/20240713090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc", "netcdf4", + "https://github.com/OPENDAP/bes/raw/3e518f6dc2f625b0b83cfb6e6fd5275e4d6dcef1/modules/dmrpp_module/data/dmrpp/chunked_threeD.h5", + "dmrpp", + "https://github.com/OPENDAP/bes/raw/3e518f6dc2f625b0b83cfb6e6fd5275e4d6dcef1/modules/dmrpp_module/data/dmrpp/chunked_threeD.h5.dmrpp", ) ] -def match_zlib_level(result: xr.Dataset, expected: xr.Dataset): - # Fix the zlib level in the result to match the expected - # Many dmrpp's currently do not have the zlib level in the metadata (so default of -1 is used) - # Usage of the virtual Dataset is not affected, but the comparison will fail - # Remove once NASA dmrpps are updated with new dmrpp version: https://github.com/OPENDAP/bes/issues/954 - for x in result.variables: - if result[x].data.zarray.dict()["filters"] is not None: - e_filters = [ - z - for z in expected[x].data.zarray.dict()["filters"] - if "id" in z and z["id"] == "zlib" - ][0] - r_filters = [ - z - for z in result[x].data.zarray.dict()["filters"] - if "id" in z and z["id"] == "zlib" - ][0] - r_filters["level"] = e_filters["level"] - - -@pytest.mark.parametrize("dmrpp_url, data_url, data_type", urls) -def test_dmrpp_reader(dmrpp_url: str, data_url: str, data_type: str): - import earthaccess - - fs = earthaccess.get_fsspec_https_session() - result = open_virtual_dataset( - dmrpp_url, - indexes={}, - filetype=FileType("dmrpp"), - reader_options={"storage_options": fs.storage_options}, - ) - expected = open_virtual_dataset( - data_url, - indexes={}, - filetype=FileType(data_type), - reader_options={"storage_options": fs.storage_options}, - ) - match_zlib_level(result, expected) +@network +@pytest.mark.parametrize("data_type, data_url, dmrpp_type, dmrpp_url", urls) +def test_dmrpp_reader(data_type, data_url, dmrpp_type, dmrpp_url): + result = open_virtual_dataset(dmrpp_url, indexes={}, filetype=dmrpp_type) + expected = open_virtual_dataset(data_url, indexes={}) xr.testing.assert_identical(result, expected) diff --git a/virtualizarr/xarray.py b/virtualizarr/xarray.py index 3a493a7..3b3cb7f 100644 --- a/virtualizarr/xarray.py +++ b/virtualizarr/xarray.py @@ -43,9 +43,7 @@ def open_virtual_dataset( cftime_variables: Iterable[str] | None = None, indexes: Mapping[str, Index] | None = None, virtual_array_class=ManifestArray, - reader_options: Optional[dict] = { - "storage_options": {"key": "", "secret": "", "anon": True} - }, + reader_options: Optional[dict] = None, ) -> xr.Dataset: """ Open a file or store as an xarray Dataset wrapping virtualized zarr arrays. @@ -60,7 +58,7 @@ def open_virtual_dataset( File path to open as a set of virtualized zarr arrays. filetype : FileType, default None Type of file to be opened. Used to determine which kerchunk file format backend to use. - Can be one of {'netCDF3', 'netCDF4', 'HDF', 'TIFF', 'GRIB', 'FITS', 'zarr_v3', 'dmrpp'}. + Can be one of {'netCDF3', 'netCDF4', 'HDF', 'TIFF', 'GRIB', 'FITS', 'zarr_v3'}. If not provided will attempt to automatically infer the correct filetype from header bytes. drop_variables: list[str], default is None Variables in the file to drop before returning. @@ -343,13 +341,16 @@ def variable_from_kerchunk_refs( arr_refs = kerchunk.extract_array_refs(refs, var_name) chunk_dict, zarray, zattrs = kerchunk.parse_array_refs(arr_refs) - - manifest = ChunkManifest._from_kerchunk_chunk_dict(chunk_dict) - # we want to remove the _ARRAY_DIMENSIONS from the final variables' .attrs dims = zattrs.pop("_ARRAY_DIMENSIONS") - - varr = virtual_array_class(zarray=zarray, chunkmanifest=manifest) + if chunk_dict: + manifest = ChunkManifest._from_kerchunk_chunk_dict(chunk_dict) + varr = virtual_array_class(zarray=zarray, chunkmanifest=manifest) + else: + # This means we encountered a scalar variable of dimension 0, + # very likely that it actually has no numeric value and its only purpose + # is to communicate dataset attributes. + varr = zarray.fill_value return xr.Variable(data=varr, dims=dims, attrs=zattrs) From 76380929aa1c0fc67c52b24f2b12b8b77fef07fd Mon Sep 17 00:00:00 2001 From: ayushnag <35325113+ayushnag@users.noreply.github.com> Date: Sun, 4 Aug 2024 12:50:57 +0530 Subject: [PATCH 11/17] add dmrpp api docs --- docs/api.rst | 11 ++++++++++ docs/releases.rst | 3 +++ virtualizarr/readers/dmrpp.py | 38 +++++++++++++++++++++++++++++------ 3 files changed, 46 insertions(+), 6 deletions(-) diff --git a/docs/api.rst b/docs/api.rst index 3dc1d14..3c9a87a 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -66,3 +66,14 @@ VirtualiZarr's :py:class:`~virtualizarr.ManifestArray` objects support a limited stack expand_dims broadcast_to + +Readers +======= + +.. currentmodule:: virtualizarr.readers.dmrpp +.. autosummary:: + :nosignatures: + :toctree: generated/ + + DMRParser + DMRParser.parse_dataset diff --git a/docs/releases.rst b/docs/releases.rst index f472db0..3025c0d 100644 --- a/docs/releases.rst +++ b/docs/releases.rst @@ -9,6 +9,9 @@ v1.0.1 (unreleased) New Features ~~~~~~~~~~~~ +- Add parser for the OPeNDAP DMR++ XML format and integration with open_virtual_dataset (:pull:`113`) + By `Ayush Nag `_. + Breaking changes ~~~~~~~~~~~~~~~~ diff --git a/virtualizarr/readers/dmrpp.py b/virtualizarr/readers/dmrpp.py index 0d6453f..2b1a5be 100644 --- a/virtualizarr/readers/dmrpp.py +++ b/virtualizarr/readers/dmrpp.py @@ -14,9 +14,12 @@ class DMRParser: """ - Parses a DMR++ file and creates a virtual xr.Dataset. - Handles groups, dimensions, coordinates, data variables, encoding, chunk manifests, and attributes. - Highly modular to allow support for older dmrpp schema versions + Parser for the OPeNDAP DMR++ XML format. + Reads groups, dimensions, coordinates, data variables, encoding, chunk manifests, and attributes. + Highly modular to allow support for older dmrpp schema versions. Includes many utility functions to extract + different information such as finding all variable tags, splitting hdf5 groups, parsing dimensions, and more. + + OPeNDAP DMR++ homepage: https://docs.opendap.org/index.php/DMR%2B%2B """ # DAP and DMRPP XML namespaces @@ -66,7 +69,7 @@ def __init__(self, dmr: str, data_filepath: Optional[str] = None): def parse_dataset(self, group=None) -> xr.Dataset: """ - Parse the dataset from the dmrpp file + Parses the given file and creates a virtual xr.Dataset with ManifestArrays. Parameters ---------- @@ -77,6 +80,29 @@ def parse_dataset(self, group=None) -> xr.Dataset: Returns ------- An xr.Dataset wrapping virtualized zarr arrays. + + Examples + -------- + Open a sample DMR++ file and parse the dataset + + >>> import requests + >>> r = requests.get("https://github.com/OPENDAP/bes/raw/3e518f6dc2f625b0b83cfb6e6fd5275e4d6dcef1/modules/dmrpp_module/data/dmrpp/chunked_threeD.h5.dmrpp") + >>> parser = DMRParser(r.text) + >>> vds = parser.parse_dataset() + >>> vds + Size: 4MB + Dimensions: (phony_dim_0: 100, phony_dim_1: 100, phony_dim_2: 100) + Dimensions without coordinates: phony_dim_0, phony_dim_1, phony_dim_2 + Data variables: + d_8_chunks (phony_dim_0, phony_dim_1, phony_dim_2) float32 4MB ManifestA... + + >>> vds2 = open_virtual_dataset("https://github.com/OPENDAP/bes/raw/3e518f6dc2f625b0b83cfb6e6fd5275e4d6dcef1/modules/dmrpp_module/data/dmrpp/chunked_threeD.h5.dmrpp", filetype="dmrpp", indexes={}) + >>> vds2 + Size: 4MB + Dimensions: (phony_dim_0: 100, phony_dim_1: 100, phony_dim_2: 100) + Dimensions without coordinates: phony_dim_0, phony_dim_1, phony_dim_2 + Data variables: + d_8_chunks (phony_dim_0, phony_dim_1, phony_dim_2) float32 4MB ManifestA... """ if group is not None: # group = "/" + group.strip("/") # ensure group is in form "/a/b" @@ -191,7 +217,7 @@ def _parse_hdf5_dataset( attrs.update(self._parse_attribute(attr_tag)) if group in all_groups: # replace aliased variable names with original names: gt1r_heights -> heights - orignames = self.find_original_names(all_groups[group]) + orignames = self._find_original_names(all_groups[group]) vds = self._parse_dataset(all_groups[group]) # Only one group so found attrs are global attrs if len(all_groups) == 1: @@ -199,7 +225,7 @@ def _parse_hdf5_dataset( return vds.rename(orignames) raise ValueError(f"Group {group} not found in HDF5 DMR file") - def find_original_names(self, root: ET.Element) -> dict[str, str]: + def _find_original_names(self, root: ET.Element) -> dict[str, str]: orignames: dict[str, str] = {} vars_tags: list[ET.Element] = [] for dap_dtype in self._dap_np_dtype: From 83cb5863e85cd57253e2cbf7cf06fff6474277f5 Mon Sep 17 00:00:00 2001 From: ayushnag <35325113+ayushnag@users.noreply.github.com> Date: Sun, 4 Aug 2024 12:57:28 +0530 Subject: [PATCH 12/17] resolve conflict --- docs/releases.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/releases.rst b/docs/releases.rst index 3025c0d..7d30e7a 100644 --- a/docs/releases.rst +++ b/docs/releases.rst @@ -12,6 +12,9 @@ New Features - Add parser for the OPeNDAP DMR++ XML format and integration with open_virtual_dataset (:pull:`113`) By `Ayush Nag `_. +- Load scalar variables by default. (:pull:`205`) + By `Gustavo Hidalgo `_. + Breaking changes ~~~~~~~~~~~~~~~~ From 888ce3281d024519849018e7ec3c5f1dd2758a89 Mon Sep 17 00:00:00 2001 From: ayushnag <35325113+ayushnag@users.noreply.github.com> Date: Sun, 25 Aug 2024 23:27:00 +0530 Subject: [PATCH 13/17] indexes and docs fix --- docs/api.rst | 11 ------ virtualizarr/readers/dmrpp.py | 70 ++++++++++++++++++++++++++--------- virtualizarr/xarray.py | 9 ++++- 3 files changed, 61 insertions(+), 29 deletions(-) diff --git a/docs/api.rst b/docs/api.rst index 3c9a87a..3dc1d14 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -66,14 +66,3 @@ VirtualiZarr's :py:class:`~virtualizarr.ManifestArray` objects support a limited stack expand_dims broadcast_to - -Readers -======= - -.. currentmodule:: virtualizarr.readers.dmrpp -.. autosummary:: - :nosignatures: - :toctree: generated/ - - DMRParser - DMRParser.parse_dataset diff --git a/virtualizarr/readers/dmrpp.py b/virtualizarr/readers/dmrpp.py index 2b1a5be..dc34a61 100644 --- a/virtualizarr/readers/dmrpp.py +++ b/virtualizarr/readers/dmrpp.py @@ -1,11 +1,13 @@ import os import warnings from collections import defaultdict +from collections.abc import Mapping from typing import Any, Optional from xml.etree import ElementTree as ET import numpy as np import xarray as xr +from xarray.core.indexes import Index from virtualizarr.manifests import ChunkManifest, ManifestArray from virtualizarr.types import ChunkKey @@ -46,7 +48,7 @@ class DMRParser: } # Default zlib compression value _default_zlib_value = 6 - # Encoding keys that should be cast to float + # Encoding keys that should be removed from attributes and placed in xarray encoding dict _encoding_keys = {"_FillValue", "missing_value", "scale_factor", "add_offset"} def __init__(self, dmr: str, data_filepath: Optional[str] = None): @@ -67,7 +69,9 @@ def __init__(self, dmr: str, data_filepath: Optional[str] = None): data_filepath if data_filepath is not None else self.root.attrib["name"] ) - def parse_dataset(self, group=None) -> xr.Dataset: + def parse_dataset( + self, group=None, indexes: Mapping[str, Index] = {} + ) -> xr.Dataset: """ Parses the given file and creates a virtual xr.Dataset with ManifestArrays. @@ -77,6 +81,10 @@ def parse_dataset(self, group=None) -> xr.Dataset: The group to parse. If None, and no groups are present, the dataset is parsed. If None and groups are present, the first group is parsed. + indexes : Mapping[str, Index], default is {} + Indexes to use on the returned xarray Dataset. + Default is {} which will avoid creating any indexes + Returns ------- An xr.Dataset wrapping virtualized zarr arrays. @@ -110,13 +118,16 @@ def parse_dataset(self, group=None) -> xr.Dataset: "/" ) # ensure group is in form "a/b/c" if self._is_hdf5(self.root): - return self._parse_hdf5_dataset(self.root, group) + return self._parse_hdf5_dataset(self.root, group, indexes) if self.data_filepath.endswith(".nc"): - return self._parse_netcdf4_dataset(self.root, group) + return self._parse_netcdf4_dataset(self.root, group, indexes) raise ValueError("DMR file must be HDF5 or netCDF4 based") def _parse_netcdf4_dataset( - self, root: ET.Element, group: Optional[str] = None + self, + root: ET.Element, + group: Optional[str] = None, + indexes: Mapping[str, Index] = {}, ) -> xr.Dataset: """ Parse the dataset from the netcdf4 based dmrpp with groups, starting at the given group. @@ -143,14 +154,14 @@ def _parse_netcdf4_dataset( "No groups found in NetCDF4 DMR file; ignoring group parameter" ) # no groups found and no group specified -> parse dataset - return self._parse_dataset(root) + return self._parse_dataset(root, indexes) all_groups = self._split_netcdf4(root) if group is None: # groups found and no group specified -> parse first group - return self._parse_dataset(group_tags[0]) + return self._parse_dataset(group_tags[0], indexes) if group in all_groups: # groups found and group specified -> parse specified group - return self._parse_dataset(all_groups[group]) + return self._parse_dataset(all_groups[group], indexes) else: # groups found and specified group not found -> error raise ValueError(f"Group {group} not found in NetCDF4 DMR file") @@ -186,7 +197,10 @@ def _is_hdf5(self, root: ET.Element) -> bool: return False def _parse_hdf5_dataset( - self, root: ET.Element, group: Optional[str] = None + self, + root: ET.Element, + group: Optional[str] = None, + indexes: Mapping[str, Index] = {}, ) -> xr.Dataset: """ Parse the dataset from the HDF5 based dmrpp with groups, starting at the given group. @@ -201,13 +215,17 @@ def _parse_hdf5_dataset( The group to parse. If None, and no groups are present, the dataset is parsed. If None and groups are present, the first group is parsed. + indexes : Mapping[str, Index], default is {} + Indexes to use on the returned xarray Dataset. + Default is {} which will avoid creating any indexes + Returns ------- xr.Dataset """ all_groups = self._split_hdf5(root=root) if len(all_groups) == 0: - raise ValueError("No groups found in HDF based DMR file") + raise ValueError("No groups found in HDF based dmrpp file") if group is None: # pick a random group if no group is specified group = next(iter(all_groups)) @@ -218,14 +236,29 @@ def _parse_hdf5_dataset( if group in all_groups: # replace aliased variable names with original names: gt1r_heights -> heights orignames = self._find_original_names(all_groups[group]) - vds = self._parse_dataset(all_groups[group]) + vds = self._parse_dataset(all_groups[group], indexes) # Only one group so found attrs are global attrs if len(all_groups) == 1: vds.attrs.update(attrs) return vds.rename(orignames) - raise ValueError(f"Group {group} not found in HDF5 DMR file") + raise ValueError(f"Group {group} not found in HDF5 dmrpp file") def _find_original_names(self, root: ET.Element) -> dict[str, str]: + """ + Find the original variable names from the HDF based groups. E.g. gt1r_heights -> heights + + E.g. if the variable name is 'gt1r_heights', the original name is 'heights' from the group 'gt1r'. + + Parameters + ---------- + root : ET.Element + The root element of the DMR file. + + Returns + ------- + dict[str, str] + """ + orignames: dict[str, str] = {} vars_tags: list[ET.Element] = [] for dap_dtype in self._dap_np_dtype: @@ -296,7 +329,9 @@ def _split_hdf5(self, root: ET.Element) -> dict[str, ET.Element]: groups_roots[next(iter(groups_roots))].extend(container_attr_tag) return groups_roots - def _parse_dataset(self, root: ET.Element) -> xr.Dataset: + def _parse_dataset( + self, root: ET.Element, indexes: Mapping[str, Index] = {} + ) -> xr.Dataset: """ Parse the dataset using the root element of the DMR file. @@ -332,7 +367,7 @@ def _parse_dataset(self, root: ET.Element) -> xr.Dataset: attrs.update(self._parse_attribute(attr_tag)) return xr.Dataset( data_vars=data_vars, - coords=xr.Coordinates(coords=coord_vars, indexes={}), + coords=xr.Coordinates(coords=coord_vars, indexes=indexes), attrs=attrs, ) @@ -476,7 +511,7 @@ def _parse_variable( ) # Chunks and Filters filters = None - shape = tuple(dim_shapes.values()) + shape: tuple[int] = tuple(dim_shapes.values()) chunks_shape = shape chunks_tag = var_tag.find("dmr:chunks", self._ns) if chunks_tag is not None: @@ -490,8 +525,9 @@ def _parse_variable( attrs: dict[str, Any] = {} for attr_tag in var_tag.iterfind("dap:Attribute", self._ns): attrs.update(self._parse_attribute(attr_tag)) - # Remove attributes only used for parsing logic + # Fill value is placed in encoding and thus removed from attributes fill_value = attrs.pop("_FillValue", 0.0) + # Remove attributes only used for parsing logic attrs.pop("fullnamepath", None) attrs.pop("origname", None) attrs.pop("coordinates", None) @@ -615,7 +651,7 @@ def _parse_chunks( chunks_tag : ET.Element An ElementTree Element with a tag. - chunks : tuple + chunks_shape : tuple Chunk sizes for each dimension. E.g. (1, 1447, 2895) Returns diff --git a/virtualizarr/xarray.py b/virtualizarr/xarray.py index 3b3cb7f..01e5095 100644 --- a/virtualizarr/xarray.py +++ b/virtualizarr/xarray.py @@ -127,11 +127,18 @@ def open_virtual_dataset( if filetype == "dmrpp": from virtualizarr.readers.dmrpp import DMRParser + if loadable_variables != [] or cftime_variables != [] or indexes is None: + raise NotImplementedError( + "Specifying `loadable_variables`, `cftime_variables` or auto-creating indexes with `indexes=None` is not supported for dmrpp files." + ) + fpath = _fsspec_openfile_from_filepath( filepath=filepath, reader_options=reader_options ) parser = DMRParser(fpath.read(), data_filepath=filepath.strip(".dmrpp")) - return parser.parse_dataset() + vds = parser.parse_dataset() + vds.drop_vars(drop_variables) + return vds else: if reader_options is None: universal_filepath = UPath(filepath) From ee23ec082ef597eda3809522e867b6a6d4f42a3b Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Mon, 26 Aug 2024 08:30:44 -0600 Subject: [PATCH 14/17] Fix type hint for shape --- virtualizarr/readers/dmrpp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/virtualizarr/readers/dmrpp.py b/virtualizarr/readers/dmrpp.py index dc34a61..fa66205 100644 --- a/virtualizarr/readers/dmrpp.py +++ b/virtualizarr/readers/dmrpp.py @@ -511,7 +511,7 @@ def _parse_variable( ) # Chunks and Filters filters = None - shape: tuple[int] = tuple(dim_shapes.values()) + shape: tuple[int, ...] = tuple(dim_shapes.values()) chunks_shape = shape chunks_tag = var_tag.find("dmr:chunks", self._ns) if chunks_tag is not None: From d9337ff48c54f2de740a40c939403c65878b9ed5 Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Mon, 26 Aug 2024 08:53:34 -0600 Subject: [PATCH 15/17] change how FileType is used --- virtualizarr/xarray.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/virtualizarr/xarray.py b/virtualizarr/xarray.py index 4deec32..5bef6f6 100644 --- a/virtualizarr/xarray.py +++ b/virtualizarr/xarray.py @@ -137,7 +137,7 @@ def open_virtual_dataset( return open_virtual_dataset_from_v3_store( storepath=filepath, drop_variables=drop_variables, indexes=indexes ) - if filetype == "dmrpp": + elif filetype.name.lower() == "dmrpp": from virtualizarr.readers.dmrpp import DMRParser if loadable_variables != [] or cftime_variables != [] or indexes is None: From 6bb9218f498bcc93e019df14fb63ce85e43591b7 Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Mon, 26 Aug 2024 09:01:46 -0600 Subject: [PATCH 16/17] Change FileType check again --- virtualizarr/xarray.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/virtualizarr/xarray.py b/virtualizarr/xarray.py index 5bef6f6..0fb3381 100644 --- a/virtualizarr/xarray.py +++ b/virtualizarr/xarray.py @@ -137,7 +137,7 @@ def open_virtual_dataset( return open_virtual_dataset_from_v3_store( storepath=filepath, drop_variables=drop_variables, indexes=indexes ) - elif filetype.name.lower() == "dmrpp": + elif filetype == FileType.dmrpp: from virtualizarr.readers.dmrpp import DMRParser if loadable_variables != [] or cftime_variables != [] or indexes is None: From d1948d42f9ab3b47ea098fb9b506b79f0f4e20ad Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Mon, 26 Aug 2024 11:28:00 -0400 Subject: [PATCH 17/17] fix storage_options bug --- virtualizarr/utils.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/virtualizarr/utils.py b/virtualizarr/utils.py index aa73e7f..092ddd2 100644 --- a/virtualizarr/utils.py +++ b/virtualizarr/utils.py @@ -16,7 +16,7 @@ def _fsspec_openfile_from_filepath( *, filepath: str, - reader_options: Optional[dict] = {}, + reader_options: Optional[dict] = None, ) -> OpenFileType: """Converts input filepath to fsspec openfile object. @@ -44,6 +44,9 @@ def _fsspec_openfile_from_filepath( universal_filepath = UPath(filepath) protocol = universal_filepath.protocol + if reader_options is None: + reader_options = {} + storage_options = reader_options.get("storage_options", {}) # type: ignore fpath = fsspec.filesystem(protocol, **storage_options).open(filepath)