Skip to content

Commit

Permalink
Reorganize healthz router definition.
Browse files Browse the repository at this point in the history
Add healthz route to main server.
  • Loading branch information
tcjennings committed Jan 6, 2025
1 parent d1891c5 commit 2c602ad
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 41 deletions.
53 changes: 14 additions & 39 deletions src/lsst/cmservice/daemon.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,21 @@
from asyncio import Task, create_task
from asyncio import create_task
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from datetime import UTC, datetime, timedelta

import structlog
import uvicorn
from anyio import sleep_until
from fastapi import APIRouter, FastAPI, HTTPException, Request
from anyio import current_time, sleep_until
from fastapi import FastAPI
from safir.database import create_async_session, create_database_engine
from safir.logging import configure_logging, configure_uvicorn_logging

from . import __version__
from .common.daemon import daemon_iteration
from .config import config
from .routers.healthz import health_router

health_router = APIRouter()
"""An API Router for a health endpoint"""


@health_router.get("/healthz", tags=["internal", "health"])
async def get_healthz(request: Request) -> dict:
"""A healthz route for application health or liveliness checking.
For any tasks running in the event loop and registered in the FastAPI app's
state, check whether that task is still running; return a 500 along with
relevant exception information if the task has ended.
"""
server_ok = True
healthz_response = {}

task: Task
for task in request.app.state.tasks:
task_response: dict[str, bool | str | None] = {"task_running": True, "task_exception": None}
if task.done():
server_ok = False
task_response["task_running"] = False
task_response["task_exception"] = str(task.exception())
healthz_response[task.get_name()] = task_response

if not server_ok:
raise HTTPException(status_code=500, detail=healthz_response)
else:
return healthz_response
configure_uvicorn_logging(config.logging.level)
configure_logging(profile=config.logging.profile, log_level=config.logging.level, name=__name__)


@asynccontextmanager
Expand All @@ -61,9 +36,7 @@ async def main_loop() -> None:
"""
logger = structlog.get_logger(__name__)
engine = create_database_engine(config.db.url, config.db.password)

startup_time = datetime.now(UTC)
sleep_time = timedelta(seconds=config.daemon.processing_interval)
sleep_time = config.daemon.processing_interval

async with engine.begin():
session = await create_async_session(engine, logger)
Expand All @@ -74,9 +47,11 @@ async def main_loop() -> None:
_iteration_count += 1
logger.info("Daemon starting iteration.")
await daemon_iteration(session)
logger.info(f"Daemon completed {_iteration_count} iterations.")
_next_wakeup = startup_time + (sleep_time * _iteration_count)
await sleep_until(_next_wakeup.timestamp())
_iteration_time = current_time()
logger.info(f"Daemon completed {_iteration_count} iterations at {_iteration_time}.")
_next_wakeup = _iteration_time + sleep_time
logger.info(f"Daemon next iteration at {_next_wakeup}.")
await sleep_until(_next_wakeup)


def main() -> None:
Expand All @@ -87,7 +62,7 @@ def main() -> None:

app.include_router(health_router)

uvicorn.run(app, host=config.asgi.host, port=config.asgi.port)
uvicorn.run(app, host=config.asgi.host, port=config.asgi.port, reload=config.asgi.reload)


if __name__ == "__main__":
Expand Down
7 changes: 5 additions & 2 deletions src/lsst/cmservice/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
actions,
campaigns,
groups,
healthz,
index,
jobs,
loaders,
Expand All @@ -36,7 +37,7 @@
from .web_app import web_app

configure_uvicorn_logging(config.logging.level)
configure_logging(profile=config.logging.profile, log_level=config.logging.level, name="cmservice")
configure_logging(profile=config.logging.profile, log_level=config.logging.level, name=config.asgi.title)

tags_metadata = [
{
Expand Down Expand Up @@ -116,8 +117,9 @@


@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncGenerator:
async def lifespan(app: FastAPI) -> AsyncGenerator:
"""Hook FastAPI init/cleanups."""
app.state.tasks = set()
# Dependency inits before app starts running
await db_session_dependency.initialize(config.db.url, config.db.password)
assert db_session_dependency._engine is not None
Expand All @@ -143,6 +145,7 @@ async def lifespan(_: FastAPI) -> AsyncGenerator:

app.add_middleware(XForwardedMiddleware)

app.include_router(healthz.health_router)
app.include_router(index.router)
app.include_router(loaders.router, prefix=config.asgi.prefix)
app.include_router(actions.router, prefix=config.asgi.prefix)
Expand Down
42 changes: 42 additions & 0 deletions src/lsst/cmservice/routers/healthz.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from asyncio import Task
from typing import Any

from fastapi import APIRouter, HTTPException, Request

from .. import __version__
from ..config import config

health_router = APIRouter()
"""An API Router for a health endpoint"""


@health_router.get(
"/healthz",
description=("Return health information about the running application and its tasks, if any."),
include_in_schema=True,
response_model=dict[str, Any],
tags=["internal", "health"],
)
async def get_healthz(request: Request) -> dict:
"""A healthz route for application health or liveliness checking.
For any tasks running in the event loop and registered in the FastAPI app's
state, check whether that task is still running; return a 500 along with
relevant exception information if the task has ended.
"""
server_ok = True
healthz_response: dict[str, Any] = dict(name=config.asgi.title, version=__version__)

task: Task
for task in request.app.state.tasks:
task_response: dict[str, bool | str | None] = {"task_running": True, "task_exception": None}
if task.done():
server_ok = False
task_response["task_running"] = False
task_response["task_exception"] = str(task.exception())
healthz_response[task.get_name()] = task_response

if not server_ok:
raise HTTPException(status_code=500, detail=healthz_response)
else:
return healthz_response

0 comments on commit 2c602ad

Please sign in to comment.