Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] Allow for opening with known file sizes #315

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion earthaccess/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ def download(
def open(
granules: Union[List[str], List[earthaccess.results.DataGranule]],
provider: Optional[str] = None,
sizes: Optional[List[int]] = None,
) -> List[AbstractFileSystem]:
"""Returns a list of fsspec file-like objects that can be used to access files
hosted on S3 or HTTPS by third party libraries like xarray.
Expand All @@ -194,7 +195,9 @@ def open(
Returns:
a list of s3fs "file pointers" to s3 files.
"""
results = earthaccess.__store__.open(granules=granules, provider=provider)
results = earthaccess.__store__.open(
granules=granules, provider=provider, sizes=sizes
)
return results


Expand Down
50 changes: 40 additions & 10 deletions earthaccess/store.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import datetime
import os
import shutil
Expand Down Expand Up @@ -37,6 +39,7 @@ def __reduce__(self) -> Any:
self.granule,
earthaccess.__auth__,
dumps(self.f),
self.f.size,
)

def __repr__(self) -> str:
Expand All @@ -48,24 +51,37 @@ def _open_files(
granules: Union[List[str], List[DataGranule]],
fs: fsspec.AbstractFileSystem,
threads: Optional[int] = 8,
sizes: Optional[List[int]] = None,
) -> List[fsspec.AbstractFileSystem]:
file_sizes: Union[List[int], List[None]]
if sizes is None:
file_sizes = [None] * len(data_links)
else:
file_sizes = sizes

def multi_thread_open(data: tuple) -> EarthAccessFile:
urls, granule = data
urls, granule, size = data
if type(granule) is not str:
if len(granule.data_links()) > 1:
print(
"Warning: This collection contains more than one file per granule. "
"earthaccess will only open the first data link, "
"try filtering the links before opening them."
)
return EarthAccessFile(fs.open(urls), granule)
return EarthAccessFile(fs.open(urls, size=size), granule)

fileset = pqdm(zip(data_links, granules), multi_thread_open, n_jobs=threads)
fileset = pqdm(
zip(data_links, granules, file_sizes), multi_thread_open, n_jobs=threads
)
return fileset


def make_instance(
cls: Any, granule: DataGranule, auth: Auth, data: Any
cls: Any,
granule: DataGranule,
auth: Auth,
data: Any,
size: int | None,
) -> EarthAccessFile:
# Attempt to re-authenticate
if not earthaccess.__auth__.authenticated:
Expand All @@ -79,7 +95,8 @@ def make_instance(
):
# NOTE: This uses the first data_link listed in the granule. That's not
# guaranteed to be the right one.
return EarthAccessFile(earthaccess.open([granule])[0], granule)
sizes = [size] if size is not None else None
return EarthAccessFile(earthaccess.open([granule], sizes=sizes)[0], granule)
else:
return EarthAccessFile(loads(data), granule)

Expand Down Expand Up @@ -269,6 +286,7 @@ def open(
self,
granules: Union[List[str], List[DataGranule]],
provider: Optional[str] = None,
sizes: Optional[List[int]] = None,
) -> Union[List[Any], None]:
"""Returns a list of fsspec file-like objects that can be used to access files
hosted on S3 or HTTPS by third party libraries like xarray.
Expand All @@ -279,7 +297,7 @@ def open(
a list of s3fs "file pointers" to s3 files.
"""
if len(granules):
return self._open(granules, provider)
return self._open(granules, provider, sizes=sizes)
print("The granules list is empty, moving on...")
return None

Expand All @@ -288,6 +306,7 @@ def _open(
self,
granules: Union[List[str], List[DataGranule]],
provider: Optional[str] = None,
sizes: Optional[List[int]] = None,
) -> Union[List[Any], None]:
"""Returns a list of fsspec file-like objects that can be used to access files
hosted on S3 or HTTPS by third party libraries like xarray.
Expand All @@ -305,6 +324,7 @@ def _open_granules(
granules: List[DataGranule],
provider: Optional[str] = None,
threads: Optional[int] = 8,
sizes: Optional[List[int]] = None,
) -> Union[List[Any], None]:
fileset: List = []
data_links: List = []
Expand Down Expand Up @@ -346,6 +366,7 @@ def _open_granules(
granules=granules,
fs=s3_fs,
threads=threads,
sizes=sizes,
)
except Exception:
print(
Expand All @@ -355,7 +376,9 @@ def _open_granules(
)
return None
else:
fileset = self._open_urls_https(data_links, granules, threads=threads)
fileset = self._open_urls_https(
data_links, granules, threads=threads, sizes=sizes
)
return fileset
else:
access_method = "on_prem"
Expand All @@ -364,7 +387,9 @@ def _open_granules(
granule.data_links(access=access_method) for granule in granules
)
)
fileset = self._open_urls_https(data_links, granules, threads=threads)
fileset = self._open_urls_https(
data_links, granules, threads=threads, sizes=sizes
)
return fileset

@_open.register
Expand All @@ -373,6 +398,7 @@ def _open_urls(
granules: List[str],
provider: Optional[str] = None,
threads: Optional[int] = 8,
sizes: Optional[List[int]] = None,
) -> Union[List[Any], None]:
fileset: List = []
data_links: List = []
Expand Down Expand Up @@ -404,6 +430,7 @@ def _open_urls(
granules=granules,
fs=s3_fs,
threads=threads,
sizes=sizes,
)
except Exception:
print(
Expand All @@ -426,7 +453,9 @@ def _open_urls(
"We cannot open S3 links when we are not in-region, try using HTTPS links"
)
return None
fileset = self._open_urls_https(data_links, granules, threads)

fileset = self._open_urls_https(data_links, granules, 8, sizes)

return fileset

def get(
Expand Down Expand Up @@ -639,11 +668,12 @@ def _open_urls_https(
urls: List[str],
granules: Union[List[str], List[DataGranule]],
threads: Optional[int] = 8,
sizes: Optional[List[int]] = None,
) -> List[fsspec.AbstractFileSystem]:
https_fs = self.get_fsspec_session()
if https_fs is not None:
try:
fileset = _open_files(urls, granules, https_fs, threads)
fileset = _open_files(urls, granules, https_fs, threads, sizes)
except Exception:
print(
"An exception occurred while trying to access remote files via HTTPS: "
Expand Down
Loading