Skip to content

Commit

Permalink
Small tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
pierrejeambrun committed Oct 25, 2024
1 parent fd004ac commit 5127bff
Show file tree
Hide file tree
Showing 12 changed files with 130 additions and 30 deletions.
2 changes: 2 additions & 0 deletions airflow/api_connexion/endpoints/dag_source_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from airflow.auth.managers.models.resource_details import DagAccessEntity, DagDetails
from airflow.models.dag import DagModel
from airflow.models.dagcode import DagCode
from airflow.utils.api_migration import mark_fastapi_migration_done
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.www.extensions.init_auth_manager import get_auth_manager

Expand All @@ -37,6 +38,7 @@
from airflow.auth.managers.models.batch_apis import IsAuthorizedDagRequest


@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.CODE)
@provide_session
def get_dag_source(*, file_token: str, session: Session = NEW_SESSION) -> Response:
Expand Down
36 changes: 33 additions & 3 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -831,12 +831,24 @@ paths:
schema:
type: string
title: File Token
- name: accept
in: header
required: false
schema:
type: string
default: '*/*'
title: Accept
responses:
'200':
description: Successful Response
content:
application/json:
schema: {}
schema:
$ref: '#/components/schemas/DAGSourceResponse'
text/plain:
schema:
type: string
example: dag code
'400':
content:
application/json:
Expand All @@ -861,12 +873,18 @@ paths:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
'406':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unprocessable Entity
description: Not Acceptable
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/monitor/health:
get:
tags:
Expand Down Expand Up @@ -1869,6 +1887,18 @@ components:
- asset_triggered
title: DAGRunTypes
description: DAG Run Types for responses.
DAGSourceResponse:
properties:
content:
anyOf:
- type: string
- type: 'null'
title: Content
type: object
required:
- content
title: DAGSourceResponse
description: DAG Source serializer for responses.
DAGTagCollectionResponse:
properties:
tags:
Expand Down
25 changes: 18 additions & 7 deletions airflow/api_fastapi/core_api/routes/public/dag_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,56 @@
# under the License.
from __future__ import annotations

from fastapi import Depends, HTTPException, Request, Response
from fastapi import Depends, Header, HTTPException, Request, Response
from itsdangerous import BadSignature, URLSafeSerializer
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api_fastapi.common.db.common import get_session
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.dag_sources import DAGSourceModel
from airflow.api_fastapi.core_api.serializers.dag_sources import DAGSourceResponse
from airflow.models.dagcode import DagCode

dag_sources_router = AirflowRouter(tags=["DagSource"], prefix="/dagSources")

mime_type_text = "text/plain"
mime_type_json = "application/json"
mime_type_any = "*/*"


@dag_sources_router.get(
"/{file_token}",
responses=create_openapi_http_exception_doc([400, 401, 403, 404, 422]),
responses={
**create_openapi_http_exception_doc([400, 401, 403, 404, 406]),
"200": {
"description": "Successful Response",
"content": {
mime_type_text: {"schema": {"type": "string", "example": "dag code"}},
},
},
},
response_model=DAGSourceResponse,
)
async def get_dag_source(
file_token: str,
session: Annotated[Session, Depends(get_session)],
request: Request,
accept: Annotated[str, Header()] = mime_type_any,
):
"""Get source code using file token."""
auth_s = URLSafeSerializer(request.app.state.secret_key)

try:
path = auth_s.loads(file_token)
dag_source_model = DAGSourceModel(
dag_source_model = DAGSourceResponse(
content=DagCode.code(path, session=session),
)
except (BadSignature, FileNotFoundError):
raise HTTPException(404, "DAG source not found")

accept_header = request.headers.get("Accept", "").lower()
if accept_header.startswith(mime_type_text):
if accept.startswith(mime_type_text):
return Response(dag_source_model.content, media_type=mime_type_text)
if accept_header.startswith(mime_type_json):
if accept.startswith(mime_type_json) or accept.startswith(mime_type_any):
return dag_source_model
raise HTTPException(406, "Content not available for Accept header")
4 changes: 2 additions & 2 deletions airflow/api_fastapi/core_api/serializers/dag_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from pydantic import BaseModel


class DAGSourceModel(BaseModel):
"""DAG Source Model class."""
class DAGSourceResponse(BaseModel):
"""DAG Source serializer for responses."""

content: str | None
7 changes: 6 additions & 1 deletion airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,17 @@ export const useDagSourceServiceGetDagSourceKey =
"DagSourceServiceGetDagSource";
export const UseDagSourceServiceGetDagSourceKeyFn = (
{
accept,
fileToken,
}: {
accept?: string;
fileToken: string;
},
queryKey?: Array<unknown>,
) => [useDagSourceServiceGetDagSourceKey, ...(queryKey ?? [{ fileToken }])];
) => [
useDagSourceServiceGetDagSourceKey,
...(queryKey ?? [{ accept, fileToken }]),
];
export type MonitorServiceGetHealthDefaultResponse = Awaited<
ReturnType<typeof MonitorService.getHealth>
>;
Expand Down
12 changes: 9 additions & 3 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,20 +283,26 @@ export const prefetchUseDagRunServiceGetDagRun = (
* Get source code using file token.
* @param data The data for the request.
* @param data.fileToken
* @returns unknown Successful Response
* @param data.accept
* @returns DAGSourceResponse Successful Response
* @throws ApiError
*/
export const prefetchUseDagSourceServiceGetDagSource = (
queryClient: QueryClient,
{
accept,
fileToken,
}: {
accept?: string;
fileToken: string;
},
) =>
queryClient.prefetchQuery({
queryKey: Common.UseDagSourceServiceGetDagSourceKeyFn({ fileToken }),
queryFn: () => DagSourceService.getDagSource({ fileToken }),
queryKey: Common.UseDagSourceServiceGetDagSourceKeyFn({
accept,
fileToken,
}),
queryFn: () => DagSourceService.getDagSource({ accept, fileToken }),
});
/**
* Get Health
Expand Down
10 changes: 7 additions & 3 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ export const useDagRunServiceGetDagRun = <
* Get source code using file token.
* @param data The data for the request.
* @param data.fileToken
* @returns unknown Successful Response
* @param data.accept
* @returns DAGSourceResponse Successful Response
* @throws ApiError
*/
export const useDagSourceServiceGetDagSource = <
Expand All @@ -364,19 +365,22 @@ export const useDagSourceServiceGetDagSource = <
TQueryKey extends Array<unknown> = unknown[],
>(
{
accept,
fileToken,
}: {
accept?: string;
fileToken: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
queryKey: Common.UseDagSourceServiceGetDagSourceKeyFn(
{ fileToken },
{ accept, fileToken },
queryKey,
),
queryFn: () => DagSourceService.getDagSource({ fileToken }) as TData,
queryFn: () =>
DagSourceService.getDagSource({ accept, fileToken }) as TData,
...options,
});
/**
Expand Down
10 changes: 7 additions & 3 deletions airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ export const useDagRunServiceGetDagRunSuspense = <
* Get source code using file token.
* @param data The data for the request.
* @param data.fileToken
* @returns unknown Successful Response
* @param data.accept
* @returns DAGSourceResponse Successful Response
* @throws ApiError
*/
export const useDagSourceServiceGetDagSourceSuspense = <
Expand All @@ -359,19 +360,22 @@ export const useDagSourceServiceGetDagSourceSuspense = <
TQueryKey extends Array<unknown> = unknown[],
>(
{
accept,
fileToken,
}: {
accept?: string;
fileToken: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useSuspenseQuery<TData, TError>({
queryKey: Common.UseDagSourceServiceGetDagSourceKeyFn(
{ fileToken },
{ accept, fileToken },
queryKey,
),
queryFn: () => DagSourceService.getDagSource({ fileToken }) as TData,
queryFn: () =>
DagSourceService.getDagSource({ accept, fileToken }) as TData,
...options,
});
/**
Expand Down
20 changes: 20 additions & 0 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,26 @@ export const $DAGRunTypes = {
description: "DAG Run Types for responses.",
} as const;

export const $DAGSourceResponse = {
properties: {
content: {
anyOf: [
{
type: "string",
},
{
type: "null",
},
],
title: "Content",
},
},
type: "object",
required: ["content"],
title: "DAGSourceResponse",
description: "DAG Source serializer for responses.",
} as const;

export const $DAGTagCollectionResponse = {
properties: {
tags: {
Expand Down
9 changes: 7 additions & 2 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,8 @@ export class DagSourceService {
* Get source code using file token.
* @param data The data for the request.
* @param data.fileToken
* @returns unknown Successful Response
* @param data.accept
* @returns DAGSourceResponse Successful Response
* @throws ApiError
*/
public static getDagSource(
Expand All @@ -497,12 +498,16 @@ export class DagSourceService {
path: {
file_token: data.fileToken,
},
headers: {
accept: data.accept,
},
errors: {
400: "Bad Request",
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
422: "Unprocessable Entity",
406: "Not Acceptable",
422: "Validation Error",
},
});
}
Expand Down
20 changes: 16 additions & 4 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,13 @@ export type DAGRunTypes = {
asset_triggered: number;
};

/**
* DAG Source serializer for responses.
*/
export type DAGSourceResponse = {
content: string | null;
};

/**
* DAG Tags Collection serializer for responses.
*/
Expand Down Expand Up @@ -481,10 +488,11 @@ export type DeleteDagRunData = {
export type DeleteDagRunResponse = void;

export type GetDagSourceData = {
accept?: string;
fileToken: string;
};

export type GetDagSourceResponse = unknown;
export type GetDagSourceResponse = DAGSourceResponse;

export type GetHealthResponse = HealthInfoSchema;

Expand Down Expand Up @@ -907,7 +915,7 @@ export type $OpenApiTs = {
/**
* Successful Response
*/
200: unknown;
200: DAGSourceResponse;
/**
* Bad Request
*/
Expand All @@ -925,9 +933,13 @@ export type $OpenApiTs = {
*/
404: HTTPExceptionResponse;
/**
* Unprocessable Entity
* Not Acceptable
*/
422: HTTPExceptionResponse;
406: HTTPExceptionResponse;
/**
* Validation Error
*/
422: HTTPValidationError;
};
};
};
Expand Down
5 changes: 3 additions & 2 deletions tests/api_fastapi/core_api/routes/public/test_dag_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,11 @@ def test_should_respond_200_text(self, test_client):
json.loads(response.content.decode())
assert response.headers["Content-Type"].startswith("text/plain")

def test_should_respond_200_json(self, test_client):
@pytest.mark.parametrize("headers", [{"Accept": "application/json"}, {}])
def test_should_respond_200_json(self, test_client, headers):
response: Response = test_client.get(
self.dag_sources_url,
headers={"Accept": "application/json"},
headers=headers,
)
assert isinstance(response, Response)
assert 200 == response.status_code
Expand Down

0 comments on commit 5127bff

Please sign in to comment.