Skip to content

Commit

Permalink
Ensure lifespans of mounted FastAPI sub-apps are called (#43817)
Browse files Browse the repository at this point in the history
  • Loading branch information
ianbuss authored Nov 8, 2024
1 parent 36e716a commit cd323e2
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 0 deletions.
15 changes: 15 additions & 0 deletions airflow/api_fastapi/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
from __future__ import annotations

import logging
from contextlib import AsyncExitStack, asynccontextmanager

from fastapi import FastAPI
from starlette.routing import Mount

from airflow.api_fastapi.core_api.app import init_config, init_dag_bag, init_plugins, init_views
from airflow.api_fastapi.execution_api.app import create_task_execution_api_app
Expand All @@ -28,6 +30,18 @@
app: FastAPI | None = None


@asynccontextmanager
async def lifespan(app: FastAPI):
async with AsyncExitStack() as stack:
for route in app.routes:
if isinstance(route, Mount) and isinstance(route.app, FastAPI):
await stack.enter_async_context(
route.app.router.lifespan_context(route.app),
)
app.state.lifespan_called = True
yield


def create_app(apps: str = "all") -> FastAPI:
apps_list = apps.split(",") if apps else ["all"]

Expand All @@ -36,6 +50,7 @@ def create_app(apps: str = "all") -> FastAPI:
description="Airflow API. All endpoints located under ``/public`` can be used safely, are stable and backward compatible. "
"Endpoints located under ``/ui`` are dedicated to the UI and are subject to breaking change "
"depending on the need of the frontend. Users should not rely on those but use the public ones instead.",
lifespan=lifespan,
)

if "core" in apps_list or "all" in apps_list:
Expand Down
10 changes: 10 additions & 0 deletions airflow/api_fastapi/execution_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,18 @@

from __future__ import annotations

from contextlib import asynccontextmanager

from fastapi import FastAPI


@asynccontextmanager
async def lifespan(app: FastAPI):
"""Context manager for the lifespan of the FastAPI app. For now does nothing."""
app.state.lifespan_called = True
yield


def create_task_execution_api_app(app: FastAPI) -> FastAPI:
"""Create FastAPI app for task execution API."""
from airflow.api_fastapi.execution_api.routes import execution_api_router
Expand All @@ -28,6 +37,7 @@ def create_task_execution_api_app(app: FastAPI) -> FastAPI:
task_exec_api_app = FastAPI(
title="Airflow Task Execution API",
description="The private Airflow Task Execution API.",
lifespan=lifespan,
)

task_exec_api_app.include_router(execution_api_router)
Expand Down
19 changes: 19 additions & 0 deletions tests/api_fastapi/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@
from unittest import mock


def test_main_app_lifespan(client):
with client() as test_client:
test_app = test_client.app

# assert the app was created and lifespan was called
assert test_app
assert test_app.state.lifespan_called, "Lifespan not called on Execution API app."


@mock.patch("airflow.api_fastapi.app.init_dag_bag")
@mock.patch("airflow.api_fastapi.app.init_views")
@mock.patch("airflow.api_fastapi.app.init_plugins")
Expand Down Expand Up @@ -55,6 +64,16 @@ def test_execution_api_app(
mock_init_plugins.assert_not_called()


def test_execution_api_app_lifespan(client):
with client(apps="execution") as test_client:
test_app = test_client.app

# assert the execution app was created and lifespan was called
execution_app = [route.app for route in test_app.router.routes if route.path == "/execution"]
assert execution_app, "Execution API app not found in FastAPI app."
assert execution_app[0].state.lifespan_called, "Lifespan not called on Execution API app."


@mock.patch("airflow.api_fastapi.app.init_dag_bag")
@mock.patch("airflow.api_fastapi.app.init_views")
@mock.patch("airflow.api_fastapi.app.init_plugins")
Expand Down

0 comments on commit cd323e2

Please sign in to comment.