Skip to content

Commit

Permalink
More explicit exception from rpc call wind up
Browse files Browse the repository at this point in the history
  • Loading branch information
unkcpz committed Dec 20, 2024
1 parent 582787e commit d953f06
Showing 1 changed file with 29 additions and 4 deletions.
33 changes: 29 additions & 4 deletions src/plumpy/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1027,11 +1027,36 @@ def _schedule_rpc(self, callback: Callable[..., Any], *args: Any, **kwargs: Any)

async def run_callback() -> None:
with capture_exceptions(kiwi_future):
result = callback(*args, **kwargs)
while asyncio.isfuture(result):
result = await result
try:
result = callback(*args, **kwargs)
except Exception as exc:
import traceback
import inspect
# Get traceback as a string
tb_str = ''.join(traceback.format_exception(type(exc), exc, exc.__traceback__))

# Attempt to get file and line number where the callback is defined
# Note: This might fail for certain built-in or dynamically generated functions.
# If it fails, just skip that part.
try:
source_file = inspect.getfile(callback)
# getsourcelines returns a tuple (list_of_source_lines, starting_line_number)
_, start_line = inspect.getsourcelines(callback)
callback_location = f"{source_file}:{start_line}"
except Exception:
callback_location = "<unknown location>"

# Include the callback name, file/line info, and the full traceback in the message
raise RuntimeError(
f"Error invoking callback '{callback.__name__}' at {callback_location}.\n"
f"Exception: {type(exc).__name__}: {exc}\n\n"
f"Full Traceback:\n{tb_str}"
) from exc
else:
while asyncio.isfuture(result):
result = await result

kiwi_future.set_result(result)
kiwi_future.set_result(result)

# Schedule the task and give back a kiwi future
asyncio.run_coroutine_threadsafe(run_callback(), self.loop)
Expand Down

0 comments on commit d953f06

Please sign in to comment.