Skip to content

Commit

Permalink
Merge pull request #265 from phenobarbital/dev
Browse files Browse the repository at this point in the history
basic background queue for running tasks in background
  • Loading branch information
phenobarbital authored Jul 10, 2024
2 parents 9fe6f50 + 5aa4001 commit 641d958
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 4 deletions.
166 changes: 166 additions & 0 deletions navigator/background/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import sys
from typing import Union, Optional, Any
from collections.abc import Awaitable, Callable
if sys.version_info >= (3, 10): # pragma: no cover
from typing import ParamSpec
else: # pragma: no cover
from typing_extensions import ParamSpec

from importlib import import_module
from functools import partial
from concurrent.futures import ThreadPoolExecutor
import asyncio
from aiohttp import web
from navconfig.logging import logging
from navigator.conf import QUEUE_CALLBACK


P = ParamSpec("P")

SERVICE_NAME: str = 'service_queue'


class BackgroundQueue:
"""BackgroundQueue.
Asyncio Queue with for background processing.
TODO:
- Add Task Timeout
- Add Task Retry
- Adding Wrapper Support
"""
service_name: str = SERVICE_NAME

def __init__(self, max_workers: int = 5, **kwargs: P.kwargs) -> None:
self.logger = logging.getLogger('NAV.Queue')
self.max_workers = max_workers
self.queue_size = kwargs.get('queue_size', 10)
self.queue = asyncio.Queue(
maxsize=self.queue_size
)
self.consumers: list = []
self.logger.notice(
f'Started Queue Manager with size: {self.queue_size}'
)
### Getting Queue Callback (called when queue object is consumed)
self._callback: Union[Callable, Awaitable] = self.get_callback(
QUEUE_CALLBACK
)
self.logger.notice(
f'Callback Queue: {self._callback!r}'
)
self.service_name: str = kwargs.get('service_name', SERVICE_NAME)

def setup(self, app: Optional[web.Application]) -> None:
if isinstance(app, web.Application):
self.app = app # register the app into the Extension
else:
self.app = app.get_app() # Nav Application
# Add Manager to main Application:
self.app[self.service_name] = self
self.app.on_startup.append(self.on_startup)
self.app.on_cleanup.append(self.on_cleanup)

async def on_cleanup(self, app: web.Application) -> None:
"""Application On cleanup."""
await self.empty_queue()
self.logger.info('Background Queue Processor Stopped.')

async def on_startup(self, app: web.Application) -> None:
"""Application On startup."""
await self.fire_consumers()
self.logger.info('Background Queue Processor Started.')

async def put(
self,
fn: Callable[P, Awaitable],
*args: P.args,
**kwargs: P.kwargs
) -> None:
try:
if isinstance(fn, partial):
await self.queue.put(fn)
elif callable(fn):
task = (fn, args, kwargs)
await self.queue.put(task)
else:
self.queue.put_nowait(task)
await asyncio.sleep(.1)
return True
except asyncio.queues.QueueFull:
self.logger.error(
f"Task Queue is Full, discarding Task {fn!r}"
)
raise

async def task_callback(self, task: Any, **kwargs: P.kwargs):
self.logger.info(
f'Task Consumed: {task!r} with ID {task.id}'
)

def get_callback(self, done_callback: str) -> Union[Callable, Awaitable]:
if not done_callback:
## returns a simple logger:
return self.task_callback
try:
parts = done_callback.split(".")
bkname = parts.pop()
classpath = ".".join(parts)
module = import_module(classpath, package=bkname)
return getattr(module, bkname)
except ImportError as ex:
raise RuntimeError(
f"Error loading Queue Callback {done_callback}: {ex}"
) from ex

async def empty_queue(self):
"""Processing and shutting down the Queue."""
while not self.queue.empty():
self.queue.get_nowait()
self.queue.task_done()
await self.queue.join()
# also: cancel the idle consumers:
for c in self.consumers:
try:
c.cancel()
except asyncio.CancelledError:
pass

async def process_queue(self):
loop = asyncio.get_running_loop()
executor = ThreadPoolExecutor(max_workers=self.max_workers)
while True:
task = await self.queue.get()
if task is None:
break # Exit signal
self.logger.info(
f"Task started {task}"
)
result = None
try:
if isinstance(task, partial):
result = await loop.run_in_executor(executor, task)
else:
# Unpack the function and its arguments
func, args, kwargs = task
if asyncio.iscoroutinefunction(func):
result = await func(*args, **kwargs)
elif callable(func):
result = await loop.run_in_executor(
executor, func, *args, **kwargs
)
finally:
### Task Completed
self.queue.task_done()
await self._callback(
task, result=result
)

async def fire_consumers(self):
"""Fire up the Task consumers."""
for _ in range(self.max_workers - 1):
task = asyncio.create_task(
self.process_queue()
)
self.consumers.append(task)
5 changes: 5 additions & 0 deletions navigator/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@
MEMCACHE_HOST = config.get("MEMCACHE_HOST", "localhost")
MEMCACHE_PORT = config.get("MEMCACHE_PORT", 11211)

"""
Background Tasks
"""
QUEUE_CALLBACK = config.get('QUEUE_CALLBACK', fallback=None)

# get configuration settings (user can override settings).
try:
from navconfig.conf import * # pylint: disable=W0401,W0614
Expand Down
2 changes: 1 addition & 1 deletion navigator/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
__description__ = (
"Navigator Web Framework based on aiohttp, " "with batteries included."
)
__version__ = "2.8.55"
__version__ = "2.9.0"
__author__ = "Jesus Lara"
__author_email__ = "jesuslarag@gmail.com"
__license__ = "BSD"
2 changes: 1 addition & 1 deletion navigator/views/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def configure(cls, app: WebApp, path: str = None) -> WebApp:
r"{url}{{meta:(:.*)?}}".format(url=url), cls
)

async def validate_payload(self, data: Optional[dict] = None):
async def validate_payload(self, data: Optional[Union[dict, list]] = None):
"""Get information for usage in Form."""
if not data:
data = await self.json_data()
Expand Down
3 changes: 1 addition & 2 deletions navigator/views/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class BaseHandler(ABC):
_allowed = ["get", "post", "put", "patch", "delete", "options", "head"]
_allowed_methods = ["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS", "HEAD"]


def __init__(self, *args, **kwargs):
self._now = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
self._loop = asyncio.get_event_loop()
Expand Down Expand Up @@ -464,8 +463,8 @@ class BaseView(CorsViewMixin, web.View, BaseHandler, AbstractView):
"*": aiohttp_cors.ResourceOptions(
allow_credentials=True,
expose_headers="*",
allow_methods="*",
allow_headers="*",
allow_methods="*",
max_age=7200,
)
}
Expand Down

0 comments on commit 641d958

Please sign in to comment.