Skip to content

Commit

Permalink
Fix dataspace multidownload (#53)
Browse files Browse the repository at this point in the history
* change username/pass to have underscores

* fix the mis-copied query result parsing for Dataspace

SDS only needs one orbit, which is why this was in there

adds new tests for downloading multiple

* bump version for patches
  • Loading branch information
scottstanie authored Nov 6, 2023
1 parent 3e4afcd commit 96ae4a6
Show file tree
Hide file tree
Showing 14 changed files with 265,247 additions and 26,981 deletions.
39 changes: 20 additions & 19 deletions eof/asf_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import os
from datetime import timedelta
from pathlib import Path
from typing import Optional
from zipfile import ZipFile

Expand Down Expand Up @@ -38,14 +39,14 @@ def __init__(
):
self._cache_dir = cache_dir
if username and password:
self.username = username
self.password = password
self._username = username
self._password = password
else:
logger.debug("Get credentials form netrc")
self.username = ""
self.password = ""
self._username = ""
self._password = ""
try:
self.username, self.password = get_netrc_credentials(NASA_HOST)
self._username, self._password = get_netrc_credentials(NASA_HOST)
except FileNotFoundError:
logger.warning("No netrc file found.")
except ValueError as e:
Expand All @@ -56,7 +57,7 @@ def __init__(
)

self.session: Optional[requests.Session] = None
if self.username and self.password:
if self._username and self._password:
self.session = self.get_authenticated_session()

def get_full_eof_list(self, orbit_type="precise", max_dt=None):
Expand Down Expand Up @@ -180,20 +181,20 @@ def get_cache_dir(self):
os.makedirs(path)
return path

def _download_and_write(self, url, save_dir="."):
def _download_and_write(self, url, save_dir=".") -> Path:
"""Wrapper function to run the link downloading in parallel
Args:
url (str): url of orbit file to download
save_dir (str): directory to save the EOF files into
Returns:
list[str]: Filenames to which the orbit files have been saved
Path: Filename to saved orbit file
"""
fname = os.path.join(save_dir, url.split("/")[-1])
fname = Path(save_dir) / url.split("/")[-1]
if os.path.isfile(fname):
logger.info("%s already exists, skipping download.", url)
return [fname]
return fname

logger.info("Downloading %s", url)
get_function = self.session.get if self.session is not None else requests.get
Expand All @@ -208,22 +209,22 @@ def _download_and_write(self, url, save_dir="."):
"Failed to download %s. Trying URS login url: %s", url, login_url
)
# Add credentials
response = get_function(login_url, auth=(self.username, self.password))
response = get_function(login_url, auth=(self._username, self._password))
response.raise_for_status()

logger.info("Saving to %s", fname)
with open(fname, "wb") as f:
f.write(response.content)
if fname.endswith(".zip"):
if fname.suffix == ".zip":
ASFClient._extract_zip(fname, save_dir=save_dir)
# Pass the unzipped file ending in ".EOF", not the ".zip"
fname = fname.replace(".zip", "")
fname = fname.with_suffix("")
return fname

@staticmethod
def _extract_zip(fname_zipped, save_dir=None, delete=True):
def _extract_zip(fname_zipped: Path, save_dir=None, delete=True):
if save_dir is None:
save_dir = os.path.dirname(fname_zipped)
save_dir = fname_zipped.parent
with ZipFile(fname_zipped, "r") as zip_ref:
# Extract the .EOF to the same direction as the .zip
zip_ref.extractall(path=save_dir)
Expand All @@ -232,9 +233,9 @@ def _extract_zip(fname_zipped, save_dir=None, delete=True):
zipped = zip_ref.namelist()[0]
zipped_dir = os.path.dirname(zipped)
if zipped_dir:
no_subdir = os.path.join(save_dir, os.path.split(zipped)[1])
os.rename(os.path.join(save_dir, zipped), no_subdir)
os.rmdir(os.path.join(save_dir, zipped_dir))
no_subdir = save_dir / os.path.split(zipped)[1]
os.rename((save_dir / zipped), no_subdir)
os.rmdir((save_dir / zipped_dir))
if delete:
os.remove(fname_zipped)

Expand All @@ -250,6 +251,6 @@ def get_authenticated_session(self) -> requests.Session:
Authenticated session
"""
s = requests.Session()
response = s.get(self.auth_url, auth=(self.username, self.password))
response = s.get(self.auth_url, auth=(self._username, self._password))
response.raise_for_status()
return s
1 change: 0 additions & 1 deletion eof/dataspace_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,6 @@ def download_all(
# Obtain an access token the download request from the provided credentials
access_token = get_access_token(username, password)
for query_result in query_results:
query_result = query_results[0]
orbit_file_name = query_result["Name"]
orbit_file_request_id = query_result["Id"]

Expand Down
16 changes: 10 additions & 6 deletions eof/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import itertools
import os
from multiprocessing.pool import ThreadPool
from pathlib import Path

from dateutil.parser import parse

Expand All @@ -49,7 +50,8 @@ def download_eofs(
asf_password: str = "",
cdse_user: str = "",
cdse_password: str = "",
):
max_workers: int = MAX_WORKERS,
) -> list[Path]:
"""Downloads and saves EOF files for specific dates
Args:
Expand Down Expand Up @@ -112,7 +114,7 @@ def download_eofs(
asf_client = ASFClient(username=asf_user, password=asf_password)
urls = asf_client.get_download_urls(orbit_dts, missions, orbit_type=orbit_type)
# Download and save all links in parallel
pool = ThreadPool(processes=MAX_WORKERS)
pool = ThreadPool(processes=max_workers)
result_url_dict = {
pool.apply_async(
asf_client._download_and_write,
Expand All @@ -122,12 +124,12 @@ def download_eofs(
}

for result, url in result_url_dict.items():
cur_filenames = result.get()
if cur_filenames is None:
cur_filename = result.get()
if cur_filename is None:
logger.error("Failed to download orbit for %s", url)
else:
logger.info("Finished %s, saved to %s", url, cur_filenames)
filenames.append(cur_filenames)
logger.info("Finished %s, saved to %s", url, cur_filename)
filenames.append(cur_filename)

return filenames

Expand Down Expand Up @@ -197,6 +199,7 @@ def main(
asf_password: str = "",
cdse_user: str = "",
cdse_password: str = "",
max_workers: int = MAX_WORKERS,
):
"""Function used for entry point to download eofs"""

Expand Down Expand Up @@ -240,4 +243,5 @@ def main(
asf_password=asf_password,
cdse_user=cdse_user,
cdse_password=cdse_password,
max_workers=max_workers,
)
Loading

0 comments on commit 96ae4a6

Please sign in to comment.