Skip to content

Commit

Permalink
Merge branch 'collect-robustness'
Browse files Browse the repository at this point in the history
  • Loading branch information
avivrosenberg committed Mar 1, 2024
2 parents d521d52 + b7db15b commit 4becb18
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 9 deletions.
3 changes: 2 additions & 1 deletion src/pp5/collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,10 @@ def collect(self) -> dict:
step_status, step_message = "SUCCESS", None
except Exception as e:
LOGGER.error(
f"Unexpected exception in top-level " f"collect", exc_info=e
f"Unexpected exception in top-level collect", exc_info=e
)
step_status, step_message = "FAIL", f"{e}"
break # don't move on to the next step
finally:
step_elapsed = time.time() - step_start_time
step_elapsed = elapsed_seconds_to_dhms(step_elapsed)
Expand Down
19 changes: 11 additions & 8 deletions src/pp5/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,16 @@ def yield_async_results(
failed_results: Set[_T] = set()

def _yield_result(_res_name: _T, _res: AsyncResult):
result = None
try:
result = _res.get()
success_results.add(_res_name)
yield _res_name, _res.get()
except Exception as e:
if re_raise:
raise e
LOGGER.error(
f"AsyncResult {_res_name} raised {type(e)}: " f"{e}", exc_info=e
)
LOGGER.error(f"AsyncResult {_res_name} raised {type(e)}: {e}", exc_info=e)
failed_results.add(_res_name)
yield _res_name, None
yield _res_name, result

while True:
# Split by whether the result is ready, so we can get these without waiting.
Expand All @@ -175,7 +174,10 @@ def _yield_result(_res_name: _T, _res: AsyncResult):

# Break if there's nothing left to do
if not len(not_ready_results):
LOGGER.info(f"Finished processing {len(async_results)} async results")
LOGGER.info(
f"Finished processing {len(async_results)} async results "
f"(#success={len(success_results)}, #failed={len(failed_results)})"
)
break

# Get next not-ready result
Expand All @@ -194,8 +196,9 @@ def _yield_result(_res_name: _T, _res: AsyncResult):
failed_results.add(res_name)
LOGGER.error(f"*** MAX RETRIES REACHED FOR {res_name}")

# Sanity: Make sure we processed all AsyncResults
assert len(success_results) + len(failed_results) == len(async_results)
# Sanity: Make sure we processed all AsyncResults (should never happen)
if len(success_results) + len(failed_results) != len(async_results):
LOGGER.warning("AsyncResult mismatch: not all async results were processed")


def _cleanup():
Expand Down

0 comments on commit 4becb18

Please sign in to comment.