Skip to content

Commit

Permalink
AIP-84 Migrate test a connection to FastAPI API (apache#43766)
Browse files Browse the repository at this point in the history
* Migrate test a connection to fastapi

* Remove NotFound error from docs

* Remove Validation error which is handled by fastapi

* Remove async for complying convention for methods

* Fix pre-commit ruff
  • Loading branch information
bugraoz93 authored Nov 12, 2024
1 parent ed31b02 commit f924ecb
Show file tree
Hide file tree
Showing 10 changed files with 307 additions and 13 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/connection_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ def post_connection(*, session: Session = NEW_SESSION) -> APIResponse:
raise AlreadyExists(detail=f"Connection already exist. ID: {conn_id}")


@mark_fastapi_migration_done
@security.requires_access_connection("POST")
def test_connection() -> APIResponse:
"""
Expand Down
7 changes: 7 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ class ConnectionCollectionResponse(BaseModel):
total_entries: int


class ConnectionTestResponse(BaseModel):
"""Connection Test serializer for responses."""

status: bool
message: str


# Request Models
class ConnectionBody(BaseModel):
"""Connection Serializer for requests body."""
Expand Down
61 changes: 61 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,53 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/connections/test:
post:
tags:
- Connection
summary: Test Connection
description: 'Test an API connection.
This method first creates an in-memory transient conn_id & exports that to
an env var,
as some hook classes tries to find out the `conn` from their __init__ method
& errors out if not found.
It also deletes the conn id env variable after the test.'
operationId: test_connection
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/ConnectionBody'
required: true
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/ConnectionTestResponse'
'401':
description: Unauthorized
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
'403':
description: Forbidden
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}:
get:
tags:
Expand Down Expand Up @@ -3568,6 +3615,20 @@ components:
- extra
title: ConnectionResponse
description: Connection serializer for responses.
ConnectionTestResponse:
properties:
status:
type: boolean
title: Status
message:
type: string
title: Message
type: object
required:
- status
- message
title: ConnectionTestResponse
description: Connection Test serializer for responses.
DAGCollectionResponse:
properties:
dags:
Expand Down
46 changes: 46 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations

import os
from typing import Annotated

from fastapi import Depends, HTTPException, Query, status
Expand All @@ -29,10 +30,14 @@
ConnectionBody,
ConnectionCollectionResponse,
ConnectionResponse,
ConnectionTestResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.configuration import conf
from airflow.models import Connection
from airflow.secrets.environment_variables import CONN_ENV_PREFIX
from airflow.utils import helpers
from airflow.utils.strings import get_random_string

connections_router = AirflowRouter(tags=["Connection"], prefix="/connections")

Expand Down Expand Up @@ -181,3 +186,44 @@ def patch_connection(
for key, val in data.items():
setattr(connection, key, val)
return ConnectionResponse.model_validate(connection, from_attributes=True)


@connections_router.post(
"/test",
responses=create_openapi_http_exception_doc(
[
status.HTTP_401_UNAUTHORIZED,
status.HTTP_403_FORBIDDEN,
]
),
)
def test_connection(
test_body: ConnectionBody,
) -> ConnectionTestResponse:
"""
Test an API connection.
This method first creates an in-memory transient conn_id & exports that to an env var,
as some hook classes tries to find out the `conn` from their __init__ method & errors out if not found.
It also deletes the conn id env variable after the test.
"""
if conf.get("core", "test_connection", fallback="Disabled").lower().strip() != "enabled":
raise HTTPException(
403,
"Testing connections is disabled in Airflow configuration. "
"Contact your deployment admin to enable it.",
)

transient_conn_id = get_random_string()
conn_env_var = f"{CONN_ENV_PREFIX}{transient_conn_id.upper()}"
try:
data = test_body.model_dump(by_alias=True)
data["conn_id"] = transient_conn_id
conn = Connection(**data)
os.environ[conn_env_var] = conn.get_uri()
test_status, test_message = conn.test_connection()
return ConnectionTestResponse.model_validate(
{"status": test_status, "message": test_message}, from_attributes=True
)
finally:
os.environ.pop(conn_env_var, None)
3 changes: 3 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,9 @@ export type BackfillServiceCreateBackfillMutationResult = Awaited<
export type ConnectionServicePostConnectionMutationResult = Awaited<
ReturnType<typeof ConnectionService.postConnection>
>;
export type ConnectionServiceTestConnectionMutationResult = Awaited<
ReturnType<typeof ConnectionService.testConnection>
>;
export type PoolServicePostPoolMutationResult = Awaited<
ReturnType<typeof PoolService.postPool>
>;
Expand Down
43 changes: 43 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1646,6 +1646,49 @@ export const useConnectionServicePostConnection = <
}) as unknown as Promise<TData>,
...options,
});
/**
* Test Connection
* Test an API connection.
*
* This method first creates an in-memory transient conn_id & exports that to an env var,
* as some hook classes tries to find out the `conn` from their __init__ method & errors out if not found.
* It also deletes the conn id env variable after the test.
* @param data The data for the request.
* @param data.requestBody
* @returns ConnectionTestResponse Successful Response
* @throws ApiError
*/
export const useConnectionServiceTestConnection = <
TData = Common.ConnectionServiceTestConnectionMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
requestBody: ConnectionBody;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
requestBody: ConnectionBody;
},
TContext
>({
mutationFn: ({ requestBody }) =>
ConnectionService.testConnection({
requestBody,
}) as unknown as Promise<TData>,
...options,
});
/**
* Post Pool
* Create a Pool.
Expand Down
17 changes: 17 additions & 0 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,23 @@ export const $ConnectionResponse = {
description: "Connection serializer for responses.",
} as const;

export const $ConnectionTestResponse = {
properties: {
status: {
type: "boolean",
title: "Status",
},
message: {
type: "string",
title: "Message",
},
},
type: "object",
required: ["status", "message"],
title: "ConnectionTestResponse",
description: "Connection Test serializer for responses.",
} as const;

export const $DAGCollectionResponse = {
properties: {
dags: {
Expand Down
30 changes: 30 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ import type {
GetConnectionsResponse,
PostConnectionData,
PostConnectionResponse,
TestConnectionData,
TestConnectionResponse,
GetDagRunData,
GetDagRunResponse,
DeleteDagRunData,
Expand Down Expand Up @@ -772,6 +774,34 @@ export class ConnectionService {
},
});
}

/**
* Test Connection
* Test an API connection.
*
* This method first creates an in-memory transient conn_id & exports that to an env var,
* as some hook classes tries to find out the `conn` from their __init__ method & errors out if not found.
* It also deletes the conn id env variable after the test.
* @param data The data for the request.
* @param data.requestBody
* @returns ConnectionTestResponse Successful Response
* @throws ApiError
*/
public static testConnection(
data: TestConnectionData,
): CancelablePromise<TestConnectionResponse> {
return __request(OpenAPI, {
method: "POST",
url: "/public/connections/test",
body: data.requestBody,
mediaType: "application/json",
errors: {
401: "Unauthorized",
403: "Forbidden",
422: "Validation Error",
},
});
}
}

export class DagRunService {
Expand Down
37 changes: 37 additions & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ export type ConnectionResponse = {
extra: string | null;
};

/**
* Connection Test serializer for responses.
*/
export type ConnectionTestResponse = {
status: boolean;
message: string;
};

/**
* DAG Collection serializer for responses.
*/
Expand Down Expand Up @@ -1005,6 +1013,12 @@ export type PostConnectionData = {

export type PostConnectionResponse = ConnectionResponse;

export type TestConnectionData = {
requestBody: ConnectionBody;
};

export type TestConnectionResponse = ConnectionTestResponse;

export type GetDagRunData = {
dagId: string;
dagRunId: string;
Expand Down Expand Up @@ -1845,6 +1859,29 @@ export type $OpenApiTs = {
};
};
};
"/public/connections/test": {
post: {
req: TestConnectionData;
res: {
/**
* Successful Response
*/
200: ConnectionTestResponse;
/**
* Unauthorized
*/
401: HTTPExceptionResponse;
/**
* Forbidden
*/
403: HTTPExceptionResponse;
/**
* Validation Error
*/
422: HTTPValidationError;
};
};
};
"/public/dags/{dag_id}/dagRuns/{dag_run_id}": {
get: {
req: GetDagRunData;
Expand Down
Loading

0 comments on commit f924ecb

Please sign in to comment.