Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opening virtual datasets (dmr-adapter) #606

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions earthaccess/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from .search import DataCollections, DataGranules
from .store import Store
from .system import PROD, UAT
from .virtualizarr import open_virtual_dataset, open_virtual_mfdataset

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -49,6 +50,9 @@
"Store",
# kerchunk
"consolidate_metadata",
# virtualizarr
"open_virtual_dataset",
"open_virtual_mfdataset",
"PROD",
"UAT",
]
Expand Down
112 changes: 112 additions & 0 deletions earthaccess/virtualizarr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from __future__ import annotations

import fsspec
import xarray as xr

import earthaccess


def _parse_dmr(
fs: fsspec.AbstractFileSystem,
data_path: str,
dmr_path: str = None
) -> xr.Dataset:
"""
Parse a granule's DMR++ file and return a virtual xarray dataset

Parameters
----------
granule : earthaccess.results.DataGranule
The granule to parse
fs : fsspec.AbstractFileSystem
The file system to use to open the DMR++

Returns
----------
xr.Dataset
The virtual dataset (with virtualizarr ManifestArrays)

Raises
----------
Exception
If the DMR++ file is not found or if there is an error parsing the DMR++
"""
from virtualizarr.readers.dmrpp import DMRParser
ayushnag marked this conversation as resolved.
Show resolved Hide resolved

dmr_path = data_path + ".dmrpp" if dmr_path is None else dmr_path
with fs.open(dmr_path) as f:
parser = DMRParser(f.read(), data_filepath=data_path)
return parser.parse()


def open_virtual_mfdataset(
granules: list[earthaccess.results.DataGranule],
access: str = "indirect",
preprocess: callable | None = None,
parallel: bool = True,
**xr_combine_nested_kwargs,
) -> xr.Dataset:
"""
Open multiple granules as a single virtual xarray Dataset

Parameters
----------
granules : list[earthaccess.results.DataGranule]
The granules to open
access : str
The access method to use. One of "direct" or "indirect". Direct is for S3/cloud access, indirect is for HTTPS access.
xr_combine_nested_kwargs : dict
Keyword arguments for xarray.combine_nested.
See https://docs.xarray.dev/en/stable/generated/xarray.combine_nested.html

Returns
----------
xr.Dataset
The virtual dataset
"""
if access == "direct":
fs = earthaccess.get_s3fs_session(results=granules)
else:
fs = earthaccess.get_fsspec_https_session()
if parallel:
# wrap _parse_dmr and preprocess with delayed
import dask
open_ = dask.delayed(_parse_dmr)
if preprocess is not None:
preprocess = dask.delayed(preprocess)
else:
open_ = _parse_dmr
vdatasets = [open_(fs=fs, data_path=g.data_links(access=access)[0]) for g in granules]
if preprocess is not None:
vdatasets = [preprocess(ds) for ds in vdatasets]
if parallel:
vdatasets = dask.compute(vdatasets)[0]
if len(vdatasets) == 1:
vds = vdatasets[0]
else:
vds = xr.combine_nested(vdatasets, **xr_combine_nested_kwargs)
return vds


def open_virtual_dataset(
granule: earthaccess.results.DataGranule, access: str = "indirect"
) -> xr.Dataset:
"""
Open a granule as a single virtual xarray Dataset

Parameters
----------
granule : earthaccess.results.DataGranule
The granule to open
access : str
The access method to use. One of "direct" or "indirect". Direct is for S3/cloud access, indirect is for HTTPS access.

Returns
----------
xr.Dataset
The virtual dataset
"""
return open_virtual_mfdataset(
granules=[granule], access=access, parallel=False, preprocess=None
)

49 changes: 49 additions & 0 deletions tests/integration/test_virtualizarr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import logging
import os
import unittest

import earthaccess
import pytest

pytest.importorskip("virtualizarr")
pytest.importorskip("dask")

logger = logging.getLogger(__name__)
assertions = unittest.TestCase("__init__")

assertions.assertTrue("EARTHDATA_USERNAME" in os.environ)
assertions.assertTrue("EARTHDATA_PASSWORD" in os.environ)

logger.info(f"Current username: {os.environ['EARTHDATA_USERNAME']}")
logger.info(f"earthaccess version: {earthaccess.__version__}")


@pytest.fixture(scope="module")
def granules():
granules = earthaccess.search_data(
count=2,
short_name="MUR-JPL-L4-GLOB-v4.1",
cloud_hosted=True
)
return granules


@pytest.mark.parametrize("output", "memory")
def test_open_virtual_mfdataset(tmp_path, granules, output):
xr = pytest.importorskip("xarray")
# Open directly with `earthaccess.open`
expected = xr.open_mfdataset(earthaccess.open(granules), concat_dim="time", combine="nested", combine_attrs="drop_conflicts")

result = earthaccess.open_virtual_mfdataset(granules=granules, access="indirect", concat_dime="time", parallel=True, preprocess=None)
# dimensions
assert result.sizes == expected.sizes
# variable names, variable dimensions
assert result.variables.keys() == expected.variables.keys()
# attributes
assert result.attrs == expected.attrs
# coordinates
assert result.coords.keys() == expected.coords.keys()
# chunks
assert result.chunks == expected.chunks
# encoding
assert result.encoding == expected.encoding
Loading