Skip to content

Commit

Permalink
2.18.0 - Add function
Browse files Browse the repository at this point in the history
  • Loading branch information
Someguy123 committed Jul 20, 2020
1 parent dcb53ca commit 0d3b81d
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
run\_coro\_thread\_async
========================

.. currentmodule:: privex.helpers.asyncx

.. autofunction:: run_coro_thread_async
1 change: 1 addition & 0 deletions docs/source/helpers/privex.helpers.asyncx.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ AsyncIO Helpers / Wrappers
is_async_context
loop_run
run_coro_thread
run_coro_thread_async
run_coro_thread_base
run_sync

Expand Down
2 changes: 1 addition & 1 deletion privex/helpers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def _setup_logging(level=logging.WARNING):
log = _setup_logging()
name = 'helpers'

VERSION = '2.17.1'
VERSION = '2.18.0'



63 changes: 62 additions & 1 deletion privex/helpers/asyncx.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
__all__ = [
'awaitable', 'AWAITABLE_BLACKLIST_MODS', 'AWAITABLE_BLACKLIST', 'AWAITABLE_BLACKLIST_FUNCS', 'run_sync', 'aobject',
'call_sys_async', 'async_sync', 'awaitable_class', 'AwaitableMixin', 'loop_run', 'is_async_context', 'await_if_needed',
'get_async_type', 'run_coro_thread', 'run_coro_thread_base', 'coro_thread_func'
'get_async_type', 'run_coro_thread', 'run_coro_thread_async', 'run_coro_thread_base', 'coro_thread_func'
]


Expand Down Expand Up @@ -136,6 +136,15 @@ def run_coro_thread(func: callable, *args, **kwargs) -> Any:
Run a Python AsyncIO coroutine function within a new event loop using a thread, and return the result / raise any exceptions
as if it were ran normally within an AsyncIO function.
.. Caution:: If you're wanting to run a coroutine within a thread from an AsyncIO function/method, then you should
use :func:`.run_coro_thread_async` instead, which uses :func:`asyncio.sleep` while waiting for a result/exception
to be transmitted via a queue.
This allows you to run and wait for multiple coroutine threads simultaneously, as there's no synchronous blocking
wait - unlike this function.
This will usually allow you to run coroutines from a synchronous function without running into the dreaded "Event loop is already
running" error - since the coroutine will be ran inside of a thread with it's own dedicated event loop.
Expand Down Expand Up @@ -174,7 +183,59 @@ def run_coro_thread(func: callable, *args, **kwargs) -> Any:
if isinstance(res, (Exception, BaseException)):
raise res
return res


async def run_coro_thread_async(func: callable, *args, _queue_timeout=30.0, _queue_sleep=0.05, **kwargs) -> Any:
"""
AsyncIO version of :func:`.run_coro_thread` which uses :func:`asyncio.sleep` while waiting on a result from the queue,
allowing you to run multiple AsyncIO coroutines which call blocking synchronous code - simultaneously,
e.g. by using :func:`asyncio.gather`
Below is an example of running an example coroutine ``hello`` which runs the synchronous blocking ``time.sleep``.
Using :func:`.run_coro_thread_async` plus :func:`asyncio.gather` - we can run ``hello`` 4 times simultaneously,
despite the use of the blocking :func:`time.sleep`.
**Basic usage**::
>>> import asyncio
>>> from privex.helpers.asyncx import run_coro_thread_async
>>> async def hello(world):
... time.sleep(1)
... return world * 10
>>> await asyncio.gather(run_coro_thread_async(hello, 5), run_coro_thread_async(hello, 15),
... run_coro_thread_async(hello, 90), run_coro_thread_async(hello, 25))
[50, 150, 900, 250]
:param callable func: A reference to the ``async def`` coroutine function that you want to run
:param args: Positional arguments to pass-through to the coroutine function
:param kwargs: Keyword arguments to pass-through to the coroutine function
:param float|int _queue_timeout: (default: ``30``) Maximum amount of seconds to wait for a result or exception
from ``func`` before giving up.
:param _queue_sleep: (default: ``0.05``) Amount of time to AsyncIO sleep between each check of the result queue
:return Any coro_res: The result returned from the coroutine ``func``
"""
_queue_timeout, _queue_sleep = float(_queue_timeout), float(_queue_sleep)
thread_waited = 0.0
q = queue.Queue()
t_co = run_coro_thread_base(func, *args, **kwargs, _output_queue=q)

res = NO_RESULT
while res == NO_RESULT:
if thread_waited >= _queue_timeout:
raise TimeoutError(f"No thread result after waiting {thread_waited} seconds...")
try:
_res = q.get_nowait()
if isinstance(_res, (Exception, BaseException)):
raise _res
res = _res
except queue.Empty:
thread_waited += _queue_sleep
await asyncio.sleep(_queue_sleep)
t_co.join(5)

return res


def run_sync(func, *args, **kwargs):
"""
Expand Down
25 changes: 24 additions & 1 deletion tests/asyncx/test_async_common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import inspect
import time
from datetime import datetime
from time import sleep
from typing import Union, Coroutine, Type, Callable

Expand Down Expand Up @@ -275,4 +277,25 @@ async def another_func(lorem: int, ipsum: int):
with self.assertRaisesRegex(AttributeError, 'ipsum over 100'):
helpers.run_coro_thread(another_func, 5, 900)


def test_run_coro_thread_async(self):
"""
Test :func:`.run_coro_thread_async` using :func:`asyncio.gather` to confirm coroutines with blocking synchronous code
are being ran simultaneously despite their blocking synchronous code.
"""
rcta = helpers.run_coro_thread_async

async def _hello(world):
# time.sleep is used in-place of a synchronous blocking function
time.sleep(1)
return world * 10

async def hello():
return await asyncio.gather(rcta(_hello, 5), rcta(_hello, 20), rcta(_hello, 2), rcta(_hello, 80))

start = datetime.utcnow()
res = helpers.loop_run(hello())
end = datetime.utcnow()
# After running 4 instances of _hello using run_coro_thread_async, check that no more than
# 2 seconds have passed - if they were ran synchronously, they would've taken 4 or more seconds.
self.assertLessEqual((end - start).total_seconds(), 2)
self.assertListEqual(res, [50, 200, 20, 800])

0 comments on commit 0d3b81d

Please sign in to comment.