From 068c6e638f1896fbdaca350ccd5c7528bb8786b2 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Thu, 21 Sep 2023 11:19:06 -0500 Subject: [PATCH 1/4] Use file size when available --- earthaccess/api.py | 5 +++- earthaccess/store.py | 69 +++++++++++++++++++++++++++++++++++++------- 2 files changed, 62 insertions(+), 12 deletions(-) diff --git a/earthaccess/api.py b/earthaccess/api.py index f0264454..1cf9302f 100644 --- a/earthaccess/api.py +++ b/earthaccess/api.py @@ -181,6 +181,7 @@ def download( def open( granules: Union[List[str], List[earthaccess.results.DataGranule]], provider: Optional[str] = None, + sizes=None, ) -> List[AbstractFileSystem]: """Returns a list of fsspec file-like objects that can be used to access files hosted on S3 or HTTPS by third party libraries like xarray. @@ -191,7 +192,9 @@ def open( Returns: a list of s3fs "file pointers" to s3 files. """ - results = earthaccess.__store__.open(granules=granules, provider=provider) + results = earthaccess.__store__.open( + granules=granules, provider=provider, sizes=sizes + ) return results diff --git a/earthaccess/store.py b/earthaccess/store.py index 4905be82..f88e51c0 100644 --- a/earthaccess/store.py +++ b/earthaccess/store.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import datetime import os import shutil @@ -37,6 +39,7 @@ def __reduce__(self) -> Any: self.granule, earthaccess.__auth__, dumps(self.f), + self.f.size, ) def __repr__(self) -> str: @@ -48,9 +51,13 @@ def _open_files( granules: Union[List[str], List[DataGranule]], fs: fsspec.AbstractFileSystem, threads: Optional[int] = 8, + sizes=None, ) -> List[fsspec.AbstractFileSystem]: + if sizes is None: + sizes = [None] * len(data_links) + def multi_thread_open(data: tuple) -> EarthAccessFile: - urls, granule = data + urls, granule, size = data if type(granule) is not str: if len(granule.data_links()) > 1: print( @@ -58,14 +65,42 @@ def multi_thread_open(data: tuple) -> EarthAccessFile: "earthaccess will only open the first data link, " "try filtering the links before opening them." ) - return EarthAccessFile(fs.open(urls), granule) - - fileset = pqdm(zip(data_links, granules), multi_thread_open, n_jobs=threads) + if "s3" in str(type(fs)).lower() and size is not None: + # populate dircache + # {'Key': 'oss-scratch-space/jrbourbeau/arraylakef7947862d0a794abb85c0c4544fcf931acfbc21d5b02fec075fea05cfa3184ac', + # 'LastModified': datetime.datetime(2023, 9, 7, 1, 44, 9, tzinfo=tzutc()), + # 'ETag': '"4c2dd9323fd2bfca326e0032926a87e6"', + # 'Size': 298161, + # 'StorageClass': 'STANDARD', + # 'type': 'file', + # 'size': 298161, + # 'name': 'oss-scratch-space/jrbourbeau/arraylakef7947862d0a794abb85c0c4544fcf931acfbc21d5b02fec075fea05cfa3184ac'} + for link in data_links: + name = fs._strip_protocol(link) + bucket, _ = os.path.split(name) + if bucket not in fs.dircache: + fs.dircache[bucket] = [] + file_info = { + "name": name, + "Key": name, + "Size": size, + "size": size, + "StorageClass": "STANDARD", + "type": "file", + } + fs.dircache[bucket].append(file_info) + return EarthAccessFile(fs.open(urls, size=size), granule) + + fileset = pqdm(zip(data_links, granules, sizes), multi_thread_open, n_jobs=threads) return fileset def make_instance( - cls: Any, granule: DataGranule, auth: Auth, data: Any + cls: Any, + granule: DataGranule, + auth: Auth, + data: Any, + size: int | None, ) -> EarthAccessFile: # Attempt to re-authenticate if not earthaccess.__auth__.authenticated: @@ -79,7 +114,8 @@ def make_instance( ): # NOTE: This uses the first data_link listed in the granule. That's not # guaranteed to be the right one. - return EarthAccessFile(earthaccess.open([granule])[0], granule) + sizes = [size] if size is not None else None + return EarthAccessFile(earthaccess.open([granule], sizes=sizes)[0], granule) else: return EarthAccessFile(loads(data), granule) @@ -269,6 +305,7 @@ def open( self, granules: Union[List[str], List[DataGranule]], provider: Optional[str] = None, + sizes=None, ) -> Union[List[Any], None]: """Returns a list of fsspec file-like objects that can be used to access files hosted on S3 or HTTPS by third party libraries like xarray. @@ -279,7 +316,7 @@ def open( a list of s3fs "file pointers" to s3 files. """ if len(granules): - return self._open(granules, provider) + return self._open(granules, provider, sizes=sizes) print("The granules list is empty, moving on...") return None @@ -288,6 +325,7 @@ def _open( self, granules: Union[List[str], List[DataGranule]], provider: Optional[str] = None, + sizes=None, ) -> Union[List[Any], None]: """Returns a list of fsspec file-like objects that can be used to access files hosted on S3 or HTTPS by third party libraries like xarray. @@ -305,6 +343,7 @@ def _open_granules( granules: List[DataGranule], provider: Optional[str] = None, threads: Optional[int] = 8, + sizes=None, ) -> Union[List[Any], None]: fileset: List = [] data_links: List = [] @@ -346,6 +385,7 @@ def _open_granules( granules=granules, fs=s3_fs, threads=threads, + sizes=sizes, ) except Exception: print( @@ -355,7 +395,9 @@ def _open_granules( ) return None else: - fileset = self._open_urls_https(data_links, granules, threads=threads) + fileset = self._open_urls_https( + data_links, granules, threads=threads, sizes=sizes + ) return fileset else: access_method = "on_prem" @@ -364,7 +406,9 @@ def _open_granules( granule.data_links(access=access_method) for granule in granules ) ) - fileset = self._open_urls_https(data_links, granules, threads=threads) + fileset = self._open_urls_https( + data_links, granules, threads=threads, sizes=sizes + ) return fileset @_open.register @@ -373,6 +417,7 @@ def _open_urls( granules: List[str], provider: Optional[str] = None, threads: Optional[int] = 8, + sizes=None, ) -> Union[List[Any], None]: fileset: List = [] data_links: List = [] @@ -404,6 +449,7 @@ def _open_urls( granules=granules, fs=s3_fs, threads=threads, + sizes=sizes, ) except Exception: print( @@ -426,7 +472,7 @@ def _open_urls( "We cannot open S3 links when we are not in-region, try using HTTPS links" ) return None - fileset = self._open_urls_https(data_links, granules, 8) + fileset = self._open_urls_https(data_links, granules, 8, sizes) return fileset def get( @@ -639,11 +685,12 @@ def _open_urls_https( urls: List[str], granules: Union[List[str], List[DataGranule]], threads: Optional[int] = 8, + sizes=None, ) -> List[fsspec.AbstractFileSystem]: https_fs = self.get_fsspec_session() if https_fs is not None: try: - fileset = _open_files(urls, granules, https_fs, threads) + fileset = _open_files(urls, granules, https_fs, threads, sizes) except Exception: print( "An exception occurred while trying to access remote files via HTTPS: " From 262589d9159a7d70586d53eebaac8df83eb999e8 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Thu, 21 Sep 2023 14:11:52 -0500 Subject: [PATCH 2/4] Hold off on dircache --- earthaccess/store.py | 48 ++++++++++++++++++++++---------------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/earthaccess/store.py b/earthaccess/store.py index f88e51c0..a189b542 100644 --- a/earthaccess/store.py +++ b/earthaccess/store.py @@ -65,30 +65,30 @@ def multi_thread_open(data: tuple) -> EarthAccessFile: "earthaccess will only open the first data link, " "try filtering the links before opening them." ) - if "s3" in str(type(fs)).lower() and size is not None: - # populate dircache - # {'Key': 'oss-scratch-space/jrbourbeau/arraylakef7947862d0a794abb85c0c4544fcf931acfbc21d5b02fec075fea05cfa3184ac', - # 'LastModified': datetime.datetime(2023, 9, 7, 1, 44, 9, tzinfo=tzutc()), - # 'ETag': '"4c2dd9323fd2bfca326e0032926a87e6"', - # 'Size': 298161, - # 'StorageClass': 'STANDARD', - # 'type': 'file', - # 'size': 298161, - # 'name': 'oss-scratch-space/jrbourbeau/arraylakef7947862d0a794abb85c0c4544fcf931acfbc21d5b02fec075fea05cfa3184ac'} - for link in data_links: - name = fs._strip_protocol(link) - bucket, _ = os.path.split(name) - if bucket not in fs.dircache: - fs.dircache[bucket] = [] - file_info = { - "name": name, - "Key": name, - "Size": size, - "size": size, - "StorageClass": "STANDARD", - "type": "file", - } - fs.dircache[bucket].append(file_info) + # if "s3" in str(type(fs)).lower() and size is not None: + # # populate dircache + # # {'Key': 'oss-scratch-space/jrbourbeau/arraylakef7947862d0a794abb85c0c4544fcf931acfbc21d5b02fec075fea05cfa3184ac', + # # 'LastModified': datetime.datetime(2023, 9, 7, 1, 44, 9, tzinfo=tzutc()), + # # 'ETag': '"4c2dd9323fd2bfca326e0032926a87e6"', + # # 'Size': 298161, + # # 'StorageClass': 'STANDARD', + # # 'type': 'file', + # # 'size': 298161, + # # 'name': 'oss-scratch-space/jrbourbeau/arraylakef7947862d0a794abb85c0c4544fcf931acfbc21d5b02fec075fea05cfa3184ac'} + # for link in data_links: + # name = fs._strip_protocol(link) + # bucket, _ = os.path.split(name) + # if bucket not in fs.dircache: + # fs.dircache[bucket] = [] + # file_info = { + # "name": name, + # "Key": name, + # "Size": size, + # "size": size, + # "StorageClass": "STANDARD", + # "type": "file", + # } + # fs.dircache[bucket].append(file_info) return EarthAccessFile(fs.open(urls, size=size), granule) fileset = pqdm(zip(data_links, granules, sizes), multi_thread_open, n_jobs=threads) From 3400fe4fb818dd3338dafeb3a8befcea6afbdccb Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Wed, 11 Oct 2023 16:33:55 -0500 Subject: [PATCH 3/4] Remove comments --- earthaccess/store.py | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/earthaccess/store.py b/earthaccess/store.py index a189b542..cb246f82 100644 --- a/earthaccess/store.py +++ b/earthaccess/store.py @@ -65,30 +65,6 @@ def multi_thread_open(data: tuple) -> EarthAccessFile: "earthaccess will only open the first data link, " "try filtering the links before opening them." ) - # if "s3" in str(type(fs)).lower() and size is not None: - # # populate dircache - # # {'Key': 'oss-scratch-space/jrbourbeau/arraylakef7947862d0a794abb85c0c4544fcf931acfbc21d5b02fec075fea05cfa3184ac', - # # 'LastModified': datetime.datetime(2023, 9, 7, 1, 44, 9, tzinfo=tzutc()), - # # 'ETag': '"4c2dd9323fd2bfca326e0032926a87e6"', - # # 'Size': 298161, - # # 'StorageClass': 'STANDARD', - # # 'type': 'file', - # # 'size': 298161, - # # 'name': 'oss-scratch-space/jrbourbeau/arraylakef7947862d0a794abb85c0c4544fcf931acfbc21d5b02fec075fea05cfa3184ac'} - # for link in data_links: - # name = fs._strip_protocol(link) - # bucket, _ = os.path.split(name) - # if bucket not in fs.dircache: - # fs.dircache[bucket] = [] - # file_info = { - # "name": name, - # "Key": name, - # "Size": size, - # "size": size, - # "StorageClass": "STANDARD", - # "type": "file", - # } - # fs.dircache[bucket].append(file_info) return EarthAccessFile(fs.open(urls, size=size), granule) fileset = pqdm(zip(data_links, granules, sizes), multi_thread_open, n_jobs=threads) From c2bb1cdb092a6f34e97bb5dbe8827f4ea38581b6 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Wed, 11 Oct 2023 16:43:08 -0500 Subject: [PATCH 4/4] Lint --- earthaccess/api.py | 2 +- earthaccess/store.py | 21 +++++++++++++-------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/earthaccess/api.py b/earthaccess/api.py index 1cf9302f..5432bff7 100644 --- a/earthaccess/api.py +++ b/earthaccess/api.py @@ -181,7 +181,7 @@ def download( def open( granules: Union[List[str], List[earthaccess.results.DataGranule]], provider: Optional[str] = None, - sizes=None, + sizes: Optional[List[int]] = None, ) -> List[AbstractFileSystem]: """Returns a list of fsspec file-like objects that can be used to access files hosted on S3 or HTTPS by third party libraries like xarray. diff --git a/earthaccess/store.py b/earthaccess/store.py index cb246f82..ef1ee72d 100644 --- a/earthaccess/store.py +++ b/earthaccess/store.py @@ -51,10 +51,13 @@ def _open_files( granules: Union[List[str], List[DataGranule]], fs: fsspec.AbstractFileSystem, threads: Optional[int] = 8, - sizes=None, + sizes: Optional[List[int]] = None, ) -> List[fsspec.AbstractFileSystem]: + file_sizes: Union[List[int], List[None]] if sizes is None: - sizes = [None] * len(data_links) + file_sizes = [None] * len(data_links) + else: + file_sizes = sizes def multi_thread_open(data: tuple) -> EarthAccessFile: urls, granule, size = data @@ -67,7 +70,9 @@ def multi_thread_open(data: tuple) -> EarthAccessFile: ) return EarthAccessFile(fs.open(urls, size=size), granule) - fileset = pqdm(zip(data_links, granules, sizes), multi_thread_open, n_jobs=threads) + fileset = pqdm( + zip(data_links, granules, file_sizes), multi_thread_open, n_jobs=threads + ) return fileset @@ -281,7 +286,7 @@ def open( self, granules: Union[List[str], List[DataGranule]], provider: Optional[str] = None, - sizes=None, + sizes: Optional[List[int]] = None, ) -> Union[List[Any], None]: """Returns a list of fsspec file-like objects that can be used to access files hosted on S3 or HTTPS by third party libraries like xarray. @@ -301,7 +306,7 @@ def _open( self, granules: Union[List[str], List[DataGranule]], provider: Optional[str] = None, - sizes=None, + sizes: Optional[List[int]] = None, ) -> Union[List[Any], None]: """Returns a list of fsspec file-like objects that can be used to access files hosted on S3 or HTTPS by third party libraries like xarray. @@ -319,7 +324,7 @@ def _open_granules( granules: List[DataGranule], provider: Optional[str] = None, threads: Optional[int] = 8, - sizes=None, + sizes: Optional[List[int]] = None, ) -> Union[List[Any], None]: fileset: List = [] data_links: List = [] @@ -393,7 +398,7 @@ def _open_urls( granules: List[str], provider: Optional[str] = None, threads: Optional[int] = 8, - sizes=None, + sizes: Optional[List[int]] = None, ) -> Union[List[Any], None]: fileset: List = [] data_links: List = [] @@ -661,7 +666,7 @@ def _open_urls_https( urls: List[str], granules: Union[List[str], List[DataGranule]], threads: Optional[int] = 8, - sizes=None, + sizes: Optional[List[int]] = None, ) -> List[fsspec.AbstractFileSystem]: https_fs = self.get_fsspec_session() if https_fs is not None: