Skip to content

Commit

Permalink
Enhance compute_reframe() to process data by chunks to prevent joblib…
Browse files Browse the repository at this point in the history
… deadlocks on Apple Silicon
  • Loading branch information
Alexey Pechnikov committed May 21, 2024
1 parent 70ead3a commit 15e5c6f
Showing 1 changed file with 36 additions and 11 deletions.
47 changes: 36 additions & 11 deletions pygmtsar/pygmtsar/Stack_reframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def _reframe_subswath(self, subswath, date, geometry, debug=False):
if geom.intersects(geometry)])
return out

def compute_reframe(self, geometry=None, n_jobs=-1, **kwargs):
def compute_reframe(self, geometry=None, n_jobs=-1, queue=16, caption='Reframing', **kwargs):
"""
Reorder bursts from sequential scenes to cover the full orbit area or some bursts only.
Expand Down Expand Up @@ -174,21 +174,46 @@ def compute_reframe(self, geometry=None, n_jobs=-1, **kwargs):
import joblib
import pandas as pd

if n_jobs is None or ('debug' in kwargs and kwargs['debug'] == True):
print ('Note: sequential joblib processing is applied when "n_jobs" is None or "debug" is True.')
joblib_backend = 'sequential'
else:
joblib_backend = 'loky'

dates = self.df.index.unique().values
subswaths = self.get_subswaths()
# approximate subswath geometries from GCP
geometries = {subswath: self.df[self.df.subswath==subswath].geometry.unary_union for subswath in subswaths}

# process all the scenes
if n_jobs is None or ('debug' in kwargs and kwargs['debug'] == True):
print ('Note: sequential joblib processing is applied when "n_jobs" is None or "debug" is True.')
joblib_backend = 'sequential'
records = []
# Applying iterative processing to prevent Dask scheduler deadlocks.
stacksize = len(dates)
counter = 0
digits = len(str(stacksize))
# Splitting all the pairs into chunks, each containing approximately queue pairs.
#n_chunks = stacksize // queue if stacksize > queue else 1
if stacksize > queue:
chunks = [dates[i:i + queue] for i in range(0, stacksize, queue)]
n_chunks = len(chunks)
else:
joblib_backend = None
with self.tqdm_joblib(tqdm(desc='Reframing', total=len(dates)*len(subswaths))) as progress_bar:
records = joblib.Parallel(n_jobs=n_jobs, backend=joblib_backend)(joblib.delayed(self._reframe_subswath)\
(subswath, date,
geometry.intersection(geometries[subswath]) if geometry is not None else geometries[subswath],
**kwargs) for date in dates for subswath in subswaths)
chunks = [pairs]
n_chunks = 1
for chunk in chunks:
if n_chunks > 1:
chunk_caption = f'{caption}: {(counter+1):0{digits}}...{(counter+len(chunk)):0{digits}} from {stacksize}'
else:
chunk_caption = caption

if joblib_backend == 'loky':
# can be missed on some systems
from joblib.externals import loky
loky.get_reusable_executor(kill_workers=True).shutdown(wait=True)
with self.tqdm_joblib(tqdm(desc=chunk_caption, total=len(chunk)*len(subswaths))) as progress_bar:
chunk_records = joblib.Parallel(n_jobs=n_jobs, backend=joblib_backend)(joblib.delayed(self._reframe_subswath)\
(subswath, date,
geometry.intersection(geometries[subswath]) if geometry is not None else geometries[subswath],
**kwargs) for date in chunk for subswath in subswaths)
records.extend(chunk_records)
counter += len(chunk)

self.df = pd.concat(records)

0 comments on commit 15e5c6f

Please sign in to comment.