diff --git a/python/helpers/defer.py b/python/helpers/defer.py index 8ef474493..58fd44050 100644 --- a/python/helpers/defer.py +++ b/python/helpers/defer.py @@ -1,70 +1,61 @@ import asyncio import threading from concurrent.futures import Future +from typing import Any, Callable, Optional, Coroutine -class DeferredTask: - def __init__(self, func, *args, **kwargs): - self._loop: asyncio.AbstractEventLoop = None # type: ignore - self._task = None - self._future = Future() - self._task_initialized = threading.Event() - self._start_task(func, *args, **kwargs) +class EventLoopThread: + _instance = None + _lock = threading.Lock() - def _start_task(self, func, *args, **kwargs): - def run_in_thread(): - self._loop = asyncio.new_event_loop() - asyncio.set_event_loop(self._loop) - self._task = self._loop.create_task(self._run(func, *args, **kwargs)) - self._task_initialized.set() - self._loop.run_forever() + def __init__(self) -> None: + self.loop: asyncio.AbstractEventLoop = asyncio.new_event_loop() + self.thread: threading.Thread = threading.Thread(target=self._run_event_loop, daemon=True) + self.thread.start() - self._thread = threading.Thread(target=run_in_thread) - self._thread.start() + def __new__(cls) -> 'EventLoopThread': + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance.__init__() + return cls._instance - async def _run(self, func, *args, **kwargs): - try: - result = await func(*args, **kwargs) - self._future.set_result(result) - except Exception as e: - self._future.set_exception(e) - finally: - self._loop.call_soon_threadsafe(self._cleanup) + def _run_event_loop(self): + asyncio.set_event_loop(self.loop) + self.loop.run_forever() - def _cleanup(self): - self._loop.stop() + def run_coroutine(self, coro): + return asyncio.run_coroutine_threadsafe(coro, self.loop) - def is_ready(self): - return self._future.done() +class DeferredTask: + def __init__(self, func: Callable[..., Coroutine[Any, Any, Any]], *args: Any, **kwargs: Any) -> None: + self._event_loop_thread = EventLoopThread() + self._future: Future[Any] = self._event_loop_thread.run_coroutine(self._run(func, *args, **kwargs)) + + async def _run(self, func: Callable[..., Coroutine[Any, Any, Any]], *args: Any, **kwargs: Any) -> Any: + return await func(*args, **kwargs) - async def result(self, timeout=None): - if not self._task_initialized.wait(timeout): - raise RuntimeError("Task was not initialized properly.") + def is_ready(self) -> bool: + return self._future.done() + async def result(self, timeout: Optional[float] = None) -> Any: try: return await asyncio.wait_for(asyncio.wrap_future(self._future), timeout) except asyncio.TimeoutError: raise TimeoutError("The task did not complete within the specified timeout.") - def result_sync(self, timeout=None): - if not self._task_initialized.wait(timeout): - raise RuntimeError("Task was not initialized properly.") - + def result_sync(self, timeout: Optional[float] = None) -> Any: try: return self._future.result(timeout) except TimeoutError: raise TimeoutError("The task did not complete within the specified timeout.") - def kill(self): - if self._task and not self._task.done(): - self._loop.call_soon_threadsafe(self._task.cancel) + def kill(self) -> None: + if not self._future.done(): + self._future.cancel() - def is_alive(self): - return self._thread.is_alive() and not self._future.done() + def is_alive(self) -> bool: + return not self._future.done() - def __del__(self): - if self._loop and self._loop.is_running(): - self._loop.call_soon_threadsafe(self._cleanup) - if self._thread and self._thread.is_alive(): - self._thread.join() - if self._loop: - self._loop.close() \ No newline at end of file +# Helper function to run async code +async def run_async(func, *args, **kwargs): + return await func(*args, **kwargs) \ No newline at end of file