-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix for event loops in defer.py
- Loading branch information
Showing
1 changed file
with
38 additions
and
47 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,70 +1,61 @@ | ||
import asyncio | ||
import threading | ||
from concurrent.futures import Future | ||
from typing import Any, Callable, Optional, Coroutine | ||
|
||
class DeferredTask: | ||
def __init__(self, func, *args, **kwargs): | ||
self._loop: asyncio.AbstractEventLoop = None # type: ignore | ||
self._task = None | ||
self._future = Future() | ||
self._task_initialized = threading.Event() | ||
self._start_task(func, *args, **kwargs) | ||
class EventLoopThread: | ||
_instance = None | ||
_lock = threading.Lock() | ||
|
||
def _start_task(self, func, *args, **kwargs): | ||
def run_in_thread(): | ||
self._loop = asyncio.new_event_loop() | ||
asyncio.set_event_loop(self._loop) | ||
self._task = self._loop.create_task(self._run(func, *args, **kwargs)) | ||
self._task_initialized.set() | ||
self._loop.run_forever() | ||
def __init__(self) -> None: | ||
self.loop: asyncio.AbstractEventLoop = asyncio.new_event_loop() | ||
self.thread: threading.Thread = threading.Thread(target=self._run_event_loop, daemon=True) | ||
self.thread.start() | ||
|
||
self._thread = threading.Thread(target=run_in_thread) | ||
self._thread.start() | ||
def __new__(cls) -> 'EventLoopThread': | ||
with cls._lock: | ||
if cls._instance is None: | ||
cls._instance = super().__new__(cls) | ||
cls._instance.__init__() | ||
return cls._instance | ||
|
||
async def _run(self, func, *args, **kwargs): | ||
try: | ||
result = await func(*args, **kwargs) | ||
self._future.set_result(result) | ||
except Exception as e: | ||
self._future.set_exception(e) | ||
finally: | ||
self._loop.call_soon_threadsafe(self._cleanup) | ||
def _run_event_loop(self): | ||
asyncio.set_event_loop(self.loop) | ||
self.loop.run_forever() | ||
|
||
def _cleanup(self): | ||
self._loop.stop() | ||
def run_coroutine(self, coro): | ||
return asyncio.run_coroutine_threadsafe(coro, self.loop) | ||
|
||
def is_ready(self): | ||
return self._future.done() | ||
class DeferredTask: | ||
def __init__(self, func: Callable[..., Coroutine[Any, Any, Any]], *args: Any, **kwargs: Any) -> None: | ||
self._event_loop_thread = EventLoopThread() | ||
self._future: Future[Any] = self._event_loop_thread.run_coroutine(self._run(func, *args, **kwargs)) | ||
|
||
async def _run(self, func: Callable[..., Coroutine[Any, Any, Any]], *args: Any, **kwargs: Any) -> Any: | ||
return await func(*args, **kwargs) | ||
|
||
async def result(self, timeout=None): | ||
if not self._task_initialized.wait(timeout): | ||
raise RuntimeError("Task was not initialized properly.") | ||
def is_ready(self) -> bool: | ||
return self._future.done() | ||
|
||
async def result(self, timeout: Optional[float] = None) -> Any: | ||
try: | ||
return await asyncio.wait_for(asyncio.wrap_future(self._future), timeout) | ||
except asyncio.TimeoutError: | ||
raise TimeoutError("The task did not complete within the specified timeout.") | ||
|
||
def result_sync(self, timeout=None): | ||
if not self._task_initialized.wait(timeout): | ||
raise RuntimeError("Task was not initialized properly.") | ||
|
||
def result_sync(self, timeout: Optional[float] = None) -> Any: | ||
try: | ||
return self._future.result(timeout) | ||
except TimeoutError: | ||
raise TimeoutError("The task did not complete within the specified timeout.") | ||
|
||
def kill(self): | ||
if self._task and not self._task.done(): | ||
self._loop.call_soon_threadsafe(self._task.cancel) | ||
def kill(self) -> None: | ||
if not self._future.done(): | ||
self._future.cancel() | ||
|
||
def is_alive(self): | ||
return self._thread.is_alive() and not self._future.done() | ||
def is_alive(self) -> bool: | ||
return not self._future.done() | ||
|
||
def __del__(self): | ||
if self._loop and self._loop.is_running(): | ||
self._loop.call_soon_threadsafe(self._cleanup) | ||
if self._thread and self._thread.is_alive(): | ||
self._thread.join() | ||
if self._loop: | ||
self._loop.close() | ||
# Helper function to run async code | ||
async def run_async(func, *args, **kwargs): | ||
return await func(*args, **kwargs) |