diff --git a/navigator/background/__init__.py b/navigator/background/__init__.py new file mode 100644 index 00000000..55d5d5bd --- /dev/null +++ b/navigator/background/__init__.py @@ -0,0 +1,166 @@ +import sys +from typing import Union, Optional, Any +from collections.abc import Awaitable, Callable +if sys.version_info >= (3, 10): # pragma: no cover + from typing import ParamSpec +else: # pragma: no cover + from typing_extensions import ParamSpec + +from importlib import import_module +from functools import partial +from concurrent.futures import ThreadPoolExecutor +import asyncio +from aiohttp import web +from navconfig.logging import logging +from navigator.conf import QUEUE_CALLBACK + + +P = ParamSpec("P") + +SERVICE_NAME: str = 'service_queue' + + +class BackgroundQueue: + """BackgroundQueue. + + Asyncio Queue with for background processing. + + TODO: + - Add Task Timeout + - Add Task Retry + - Adding Wrapper Support + """ + service_name: str = SERVICE_NAME + + def __init__(self, max_workers: int = 5, **kwargs: P.kwargs) -> None: + self.logger = logging.getLogger('NAV.Queue') + self.max_workers = max_workers + self.queue_size = kwargs.get('queue_size', 10) + self.queue = asyncio.Queue( + maxsize=self.queue_size + ) + self.consumers: list = [] + self.logger.notice( + f'Started Queue Manager with size: {self.queue_size}' + ) + ### Getting Queue Callback (called when queue object is consumed) + self._callback: Union[Callable, Awaitable] = self.get_callback( + QUEUE_CALLBACK + ) + self.logger.notice( + f'Callback Queue: {self._callback!r}' + ) + self.service_name: str = kwargs.get('service_name', SERVICE_NAME) + + def setup(self, app: Optional[web.Application]) -> None: + if isinstance(app, web.Application): + self.app = app # register the app into the Extension + else: + self.app = app.get_app() # Nav Application + # Add Manager to main Application: + self.app[self.service_name] = self + self.app.on_startup.append(self.on_startup) + self.app.on_cleanup.append(self.on_cleanup) + + async def on_cleanup(self, app: web.Application) -> None: + """Application On cleanup.""" + await self.empty_queue() + self.logger.info('Background Queue Processor Stopped.') + + async def on_startup(self, app: web.Application) -> None: + """Application On startup.""" + await self.fire_consumers() + self.logger.info('Background Queue Processor Started.') + + async def put( + self, + fn: Callable[P, Awaitable], + *args: P.args, + **kwargs: P.kwargs + ) -> None: + try: + if isinstance(fn, partial): + await self.queue.put(fn) + elif callable(fn): + task = (fn, args, kwargs) + await self.queue.put(task) + else: + self.queue.put_nowait(task) + await asyncio.sleep(.1) + return True + except asyncio.queues.QueueFull: + self.logger.error( + f"Task Queue is Full, discarding Task {fn!r}" + ) + raise + + async def task_callback(self, task: Any, **kwargs: P.kwargs): + self.logger.info( + f'Task Consumed: {task!r} with ID {task.id}' + ) + + def get_callback(self, done_callback: str) -> Union[Callable, Awaitable]: + if not done_callback: + ## returns a simple logger: + return self.task_callback + try: + parts = done_callback.split(".") + bkname = parts.pop() + classpath = ".".join(parts) + module = import_module(classpath, package=bkname) + return getattr(module, bkname) + except ImportError as ex: + raise RuntimeError( + f"Error loading Queue Callback {done_callback}: {ex}" + ) from ex + + async def empty_queue(self): + """Processing and shutting down the Queue.""" + while not self.queue.empty(): + self.queue.get_nowait() + self.queue.task_done() + await self.queue.join() + # also: cancel the idle consumers: + for c in self.consumers: + try: + c.cancel() + except asyncio.CancelledError: + pass + + async def process_queue(self): + loop = asyncio.get_running_loop() + executor = ThreadPoolExecutor(max_workers=self.max_workers) + while True: + task = await self.queue.get() + if task is None: + break # Exit signal + self.logger.info( + f"Task started {task}" + ) + result = None + try: + if isinstance(task, partial): + result = await loop.run_in_executor(executor, task) + else: + # Unpack the function and its arguments + func, args, kwargs = task + if asyncio.iscoroutinefunction(func): + result = await func(*args, **kwargs) + elif callable(func): + result = await loop.run_in_executor( + executor, func, *args, **kwargs + ) + finally: + ### Task Completed + self.queue.task_done() + await self._callback( + task, result=result + ) + + async def fire_consumers(self): + """Fire up the Task consumers.""" + for _ in range(self.max_workers - 1): + task = asyncio.create_task( + self.process_queue() + ) + self.consumers.append(task) diff --git a/navigator/conf.py b/navigator/conf.py index 26cd0819..8bddda9b 100644 --- a/navigator/conf.py +++ b/navigator/conf.py @@ -200,6 +200,11 @@ MEMCACHE_HOST = config.get("MEMCACHE_HOST", "localhost") MEMCACHE_PORT = config.get("MEMCACHE_PORT", 11211) +""" +Background Tasks +""" +QUEUE_CALLBACK = config.get('QUEUE_CALLBACK', fallback=None) + # get configuration settings (user can override settings). try: from navconfig.conf import * # pylint: disable=W0401,W0614 diff --git a/navigator/version.py b/navigator/version.py index df6c88ea..962d47c3 100644 --- a/navigator/version.py +++ b/navigator/version.py @@ -4,7 +4,7 @@ __description__ = ( "Navigator Web Framework based on aiohttp, " "with batteries included." ) -__version__ = "2.8.55" +__version__ = "2.9.0" __author__ = "Jesus Lara" __author_email__ = "jesuslarag@gmail.com" __license__ = "BSD" diff --git a/navigator/views/abstract.py b/navigator/views/abstract.py index 1794a57c..8c9871ab 100644 --- a/navigator/views/abstract.py +++ b/navigator/views/abstract.py @@ -195,7 +195,7 @@ def configure(cls, app: WebApp, path: str = None) -> WebApp: r"{url}{{meta:(:.*)?}}".format(url=url), cls ) - async def validate_payload(self, data: Optional[dict] = None): + async def validate_payload(self, data: Optional[Union[dict, list]] = None): """Get information for usage in Form.""" if not data: data = await self.json_data() diff --git a/navigator/views/base.py b/navigator/views/base.py index 2684a4da..4e57220f 100644 --- a/navigator/views/base.py +++ b/navigator/views/base.py @@ -38,7 +38,6 @@ class BaseHandler(ABC): _allowed = ["get", "post", "put", "patch", "delete", "options", "head"] _allowed_methods = ["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS", "HEAD"] - def __init__(self, *args, **kwargs): self._now = datetime.datetime.now().strftime("%Y%m%d%H%M%S") self._loop = asyncio.get_event_loop() @@ -464,8 +463,8 @@ class BaseView(CorsViewMixin, web.View, BaseHandler, AbstractView): "*": aiohttp_cors.ResourceOptions( allow_credentials=True, expose_headers="*", - allow_methods="*", allow_headers="*", + allow_methods="*", max_age=7200, ) }