From 06592f1b0bc196c12a78845e1e9f8805a1f9aa29 Mon Sep 17 00:00:00 2001 From: ayushnag <35325113+ayushnag@users.noreply.github.com> Date: Tue, 18 Jun 2024 15:05:58 -0700 Subject: [PATCH 1/6] base features --- earthaccess/__init__.py | 4 + earthaccess/virtualizarr.py | 112 +++++++++++++++++++++++++ tests/integration/test_virtualizarr.py | 49 +++++++++++ 3 files changed, 165 insertions(+) create mode 100644 earthaccess/virtualizarr.py create mode 100644 tests/integration/test_virtualizarr.py diff --git a/earthaccess/__init__.py b/earthaccess/__init__.py index 6db81f92..f9c64116 100644 --- a/earthaccess/__init__.py +++ b/earthaccess/__init__.py @@ -22,6 +22,7 @@ from .search import DataCollections, DataGranules from .store import Store from .system import PROD, UAT +from .virtualizarr import open_virtual_dataset, open_virtual_mfdataset logger = logging.getLogger(__name__) @@ -49,6 +50,9 @@ "Store", # kerchunk "consolidate_metadata", + # virtualizarr + "open_virtual_dataset", + "open_virtual_mfdataset", "PROD", "UAT", ] diff --git a/earthaccess/virtualizarr.py b/earthaccess/virtualizarr.py new file mode 100644 index 00000000..d6d0afa0 --- /dev/null +++ b/earthaccess/virtualizarr.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +import fsspec +import xarray as xr + +import earthaccess + + +def _parse_dmr( + fs: fsspec.AbstractFileSystem, + data_path: str, + dmr_path: str = None +) -> xr.Dataset: + """ + Parse a granule's DMR++ file and return a virtual xarray dataset + + Parameters + ---------- + granule : earthaccess.results.DataGranule + The granule to parse + fs : fsspec.AbstractFileSystem + The file system to use to open the DMR++ + + Returns + ---------- + xr.Dataset + The virtual dataset (with virtualizarr ManifestArrays) + + Raises + ---------- + Exception + If the DMR++ file is not found or if there is an error parsing the DMR++ + """ + from virtualizarr.readers.dmrpp import DMRParser + + dmr_path = data_path + ".dmrpp" if dmr_path is None else dmr_path + with fs.open(dmr_path) as f: + parser = DMRParser(f.read(), data_filepath=data_path) + return parser.parse() + + +def open_virtual_mfdataset( + granules: list[earthaccess.results.DataGranule], + access: str = "indirect", + preprocess: callable | None = None, + parallel: bool = True, + **xr_combine_nested_kwargs, +) -> xr.Dataset: + """ + Open multiple granules as a single virtual xarray Dataset + + Parameters + ---------- + granules : list[earthaccess.results.DataGranule] + The granules to open + access : str + The access method to use. One of "direct" or "indirect". Direct is for S3/cloud access, indirect is for HTTPS access. + xr_combine_nested_kwargs : dict + Keyword arguments for xarray.combine_nested. + See https://docs.xarray.dev/en/stable/generated/xarray.combine_nested.html + + Returns + ---------- + xr.Dataset + The virtual dataset + """ + if access == "direct": + fs = earthaccess.get_s3fs_session(results=granules) + else: + fs = earthaccess.get_fsspec_https_session() + if parallel: + # wrap _parse_dmr and preprocess with delayed + import dask + open_ = dask.delayed(_parse_dmr) + if preprocess is not None: + preprocess = dask.delayed(preprocess) + else: + open_ = _parse_dmr + vdatasets = [open_(fs=fs, data_path=g.data_links(access=access)[0]) for g in granules] + if preprocess is not None: + vdatasets = [preprocess(ds) for ds in vdatasets] + if parallel: + vdatasets = dask.compute(vdatasets)[0] + if len(vdatasets) == 1: + vds = vdatasets[0] + else: + vds = xr.combine_nested(vdatasets, **xr_combine_nested_kwargs) + return vds + + +def open_virtual_dataset( + granule: earthaccess.results.DataGranule, access: str = "indirect" +) -> xr.Dataset: + """ + Open a granule as a single virtual xarray Dataset + + Parameters + ---------- + granule : earthaccess.results.DataGranule + The granule to open + access : str + The access method to use. One of "direct" or "indirect". Direct is for S3/cloud access, indirect is for HTTPS access. + + Returns + ---------- + xr.Dataset + The virtual dataset + """ + return open_virtual_mfdataset( + granules=[granule], access=access, parallel=False, preprocess=None + ) + diff --git a/tests/integration/test_virtualizarr.py b/tests/integration/test_virtualizarr.py new file mode 100644 index 00000000..09bf14a7 --- /dev/null +++ b/tests/integration/test_virtualizarr.py @@ -0,0 +1,49 @@ +import logging +import os +import unittest + +import earthaccess +import pytest + +pytest.importorskip("virtualizarr") +pytest.importorskip("dask") + +logger = logging.getLogger(__name__) +assertions = unittest.TestCase("__init__") + +assertions.assertTrue("EARTHDATA_USERNAME" in os.environ) +assertions.assertTrue("EARTHDATA_PASSWORD" in os.environ) + +logger.info(f"Current username: {os.environ['EARTHDATA_USERNAME']}") +logger.info(f"earthaccess version: {earthaccess.__version__}") + + +@pytest.fixture(scope="module") +def granules(): + granules = earthaccess.search_data( + count=2, + short_name="MUR-JPL-L4-GLOB-v4.1", + cloud_hosted=True + ) + return granules + + +@pytest.mark.parametrize("output", "memory") +def test_open_virtual_mfdataset(tmp_path, granules, output): + xr = pytest.importorskip("xarray") + # Open directly with `earthaccess.open` + expected = xr.open_mfdataset(earthaccess.open(granules), concat_dim="time", combine="nested", combine_attrs="drop_conflicts") + + result = earthaccess.open_virtual_mfdataset(granules=granules, access="indirect", concat_dime="time", parallel=True, preprocess=None) + # dimensions + assert result.sizes == expected.sizes + # variable names, variable dimensions + assert result.variables.keys() == expected.variables.keys() + # attributes + assert result.attrs == expected.attrs + # coordinates + assert result.coords.keys() == expected.coords.keys() + # chunks + assert result.chunks == expected.chunks + # encoding + assert result.encoding == expected.encoding From 90f296a40406aa8a8c01488a19d0023b6a83c986 Mon Sep 17 00:00:00 2001 From: ayushnag <35325113+ayushnag@users.noreply.github.com> Date: Tue, 6 Aug 2024 22:30:14 +0530 Subject: [PATCH 2/6] added load param --- earthaccess/virtualizarr.py | 93 +++++++++++++++++++++---------------- 1 file changed, 52 insertions(+), 41 deletions(-) diff --git a/earthaccess/virtualizarr.py b/earthaccess/virtualizarr.py index d6d0afa0..4f9a6765 100644 --- a/earthaccess/virtualizarr.py +++ b/earthaccess/virtualizarr.py @@ -1,47 +1,17 @@ from __future__ import annotations -import fsspec -import xarray as xr +from typing import TYPE_CHECKING import earthaccess - -def _parse_dmr( - fs: fsspec.AbstractFileSystem, - data_path: str, - dmr_path: str = None -) -> xr.Dataset: - """ - Parse a granule's DMR++ file and return a virtual xarray dataset - - Parameters - ---------- - granule : earthaccess.results.DataGranule - The granule to parse - fs : fsspec.AbstractFileSystem - The file system to use to open the DMR++ - - Returns - ---------- - xr.Dataset - The virtual dataset (with virtualizarr ManifestArrays) - - Raises - ---------- - Exception - If the DMR++ file is not found or if there is an error parsing the DMR++ - """ - from virtualizarr.readers.dmrpp import DMRParser - - dmr_path = data_path + ".dmrpp" if dmr_path is None else dmr_path - with fs.open(dmr_path) as f: - parser = DMRParser(f.read(), data_filepath=data_path) - return parser.parse() +if TYPE_CHECKING: + import xarray as xr def open_virtual_mfdataset( granules: list[earthaccess.results.DataGranule], access: str = "indirect", + load: bool = True, preprocess: callable | None = None, parallel: bool = True, **xr_combine_nested_kwargs, @@ -55,6 +25,12 @@ def open_virtual_mfdataset( The granules to open access : str The access method to use. One of "direct" or "indirect". Direct is for S3/cloud access, indirect is for HTTPS access. + load: bool + When true, creates a lazy loaded, numpy/dask backed xarray dataset with indexes. When false a virtual xarray dataset is created with ManifestArrays. This virtual dataset cannot load data and is used to create zarr reference files. See https://virtualizarr.readthedocs.io/en/latest/ for more information on virtual xarray datasets + preprocess : callable + A function to apply to each virtual dataset before combining + parallel : bool + Whether to open the virtual datasets in parallel (using dask.delayed) xr_combine_nested_kwargs : dict Keyword arguments for xarray.combine_nested. See https://docs.xarray.dev/en/stable/generated/xarray.combine_nested.html @@ -64,19 +40,39 @@ def open_virtual_mfdataset( xr.Dataset The virtual dataset """ + import xarray as xr + + import virtualizarr as vz + if access == "direct": fs = earthaccess.get_s3fs_session(results=granules) + fs.storage_options["anon"] = False else: fs = earthaccess.get_fsspec_https_session() if parallel: - # wrap _parse_dmr and preprocess with delayed + # wrap _open_virtual_dataset and preprocess with delayed import dask - open_ = dask.delayed(_parse_dmr) + + open_ = dask.delayed(vz.open_virtual_dataset) if preprocess is not None: preprocess = dask.delayed(preprocess) else: - open_ = _parse_dmr - vdatasets = [open_(fs=fs, data_path=g.data_links(access=access)[0]) for g in granules] + open_ = vz.open_virtual_dataset + data_paths: list[str] = [] + vdatasets = [] + for g in granules: + data_paths.append(g.data_links(access=access)[0]) + vdatasets.append( + open_( + filepath=g.data_links(access=access)[0] + ".dmrpp", + filetype="dmrpp", + reader_options={"storage_options": fs.storage_options}, + ) + ) + # Rename paths to match granule s3/https paths + vdatasets = [ + vds.virtualize.rename_paths(data_paths[i]) for i, vds in enumerate(vdatasets) + ] if preprocess is not None: vdatasets = [preprocess(ds) for ds in vdatasets] if parallel: @@ -85,11 +81,23 @@ def open_virtual_mfdataset( vds = vdatasets[0] else: vds = xr.combine_nested(vdatasets, **xr_combine_nested_kwargs) + if load: + options = fs.storage_options.copy() + refs = vds.virtualize.to_kerchunk(filepath=None, format="dict") + options["fo"] = refs + return xr.open_dataset( + "reference://", + engine="zarr", + chunks={}, + backend_kwargs={"storage_options": options, "consolidated": False}, + ) return vds def open_virtual_dataset( - granule: earthaccess.results.DataGranule, access: str = "indirect" + granule: earthaccess.results.DataGranule, + access: str = "indirect", + load: bool = True, ) -> xr.Dataset: """ Open a granule as a single virtual xarray Dataset @@ -100,6 +108,10 @@ def open_virtual_dataset( The granule to open access : str The access method to use. One of "direct" or "indirect". Direct is for S3/cloud access, indirect is for HTTPS access. + load: bool + When true, creates a numpy/dask backed xarray dataset. When false a virtual xarray dataset is created with ManifestArrays + This virtual dataset cannot load data and is used to create zarr reference files. See https://virtualizarr.readthedocs.io/en/latest/ + for more information on virtual xarray datasets Returns ---------- @@ -107,6 +119,5 @@ def open_virtual_dataset( The virtual dataset """ return open_virtual_mfdataset( - granules=[granule], access=access, parallel=False, preprocess=None + granules=[granule], access=access, load=load, parallel=False, preprocess=None ) - From b626a4139727568ffccbf927a53489a07c46961e Mon Sep 17 00:00:00 2001 From: ayushnag <35325113+ayushnag@users.noreply.github.com> Date: Sat, 17 Aug 2024 00:03:17 +0530 Subject: [PATCH 3/6] add dmrpp parser and test --- earthaccess/dmrpp.py | 632 +++++++++++++++++++++++++ earthaccess/virtualizarr.py | 145 +++++- poetry.lock | 61 ++- pyproject.toml | 1 + tests/integration/test_virtualizarr.py | 49 -- tests/unit/test_dmrpp.py | 48 ++ 6 files changed, 850 insertions(+), 86 deletions(-) create mode 100644 earthaccess/dmrpp.py delete mode 100644 tests/integration/test_virtualizarr.py create mode 100644 tests/unit/test_dmrpp.py diff --git a/earthaccess/dmrpp.py b/earthaccess/dmrpp.py new file mode 100644 index 00000000..05a3f4cc --- /dev/null +++ b/earthaccess/dmrpp.py @@ -0,0 +1,632 @@ +import os +import warnings +from collections import defaultdict +from typing import Any, Optional +from xml.etree import ElementTree as ET + +import numpy as np +import xarray as xr + +from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.types import ChunkKey +from virtualizarr.zarr import ZArray + + +class DMRParser: + """Parser for the OPeNDAP DMR++ XML format. + + Reads groups, dimensions, coordinates, data variables, encoding, chunk manifests, and attributes. + Highly modular to allow support for older dmrpp schema versions. Includes many utility functions to extract + different information such as finding all variable tags, splitting hdf5 groups, parsing dimensions, and more. + + OPeNDAP DMR++ homepage: https://docs.opendap.org/index.php/DMR%2B%2B + """ + + # DAP and DMRPP XML namespaces + _ns = { + "dap": "http://xml.opendap.org/ns/DAP/4.0#", + "dmr": "http://xml.opendap.org/dap/dmrpp/1.0.0#", + } + # DAP data types to numpy data types + _dap_np_dtype = { + "Byte": "uint8", + "UByte": "uint8", + "Int8": "int8", + "UInt8": "uint8", + "Int16": "int16", + "UInt16": "uint16", + "Int32": "int32", + "UInt32": "uint32", + "Int64": "int64", + "UInt64": "uint64", + "Url": "object", + "Float32": "float32", + "Float64": "float64", + "String": "object", + } + # Default zlib compression value + _default_zlib_value = 6 + # Encoding keys that should be cast to float + _encoding_keys = {"_FillValue", "missing_value", "scale_factor", "add_offset"} + + def __init__(self, dmr: str, data_filepath: Optional[str] = None): + """Initialize the DMRParser with the given DMR data and data file path. + + Parameters + ---------- + dmr : str + The DMR file contents as a string. + + data_filepath : str, optional + The path to the actual data file that will be set in the chunk manifests. + If None, the data file path is taken from the DMR file. + """ + self.root = ET.fromstring(dmr) + self.data_filepath = ( + data_filepath if data_filepath is not None else self.root.attrib["name"] + ) + + def parse_dataset(self, group=None) -> xr.Dataset: + """Parses the given file and creates a virtual xr.Dataset with ManifestArrays. + + Parameters + ---------- + group : str + The group to parse. If None, and no groups are present, the dataset is parsed. + If None and groups are present, the first group is parsed. + + Returns: + ------- + An xr.Dataset wrapping virtualized zarr arrays. + + Examples: + -------- + Open a sample DMR++ file and parse the dataset + + >>> import requests + >>> r = requests.get("https://github.com/OPENDAP/bes/raw/3e518f6dc2f625b0b83cfb6e6fd5275e4d6dcef1/modules/dmrpp_module/data/dmrpp/chunked_threeD.h5.dmrpp") + >>> parser = DMRParser(r.text) + >>> vds = parser.parse_dataset() + >>> vds + Size: 4MB + Dimensions: (phony_dim_0: 100, phony_dim_1: 100, phony_dim_2: 100) + Dimensions without coordinates: phony_dim_0, phony_dim_1, phony_dim_2 + Data variables: + d_8_chunks (phony_dim_0, phony_dim_1, phony_dim_2) float32 4MB ManifestA... + + >>> vds2 = open_virtual_dataset("https://github.com/OPENDAP/bes/raw/3e518f6dc2f625b0b83cfb6e6fd5275e4d6dcef1/modules/dmrpp_module/data/dmrpp/chunked_threeD.h5.dmrpp", filetype="dmrpp", indexes={}) + >>> vds2 + Size: 4MB + Dimensions: (phony_dim_0: 100, phony_dim_1: 100, phony_dim_2: 100) + Dimensions without coordinates: phony_dim_0, phony_dim_1, phony_dim_2 + Data variables: + d_8_chunks (phony_dim_0, phony_dim_1, phony_dim_2) float32 4MB ManifestA... + """ + if group is not None: + # group = "/" + group.strip("/") # ensure group is in form "/a/b" + group = os.path.normpath(group).removeprefix( + "/" + ) # ensure group is in form "a/b/c" + if self._is_hdf5(self.root): + return self._parse_hdf5_dataset(self.root, group) + if self.data_filepath.endswith(".nc"): + return self._parse_netcdf4_dataset(self.root, group) + raise ValueError("dmrpp file must be HDF5 or netCDF4 based") + + def _parse_netcdf4_dataset( + self, root: ET.Element, group: Optional[str] = None + ) -> xr.Dataset: + """Parse the dataset from the netcdf4 based dmrpp with groups, starting at the given group. Set root to the given group. + + Parameters + ---------- + root : ET.Element + The root element of the DMR file. + + group : str + The group to parse. If None, and no groups are present, the dataset is parsed. + If None and groups are present, the first group is parsed. + + Returns: + ------- + xr.Dataset + """ + group_tags = root.findall("dap:Group", self._ns) + if len(group_tags) == 0: + if group is not None: + # no groups found and group specified -> warning + warnings.warn( + "No groups found in NetCDF4 dmrpp file; ignoring group parameter" + ) + # no groups found and no group specified -> parse dataset + return self._parse_dataset(root) + all_groups = self._split_netcdf4(root) + if group is None: + # groups found and no group specified -> parse first group + return self._parse_dataset(group_tags[0]) + if group in all_groups: + # groups found and group specified -> parse specified group + return self._parse_dataset(all_groups[group]) + else: + # groups found and specified group not found -> error + raise ValueError(f"Group {group} not found in NetCDF4 dmrpp file") + + def _split_netcdf4(self, root: ET.Element) -> dict[str, ET.Element]: + """Split the input element into several ET.Elements by netcdf4 group. E.g. {"left": , "right": }. + + Parameters + ---------- + root : ET.Element + The root element of the DMR file. + + Returns: + ------- + dict[str, ET.Element] + """ + group_tags = root.findall("dap:Group", self._ns) + all_groups: dict[str, ET.Element] = defaultdict( + lambda: ET.Element(root.tag, root.attrib) + ) + for group_tag in group_tags: + all_groups[os.path.normpath(group_tag.attrib["name"])] = group_tag + return all_groups + + def _is_hdf5(self, root: ET.Element) -> bool: + """Check if the DMR file is HDF5 based.""" + if root.find(".//dap:Attribute[@name='fullnamepath']", self._ns) is not None: + return True + if root.find("./dap:Attribute[@name='HDF5_GLOBAL']", self._ns) is not None: + return True + return False + + def _parse_hdf5_dataset( + self, root: ET.Element, group: Optional[str] = None + ) -> xr.Dataset: + """Parse the dataset from the HDF5 based dmrpp with groups, starting at the given group. Set root to the given group. + + Parameters + ---------- + root : ET.Element + The root element of the DMR file. + + group : str + The group to parse. If None, and no groups are present, the dataset is parsed. + If None and groups are present, the first group is parsed. + + Returns: + ------- + xr.Dataset + """ + all_groups = self._split_hdf5(root=root) + if len(all_groups) == 0: + raise ValueError("No groups found in HDF based dmrpp file") + if group is None: + # pick a random group if no group is specified + group = next(iter(all_groups)) + attrs = {} + for attr_tag in root.iterfind("dap:Attribute", self._ns): + if attr_tag.attrib["type"] != "Container": + attrs.update(self._parse_attribute(attr_tag)) + if group in all_groups: + # replace aliased variable names with original names: gt1r_heights -> heights + orignames = self._find_original_names(all_groups[group]) + vds = self._parse_dataset(all_groups[group]) + # Only one group so found attrs are global attrs + if len(all_groups) == 1: + vds.attrs.update(attrs) + return vds.rename(orignames) + raise ValueError(f"Group {group} not found in HDF5 dmrpp file") + + def _find_original_names(self, root: ET.Element) -> dict[str, str]: + """Find the original names of variables in the DMR file. E.g. {"gt1r_heights": "heights"}. + + Parameters + ---------- + root : ET.Element + The root element of the DMR file. + + Returns: + ------- + dict[str, str] + """ + orignames: dict[str, str] = {} + vars_tags: list[ET.Element] = [] + for dap_dtype in self._dap_np_dtype: + vars_tags += root.findall(f"dap:{dap_dtype}", self._ns) + for var_tag in vars_tags: + origname_tag = var_tag.find( + "./dap:Attribute[@name='origname']/dap:Value", self._ns + ) + if origname_tag is not None and origname_tag.text is not None: + orignames[var_tag.attrib["name"]] = origname_tag.text + return orignames + + def _split_hdf5(self, root: ET.Element) -> dict[str, ET.Element]: + """Split the input element into several ET.Elements by HDF5 group. + + E.g. {"gtr1/heights": , "gtr1/temperatures": }. Builds up new elements + each with dimensions, variables, and attributes. + + Parameters + ---------- + root : ET.Element + The root element of the DMR file. + + Returns: + ------- + dict[str, ET.Element] + """ + # Add all variable, dimension, and attribute tags to their respective groups + groups_roots: dict[str, ET.Element] = defaultdict( + lambda: ET.Element(root.tag, root.attrib) + ) + group_dims: dict[str, set[str]] = defaultdict( + set + ) # {"gt1r/heights": {"dim1", "dim2", ...}} + vars_tags: list[ET.Element] = [] + for dap_dtype in self._dap_np_dtype: + vars_tags += root.findall(f"dap:{dap_dtype}", self._ns) + # Variables + for var_tag in vars_tags: + fullname_tag = var_tag.find( + "./dap:Attribute[@name='fullnamepath']/dap:Value", self._ns + ) + if fullname_tag is not None and fullname_tag.text is not None: + # '/gt1r/heights/ph_id_pulse' -> 'gt1r/heights' + group_name = os.path.dirname(fullname_tag.text).removeprefix("/") + groups_roots[group_name].append(var_tag) + dim_tags = var_tag.findall("dap:Dim", self._ns) + dims = self._parse_multi_dims(dim_tags) + group_dims[group_name].update(dims.keys()) + # Dimensions + for dim_tag in root.iterfind("dap:Dimension", self._ns): + for g, d in group_dims.items(): + if dim_tag.attrib["name"] in d: + groups_roots[g].append(dim_tag) + # Attributes + container_attr_tag = root.find("dap:Attribute[@name='HDF5_GLOBAL']", self._ns) + if container_attr_tag is None: + attrs_tags = root.findall("dap:Attribute", self._ns) + for attr_tag in attrs_tags: + fullname_tag = attr_tag.find( + "./dap:Attribute[@name='fullnamepath']/dap:Value", self._ns + ) + if fullname_tag is not None and fullname_tag.text is not None: + group_name = os.path.dirname(fullname_tag.text).removeprefix("/") + # Add all attributes to the new dataset + groups_roots[group_name].extend(attr_tag) + else: + groups_roots[next(iter(groups_roots))].extend(container_attr_tag) + return groups_roots + + def _parse_dataset(self, root: ET.Element) -> xr.Dataset: + """Parse the dataset using the root element of the DMR file. + + Parameters + ---------- + root : ET.Element + The root element of the DMR file. + + Returns: + ------- + xr.Dataset + """ + # Dimension names and sizes + dim_tags = root.findall("dap:Dimension", self._ns) + dataset_dims = self._parse_multi_dims(dim_tags) + # Data variables and coordinates + coord_names = self._find_coord_names(root) + # if no coord_names are found or coords don't include dims, dims are used as coords + if len(coord_names) == 0 or len(coord_names) < len(dataset_dims): + coord_names = set(dataset_dims.keys()) + # Seperate and parse coords + data variables + coord_vars: dict[str, xr.Variable] = {} + data_vars: dict[str, xr.Variable] = {} + for var_tag in self._find_var_tags(root): + variable = self._parse_variable(var_tag, dataset_dims) + if var_tag.attrib["name"] in coord_names: + coord_vars[var_tag.attrib["name"]] = variable + else: + data_vars[var_tag.attrib["name"]] = variable + # Attributes + attrs: dict[str, str] = {} + for attr_tag in self.root.iterfind("dap:Attribute", self._ns): + attrs.update(self._parse_attribute(attr_tag)) + return xr.Dataset( + data_vars=data_vars, + coords=xr.Coordinates(coords=coord_vars, indexes={}), + attrs=attrs, + ) + + def _find_var_tags(self, root: ET.Element) -> list[ET.Element]: + """Find all variable tags in the DMR file. Also known as array tags. Tags are labeled with the DAP data type. E.g. , , . + + Parameters + ---------- + root : ET.Element + The root element of the DMR file. + + Returns: + ------- + list[ET.Element] + """ + vars_tags: list[ET.Element] = [] + for dap_dtype in self._dap_np_dtype: + vars_tags += root.findall(f"dap:{dap_dtype}", self._ns) + return vars_tags + + def _find_coord_names(self, root: ET.Element) -> set[str]: + """Find the name of all coordinates in root. Checks inside all variables and global attributes. + + Parameters + ---------- + root : ET.Element + The root element of the DMR file. + + Returns: + ------- + set[str] : The set of unique coordinate names. + """ + # Check for coordinate names within each variable attributes + coord_names: set[str] = set() + for var_tag in self._find_var_tags(root): + coord_tag = var_tag.find( + "./dap:Attribute[@name='coordinates']/dap:Value", self._ns + ) + if coord_tag is not None and coord_tag.text is not None: + coord_names.update(coord_tag.text.split(" ")) + for map_tag in var_tag.iterfind("dap:Map", self._ns): + coord_names.add(map_tag.attrib["name"].removeprefix("/")) + # Check for coordinate names in a global attribute + coord_tag = var_tag.find("./dap:Attribute[@name='coordinates']", self._ns) + if coord_tag is not None and coord_tag.text is not None: + coord_names.update(coord_tag.text.split(" ")) + return coord_names + + def _parse_dim(self, root: ET.Element) -> dict[str, int | None]: + """Parse single or tag. + + If the tag has no name attribute, it is a phony dimension. E.g. --> {"phony_dim": 300} + If the tag has no size attribute, it is an unlimited dimension. E.g. --> {"time": None} + If the tag has both name and size attributes, it is a regular dimension. E.g. --> {"lat": 1447} + + Parameters + ---------- + root : ET.Element + The root element Dim/Dimension tag + + Returns: + ------- + dict + E.g. {"time": 1, "lat": 1447, "lon": 2895}, {"phony_dim": 300}, {"time": None, "lat": None, "lon": None} + """ + if "name" not in root.attrib and "size" in root.attrib: + return {"phony_dim": int(root.attrib["size"])} + if "name" in root.attrib and "size" not in root.attrib: + return {os.path.basename(root.attrib["name"]): None} + if "name" in root.attrib and "size" in root.attrib: + return {os.path.basename(root.attrib["name"]): int(root.attrib["size"])} + raise ValueError("Not enough information to parse Dim/Dimension tag") + + def _parse_multi_dims( + self, dim_tags: list[ET.Element], global_dims: dict[str, int] = {} + ) -> dict: + """Parse multiple or tags. Generally tags are found within dmrpp variable tags. + + Returns best possible matching of {dimension: shape} present in the list and global_dims. E.g tags=(Dim("lat", None), Dim("lon", None)) and global_dims={"lat": 100, "lon": 100, "time": 5} --> {"lat": 100, "lon": 100} + + E.g. tags=(Dim("time", None), Dim("", 200)) and global_dims={"lat": 100, "lon": 100, "time": 5} --> {"time": 5, "phony_dim0": 200} + + This function is often used to fill in missing sizes from the global_dims. E.g. Variable tags may contain only dimension names and not sizes. If the {name: size} matching is known from the global_dims, it is used to fill in the missing sizes. + + Parameters + ---------- + dim_tags : tuple[ET.Element] + A tuple of ElementTree Elements representing dimensions in the DMR file. + + global_dims : dict + A dictionary of dimension names and sizes. E.g. {"time": 1, "lat": 1447, "lon": 2895} + + Returns: + ------- + dict + E.g. {"time": 1, "lat": 1447, "lon": 2895} + """ + dims: dict[str, int | None] = {} + for dim_tag in dim_tags: + dim: dict[str, int | None] = self._parse_dim(dim_tag) + if "phony_dim" in dim: + dims["phony_dim_" + str(len(dims))] = dim["phony_dim"] + else: + dims.update(dim) + for name, size in list(dims.items()): + if name in global_dims and size is None: + dims[name] = global_dims[name] + return dims + + def _parse_variable( + self, var_tag: ET.Element, dataset_dims: dict[str, int] + ) -> xr.Variable: + """Parse a variable from a DMR tag. + + Parameters + ---------- + var_tag : ET.Element + An ElementTree Element representing a variable in the DMR file. Will have DAP dtype as tag. + + dataset_dims : dict + A dictionary of dimension names and sizes. E.g. {"time": 1, "lat": 1447, "lon": 2895} + Must contain at least all the dimensions used by the variable. Necessary since the variable + metadata only contains the dimension names and not the sizes. + + Returns: + ------- + xr.Variable + """ + # Dimension names + dim_tags = var_tag.findall("dap:Dim", self._ns) + dim_shapes = self._parse_multi_dims(dim_tags, dataset_dims) + # convert DAP dtype to numpy dtype + dtype = np.dtype( + self._dap_np_dtype[var_tag.tag.removeprefix("{" + self._ns["dap"] + "}")] + ) + # Chunks and Filters + filters = None + shape = tuple(dim_shapes.values()) + chunks_shape = shape + chunks_tag = var_tag.find("dmr:chunks", self._ns) + if chunks_tag is not None: + # Chunks + found_chunk_dims = self._parse_chunks_dimensions(chunks_tag) + chunks_shape = found_chunk_dims if found_chunk_dims is not None else shape + chunkmanifest = self._parse_chunks(chunks_tag, chunks_shape) + # Filters + filters = self._parse_filters(chunks_tag, dtype) + # Attributes + attrs: dict[str, Any] = {} + for attr_tag in var_tag.iterfind("dap:Attribute", self._ns): + attrs.update(self._parse_attribute(attr_tag)) + # Remove attributes only used for parsing logic + fill_value = attrs.pop("_FillValue", np.nan) + attrs.pop("fullnamepath", None) + attrs.pop("origname", None) + # attrs.pop("coordinates", None) + # create ManifestArray and ZArray + zarray = ZArray( + chunks=chunks_shape, + dtype=dtype, + fill_value=fill_value, + filters=filters, + order="C", + shape=shape, + ) + marr = ManifestArray(zarray=zarray, chunkmanifest=chunkmanifest) + encoding = {k: attrs.get(k) for k in self._encoding_keys if k in attrs} + return xr.Variable( + dims=dim_shapes.keys(), data=marr, attrs=attrs, encoding=encoding + ) + + def _parse_attribute(self, attr_tag: ET.Element) -> dict[str, Any]: + """Parse an attribute from a DMR attr tag. Converts the attribute value to a native python type. + + Parameters + ---------- + attr_tag : ET.Element + An ElementTree Element with an tag. + + Returns: + ------- + dict + """ + attr: dict[str, Any] = {} + values = [] + if "type" in attr_tag.attrib and attr_tag.attrib["type"] == "Container": + return attr + dtype = np.dtype(self._dap_np_dtype[attr_tag.attrib["type"]]) + # if multiple Value tags are present, store as "key": "[v1, v2, ...]" + for value_tag in attr_tag: + # cast attribute to native python type using dmr provided dtype + val = ( + dtype.type(value_tag.text).item() + if dtype != np.object_ + else value_tag.text + ) + if val == "*": + val = np.nan + values.append(val) + attr[attr_tag.attrib["name"]] = values[0] if len(values) == 1 else values + return attr + + def _parse_filters( + self, chunks_tag: ET.Element, dtype: np.dtype + ) -> list[dict] | None: + """Parse filters from a DMR chunks tag. + + Parameters + ---------- + chunks_tag : ET.Element + An ElementTree Element with a tag. + + dtype : np.dtype + The numpy dtype of the variable. + + Returns: + ------- + list[dict] | None + E.g. [{"id": "shuffle", "elementsize": 4}, {"id": "zlib", "level": 4}] + """ + if "compressionType" in chunks_tag.attrib: + filters: list[dict] = [] + # shuffle deflate --> ["shuffle", "deflate"] + compression_types = chunks_tag.attrib["compressionType"].split(" ") + if "shuffle" in compression_types: + filters.append({"id": "shuffle", "elementsize": dtype.itemsize}) + if "deflate" in compression_types: + level = int( + chunks_tag.attrib.get("deflateLevel", self._default_zlib_value) + ) + filters.append({"id": "zlib", "level": level}) + return filters + return None + + def _parse_chunks_dimensions( + self, chunks_tag: ET.Element + ) -> tuple[int, ...] | None: + """Parse the chunk dimensions from a DMR chunks tag. Returns None if no chunk dimensions are found. + + Parameters + ---------- + chunks_tag : ET.Element + An ElementTree Element with a tag. + + Returns: + ------- + tuple[int, ...] | None + + """ + chunk_dim_tag = chunks_tag.find("dmr:chunkDimensionSizes", self._ns) + if chunk_dim_tag is not None and chunk_dim_tag.text is not None: + # 1 1447 2895 -> (1, 1447, 2895) + return tuple(map(int, chunk_dim_tag.text.split())) + return None + + def _parse_chunks( + self, chunks_tag: ET.Element, chunks_shape: tuple[int, ...] + ) -> ChunkManifest: + """Parse the chunk manifest from a DMR chunks tag. + + Parameters + ---------- + chunks_tag : ET.Element + An ElementTree Element with a tag. + + chunks : tuple + Chunk sizes for each dimension. E.g. (1, 1447, 2895) + + Returns: + ------- + ChunkManifest + """ + chunkmanifest: dict[ChunkKey, object] = {} + default_num: list[int] = ( + [0 for i in range(len(chunks_shape))] if chunks_shape else [0] + ) + chunk_key_template = ".".join(["{}" for i in range(len(default_num))]) + for chunk_tag in chunks_tag.iterfind("dmr:chunk", self._ns): + chunk_num = default_num + if "chunkPositionInArray" in chunk_tag.attrib: + # "[0,1023,10235]" -> ["0","1023","10235"] + chunk_pos = chunk_tag.attrib["chunkPositionInArray"][1:-1].split(",") + # [0,1023,10235] // [1, 1023, 2047] -> [0,1,5] + chunk_num = [ + int(chunk_pos[i]) // chunks_shape[i] + for i in range(len(chunks_shape)) + ] + # [0,1,5] -> "0.1.5" + chunk_key = ChunkKey(chunk_key_template.format(*chunk_num)) + chunkmanifest[chunk_key] = { + "path": self.data_filepath, + "offset": int(chunk_tag.attrib["offset"]), + "length": int(chunk_tag.attrib["nBytes"]), + } + return ChunkManifest(entries=chunkmanifest) diff --git a/earthaccess/virtualizarr.py b/earthaccess/virtualizarr.py index 4f9a6765..7885b4f4 100644 --- a/earthaccess/virtualizarr.py +++ b/earthaccess/virtualizarr.py @@ -5,28 +5,70 @@ import earthaccess if TYPE_CHECKING: + import fsspec import xarray as xr +def _parse_dmr( + fs: fsspec.AbstractFileSystem, + data_path: str, + dmrpp_path: str = None, + group: str | None = None, +) -> xr.Dataset: + """Parse a granule's DMR++ file and return a virtual xarray dataset. + + Parameters + ---------- + fs : fsspec.AbstractFileSystem + The file system to use to open the DMR++ + data_path : str + The path to the data file + dmrpp_path : str + The path to the DMR++ file. If None, the DMR++ file is assumed to be at data_path + ".dmrpp" + group : str + The group to open in the DMR++. + + Returns: + ---------- + xr.Dataset + The virtual dataset (with virtualizarr ManifestArrays) + + Raises: + ---------- + Exception if the DMR++ file is not found or if there is an error parsing the DMR++ + """ + from earthaccess.dmrpp import DMRParser + + dmrpp_path = data_path + ".dmrpp" if dmrpp_path is None else dmrpp_path + with fs.open(dmrpp_path) as f: + parser = DMRParser(f.read(), data_filepath=data_path) + return parser.parse_dataset(group=group) + + def open_virtual_mfdataset( granules: list[earthaccess.results.DataGranule], + group: str | None = None, access: str = "indirect", - load: bool = True, + load: bool = False, preprocess: callable | None = None, parallel: bool = True, **xr_combine_nested_kwargs, ) -> xr.Dataset: - """ - Open multiple granules as a single virtual xarray Dataset + """Open multiple granules as a single virtual xarray Dataset. Parameters ---------- granules : list[earthaccess.results.DataGranule] The granules to open + group : str + The group to open in the DMR++. If groups are present in the DMR++ files, this will open the specified group. If None, the first parsed group will be opened. + If the DMR++ file does not have groups, this parameter is ignored. access : str The access method to use. One of "direct" or "indirect". Direct is for S3/cloud access, indirect is for HTTPS access. load: bool - When true, creates a lazy loaded, numpy/dask backed xarray dataset with indexes. When false a virtual xarray dataset is created with ManifestArrays. This virtual dataset cannot load data and is used to create zarr reference files. See https://virtualizarr.readthedocs.io/en/latest/ for more information on virtual xarray datasets + Create an xarray dataset with indexes and data available to load. + + When true, creates a lazy loaded, numpy/dask backed xarray dataset with indexes. Note that when load=True all the data is now available to access but not loaded into memory. When false a virtual xarray dataset is created with ManifestArrays. This virtual dataset is a view over the underlying metadata and chunks and allows creation and concatenation of zarr reference files. This virtual dataset cannot load data on it's own and see https://virtualizarr.readthedocs.io/en/latest/ for more information on virtual xarray datasets. Also note that load=True will only work in the cloud since ranged reads of chunks are supported by cloud storage but not by HTTPS preprocess : callable A function to apply to each virtual dataset before combining parallel : bool @@ -35,17 +77,37 @@ def open_virtual_mfdataset( Keyword arguments for xarray.combine_nested. See https://docs.xarray.dev/en/stable/generated/xarray.combine_nested.html - Returns + Returns: ---------- xr.Dataset - The virtual dataset + + Example: + ---------- + >>> results = earthaccess.search_data(count=5, short_name="MUR-JPL-L4-GLOB-v4.1") + >>> vds = open_virtual_mfdataset(results, access="indirect", load=False, concat_dim="time", coords='minimal', compat='override', combine_attrs="drop_conflicts") + >>> vds + Size: 19GB + Dimensions: (time: 5, lat: 17999, lon: 36000) + Coordinates: + time (time) int32 20B ManifestArray xr.Dataset: - """ - Open a granule as a single virtual xarray Dataset + """Open a granule as a single virtual xarray Dataset. Parameters ---------- granule : earthaccess.results.DataGranule The granule to open + group : str + The group to open in the DMR++. If groups are present in the DMR++ files, this will open the specified group. If None, the first parsed group will be opened. + If the DMR++ file does not have groups, this parameter is ignored. access : str The access method to use. One of "direct" or "indirect". Direct is for S3/cloud access, indirect is for HTTPS access. load: bool @@ -113,11 +174,45 @@ def open_virtual_dataset( This virtual dataset cannot load data and is used to create zarr reference files. See https://virtualizarr.readthedocs.io/en/latest/ for more information on virtual xarray datasets - Returns + Returns: ---------- xr.Dataset - The virtual dataset + + Example: + ---------- + >>> results = earthaccess.search_data(count=2, temporal=("2023"), short_name="SWOT_L2_LR_SSH_Expert_2.0") + >>> open_virtual_dataset(results[0], access="indirect", load=False) + Size: 149MB + Dimensions: (num_lines: 9866, num_pixels: 69, + num_sides: 2) + Coordinates: + longitude (num_lines, num_pixels) int32 3MB ... + latitude (num_lines, num_pixels) int32 3MB ... + latitude_nadir (num_lines) int32 39kB ManifestArr... + longitude_nadir (num_lines) int32 39kB ManifestArr... + Dimensions without coordinates: num_lines, num_pixels, num_sides + Data variables: (12/98) + height_cor_xover_qual (num_lines, num_pixels) uint8 681kB ManifestArray>> results = earthaccess.search_data(count=2, short_name="ATL03") + >>> open_virtual_dataset(results[0], group=""/gt1r/geolocation"" access="indirect", load=False) + Size: 27MB + Dimensions: (delta_time: 149696, ds_surf_type: 5, ds_xyz: 3) + Coordinates: + delta_time (delta_time) float64 1MB ManifestArray=2.1.0)", "coverage", "coverage-enable-subprocess", "ipyth name = "fasteners" version = "0.19" description = "A python package that provides useful locks" -optional = true +optional = false python-versions = ">=3.6" files = [ {file = "fasteners-0.19-py3-none-any.whl", hash = "sha256:758819cb5d94cdedf4e836988b74de396ceacb8e2794d21f82d131fd9ee77237"}, @@ -1898,7 +1898,7 @@ test-ui = ["calysto-bash"] name = "kerchunk" version = "0.2.6" description = "Functions to make reference descriptions for ReferenceFileSystem" -optional = true +optional = false python-versions = ">=3.7" files = [ {file = "kerchunk-0.2.6-py3-none-any.whl", hash = "sha256:dc55fcea6560688ffc2390ff5882847fdf736982c1817553632a2bd7eb59de73"}, @@ -2370,29 +2370,24 @@ files = [ {file = "matplotlib-3.9.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:dd2a59ff4b83d33bca3b5ec58203cc65985367812cb8c257f3e101632be86d92"}, {file = "matplotlib-3.9.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0fc001516ffcf1a221beb51198b194d9230199d6842c540108e4ce109ac05cc0"}, {file = "matplotlib-3.9.1-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:83c6a792f1465d174c86d06f3ae85a8fe36e6f5964633ae8106312ec0921fdf5"}, - {file = "matplotlib-3.9.1-cp310-cp310-win_amd64.whl", hash = "sha256:421851f4f57350bcf0811edd754a708d2275533e84f52f6760b740766c6747a7"}, {file = "matplotlib-3.9.1-cp311-cp311-macosx_10_12_x86_64.whl", hash = "sha256:b3fce58971b465e01b5c538f9d44915640c20ec5ff31346e963c9e1cd66fa812"}, {file = "matplotlib-3.9.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:a973c53ad0668c53e0ed76b27d2eeeae8799836fd0d0caaa4ecc66bf4e6676c0"}, {file = "matplotlib-3.9.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:82cd5acf8f3ef43f7532c2f230249720f5dc5dd40ecafaf1c60ac8200d46d7eb"}, {file = "matplotlib-3.9.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ab38a4f3772523179b2f772103d8030215b318fef6360cb40558f585bf3d017f"}, {file = "matplotlib-3.9.1-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:2315837485ca6188a4b632c5199900e28d33b481eb083663f6a44cfc8987ded3"}, - {file = "matplotlib-3.9.1-cp311-cp311-win_amd64.whl", hash = "sha256:a0c977c5c382f6696caf0bd277ef4f936da7e2aa202ff66cad5f0ac1428ee15b"}, {file = "matplotlib-3.9.1-cp312-cp312-macosx_10_12_x86_64.whl", hash = "sha256:565d572efea2b94f264dd86ef27919515aa6d629252a169b42ce5f570db7f37b"}, {file = "matplotlib-3.9.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:6d397fd8ccc64af2ec0af1f0efc3bacd745ebfb9d507f3f552e8adb689ed730a"}, {file = "matplotlib-3.9.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:26040c8f5121cd1ad712abffcd4b5222a8aec3a0fe40bc8542c94331deb8780d"}, {file = "matplotlib-3.9.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d12cb1837cffaac087ad6b44399d5e22b78c729de3cdae4629e252067b705e2b"}, {file = "matplotlib-3.9.1-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:0e835c6988edc3d2d08794f73c323cc62483e13df0194719ecb0723b564e0b5c"}, - {file = "matplotlib-3.9.1-cp312-cp312-win_amd64.whl", hash = "sha256:44a21d922f78ce40435cb35b43dd7d573cf2a30138d5c4b709d19f00e3907fd7"}, {file = "matplotlib-3.9.1-cp39-cp39-macosx_10_12_x86_64.whl", hash = "sha256:0c584210c755ae921283d21d01f03a49ef46d1afa184134dd0f95b0202ee6f03"}, {file = "matplotlib-3.9.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:11fed08f34fa682c2b792942f8902e7aefeed400da71f9e5816bea40a7ce28fe"}, {file = "matplotlib-3.9.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0000354e32efcfd86bda75729716b92f5c2edd5b947200be9881f0a671565c33"}, {file = "matplotlib-3.9.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4db17fea0ae3aceb8e9ac69c7e3051bae0b3d083bfec932240f9bf5d0197a049"}, {file = "matplotlib-3.9.1-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:208cbce658b72bf6a8e675058fbbf59f67814057ae78165d8a2f87c45b48d0ff"}, - {file = "matplotlib-3.9.1-cp39-cp39-win_amd64.whl", hash = "sha256:dc23f48ab630474264276be156d0d7710ac6c5a09648ccdf49fef9200d8cbe80"}, {file = "matplotlib-3.9.1-pp39-pypy39_pp73-macosx_10_15_x86_64.whl", hash = "sha256:3fda72d4d472e2ccd1be0e9ccb6bf0d2eaf635e7f8f51d737ed7e465ac020cb3"}, {file = "matplotlib-3.9.1-pp39-pypy39_pp73-macosx_11_0_arm64.whl", hash = "sha256:84b3ba8429935a444f1fdc80ed930babbe06725bcf09fbeb5c8757a2cd74af04"}, {file = "matplotlib-3.9.1-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b918770bf3e07845408716e5bbda17eadfc3fcbd9307dc67f37d6cf834bb3d98"}, - {file = "matplotlib-3.9.1-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:f1f2e5d29e9435c97ad4c36fb6668e89aee13d48c75893e25cef064675038ac9"}, {file = "matplotlib-3.9.1.tar.gz", hash = "sha256:de06b19b8db95dd33d0dc17c926c7c9ebed9f572074b6fac4f65068a6814d010"}, ] @@ -2938,7 +2933,7 @@ test = ["pytest", "pytest-console-scripts", "pytest-jupyter", "pytest-tornasync" name = "numcodecs" version = "0.12.1" description = "A Python package providing buffer compression and transformation codecs for use in data storage and communication applications." -optional = true +optional = false python-versions = ">=3.8" files = [ {file = "numcodecs-0.12.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:d37f628fe92b3699e65831d5733feca74d2e33b50ef29118ffd41c13c677210e"}, @@ -4695,7 +4690,7 @@ files = [ name = "ujson" version = "5.10.0" description = "Ultra fast JSON encoder and decoder for Python" -optional = true +optional = false python-versions = ">=3.8" files = [ {file = "ujson-5.10.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:2601aa9ecdbee1118a1c2065323bda35e2c5a2cf0797ef4522d485f9d3ef65bd"}, @@ -4778,6 +4773,24 @@ files = [ {file = "ujson-5.10.0.tar.gz", hash = "sha256:b3cd8f3c5d8c7738257f1018880444f7b7d9b66232c64649f562d7ba86ad4bc1"}, ] +[[package]] +name = "universal-pathlib" +version = "0.2.2" +description = "pathlib api extended to use fsspec backends" +optional = false +python-versions = ">=3.8" +files = [ + {file = "universal_pathlib-0.2.2-py3-none-any.whl", hash = "sha256:9bc176112d593348bb29806a47e409eda78dff8d95391d66dd6f85e443aaa75d"}, + {file = "universal_pathlib-0.2.2.tar.gz", hash = "sha256:6bc215548792ad5db3553708b1c19bafd9e2fa1667dc925ed404c95e52ae2f13"}, +] + +[package.dependencies] +fsspec = ">=2022.1.0" + +[package.extras] +dev = ["adlfs", "aiohttp", "cheroot", "gcsfs", "moto[s3,server] (<5)", "mypy (==1.8.0)", "packaging", "pydantic", "pydantic-settings", "pylint (==2.17.4)", "pytest (==8.0.0)", "pytest-cov (==4.1.0)", "pytest-mock (==3.12.0)", "pytest-sugar (==0.9.7)", "requests", "s3fs", "webdav4[fsspec]", "wsgidav"] +tests = ["mypy (==1.8.0)", "packaging", "pylint (==2.17.4)", "pytest (==8.0.0)", "pytest-cov (==4.1.0)", "pytest-mock (==3.12.0)", "pytest-sugar (==0.9.7)"] + [[package]] name = "uri-template" version = "1.3.0" @@ -4865,6 +4878,30 @@ platformdirs = ">=3.9.1,<5" docs = ["furo (>=2023.7.26)", "proselint (>=0.13)", "sphinx (>=7.1.2,!=7.3)", "sphinx-argparse (>=0.4)", "sphinxcontrib-towncrier (>=0.2.1a0)", "towncrier (>=23.6)"] test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess (>=1)", "flaky (>=3.7)", "packaging (>=23.1)", "pytest (>=7.4)", "pytest-env (>=0.8.2)", "pytest-freezer (>=0.4.8)", "pytest-mock (>=3.11.1)", "pytest-randomly (>=3.12)", "pytest-timeout (>=2.1)", "setuptools (>=68)", "time-machine (>=2.10)"] +[[package]] +name = "virtualizarr" +version = "1.0.0" +description = "Create virtual Zarr stores from archival data using xarray API" +optional = false +python-versions = ">=3.10" +files = [ + {file = "virtualizarr-1.0.0-py3-none-any.whl", hash = "sha256:eef675dfc9c7599d9b8164eabff34f274562c62a0624c758148822af430ada50"}, + {file = "virtualizarr-1.0.0.tar.gz", hash = "sha256:6d78d6b94e0341fe728783debfbbeb64cbca986b134ee7014885640379e6e47b"}, +] + +[package.dependencies] +h5netcdf = "*" +kerchunk = ">=0.2.5" +numpy = ">=2.0.0" +packaging = "*" +pydantic = "*" +ujson = "*" +universal-pathlib = "*" +xarray = ">=2024.06.0" + +[package.extras] +test = ["codecov", "fastparquet", "fsspec", "netcdf4", "pooch", "pre-commit", "pytest", "pytest-cov", "pytest-mypy", "ruff", "s3fs", "scipy"] + [[package]] name = "watchdog" version = "4.0.1" @@ -5197,7 +5234,7 @@ multidict = ">=4.0" name = "zarr" version = "2.18.2" description = "An implementation of chunked, compressed, N-dimensional arrays for Python" -optional = true +optional = false python-versions = ">=3.9" files = [ {file = "zarr-2.18.2-py3-none-any.whl", hash = "sha256:a638754902f97efa99b406083fdc807a0e2ccf12a949117389d2a4ba9b05df38"}, @@ -5235,4 +5272,4 @@ kerchunk = ["dask", "kerchunk"] [metadata] lock-version = "2.0" python-versions = ">=3.9,<4.0" -content-hash = "053e27d57acc91d68b6898389ddb15c20b78bb983490d0da7d83c528e6a0215e" +content-hash = "6089537ab5327425f71e63b27004a9dc05a5a43ae294e24219a1113d30dd43c9" diff --git a/pyproject.toml b/pyproject.toml index 54f73e0f..c44b6990 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,6 +52,7 @@ numpy = [ dask = { version = ">=2022.1.0", optional = true } importlib-resources = ">=6.3.2" typing_extensions = ">=4.10.0" +virtualizarr = {version = "^1.0.0", python = ">=3.10,<4.0"} [tool.poetry.extras] kerchunk = ["kerchunk", "dask"] diff --git a/tests/integration/test_virtualizarr.py b/tests/integration/test_virtualizarr.py deleted file mode 100644 index 09bf14a7..00000000 --- a/tests/integration/test_virtualizarr.py +++ /dev/null @@ -1,49 +0,0 @@ -import logging -import os -import unittest - -import earthaccess -import pytest - -pytest.importorskip("virtualizarr") -pytest.importorskip("dask") - -logger = logging.getLogger(__name__) -assertions = unittest.TestCase("__init__") - -assertions.assertTrue("EARTHDATA_USERNAME" in os.environ) -assertions.assertTrue("EARTHDATA_PASSWORD" in os.environ) - -logger.info(f"Current username: {os.environ['EARTHDATA_USERNAME']}") -logger.info(f"earthaccess version: {earthaccess.__version__}") - - -@pytest.fixture(scope="module") -def granules(): - granules = earthaccess.search_data( - count=2, - short_name="MUR-JPL-L4-GLOB-v4.1", - cloud_hosted=True - ) - return granules - - -@pytest.mark.parametrize("output", "memory") -def test_open_virtual_mfdataset(tmp_path, granules, output): - xr = pytest.importorskip("xarray") - # Open directly with `earthaccess.open` - expected = xr.open_mfdataset(earthaccess.open(granules), concat_dim="time", combine="nested", combine_attrs="drop_conflicts") - - result = earthaccess.open_virtual_mfdataset(granules=granules, access="indirect", concat_dime="time", parallel=True, preprocess=None) - # dimensions - assert result.sizes == expected.sizes - # variable names, variable dimensions - assert result.variables.keys() == expected.variables.keys() - # attributes - assert result.attrs == expected.attrs - # coordinates - assert result.coords.keys() == expected.coords.keys() - # chunks - assert result.chunks == expected.chunks - # encoding - assert result.encoding == expected.encoding diff --git a/tests/unit/test_dmrpp.py b/tests/unit/test_dmrpp.py new file mode 100644 index 00000000..5c4abadc --- /dev/null +++ b/tests/unit/test_dmrpp.py @@ -0,0 +1,48 @@ +import logging +import os +import unittest + +import earthaccess +import pytest + +logger = logging.getLogger(__name__) +assertions = unittest.TestCase("__init__") + +assertions.assertTrue("EARTHDATA_USERNAME" in os.environ) +assertions.assertTrue("EARTHDATA_PASSWORD" in os.environ) + +logger.info(f"Current username: {os.environ['EARTHDATA_USERNAME']}") +logger.info(f"earthaccess version: {earthaccess.__version__}") + + +@pytest.fixture(scope="module") +def granules(): + granules = earthaccess.search_data( + count=2, temporal=("2024-08-01"), short_name="MUR25-JPL-L4-GLOB-v04.2" + ) + return granules + + +def test_dmrpp(granules): + xr = pytest.importorskip("xarray") + virtualizarr = pytest.importorskip("virtualizarr") + earthaccess = pytest.importorskip("earthaccess") + fs = pytest.importorskip("fsspec") + + from earthaccess.dmrpp import DMRParser + + data_path = granules[0].data_links(access="indirect")[0] + dmrpp_path = data_path + ".dmrpp" + + fs = earthaccess.get_fsspec_https_session() + with fs.open(dmrpp_path) as f: + parser = DMRParser(f.read(), data_filepath=data_path) + result = parser.parse_dataset() + + from virtualizarr import open_virtual_dataset + + expected = open_virtual_dataset( + data_path, indexes={}, reader_options={"storage_options": fs.storage_options} + ) + + xr.testing.assert_identical(result, expected) From ed4a98b656ac7ae5392cde8700c8de2cf81aa22d Mon Sep 17 00:00:00 2001 From: ayushnag <35325113+ayushnag@users.noreply.github.com> Date: Wed, 13 Nov 2024 22:31:54 -0800 Subject: [PATCH 4/6] add indirect access support and update docs --- earthaccess/dmrpp.py | 632 ------------------------- earthaccess/virtualizarr.py | 182 +++---- tests/integration/test_virtualizarr.py | 57 +++ tests/unit/test_dmrpp.py | 48 -- 4 files changed, 135 insertions(+), 784 deletions(-) delete mode 100644 earthaccess/dmrpp.py create mode 100644 tests/integration/test_virtualizarr.py delete mode 100644 tests/unit/test_dmrpp.py diff --git a/earthaccess/dmrpp.py b/earthaccess/dmrpp.py deleted file mode 100644 index 05a3f4cc..00000000 --- a/earthaccess/dmrpp.py +++ /dev/null @@ -1,632 +0,0 @@ -import os -import warnings -from collections import defaultdict -from typing import Any, Optional -from xml.etree import ElementTree as ET - -import numpy as np -import xarray as xr - -from virtualizarr.manifests import ChunkManifest, ManifestArray -from virtualizarr.types import ChunkKey -from virtualizarr.zarr import ZArray - - -class DMRParser: - """Parser for the OPeNDAP DMR++ XML format. - - Reads groups, dimensions, coordinates, data variables, encoding, chunk manifests, and attributes. - Highly modular to allow support for older dmrpp schema versions. Includes many utility functions to extract - different information such as finding all variable tags, splitting hdf5 groups, parsing dimensions, and more. - - OPeNDAP DMR++ homepage: https://docs.opendap.org/index.php/DMR%2B%2B - """ - - # DAP and DMRPP XML namespaces - _ns = { - "dap": "http://xml.opendap.org/ns/DAP/4.0#", - "dmr": "http://xml.opendap.org/dap/dmrpp/1.0.0#", - } - # DAP data types to numpy data types - _dap_np_dtype = { - "Byte": "uint8", - "UByte": "uint8", - "Int8": "int8", - "UInt8": "uint8", - "Int16": "int16", - "UInt16": "uint16", - "Int32": "int32", - "UInt32": "uint32", - "Int64": "int64", - "UInt64": "uint64", - "Url": "object", - "Float32": "float32", - "Float64": "float64", - "String": "object", - } - # Default zlib compression value - _default_zlib_value = 6 - # Encoding keys that should be cast to float - _encoding_keys = {"_FillValue", "missing_value", "scale_factor", "add_offset"} - - def __init__(self, dmr: str, data_filepath: Optional[str] = None): - """Initialize the DMRParser with the given DMR data and data file path. - - Parameters - ---------- - dmr : str - The DMR file contents as a string. - - data_filepath : str, optional - The path to the actual data file that will be set in the chunk manifests. - If None, the data file path is taken from the DMR file. - """ - self.root = ET.fromstring(dmr) - self.data_filepath = ( - data_filepath if data_filepath is not None else self.root.attrib["name"] - ) - - def parse_dataset(self, group=None) -> xr.Dataset: - """Parses the given file and creates a virtual xr.Dataset with ManifestArrays. - - Parameters - ---------- - group : str - The group to parse. If None, and no groups are present, the dataset is parsed. - If None and groups are present, the first group is parsed. - - Returns: - ------- - An xr.Dataset wrapping virtualized zarr arrays. - - Examples: - -------- - Open a sample DMR++ file and parse the dataset - - >>> import requests - >>> r = requests.get("https://github.com/OPENDAP/bes/raw/3e518f6dc2f625b0b83cfb6e6fd5275e4d6dcef1/modules/dmrpp_module/data/dmrpp/chunked_threeD.h5.dmrpp") - >>> parser = DMRParser(r.text) - >>> vds = parser.parse_dataset() - >>> vds - Size: 4MB - Dimensions: (phony_dim_0: 100, phony_dim_1: 100, phony_dim_2: 100) - Dimensions without coordinates: phony_dim_0, phony_dim_1, phony_dim_2 - Data variables: - d_8_chunks (phony_dim_0, phony_dim_1, phony_dim_2) float32 4MB ManifestA... - - >>> vds2 = open_virtual_dataset("https://github.com/OPENDAP/bes/raw/3e518f6dc2f625b0b83cfb6e6fd5275e4d6dcef1/modules/dmrpp_module/data/dmrpp/chunked_threeD.h5.dmrpp", filetype="dmrpp", indexes={}) - >>> vds2 - Size: 4MB - Dimensions: (phony_dim_0: 100, phony_dim_1: 100, phony_dim_2: 100) - Dimensions without coordinates: phony_dim_0, phony_dim_1, phony_dim_2 - Data variables: - d_8_chunks (phony_dim_0, phony_dim_1, phony_dim_2) float32 4MB ManifestA... - """ - if group is not None: - # group = "/" + group.strip("/") # ensure group is in form "/a/b" - group = os.path.normpath(group).removeprefix( - "/" - ) # ensure group is in form "a/b/c" - if self._is_hdf5(self.root): - return self._parse_hdf5_dataset(self.root, group) - if self.data_filepath.endswith(".nc"): - return self._parse_netcdf4_dataset(self.root, group) - raise ValueError("dmrpp file must be HDF5 or netCDF4 based") - - def _parse_netcdf4_dataset( - self, root: ET.Element, group: Optional[str] = None - ) -> xr.Dataset: - """Parse the dataset from the netcdf4 based dmrpp with groups, starting at the given group. Set root to the given group. - - Parameters - ---------- - root : ET.Element - The root element of the DMR file. - - group : str - The group to parse. If None, and no groups are present, the dataset is parsed. - If None and groups are present, the first group is parsed. - - Returns: - ------- - xr.Dataset - """ - group_tags = root.findall("dap:Group", self._ns) - if len(group_tags) == 0: - if group is not None: - # no groups found and group specified -> warning - warnings.warn( - "No groups found in NetCDF4 dmrpp file; ignoring group parameter" - ) - # no groups found and no group specified -> parse dataset - return self._parse_dataset(root) - all_groups = self._split_netcdf4(root) - if group is None: - # groups found and no group specified -> parse first group - return self._parse_dataset(group_tags[0]) - if group in all_groups: - # groups found and group specified -> parse specified group - return self._parse_dataset(all_groups[group]) - else: - # groups found and specified group not found -> error - raise ValueError(f"Group {group} not found in NetCDF4 dmrpp file") - - def _split_netcdf4(self, root: ET.Element) -> dict[str, ET.Element]: - """Split the input element into several ET.Elements by netcdf4 group. E.g. {"left": , "right": }. - - Parameters - ---------- - root : ET.Element - The root element of the DMR file. - - Returns: - ------- - dict[str, ET.Element] - """ - group_tags = root.findall("dap:Group", self._ns) - all_groups: dict[str, ET.Element] = defaultdict( - lambda: ET.Element(root.tag, root.attrib) - ) - for group_tag in group_tags: - all_groups[os.path.normpath(group_tag.attrib["name"])] = group_tag - return all_groups - - def _is_hdf5(self, root: ET.Element) -> bool: - """Check if the DMR file is HDF5 based.""" - if root.find(".//dap:Attribute[@name='fullnamepath']", self._ns) is not None: - return True - if root.find("./dap:Attribute[@name='HDF5_GLOBAL']", self._ns) is not None: - return True - return False - - def _parse_hdf5_dataset( - self, root: ET.Element, group: Optional[str] = None - ) -> xr.Dataset: - """Parse the dataset from the HDF5 based dmrpp with groups, starting at the given group. Set root to the given group. - - Parameters - ---------- - root : ET.Element - The root element of the DMR file. - - group : str - The group to parse. If None, and no groups are present, the dataset is parsed. - If None and groups are present, the first group is parsed. - - Returns: - ------- - xr.Dataset - """ - all_groups = self._split_hdf5(root=root) - if len(all_groups) == 0: - raise ValueError("No groups found in HDF based dmrpp file") - if group is None: - # pick a random group if no group is specified - group = next(iter(all_groups)) - attrs = {} - for attr_tag in root.iterfind("dap:Attribute", self._ns): - if attr_tag.attrib["type"] != "Container": - attrs.update(self._parse_attribute(attr_tag)) - if group in all_groups: - # replace aliased variable names with original names: gt1r_heights -> heights - orignames = self._find_original_names(all_groups[group]) - vds = self._parse_dataset(all_groups[group]) - # Only one group so found attrs are global attrs - if len(all_groups) == 1: - vds.attrs.update(attrs) - return vds.rename(orignames) - raise ValueError(f"Group {group} not found in HDF5 dmrpp file") - - def _find_original_names(self, root: ET.Element) -> dict[str, str]: - """Find the original names of variables in the DMR file. E.g. {"gt1r_heights": "heights"}. - - Parameters - ---------- - root : ET.Element - The root element of the DMR file. - - Returns: - ------- - dict[str, str] - """ - orignames: dict[str, str] = {} - vars_tags: list[ET.Element] = [] - for dap_dtype in self._dap_np_dtype: - vars_tags += root.findall(f"dap:{dap_dtype}", self._ns) - for var_tag in vars_tags: - origname_tag = var_tag.find( - "./dap:Attribute[@name='origname']/dap:Value", self._ns - ) - if origname_tag is not None and origname_tag.text is not None: - orignames[var_tag.attrib["name"]] = origname_tag.text - return orignames - - def _split_hdf5(self, root: ET.Element) -> dict[str, ET.Element]: - """Split the input element into several ET.Elements by HDF5 group. - - E.g. {"gtr1/heights": , "gtr1/temperatures": }. Builds up new elements - each with dimensions, variables, and attributes. - - Parameters - ---------- - root : ET.Element - The root element of the DMR file. - - Returns: - ------- - dict[str, ET.Element] - """ - # Add all variable, dimension, and attribute tags to their respective groups - groups_roots: dict[str, ET.Element] = defaultdict( - lambda: ET.Element(root.tag, root.attrib) - ) - group_dims: dict[str, set[str]] = defaultdict( - set - ) # {"gt1r/heights": {"dim1", "dim2", ...}} - vars_tags: list[ET.Element] = [] - for dap_dtype in self._dap_np_dtype: - vars_tags += root.findall(f"dap:{dap_dtype}", self._ns) - # Variables - for var_tag in vars_tags: - fullname_tag = var_tag.find( - "./dap:Attribute[@name='fullnamepath']/dap:Value", self._ns - ) - if fullname_tag is not None and fullname_tag.text is not None: - # '/gt1r/heights/ph_id_pulse' -> 'gt1r/heights' - group_name = os.path.dirname(fullname_tag.text).removeprefix("/") - groups_roots[group_name].append(var_tag) - dim_tags = var_tag.findall("dap:Dim", self._ns) - dims = self._parse_multi_dims(dim_tags) - group_dims[group_name].update(dims.keys()) - # Dimensions - for dim_tag in root.iterfind("dap:Dimension", self._ns): - for g, d in group_dims.items(): - if dim_tag.attrib["name"] in d: - groups_roots[g].append(dim_tag) - # Attributes - container_attr_tag = root.find("dap:Attribute[@name='HDF5_GLOBAL']", self._ns) - if container_attr_tag is None: - attrs_tags = root.findall("dap:Attribute", self._ns) - for attr_tag in attrs_tags: - fullname_tag = attr_tag.find( - "./dap:Attribute[@name='fullnamepath']/dap:Value", self._ns - ) - if fullname_tag is not None and fullname_tag.text is not None: - group_name = os.path.dirname(fullname_tag.text).removeprefix("/") - # Add all attributes to the new dataset - groups_roots[group_name].extend(attr_tag) - else: - groups_roots[next(iter(groups_roots))].extend(container_attr_tag) - return groups_roots - - def _parse_dataset(self, root: ET.Element) -> xr.Dataset: - """Parse the dataset using the root element of the DMR file. - - Parameters - ---------- - root : ET.Element - The root element of the DMR file. - - Returns: - ------- - xr.Dataset - """ - # Dimension names and sizes - dim_tags = root.findall("dap:Dimension", self._ns) - dataset_dims = self._parse_multi_dims(dim_tags) - # Data variables and coordinates - coord_names = self._find_coord_names(root) - # if no coord_names are found or coords don't include dims, dims are used as coords - if len(coord_names) == 0 or len(coord_names) < len(dataset_dims): - coord_names = set(dataset_dims.keys()) - # Seperate and parse coords + data variables - coord_vars: dict[str, xr.Variable] = {} - data_vars: dict[str, xr.Variable] = {} - for var_tag in self._find_var_tags(root): - variable = self._parse_variable(var_tag, dataset_dims) - if var_tag.attrib["name"] in coord_names: - coord_vars[var_tag.attrib["name"]] = variable - else: - data_vars[var_tag.attrib["name"]] = variable - # Attributes - attrs: dict[str, str] = {} - for attr_tag in self.root.iterfind("dap:Attribute", self._ns): - attrs.update(self._parse_attribute(attr_tag)) - return xr.Dataset( - data_vars=data_vars, - coords=xr.Coordinates(coords=coord_vars, indexes={}), - attrs=attrs, - ) - - def _find_var_tags(self, root: ET.Element) -> list[ET.Element]: - """Find all variable tags in the DMR file. Also known as array tags. Tags are labeled with the DAP data type. E.g. , , . - - Parameters - ---------- - root : ET.Element - The root element of the DMR file. - - Returns: - ------- - list[ET.Element] - """ - vars_tags: list[ET.Element] = [] - for dap_dtype in self._dap_np_dtype: - vars_tags += root.findall(f"dap:{dap_dtype}", self._ns) - return vars_tags - - def _find_coord_names(self, root: ET.Element) -> set[str]: - """Find the name of all coordinates in root. Checks inside all variables and global attributes. - - Parameters - ---------- - root : ET.Element - The root element of the DMR file. - - Returns: - ------- - set[str] : The set of unique coordinate names. - """ - # Check for coordinate names within each variable attributes - coord_names: set[str] = set() - for var_tag in self._find_var_tags(root): - coord_tag = var_tag.find( - "./dap:Attribute[@name='coordinates']/dap:Value", self._ns - ) - if coord_tag is not None and coord_tag.text is not None: - coord_names.update(coord_tag.text.split(" ")) - for map_tag in var_tag.iterfind("dap:Map", self._ns): - coord_names.add(map_tag.attrib["name"].removeprefix("/")) - # Check for coordinate names in a global attribute - coord_tag = var_tag.find("./dap:Attribute[@name='coordinates']", self._ns) - if coord_tag is not None and coord_tag.text is not None: - coord_names.update(coord_tag.text.split(" ")) - return coord_names - - def _parse_dim(self, root: ET.Element) -> dict[str, int | None]: - """Parse single or tag. - - If the tag has no name attribute, it is a phony dimension. E.g. --> {"phony_dim": 300} - If the tag has no size attribute, it is an unlimited dimension. E.g. --> {"time": None} - If the tag has both name and size attributes, it is a regular dimension. E.g. --> {"lat": 1447} - - Parameters - ---------- - root : ET.Element - The root element Dim/Dimension tag - - Returns: - ------- - dict - E.g. {"time": 1, "lat": 1447, "lon": 2895}, {"phony_dim": 300}, {"time": None, "lat": None, "lon": None} - """ - if "name" not in root.attrib and "size" in root.attrib: - return {"phony_dim": int(root.attrib["size"])} - if "name" in root.attrib and "size" not in root.attrib: - return {os.path.basename(root.attrib["name"]): None} - if "name" in root.attrib and "size" in root.attrib: - return {os.path.basename(root.attrib["name"]): int(root.attrib["size"])} - raise ValueError("Not enough information to parse Dim/Dimension tag") - - def _parse_multi_dims( - self, dim_tags: list[ET.Element], global_dims: dict[str, int] = {} - ) -> dict: - """Parse multiple or tags. Generally tags are found within dmrpp variable tags. - - Returns best possible matching of {dimension: shape} present in the list and global_dims. E.g tags=(Dim("lat", None), Dim("lon", None)) and global_dims={"lat": 100, "lon": 100, "time": 5} --> {"lat": 100, "lon": 100} - - E.g. tags=(Dim("time", None), Dim("", 200)) and global_dims={"lat": 100, "lon": 100, "time": 5} --> {"time": 5, "phony_dim0": 200} - - This function is often used to fill in missing sizes from the global_dims. E.g. Variable tags may contain only dimension names and not sizes. If the {name: size} matching is known from the global_dims, it is used to fill in the missing sizes. - - Parameters - ---------- - dim_tags : tuple[ET.Element] - A tuple of ElementTree Elements representing dimensions in the DMR file. - - global_dims : dict - A dictionary of dimension names and sizes. E.g. {"time": 1, "lat": 1447, "lon": 2895} - - Returns: - ------- - dict - E.g. {"time": 1, "lat": 1447, "lon": 2895} - """ - dims: dict[str, int | None] = {} - for dim_tag in dim_tags: - dim: dict[str, int | None] = self._parse_dim(dim_tag) - if "phony_dim" in dim: - dims["phony_dim_" + str(len(dims))] = dim["phony_dim"] - else: - dims.update(dim) - for name, size in list(dims.items()): - if name in global_dims and size is None: - dims[name] = global_dims[name] - return dims - - def _parse_variable( - self, var_tag: ET.Element, dataset_dims: dict[str, int] - ) -> xr.Variable: - """Parse a variable from a DMR tag. - - Parameters - ---------- - var_tag : ET.Element - An ElementTree Element representing a variable in the DMR file. Will have DAP dtype as tag. - - dataset_dims : dict - A dictionary of dimension names and sizes. E.g. {"time": 1, "lat": 1447, "lon": 2895} - Must contain at least all the dimensions used by the variable. Necessary since the variable - metadata only contains the dimension names and not the sizes. - - Returns: - ------- - xr.Variable - """ - # Dimension names - dim_tags = var_tag.findall("dap:Dim", self._ns) - dim_shapes = self._parse_multi_dims(dim_tags, dataset_dims) - # convert DAP dtype to numpy dtype - dtype = np.dtype( - self._dap_np_dtype[var_tag.tag.removeprefix("{" + self._ns["dap"] + "}")] - ) - # Chunks and Filters - filters = None - shape = tuple(dim_shapes.values()) - chunks_shape = shape - chunks_tag = var_tag.find("dmr:chunks", self._ns) - if chunks_tag is not None: - # Chunks - found_chunk_dims = self._parse_chunks_dimensions(chunks_tag) - chunks_shape = found_chunk_dims if found_chunk_dims is not None else shape - chunkmanifest = self._parse_chunks(chunks_tag, chunks_shape) - # Filters - filters = self._parse_filters(chunks_tag, dtype) - # Attributes - attrs: dict[str, Any] = {} - for attr_tag in var_tag.iterfind("dap:Attribute", self._ns): - attrs.update(self._parse_attribute(attr_tag)) - # Remove attributes only used for parsing logic - fill_value = attrs.pop("_FillValue", np.nan) - attrs.pop("fullnamepath", None) - attrs.pop("origname", None) - # attrs.pop("coordinates", None) - # create ManifestArray and ZArray - zarray = ZArray( - chunks=chunks_shape, - dtype=dtype, - fill_value=fill_value, - filters=filters, - order="C", - shape=shape, - ) - marr = ManifestArray(zarray=zarray, chunkmanifest=chunkmanifest) - encoding = {k: attrs.get(k) for k in self._encoding_keys if k in attrs} - return xr.Variable( - dims=dim_shapes.keys(), data=marr, attrs=attrs, encoding=encoding - ) - - def _parse_attribute(self, attr_tag: ET.Element) -> dict[str, Any]: - """Parse an attribute from a DMR attr tag. Converts the attribute value to a native python type. - - Parameters - ---------- - attr_tag : ET.Element - An ElementTree Element with an tag. - - Returns: - ------- - dict - """ - attr: dict[str, Any] = {} - values = [] - if "type" in attr_tag.attrib and attr_tag.attrib["type"] == "Container": - return attr - dtype = np.dtype(self._dap_np_dtype[attr_tag.attrib["type"]]) - # if multiple Value tags are present, store as "key": "[v1, v2, ...]" - for value_tag in attr_tag: - # cast attribute to native python type using dmr provided dtype - val = ( - dtype.type(value_tag.text).item() - if dtype != np.object_ - else value_tag.text - ) - if val == "*": - val = np.nan - values.append(val) - attr[attr_tag.attrib["name"]] = values[0] if len(values) == 1 else values - return attr - - def _parse_filters( - self, chunks_tag: ET.Element, dtype: np.dtype - ) -> list[dict] | None: - """Parse filters from a DMR chunks tag. - - Parameters - ---------- - chunks_tag : ET.Element - An ElementTree Element with a tag. - - dtype : np.dtype - The numpy dtype of the variable. - - Returns: - ------- - list[dict] | None - E.g. [{"id": "shuffle", "elementsize": 4}, {"id": "zlib", "level": 4}] - """ - if "compressionType" in chunks_tag.attrib: - filters: list[dict] = [] - # shuffle deflate --> ["shuffle", "deflate"] - compression_types = chunks_tag.attrib["compressionType"].split(" ") - if "shuffle" in compression_types: - filters.append({"id": "shuffle", "elementsize": dtype.itemsize}) - if "deflate" in compression_types: - level = int( - chunks_tag.attrib.get("deflateLevel", self._default_zlib_value) - ) - filters.append({"id": "zlib", "level": level}) - return filters - return None - - def _parse_chunks_dimensions( - self, chunks_tag: ET.Element - ) -> tuple[int, ...] | None: - """Parse the chunk dimensions from a DMR chunks tag. Returns None if no chunk dimensions are found. - - Parameters - ---------- - chunks_tag : ET.Element - An ElementTree Element with a tag. - - Returns: - ------- - tuple[int, ...] | None - - """ - chunk_dim_tag = chunks_tag.find("dmr:chunkDimensionSizes", self._ns) - if chunk_dim_tag is not None and chunk_dim_tag.text is not None: - # 1 1447 2895 -> (1, 1447, 2895) - return tuple(map(int, chunk_dim_tag.text.split())) - return None - - def _parse_chunks( - self, chunks_tag: ET.Element, chunks_shape: tuple[int, ...] - ) -> ChunkManifest: - """Parse the chunk manifest from a DMR chunks tag. - - Parameters - ---------- - chunks_tag : ET.Element - An ElementTree Element with a tag. - - chunks : tuple - Chunk sizes for each dimension. E.g. (1, 1447, 2895) - - Returns: - ------- - ChunkManifest - """ - chunkmanifest: dict[ChunkKey, object] = {} - default_num: list[int] = ( - [0 for i in range(len(chunks_shape))] if chunks_shape else [0] - ) - chunk_key_template = ".".join(["{}" for i in range(len(default_num))]) - for chunk_tag in chunks_tag.iterfind("dmr:chunk", self._ns): - chunk_num = default_num - if "chunkPositionInArray" in chunk_tag.attrib: - # "[0,1023,10235]" -> ["0","1023","10235"] - chunk_pos = chunk_tag.attrib["chunkPositionInArray"][1:-1].split(",") - # [0,1023,10235] // [1, 1023, 2047] -> [0,1,5] - chunk_num = [ - int(chunk_pos[i]) // chunks_shape[i] - for i in range(len(chunks_shape)) - ] - # [0,1,5] -> "0.1.5" - chunk_key = ChunkKey(chunk_key_template.format(*chunk_num)) - chunkmanifest[chunk_key] = { - "path": self.data_filepath, - "offset": int(chunk_tag.attrib["offset"]), - "length": int(chunk_tag.attrib["nBytes"]), - } - return ChunkManifest(entries=chunkmanifest) diff --git a/earthaccess/virtualizarr.py b/earthaccess/virtualizarr.py index 7885b4f4..0bcbb655 100644 --- a/earthaccess/virtualizarr.py +++ b/earthaccess/virtualizarr.py @@ -5,74 +5,38 @@ import earthaccess if TYPE_CHECKING: - import fsspec import xarray as xr - -def _parse_dmr( - fs: fsspec.AbstractFileSystem, - data_path: str, - dmrpp_path: str = None, - group: str | None = None, -) -> xr.Dataset: - """Parse a granule's DMR++ file and return a virtual xarray dataset. - - Parameters - ---------- - fs : fsspec.AbstractFileSystem - The file system to use to open the DMR++ - data_path : str - The path to the data file - dmrpp_path : str - The path to the DMR++ file. If None, the DMR++ file is assumed to be at data_path + ".dmrpp" - group : str - The group to open in the DMR++. - - Returns: - ---------- - xr.Dataset - The virtual dataset (with virtualizarr ManifestArrays) - - Raises: - ---------- - Exception if the DMR++ file is not found or if there is an error parsing the DMR++ - """ - from earthaccess.dmrpp import DMRParser - - dmrpp_path = data_path + ".dmrpp" if dmrpp_path is None else dmrpp_path - with fs.open(dmrpp_path) as f: - parser = DMRParser(f.read(), data_filepath=data_path) - return parser.parse_dataset(group=group) - - def open_virtual_mfdataset( - granules: list[earthaccess.results.DataGranule], + granules: list[earthaccess.DataGranule], group: str | None = None, access: str = "indirect", load: bool = False, - preprocess: callable | None = None, + preprocess: callable | None = None, # type: ignore parallel: bool = True, **xr_combine_nested_kwargs, ) -> xr.Dataset: """Open multiple granules as a single virtual xarray Dataset. + Uses DMR++ metadata files to create a virtual xarray dataset with ManifestArrays. This virtual dataset can be used to create zarr reference files. See https://virtualizarr.readthedocs.io/en/latest/ for more information on virtual xarray datasets. + Parameters ---------- - granules : list[earthaccess.results.DataGranule] + granules : list[earthaccess.DataGranule] The granules to open - group : str - The group to open in the DMR++. If groups are present in the DMR++ files, this will open the specified group. If None, the first parsed group will be opened. + group : str or None (default=None) + The group to open in the DMR++. If groups are present in the DMR++ files, this will open the specified group. If None, the root group will be opened. If the DMR++ file does not have groups, this parameter is ignored. - access : str - The access method to use. One of "direct" or "indirect". Direct is for S3/cloud access, indirect is for HTTPS access. - load: bool - Create an xarray dataset with indexes and data available to load. + access : str (default="indirect") + The access method to use. One of "direct" or "indirect". Use direct when running on AWS, use indirect when running on a local machine. + load : bool (default=False) + Create an xarray dataset with indexes and lazy loaded data. - When true, creates a lazy loaded, numpy/dask backed xarray dataset with indexes. Note that when load=True all the data is now available to access but not loaded into memory. When false a virtual xarray dataset is created with ManifestArrays. This virtual dataset is a view over the underlying metadata and chunks and allows creation and concatenation of zarr reference files. This virtual dataset cannot load data on it's own and see https://virtualizarr.readthedocs.io/en/latest/ for more information on virtual xarray datasets. Also note that load=True will only work in the cloud since ranged reads of chunks are supported by cloud storage but not by HTTPS - preprocess : callable + When true, creates a lazy loaded, numpy/dask backed xarray dataset with indexes. Note that when `load=True` all the data is now available to access but not loaded into memory. When `load=False` a virtual xarray dataset is created with ManifestArrays. This virtual dataset is a view over the underlying metadata and chunks and allows creation and concatenation of zarr reference files. This virtual dataset cannot load data on it's own and see https://virtualizarr.readthedocs.io/en/latest/ for more information on virtual xarray datasets. + preprocess : callable (default=None) A function to apply to each virtual dataset before combining - parallel : bool - Whether to open the virtual datasets in parallel (using dask.delayed) + parallel : bool (default=True) + Open the virtual datasets in parallel (using dask.delayed) xr_combine_nested_kwargs : dict Keyword arguments for xarray.combine_nested. See https://docs.xarray.dev/en/stable/generated/xarray.combine_nested.html @@ -81,12 +45,12 @@ def open_virtual_mfdataset( ---------- xr.Dataset - Example: + Examplea: ---------- - >>> results = earthaccess.search_data(count=5, short_name="MUR-JPL-L4-GLOB-v4.1") + >>> results = earthaccess.search_data(count=5, temporal=("2024"), short_name="MUR-JPL-L4-GLOB-v4.1") >>> vds = open_virtual_mfdataset(results, access="indirect", load=False, concat_dim="time", coords='minimal', compat='override', combine_attrs="drop_conflicts") >>> vds - Size: 19GB + Size: 29GB Dimensions: (time: 5, lat: 17999, lon: 36000) Coordinates: time (time) int32 20B ManifestArray>> vds = open_virtual_mfdataset(results, access="indirect", load=True, concat_dim="time", coords='minimal', compat='override', combine_attrs="drop_conflicts") + >>> vds + Size: 143GB + Dimensions: (time: 5, lat: 17999, lon: 36000) + Coordinates: + * lat (lat) float32 72kB -89.99 -89.98 -89.97 ... 89.98 89.99 + * lon (lon) float32 144kB -180.0 -180.0 -180.0 ... 180.0 180.0 + * time (time) datetime64[ns] 40B 2024-01-01T09:00:00 ... 2024-... + Data variables: + analysed_sst (time, lat, lon) float64 26GB dask.array + analysis_error (time, lat, lon) float64 26GB dask.array + dt_1km_data (time, lat, lon) timedelta64[ns] 26GB dask.array + mask (time, lat, lon) float32 13GB dask.array + sea_ice_fraction (time, lat, lon) float64 26GB dask.array + sst_anomaly (time, lat, lon) float64 26GB dask.array + Attributes: (12/42) + Conventions: CF-1.7 title: Daily MUR SST, Final product """ import xarray as xr - if access == "indirect" and load: - raise ValueError("load=True is not supported with access='indirect'") + import virtualizarr as vz if access == "direct": - fs = earthaccess.get_s3_filesystem(results=granules) - fs.storage_options["anon"] = False + fs = earthaccess.get_s3_filesystem(results=granules[0]) + fs.storage_options["anon"] = False # type: ignore else: fs = earthaccess.get_fsspec_https_session() if parallel: - # wrap _open_virtual_dataset and preprocess with delayed import dask - open_ = dask.delayed(_parse_dmr) + # wrap _open_virtual_dataset and preprocess with delayed + open_ = dask.delayed(vz.open_virtual_dataset) # type: ignore if preprocess is not None: - preprocess = dask.delayed(preprocess) + preprocess = dask.delayed(preprocess) # type: ignore else: - open_ = _parse_dmr - # data_paths: list[str] = [] + open_ = vz.open_virtual_dataset vdatasets = [] + # Get list of virtual datasets (or dask delayed objects) for g in granules: - # data_paths.append(g.data_links(access=access)[0]) vdatasets.append( - open_(fs=fs, group=group, data_path=g.data_links(access=access)[0]) + open_( + filepath=g.data_links(access=access)[0] + ".dmrpp", + filetype="dmrpp", # type: ignore + group=group, + indexes={}, + reader_options={"storage_options": fs.storage_options}, # type: ignore + ) ) - # Rename paths to match granule s3/https paths - # vdatasets = [ - # vds.virtualize.rename_paths(data_paths[i]) for i, vds in enumerate(vdatasets) - # ] if preprocess is not None: vdatasets = [preprocess(ds) for ds in vdatasets] if parallel: - vdatasets = dask.compute(vdatasets)[0] + vdatasets = dask.compute(vdatasets)[0] # type: ignore if len(vdatasets) == 1: vds = vdatasets[0] else: vds = xr.combine_nested(vdatasets, **xr_combine_nested_kwargs) if load: - options = fs.storage_options.copy() refs = vds.virtualize.to_kerchunk(filepath=None, format="dict") - options["fo"] = refs return xr.open_dataset( "reference://", engine="zarr", chunks={}, - backend_kwargs={"storage_options": options, "consolidated": False}, + backend_kwargs={ + "consolidated": False, + "storage_options": { + "fo": refs, + "remote_protocol": fs.protocol, + "remote_options": fs.storage_options, # type: ignore + }, + }, ) return vds def open_virtual_dataset( - granule: earthaccess.results.DataGranule, - group: str | None = None, + granule: earthaccess.DataGranule, access: str = "indirect", - load: bool = True, + load: bool = False, ) -> xr.Dataset: """Open a granule as a single virtual xarray Dataset. + Uses DMR++ metadata files to create a virtual xarray dataset with ManifestArrays. This virtual dataset can be used to create zarr reference files. See https://virtualizarr.readthedocs.io/en/latest/ for more information on virtual xarray datasets. + Parameters ---------- - granule : earthaccess.results.DataGranule + granule : earthaccess.DataGranule The granule to open - group : str - The group to open in the DMR++. If groups are present in the DMR++ files, this will open the specified group. If None, the first parsed group will be opened. - If the DMR++ file does not have groups, this parameter is ignored. - access : str - The access method to use. One of "direct" or "indirect". Direct is for S3/cloud access, indirect is for HTTPS access. - load: bool - When true, creates a numpy/dask backed xarray dataset. When false a virtual xarray dataset is created with ManifestArrays - This virtual dataset cannot load data and is used to create zarr reference files. See https://virtualizarr.readthedocs.io/en/latest/ - for more information on virtual xarray datasets + access : str (default="indirect") + The access method to use. One of "direct" or "indirect". Use direct when running on AWS, use indirect when running on a local machine. + load: bool (default=False) + Create an xarray dataset with indexes and lazy loaded data. + + When true, creates a lazy loaded, numpy/dask backed xarray dataset with indexes. Note that when `load=True` all the data is now available to access but not loaded into memory. When `load=False` a virtual xarray dataset is created with ManifestArrays. This virtual dataset is a view over the underlying metadata and chunks and allows creation and concatenation of zarr reference files. This virtual dataset cannot load data on it's own and see https://virtualizarr.readthedocs.io/en/latest/ for more information on virtual xarray datasets. + Returns: ---------- xr.Dataset - Example: + Examples: ---------- >>> results = earthaccess.search_data(count=2, temporal=("2023"), short_name="SWOT_L2_LR_SSH_Expert_2.0") >>> open_virtual_dataset(results[0], access="indirect", load=False) @@ -193,24 +182,9 @@ def open_virtual_dataset( Dimensions without coordinates: num_lines, num_pixels, num_sides Data variables: (12/98) height_cor_xover_qual (num_lines, num_pixels) uint8 681kB ManifestArray>> results = earthaccess.search_data(count=2, short_name="ATL03") - >>> open_virtual_dataset(results[0], group=""/gt1r/geolocation"" access="indirect", load=False) - Size: 27MB - Dimensions: (delta_time: 149696, ds_surf_type: 5, ds_xyz: 3) - Coordinates: - delta_time (delta_time) float64 1MB ManifestArray Date: Thu, 14 Nov 2024 10:57:37 -0800 Subject: [PATCH 5/6] pyproject and formatting --- earthaccess/virtualizarr.py | 3 ++- pyproject.toml | 7 +++++++ tests/integration/test_virtualizarr.py | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/earthaccess/virtualizarr.py b/earthaccess/virtualizarr.py index 0bcbb655..69122163 100644 --- a/earthaccess/virtualizarr.py +++ b/earthaccess/virtualizarr.py @@ -7,6 +7,7 @@ if TYPE_CHECKING: import xarray as xr + def open_virtual_mfdataset( granules: list[earthaccess.DataGranule], group: str | None = None, @@ -161,7 +162,7 @@ def open_virtual_dataset( Create an xarray dataset with indexes and lazy loaded data. When true, creates a lazy loaded, numpy/dask backed xarray dataset with indexes. Note that when `load=True` all the data is now available to access but not loaded into memory. When `load=False` a virtual xarray dataset is created with ManifestArrays. This virtual dataset is a view over the underlying metadata and chunks and allows creation and concatenation of zarr reference files. This virtual dataset cannot load data on it's own and see https://virtualizarr.readthedocs.io/en/latest/ for more information on virtual xarray datasets. - + Returns: ---------- diff --git a/pyproject.toml b/pyproject.toml index 1485125c..c1a55ae2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,6 +64,9 @@ kerchunk = [ "h5netcdf", "xarray", ] +virtualizarr = [ + "virtualizarr @ git+https://github.com/zarr-developers/VirtualiZarr.git" +] dev = [ "bump-my-version >=0.10.0", "nox", @@ -82,6 +85,7 @@ test = [ "types-setuptools >=0.1", "vcrpy >=6.0.1", "earthaccess[kerchunk]", + "earthaccess[virtualizarr]", ] docs = [ "jupyterlab >=3", @@ -122,6 +126,9 @@ docs = [ [tool.pytest] filterwarnings = ["error::UserWarning"] +[tool.hatch.metadata] +allow-direct-references = true + [tool.mypy] python_version = "3.10" files = ["earthaccess", "tests"] diff --git a/tests/integration/test_virtualizarr.py b/tests/integration/test_virtualizarr.py index 05281042..aaf93260 100644 --- a/tests/integration/test_virtualizarr.py +++ b/tests/integration/test_virtualizarr.py @@ -16,7 +16,7 @@ @pytest.fixture( - scope="module", params=["MUR25-JPL-L4-GLOB-v04.2", "SWOT_L2_LR_SSH_Basic_2.0"] + scope="module", params=["MUR25-JPL-L4-GLOB-v04.2"] ) def granule(request): granules = earthaccess.search_data( From dde9c579a8c58d1a7abf6a60f27d982f3d5090f0 Mon Sep 17 00:00:00 2001 From: ayushnag <35325113+ayushnag@users.noreply.github.com> Date: Thu, 14 Nov 2024 11:16:50 -0800 Subject: [PATCH 6/6] add group param to open_virtual_dataset --- earthaccess/virtualizarr.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/earthaccess/virtualizarr.py b/earthaccess/virtualizarr.py index 69122163..8bca29cd 100644 --- a/earthaccess/virtualizarr.py +++ b/earthaccess/virtualizarr.py @@ -145,6 +145,7 @@ def open_virtual_mfdataset( def open_virtual_dataset( granule: earthaccess.DataGranule, + group: str | None = None, access: str = "indirect", load: bool = False, ) -> xr.Dataset: @@ -156,6 +157,9 @@ def open_virtual_dataset( ---------- granule : earthaccess.DataGranule The granule to open + group : str or None (default=None) + The group to open in the DMR++. If groups are present in the DMR++ files, this will open the specified group. If None, the root group will be opened. + If the DMR++ file does not have groups, this parameter is ignored. access : str (default="indirect") The access method to use. One of "direct" or "indirect". Use direct when running on AWS, use indirect when running on a local machine. load: bool (default=False) @@ -186,6 +190,7 @@ def open_virtual_dataset( """ return open_virtual_mfdataset( granules=[granule], + group=group, access=access, load=load, parallel=False,