Skip to content

Commit

Permalink
Merge pull request #267 from phenobarbital/dev
Browse files Browse the repository at this point in the history
sample background executor (like starlette) for navigator
  • Loading branch information
phenobarbital authored Jul 11, 2024
2 parents c05554a + e0fc462 commit ce5cde6
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 7 deletions.
44 changes: 38 additions & 6 deletions navigator/background/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async def on_startup(self, app: web.Application) -> None:

async def put(
self,
fn: Callable[P, Awaitable],
fn: Union[partial, Callable[P, Awaitable]],
*args: P.args,
**kwargs: P.kwargs
) -> None:
Expand Down Expand Up @@ -129,7 +129,6 @@ async def empty_queue(self):

async def process_queue(self):
loop = asyncio.get_running_loop()
executor = ThreadPoolExecutor(max_workers=self.max_workers)
while True:
task = await self.queue.get()
if task is None:
Expand All @@ -140,16 +139,18 @@ async def process_queue(self):
result = None
try:
if isinstance(task, partial):
result = await loop.run_in_executor(executor, task)
with ThreadPoolExecutor(max_workers=1) as executor:
result = await loop.run_in_executor(executor, task)
else:
# Unpack the function and its arguments
func, args, kwargs = task
if asyncio.iscoroutinefunction(func):
result = await func(*args, **kwargs)
elif callable(func):
result = await loop.run_in_executor(
executor, func, *args, **kwargs
)
with ThreadPoolExecutor(max_workers=1) as executor:
result = await loop.run_in_executor(
executor, func, *args, **kwargs
)
finally:
### Task Completed
self.queue.task_done()
Expand All @@ -167,3 +168,34 @@ async def fire_consumers(self):
self.process_queue()
)
self.consumers.append(task)


class BackgroundTask:
"""BackgroundTask.
Calling functions in the background.
"""
def __init__(self, fn: Callable[P, Awaitable], *args: P.args, **kwargs: P.kwargs) -> None:
self.fn = fn
self.args = args
self.kwargs = kwargs
self.id = kwargs.get('id', None)

async def __call__(self):
return await self.fn(*self.args, **self.kwargs)

def __repr__(self):
return f'<BackgroundTask {self.fn.__name__} with ID {self.id}>'

async def run(self):
loop = asyncio.get_running_loop()
if isinstance(self.fn, partial):
with ThreadPoolExecutor(max_workers=1) as executor:
await loop.run_in_executor(executor, self.fn)
elif asyncio.iscoroutinefunction(self.fn):
await self.fn(*self.args, **self.kwargs)
elif callable(self.fn):
with ThreadPoolExecutor(max_workers=1) as executor:
await loop.run_in_executor(
executor, self.fn, *self.args, **self.kwargs
)
2 changes: 1 addition & 1 deletion navigator/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
__description__ = (
"Navigator Web Framework based on aiohttp, " "with batteries included."
)
__version__ = "2.9.1"
__version__ = "2.9.2"
__author__ = "Jesus Lara"
__author_email__ = "jesuslarag@gmail.com"
__license__ = "BSD"
38 changes: 38 additions & 0 deletions test_background.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import asyncio
from aiohttp import web
from navigator import Application
from navigator.background import BackgroundTask
from app import Main

# define a new Application
app = Application(Main, enable_jinja2=True)

async def send_email(email, message):
# Simulate email sending
await asyncio.sleep(10)
print(f"Email sent to {email} with message: {message}")


# Using the Application Context
@app.post('/sample_background')
async def handler(request: web.Request) -> web.Response:
data = await request.json()
email = data.get('email')
message = data.get('message')

# Create a BackgroundTask
background_task = BackgroundTask(send_email, email, message)

# Schedule the background task to run
asyncio.create_task(background_task.run())

return web.Response(text="Background task scheduled")




if __name__ == '__main__':
try:
app.run()
except KeyboardInterrupt:
print('EXIT FROM APP =========')

0 comments on commit ce5cde6

Please sign in to comment.