diff --git a/src/pp5/collect.py b/src/pp5/collect.py index 7c75cac..231786a 100644 --- a/src/pp5/collect.py +++ b/src/pp5/collect.py @@ -167,9 +167,10 @@ def collect(self) -> dict: step_status, step_message = "SUCCESS", None except Exception as e: LOGGER.error( - f"Unexpected exception in top-level " f"collect", exc_info=e + f"Unexpected exception in top-level collect", exc_info=e ) step_status, step_message = "FAIL", f"{e}" + break # don't move on to the next step finally: step_elapsed = time.time() - step_start_time step_elapsed = elapsed_seconds_to_dhms(step_elapsed) diff --git a/src/pp5/parallel.py b/src/pp5/parallel.py index 0383b61..84ef8bd 100644 --- a/src/pp5/parallel.py +++ b/src/pp5/parallel.py @@ -143,17 +143,16 @@ def yield_async_results( failed_results: Set[_T] = set() def _yield_result(_res_name: _T, _res: AsyncResult): + result = None try: + result = _res.get() success_results.add(_res_name) - yield _res_name, _res.get() except Exception as e: if re_raise: raise e - LOGGER.error( - f"AsyncResult {_res_name} raised {type(e)}: " f"{e}", exc_info=e - ) + LOGGER.error(f"AsyncResult {_res_name} raised {type(e)}: {e}", exc_info=e) failed_results.add(_res_name) - yield _res_name, None + yield _res_name, result while True: # Split by whether the result is ready, so we can get these without waiting. @@ -175,7 +174,10 @@ def _yield_result(_res_name: _T, _res: AsyncResult): # Break if there's nothing left to do if not len(not_ready_results): - LOGGER.info(f"Finished processing {len(async_results)} async results") + LOGGER.info( + f"Finished processing {len(async_results)} async results " + f"(#success={len(success_results)}, #failed={len(failed_results)})" + ) break # Get next not-ready result @@ -194,8 +196,9 @@ def _yield_result(_res_name: _T, _res: AsyncResult): failed_results.add(res_name) LOGGER.error(f"*** MAX RETRIES REACHED FOR {res_name}") - # Sanity: Make sure we processed all AsyncResults - assert len(success_results) + len(failed_results) == len(async_results) + # Sanity: Make sure we processed all AsyncResults (should never happen) + if len(success_results) + len(failed_results) != len(async_results): + LOGGER.warning("AsyncResult mismatch: not all async results were processed") def _cleanup():