diff --git a/src/plumpy/processes.py b/src/plumpy/processes.py index feeb3c4e..26fe2b91 100644 --- a/src/plumpy/processes.py +++ b/src/plumpy/processes.py @@ -1027,11 +1027,36 @@ def _schedule_rpc(self, callback: Callable[..., Any], *args: Any, **kwargs: Any) async def run_callback() -> None: with capture_exceptions(kiwi_future): - result = callback(*args, **kwargs) - while asyncio.isfuture(result): - result = await result + try: + result = callback(*args, **kwargs) + except Exception as exc: + import traceback + import inspect + # Get traceback as a string + tb_str = ''.join(traceback.format_exception(type(exc), exc, exc.__traceback__)) + + # Attempt to get file and line number where the callback is defined + # Note: This might fail for certain built-in or dynamically generated functions. + # If it fails, just skip that part. + try: + source_file = inspect.getfile(callback) + # getsourcelines returns a tuple (list_of_source_lines, starting_line_number) + _, start_line = inspect.getsourcelines(callback) + callback_location = f"{source_file}:{start_line}" + except Exception: + callback_location = "" + + # Include the callback name, file/line info, and the full traceback in the message + raise RuntimeError( + f"Error invoking callback '{callback.__name__}' at {callback_location}.\n" + f"Exception: {type(exc).__name__}: {exc}\n\n" + f"Full Traceback:\n{tb_str}" + ) from exc + else: + while asyncio.isfuture(result): + result = await result - kiwi_future.set_result(result) + kiwi_future.set_result(result) # Schedule the task and give back a kiwi future asyncio.run_coroutine_threadsafe(run_callback(), self.loop)