diff --git a/sar_asf_to_gee/hyp3.py b/sar_asf_to_gee/hyp3.py deleted file mode 100644 index 1553321..0000000 --- a/sar_asf_to_gee/hyp3.py +++ /dev/null @@ -1,191 +0,0 @@ -# AUTOGENERATED! DO NOT EDIT! File to edit: ../01_hyp3.ipynb. - -# %% auto 0 -__all__ = ['Transfer'] - -# %% ../01_hyp3.ipynb 2 -import asf_search -import datetime -from IPython.display import JSON -import ee -from fastcore.basics import patch -import gcsfs -from hyp3_sdk import HyP3 -import logging -import os -from pprint import pprint -import re -from rio_cogeo import cogeo -import subprocess -import zipfile - -from sar_asf_to_gee.core import ( - FORMAT_GEE_DATETIME_STRING, - create_gee_image_collection -) - -# %% ../01_hyp3.ipynb 11 -class Transfer(): - def __init__( - self, - job_dict, # HyP3 job dictionary - gcs_bucket, # GCS bucket - gee_gcp_project, # GCP project used by Earth Engine - gee_image_collection=None, # Name of the Earth Engine ImageCollection (optional) - local_storage=None, - ): - self.job_dict = job_dict - self.gcs_bucket = gcs_bucket - self.gee_gcp_project = gee_gcp_project - self.gee_image_collection = gee_image_collection - if local_storage: - self.tempdir = None - self.local_storage = local_storage - else: - self.tempdir = tempfile.TemporaryDirectory() - self.local_storage = self.tempdir.name - logging.debug(f'created temporary directory: {self.tempdir.name}') - -# %% ../01_hyp3.ipynb 13 -@patch -def hpy3_results_to_local( - self:Transfer, -): - "Transfer HyP3 results to local system, unzip, and update the job dictionary." - logging.info(f'Starting hpy3_results_to_local()') - for file in self.job_dict['files']: - logging.info(f'file = {file}') - asf_search.download_url( - url=file['url'], - path=self.local_storage, - filename=file['filename'], - ) - # Unzip the file - with zipfile.ZipFile(os.path.join(self.local_storage, file['filename']), 'r') as zip_ref: - zip_ref.extractall(self.local_storage) - - # List the TIF files. - scene_name = file['filename'].removesuffix('.zip') - tifs = [x for x in os.listdir( - os.path.join('temp_downloads', scene_name)) - if x.endswith('.tif')] - - for tif in tifs: - logging.info(f'Converting to a Cloud Optimized GeoTIFF: {tif}') - subprocess.run([ - "rio", - "cogeo", - "create", - os.path.join(self.local_storage, scene_name, tif), - os.path.join(self.local_storage, scene_name, tif) - ]) - - tif_dict = {} - pattern = rf'^({scene_name}_(.+).tif)$' - for i in tifs: - groups = re.search(pattern, i).groups() - tif_dict[groups[1]] = os.path.join(scene_name, groups[0]) - - file['extracted'] = tif_dict - -# %% ../01_hyp3.ipynb 16 -@patch -def to_gcs( - self:Transfer, -): - logging.info('Starting _to_gcs()') - - fs = gcsfs.GCSFileSystem(token='google_default') - - for file in self.job_dict['files']: - for band, filename in file['extracted'].items(): - gcs_path = f'{self.gcs_bucket}/{filename}' - if fs.exists(gcs_path): - logging.info(f'GCS file already exists: {gcs_path}') - else: - logging.info(f'Starting to transfer file to GCS: {gcs_path}') - # Transfer the local file to GCS. - fs.put_file( - lpath=f"{self.local_storage}/{filename}", - rpath=gcs_path - ) - logging.info(f'Transferred file to GCS: {gcs_path}') - -# %% ../01_hyp3.ipynb 19 -@patch -def create_gee_asset( - self:Transfer, -): - "Create an Earth Engine asset." - logging.info(f'Starting create_gee_asset()') - - ee.Initialize(project=self.gee_gcp_project) - - create_gee_image_collection(self.gee_gcp_project, self.gee_image_collection) - - granule_names = self.job_dict['job_parameters']['granules'] - granules = asf_search.granule_search(granule_names) - - granule_times = [datetime.datetime.fromisoformat(x.properties['stopTime']) for x in granules] - start_time = min(granule_times) - end_time = max(granule_times) - - id = f"{self.job_dict['job_id']}" - - props = granules[0].properties - description = (f"{props['platform']}" - f" - {props['processingLevel']}" - f" - {props['beamModeType']}") - - for file_dict in self.job_dict['files']: - for band, filename in file_dict['extracted'].items(): - - # Skip non-geocoded (native range-doppler coordinates) TIFFs. - if filename.endswith('_rdr.tif'): - continue - - gcs_path = f'{self.gcs_bucket}/{filename}' - print(gcs_path) - - request = { - 'type': 'IMAGE', - 'bands': { # TODO: Update this once multi-band COG assets are supported - 'id': band - }, - 'gcs_location': { - 'uris': [f'gs://{gcs_path}'] - }, - 'properties': { - 'source': file_dict['url'], - 'band': band # TODO: Remove this once multi-band COG assets are supported - }, - 'startTime': start_time.strftime(FORMAT_GEE_DATETIME_STRING), - 'endTime': end_time.strftime(FORMAT_GEE_DATETIME_STRING), - 'description': description - } - - path_parts = [ - 'projects', - self.gee_gcp_project, - 'assets', - self.gee_image_collection, - # TODO: Remove the band suffix once multi-band COG assets are supported - f'{id}_{band}'.replace(".", "_") - ] - assetname = os.path.join(*[x for x in path_parts if x is not None]) - - logging.debug(f'request = {request}') - logging.debug(f'assetname = {assetname}') - try: - ee.data.createAsset( - value=request, - path=assetname - ) - logging.info('Finished creating a GEE asset.') - except ee.EEException as e: - print(f'e = {e}') - if "does not exist or doesn't allow this operation" in str(e): - raise(e) - else: - raise(e) # TODO: Add logic to parse the EEException message. - logging.info('GEE asset already exists. Skipping.')