Skip to content

Commit

Permalink
Refactor bursts downloading code to make it more robust
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Pechnikov committed Aug 24, 2024
1 parent 2c8d5d5 commit 98337a6
Showing 1 changed file with 86 additions and 64 deletions.
150 changes: 86 additions & 64 deletions pygmtsar/pygmtsar/ASF.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def download(self, basedir, scenes_or_bursts, subswaths=None, polarization='VV',
session : asf_search.ASFSession, optional
The session object for authentication. If None, a new session is created.
n_jobs : int, optional
The number of concurrent download jobs. Default is 8.
The number of concurrent download jobs. Default is 4 for scenes and 8 for bursts.
joblib_backend : str, optional
The backend for parallel processing. Default is 'loky'.
skip_exist : bool, optional
Expand Down Expand Up @@ -244,7 +244,7 @@ def download_scene(scene, subswaths, polarization, basedir, session):
def download_bursts(self, basedir, bursts, session=None, n_jobs=8, joblib_backend='loky', skip_exist=True, debug=False):
"""
Downloads the specified bursts extracted from Sentinel-1 SLC scenes.
Parameters
----------
basedir : str
Expand All @@ -261,7 +261,7 @@ def download_bursts(self, basedir, bursts, session=None, n_jobs=8, joblib_backen
If True, skips downloading bursts that already exist. Default is True.
debug : bool, optional
If True, prints debugging information. Default is False.
Returns
-------
pandas.DataFrame
Expand All @@ -270,6 +270,7 @@ def download_bursts(self, basedir, bursts, session=None, n_jobs=8, joblib_backen
import rioxarray as rio
from tifffile import TiffFile
import xmltodict
from xml.etree import ElementTree
import pandas as pd
import asf_search
import joblib
Expand All @@ -281,18 +282,15 @@ def download_bursts(self, basedir, bursts, session=None, n_jobs=8, joblib_backen
import warnings
# supress asf_search 'UserWarning: File already exists, skipping download'
warnings.filterwarnings("ignore", category=UserWarning)
# repeat failed downloads
retries = 300
retries_timeout = 3

def filter_azimuth_time(items, start_utc_dt, stop_utc_dt, delta=3):
return [item for item in items if
datetime.strptime(item['azimuthTime'], '%Y-%m-%dT%H:%M:%S.%f') >= start_utc_dt - timedelta(seconds=delta) and
datetime.strptime(item['azimuthTime'], '%Y-%m-%dT%H:%M:%S.%f') >= start_utc_dt - timedelta(seconds=delta) and
datetime.strptime(item['azimuthTime'], '%Y-%m-%dT%H:%M:%S.%f') <= stop_utc_dt + timedelta(seconds=delta)]

# create the directory if needed
os.makedirs(basedir, exist_ok=True)

# skip existing bursts
if skip_exist:
bursts_missed = []
Expand Down Expand Up @@ -322,7 +320,7 @@ def filter_azimuth_time(items, start_utc_dt, stop_utc_dt, delta=3):
# do not use internet connection, work offline when all the scenes already available
if len(bursts_missed) == 0:
return

def download_burst(result, basedir, session):
properties = result.geojson()['properties']
#print (properties)
Expand Down Expand Up @@ -358,7 +356,7 @@ def download_burst(result, basedir, session):
burst = '-'.join([f's{platform.lower()}-{subswath.lower()}-slc-{polarization.lower()}'] + scene_parts[5:-1] + ['001']).lower()
# s1a-iw2-slc-vv-20240314t130744-20240314t130747-052978-0669c6-001
#print ('burst', burst)

# create the directories if needed
tif_dir = os.path.join(scene_dir, 'measurement')
xml_dir = os.path.join(scene_dir, 'annotation')
Expand All @@ -369,7 +367,7 @@ def download_burst(result, basedir, session):
#print ('tif_file', tif_file)
for dirname in [scene_dir, tif_dir, xml_dir]:
os.makedirs(dirname, exist_ok=True)

# download tif
# properties['bytes'] is not an accurate file size but it looks about 40 kB smaller
if os.path.exists(tif_file) and os.path.getsize(tif_file) >= int(properties['bytes']):
Expand All @@ -384,27 +382,35 @@ def download_burst(result, basedir, session):
tmp_file = os.path.join(scene_dir, os.path.basename(tif_file))
# download burst tif file and save using the burst and scene names
#result.download(os.path.dirname(tif_file), filename=os.path.basename(tif_file))
for retry in range(retries):
try:
# remove potentially incomplete data file if needed
if os.path.exists(tmp_file):
os.remove(tmp_file)
result.download(scene_dir, filename=os.path.basename(tif_file), session=session)
if os.path.exists(tmp_file):
# check TiFF file validity opening it
with rio.open_rasterio(tmp_file) as raster:
raster.load()
# TiFF file is well loaded
break
except Exception as e:
print(f"Failed attempt {retry+1} to download {tmp_file}: {e}")
# wait before the next attempt
time.sleep(retries_timeout)
try:
# remove potentially incomplete data file if needed
if os.path.exists(tmp_file):
os.remove(tmp_file)
result.download(scene_dir, filename=os.path.basename(tif_file), session=session)
if not os.path.exists(tmp_file):
raise Exception(f'ERROR: TiFF file is not downloaded: {tmp_file}')
if os.path.getsize(tmp_file) == 0:
raise Exception(f'ERROR: TiFF file is empty: {tmp_file}')
# check TiFF file validity opening it
with TiffFile(tmp_file) as tif:
# get TiFF file information
page = tif.pages[0]
tags = page.tags
data = page.asarray()
# rasterio may cause the interpreter to crash when attempting to open a corrupted TIFF file
#with rio.open_rasterio(tmp_file) as raster:
# raster.load()
# TiFF file is well loaded
except Exception as e:
print(f'ERROR: TiFF file downloading failed: {tmp_file}: {e}')
# check if the file is really downloaded
assert os.path.exists(tmp_file), f'ERROR: TiFF file {tmp_file} is not downloaded and validated in {retries} retries'
#assert os.path.exists(tmp_file), f'ERROR: TiFF file {tmp_file} is not downloaded and validated in {retries} retries'
if not os.path.exists(tmp_file):
raise Exception(f'ERROR: TiFF file is missed: {tmp_file}')
# move to persistent name
os.rename(tmp_file, tif_file)

if os.path.exists(tmp_file):
os.rename(tmp_file, tif_file)

# download xml
if os.path.exists(xml_file) and os.path.getsize(xml_file) > 0:
#print (f'pass {xml_file}')
Expand All @@ -415,38 +421,40 @@ def download_burst(result, basedir, session):
page = tif.pages[0]
offset = page.dataoffsets[0]
#print ('offset', offset)

# get the file name
basename = os.path.basename(properties['additionalUrls'][0])
#print ('basename', '=>', basename)
manifest_file = os.path.join(scene_dir, basename)
# download and process manifest file even when it exists but is not processed to annotation xml
for retry in range(retries):
try:
# remove potentially incomplete manifest file if needed
if os.path.exists(manifest_file):
os.remove(manifest_file)
asf_search.download_urls(urls=properties['additionalUrls'], path=scene_dir, session=session)
if os.path.exists(manifest_file):
# check XML file validity parsing it
with open(manifest_file, 'r') as file:
_ = xmltodict.parse(file.read())['burst']['metadata']['product'][0]
# xml file is well parsed
break
except Exception as e:
print(f"Failed attempt {retry+1} to download and parse {manifest_file}: {e}")
# wait before the next attempt
time.sleep(retries_timeout)
try:
# remove potentially incomplete manifest file if needed
if os.path.exists(manifest_file):
os.remove(manifest_file)
asf_search.download_urls(urls=properties['additionalUrls'], path=scene_dir, session=session)
if not os.path.exists(manifest_file):
raise Exception(f'ERROR: manifest file is not downloaded: {manifest_file}')
if os.path.getsize(manifest_file) == 0:
raise Exception(f'ERROR: manifest file is empty: {manifest_file}')
# check XML file validity parsing it
with open(manifest_file, 'r') as file:
xml_content = file.read()
_ = ElementTree.fromstring(xml_content)
# xml file is well parsed
except Exception as e:
print(f'ERROR: Manifest file downloading failed: {manifest_file}: {e}')
# check if the file is really downloaded
assert os.path.exists(manifest_file), f'ERROR: manifest file {manifest_file} is not downloaded and validated in {retries} retries'
#assert os.path.exists(manifest_file), f'ERROR: manifest file {manifest_file} is not downloaded and validated in {retries} retries'
if not os.path.exists(manifest_file):
raise Exception(f'ERROR: manifest file is missed: {manifest_file}')
# parse xml
with open(manifest_file, 'r') as file:
xml_content = file.read()
# remove manifest file
if os.path.exists(manifest_file):
os.remove(manifest_file)

subswathidx = int(subswath[-1:]) - 1
subswathidx = int(subswath[-1:]) - 1
content = xmltodict.parse(xml_content)['burst']['metadata']['product'][subswathidx]
assert polarization == content['polarisation'], 'ERROR: XML polarization differs from burst polarization'
annotation = content['content']
Expand All @@ -455,31 +463,31 @@ def download_burst(result, basedir, session):
start_utc = annotation_burst['azimuthTime']
start_utc_dt = datetime.strptime(start_utc, '%Y-%m-%dT%H:%M:%S.%f')
#print ('start_utc', start_utc, start_utc_dt)

length = annotation['swathTiming']['linesPerBurst']
azimuth_time_interval = annotation['imageAnnotation']['imageInformation']['azimuthTimeInterval']
burst_time_interval = timedelta(seconds=(int(length) - 1) * float(azimuth_time_interval))
stop_utc_dt = start_utc_dt + burst_time_interval
stop_utc = stop_utc_dt.strftime('%Y-%m-%dT%H:%M:%S.%f')
#print ('stop_utc', stop_utc, stop_utc_dt)

# output xml
product = {}

adsHeader = annotation['adsHeader']
adsHeader['startTime'] = start_utc
adsHeader['stopTime'] = stop_utc
adsHeader['imageNumber'] = '001'
product = product | {'adsHeader': adsHeader}

qualityInformation = {'productQualityIndex': annotation['qualityInformation']['productQualityIndex']} |\
{'qualityDataList': annotation['qualityInformation']['qualityDataList']}
product = product | {'qualityInformation': qualityInformation}

generalAnnotation = annotation['generalAnnotation']
# filter annotation['generalAnnotation']['replicaInformationList'] by azimuthTime
product = product | {'generalAnnotation': generalAnnotation}

imageAnnotation = annotation['imageAnnotation']
imageAnnotation['imageInformation']['productFirstLineUtcTime'] = start_utc
imageAnnotation['imageInformation']['productLastLineUtcTime'] = stop_utc
Expand All @@ -489,55 +497,69 @@ def download_burst(result, basedir, session):
imageAnnotation['imageInformation']['numberOfLines'] = str(length)
# imageStatistics and inputDimensionsList are not updated
product = product | {'imageAnnotation': imageAnnotation}

dopplerCentroid = annotation['dopplerCentroid']
items = filter_azimuth_time(dopplerCentroid['dcEstimateList']['dcEstimate'], start_utc_dt, stop_utc_dt)
dopplerCentroid['dcEstimateList'] = {'@count': len(items), 'dcEstimate': items}
product = product | {'dopplerCentroid': dopplerCentroid}

antennaPattern = annotation['antennaPattern']
items = filter_azimuth_time(antennaPattern['antennaPatternList']['antennaPattern'], start_utc_dt, stop_utc_dt)
antennaPattern['antennaPatternList'] = {'@count': len(items), 'antennaPattern': items}
product = product | {'antennaPattern': antennaPattern}

swathTiming = annotation['swathTiming']
items = filter_azimuth_time(swathTiming['burstList']['burst'], start_utc_dt, start_utc_dt, 1)
assert len(items) == 1, 'ERROR: unexpected bursts count, should be 1'
# add TiFF file information
items[0]['byteOffset'] = offset
swathTiming['burstList'] = {'@count': len(items), 'burst': items}
product = product | {'swathTiming': swathTiming}

geolocationGrid = annotation['geolocationGrid']
items = filter_azimuth_time(geolocationGrid['geolocationGridPointList']['geolocationGridPoint'], start_utc_dt, stop_utc_dt, 1)
# re-numerate line numbers for the burst
for item in items: item['line'] = str(int(item['line']) - (int(length) * burstIndex))
geolocationGrid['geolocationGridPointList'] = {'@count': len(items), 'geolocationGridPoint': items}
product = product | {'geolocationGrid': geolocationGrid}

product = product | {'coordinateConversion': annotation['coordinateConversion']}
product = product | {'swathMerging': annotation['swathMerging']}

with open(xml_file, 'w') as file:
file.write(xmltodict.unparse({'product': product}, pretty=True, indent=' '))

# prepare authorized connection
if session is None:
session = self._get_asf_session()

with tqdm(desc=f'ASF Downloading Bursts Catalog', total=1) as pbar:
results = asf_search.granule_search(bursts_missed)
pbar.update(1)

if n_jobs is None or debug == True:
print ('Note: sequential joblib processing is applied when "n_jobs" is None or "debug" is True.')
joblib_backend = 'sequential'

def download_burst_with_retry(result, basedir, session, retries=30, timeout_second=3):
for retry in range(retries):
try:
download_burst(result, basedir, session)
return True
except Exception as e:
print(f'ERROR: download attempt {retry+1} failed for {result}: {e}')
if retry + 1 == retries:
return False
time.sleep(timeout_second)

# download bursts
with self.tqdm_joblib(tqdm(desc='ASF Downloading Sentinel-1 Bursts', total=len(bursts_missed))) as progress_bar:
joblib.Parallel(n_jobs=n_jobs, backend=joblib_backend)(joblib.delayed(download_burst)\
statuses = joblib.Parallel(n_jobs=n_jobs, backend=joblib_backend)(joblib.delayed(download_burst_with_retry)\
(result, basedir, session) for result in results)


failed_count = statuses.count(False)
if failed_count > 0:
raise Exception(f'Bursts downloading failed for {failed_count} bursts.')
# parse processed bursts and convert to dataframe
bursts_downloaded = pd.DataFrame(bursts_missed, columns=['burst'])
# return the results in a user-friendly dataframe
Expand Down

0 comments on commit 98337a6

Please sign in to comment.