From 15e5c6f8ce8e56e17aaeeec87f83fab103cb7748 Mon Sep 17 00:00:00 2001 From: Alexey Pechnikov Date: Tue, 21 May 2024 14:03:59 +0700 Subject: [PATCH] Enhance compute_reframe() to process data by chunks to prevent joblib deadlocks on Apple Silicon --- pygmtsar/pygmtsar/Stack_reframe.py | 47 +++++++++++++++++++++++------- 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/pygmtsar/pygmtsar/Stack_reframe.py b/pygmtsar/pygmtsar/Stack_reframe.py index 3bccda60..92c780f7 100644 --- a/pygmtsar/pygmtsar/Stack_reframe.py +++ b/pygmtsar/pygmtsar/Stack_reframe.py @@ -140,7 +140,7 @@ def _reframe_subswath(self, subswath, date, geometry, debug=False): if geom.intersects(geometry)]) return out - def compute_reframe(self, geometry=None, n_jobs=-1, **kwargs): + def compute_reframe(self, geometry=None, n_jobs=-1, queue=16, caption='Reframing', **kwargs): """ Reorder bursts from sequential scenes to cover the full orbit area or some bursts only. @@ -174,21 +174,46 @@ def compute_reframe(self, geometry=None, n_jobs=-1, **kwargs): import joblib import pandas as pd + if n_jobs is None or ('debug' in kwargs and kwargs['debug'] == True): + print ('Note: sequential joblib processing is applied when "n_jobs" is None or "debug" is True.') + joblib_backend = 'sequential' + else: + joblib_backend = 'loky' + dates = self.df.index.unique().values subswaths = self.get_subswaths() # approximate subswath geometries from GCP geometries = {subswath: self.df[self.df.subswath==subswath].geometry.unary_union for subswath in subswaths} - # process all the scenes - if n_jobs is None or ('debug' in kwargs and kwargs['debug'] == True): - print ('Note: sequential joblib processing is applied when "n_jobs" is None or "debug" is True.') - joblib_backend = 'sequential' + records = [] + # Applying iterative processing to prevent Dask scheduler deadlocks. + stacksize = len(dates) + counter = 0 + digits = len(str(stacksize)) + # Splitting all the pairs into chunks, each containing approximately queue pairs. + #n_chunks = stacksize // queue if stacksize > queue else 1 + if stacksize > queue: + chunks = [dates[i:i + queue] for i in range(0, stacksize, queue)] + n_chunks = len(chunks) else: - joblib_backend = None - with self.tqdm_joblib(tqdm(desc='Reframing', total=len(dates)*len(subswaths))) as progress_bar: - records = joblib.Parallel(n_jobs=n_jobs, backend=joblib_backend)(joblib.delayed(self._reframe_subswath)\ - (subswath, date, - geometry.intersection(geometries[subswath]) if geometry is not None else geometries[subswath], - **kwargs) for date in dates for subswath in subswaths) + chunks = [pairs] + n_chunks = 1 + for chunk in chunks: + if n_chunks > 1: + chunk_caption = f'{caption}: {(counter+1):0{digits}}...{(counter+len(chunk)):0{digits}} from {stacksize}' + else: + chunk_caption = caption + + if joblib_backend == 'loky': + # can be missed on some systems + from joblib.externals import loky + loky.get_reusable_executor(kill_workers=True).shutdown(wait=True) + with self.tqdm_joblib(tqdm(desc=chunk_caption, total=len(chunk)*len(subswaths))) as progress_bar: + chunk_records = joblib.Parallel(n_jobs=n_jobs, backend=joblib_backend)(joblib.delayed(self._reframe_subswath)\ + (subswath, date, + geometry.intersection(geometries[subswath]) if geometry is not None else geometries[subswath], + **kwargs) for date in chunk for subswath in subswaths) + records.extend(chunk_records) + counter += len(chunk) self.df = pd.concat(records)