From f0a127993805a1bd38c32c4ca86353898194b215 Mon Sep 17 00:00:00 2001 From: Michael Kaye <1917473+michaelkaye@users.noreply.github.com> Date: Tue, 19 Sep 2023 15:50:16 +0100 Subject: [PATCH] Refactor to allow cancellation of sleeps, and react to shutdown. --- trafficlight/__init__.py | 1 + trafficlight/http/adapter.py | 27 ++++++++++++++++++++++++--- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/trafficlight/__init__.py b/trafficlight/__init__.py index 5092900..c2ac367 100644 --- a/trafficlight/__init__.py +++ b/trafficlight/__init__.py @@ -120,6 +120,7 @@ async def startup() -> None: @app.after_serving async def shutdown() -> None: trafficlight.http.adapter.stop_background_tasks = True + await trafficlight.http.adapter.interrupt_tasks() if kiwi.kiwi_client: await kiwi.kiwi_client.end_run() await adapter_shutdown() diff --git a/trafficlight/http/adapter.py b/trafficlight/http/adapter.py index c9f17a7..f8183fa 100644 --- a/trafficlight/http/adapter.py +++ b/trafficlight/http/adapter.py @@ -15,7 +15,7 @@ import asyncio import logging from datetime import datetime, timedelta -from typing import Any, Dict, cast +from typing import Any, Dict, Set, cast from quart import Blueprint, current_app, request from werkzeug.utils import secure_filename @@ -63,6 +63,7 @@ async def run() -> None: current_app.add_background_task(run) return + logger.debug( "Not enough client_types to run any test(have %s)", [str(item) for item in available_adapters], @@ -115,12 +116,26 @@ async def cleanup_unresponsive_adapters() -> None: ) +sleeping_tasks: Set[asyncio.Future[None]] = set() + + +async def interrupt_tasks() -> None: + for task in sleeping_tasks: + task.cancel() + + async def loop_cleanup_unresponsive_adapters() -> None: while not stop_background_tasks: logging.info("Running sweep for idle adapters") await cleanup_unresponsive_adapters() - await asyncio.sleep(30) + sleep_task: asyncio.Future[None] = asyncio.ensure_future(asyncio.sleep(30)) + try: + sleeping_tasks.add(sleep_task) + except asyncio.CancelledError: + pass # we don't mind this task being cancelled. + finally: + sleeping_tasks.remove(sleep_task) logging.info("Finished sweep task") @@ -128,7 +143,13 @@ async def loop_check_for_new_tests() -> None: while not stop_background_tasks: logging.info("Running sweep for new tests") await check_for_new_tests() - await asyncio.sleep(30) + sleep_task: asyncio.Future[None] = asyncio.ensure_future(asyncio.sleep(30)) + try: + sleeping_tasks.add(sleep_task) + except asyncio.CancelledError: + pass # we don't mind this task being cancelled. + finally: + sleeping_tasks.remove(sleep_task) logging.info("Finished new test task")