Skip to content

Commit

Permalink
Made changes like adding excpetion_behaviour and fail_fast parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
Sherwin-14 committed Aug 24, 2024
1 parent a492348 commit 980469a
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
instead ([#766](https://github.com/nsidc/earthaccess/issues/766))([**@Sherwin-14**](https://github.com/Sherwin-14))
- Added Issue Templates([#281](https://github.com/nsidc/earthaccess/issues/281))([**@Sherwin-14**](https://github.com/Sherwin-14))

- Fixed earthaccess.download() ignoring errors ([#581])(https://github.com/nsidc/earthaccess/issues/581) ([**@Sherwin-14**](https://github.com/Sherwin-14))


## [v0.10.0] 2024-07-19
Expand Down
43 changes: 27 additions & 16 deletions earthaccess/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,20 @@ def _open_files(
url_mapping: Mapping[str, Union[DataGranule, None]],
fs: fsspec.AbstractFileSystem,
threads: Optional[int] = 8,
fail_fast: bool = True
fail_fast: bool = True,
) -> List[fsspec.AbstractFileSystem]:
def multi_thread_open(data: tuple) -> EarthAccessFile:
urls, granule = data
return EarthAccessFile(fs.open(urls), granule)

exception_behavior = "immediate" if fail_fast else "deferred"

fileset = pqdm(url_mapping.items(), multi_thread_open, n_jobs=threads,exception_behaviour=exception_behavior)
fileset = pqdm(
url_mapping.items(),
multi_thread_open,
n_jobs=threads,
exception_behaviour=exception_behavior,
)
return fileset


Expand Down Expand Up @@ -420,7 +425,7 @@ def _open_urls(
granules: List[str],
provider: Optional[str] = None,
threads: Optional[int] = 8,
fail_fast: bool = True
fail_fast: bool = True,
) -> List[Any]:
fileset: List = []

Expand All @@ -445,10 +450,7 @@ def _open_urls(
if s3_fs is not None:
try:
fileset = _open_files(
url_mapping,
fs=s3_fs,
threads=threads,
fail_fast = fail_fast
url_mapping, fs=s3_fs, threads=threads, fail_fast=fail_fast
)
except Exception as e:
raise RuntimeError(
Expand Down Expand Up @@ -546,7 +548,7 @@ def _get_urls(
local_path: Path,
provider: Optional[str] = None,
threads: int = 8,
fail_fast: bool = True
fail_fast: bool = True,
) -> List[str]:
data_links = granules
downloaded_files: List = []
Expand All @@ -568,7 +570,9 @@ def _get_urls(

else:
# if we are not in AWS
return self._download_onprem_granules(data_links, local_path, threads,fail_fast=fail_fast)
return self._download_onprem_granules(
data_links, local_path, threads, fail_fast=fail_fast
)

@_get.register
def _get_granules(
Expand All @@ -577,7 +581,7 @@ def _get_granules(
local_path: Path,
provider: Optional[str] = None,
threads: int = 8,
fail_fast: bool = True
fail_fast: bool = True,
) -> List[str]:
data_links: List = []
downloaded_files: List = []
Expand Down Expand Up @@ -618,7 +622,9 @@ def _get_granules(
else:
# if the data are cloud-based, but we are not in AWS,
# it will be downloaded as if it was on prem
return self._download_onprem_granules(data_links, local_path, threads,fail_fast=fail_fast)
return self._download_onprem_granules(
data_links, local_path, threads, fail_fast=fail_fast
)

def _download_file(self, url: str, directory: Path) -> str:
"""Download a single file from an on-prem location, a DAAC data center.
Expand Down Expand Up @@ -656,7 +662,7 @@ def _download_file(self, url: str, directory: Path) -> str:
return str(path)

def _download_onprem_granules(
self, urls: List[str], directory: Path, threads: int = 8,fail_fast: bool = True
self, urls: List[str], directory: Path, threads: int = 8, fail_fast: bool = True
) -> List[Any]:
"""Downloads a list of URLS into the data directory.
Expand All @@ -665,6 +671,9 @@ def _download_onprem_granules(
directory: local directory to store the downloaded files
threads: parallel number of threads to use to download the files;
adjust as necessary, default = 8
fail_fast: if set to True, the download process will stop immediately
upon encountering the first error. If set to False, errors will be
deferred, allowing the download of remaining files to continue.
Returns:
A list of local filepaths to which the files were downloaded.
Expand All @@ -686,20 +695,22 @@ def _download_onprem_granules(
self._download_file,
n_jobs=threads,
argument_type="args",
exception_behaviour=exception_behavior
exception_behaviour=exception_behavior,
)
return results

def _open_urls_https(
self,
url_mapping: Mapping[str, Union[DataGranule, None]],
threads: Optional[int] = 8,
fail_fast: bool = True
fail_fast: bool = True,
) -> List[fsspec.AbstractFileSystem]:
https_fs = self.get_fsspec_session()
if https_fs is not None:
try:
fileset = _open_files(url_mapping, https_fs, threads,fail_fast=fail_fast)
fileset = _open_files(
url_mapping, https_fs, threads, fail_fast=fail_fast
)
except Exception:
logger.exception(
"An exception occurred while trying to access remote files via HTTPS"
Expand Down

0 comments on commit 980469a

Please sign in to comment.