Skip to content

Commit

Permalink
Migrate public endpoint Get Task to FastAPI, with main resynced
Browse files Browse the repository at this point in the history
  • Loading branch information
omkar-foss committed Nov 8, 2024
1 parent f57db71 commit 7467a22
Show file tree
Hide file tree
Showing 14 changed files with 1,316 additions and 1 deletion.
2 changes: 2 additions & 0 deletions airflow/api_connexion/endpoints/task_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
from airflow.auth.managers.models.resource_details import DagAccessEntity
from airflow.exceptions import TaskNotFound
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.utils.api_migration import mark_fastapi_migration_done

if TYPE_CHECKING:
from airflow import DAG
from airflow.api_connexion.types import APIResponse


@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.TASK)
def get_task(*, dag_id: str, task_id: str) -> APIResponse:
"""Get simplified representation of a task."""
Expand Down
64 changes: 63 additions & 1 deletion airflow/api_fastapi/common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,72 @@
# under the License.
from __future__ import annotations

from pydantic import AfterValidator, AwareDatetime
import inspect
from datetime import timedelta

from pydantic import AfterValidator, AliasGenerator, AwareDatetime, BaseModel, BeforeValidator, ConfigDict
from typing_extensions import Annotated

from airflow.models.mappedoperator import MappedOperator
from airflow.models.operator import Operator
from airflow.serialization.serialized_objects import SerializedBaseOperator
from airflow.utils import timezone

UtcDateTime = Annotated[AwareDatetime, AfterValidator(lambda d: d.astimezone(timezone.utc))]
"""UTCDateTime is a datetime with timezone information"""


def _validate_timedelta_field(td: timedelta | None) -> TimeDelta | None:
"""Validate the execution_timeout property."""
if td is None:
return None
return TimeDelta(
days=td.days,
seconds=td.seconds,
microseconds=td.microseconds,
)


class TimeDelta(BaseModel):
"""TimeDelta can be used to interact with datetime.timedelta objects."""

object_type: str = "TimeDelta"
days: int
seconds: int
microseconds: int

model_config = ConfigDict(
alias_generator=AliasGenerator(
serialization_alias=lambda field_name: {
"object_type": "__type",
}.get(field_name, field_name),
)
)


TimeDeltaWithValidation = Annotated[TimeDelta, BeforeValidator(_validate_timedelta_field)]


def get_class_ref(obj: Operator) -> dict[str, str | None]:
"""Return the class_ref dict for obj."""
is_mapped_or_serialized = isinstance(obj, (MappedOperator, SerializedBaseOperator))

module_path = None
if is_mapped_or_serialized:
module_path = obj._task_module
else:
module_type = inspect.getmodule(obj)
module_path = module_type.__name__ if module_type else None

class_name = None
if is_mapped_or_serialized:
class_name = obj._task_type
elif obj.__class__ is type:
class_name = obj.__name__
else:
class_name = type(obj).__name__

return {
"module_path": module_path,
"class_name": class_name,
}
246 changes: 246 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3104,6 +3104,62 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/tasks/{task_id}:
get:
tags:
- Task
summary: Get Task
description: Get simplified representation of a task.
operationId: get_task
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: task_id
in: path
required: true
schema:
title: Task Id
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/TaskResponse'
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
components:
schemas:
AppBuilderMenuItemResponse:
Expand Down Expand Up @@ -4913,6 +4969,196 @@ components:
- triggerer_job
title: TaskInstanceResponse
description: TaskInstance serializer for responses.
TaskResponse:
properties:
task_id:
anyOf:
- type: string
- type: 'null'
title: Task Id
task_display_name:
anyOf:
- type: string
- type: 'null'
title: Task Display Name
owner:
anyOf:
- type: string
- type: 'null'
title: Owner
start_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Start Date
end_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: End Date
trigger_rule:
anyOf:
- type: string
- type: 'null'
title: Trigger Rule
depends_on_past:
type: boolean
title: Depends On Past
wait_for_downstream:
type: boolean
title: Wait For Downstream
retries:
anyOf:
- type: number
- type: 'null'
title: Retries
queue:
anyOf:
- type: string
- type: 'null'
title: Queue
pool:
anyOf:
- type: string
- type: 'null'
title: Pool
pool_slots:
anyOf:
- type: number
- type: 'null'
title: Pool Slots
execution_timeout:
anyOf:
- $ref: '#/components/schemas/TimeDelta'
- type: 'null'
retry_delay:
anyOf:
- $ref: '#/components/schemas/TimeDelta'
- type: 'null'
retry_exponential_backoff:
type: boolean
title: Retry Exponential Backoff
priority_weight:
anyOf:
- type: number
- type: 'null'
title: Priority Weight
weight_rule:
anyOf:
- type: string
- type: 'null'
title: Weight Rule
ui_color:
anyOf:
- type: string
- type: 'null'
title: Ui Color
ui_fgcolor:
anyOf:
- type: string
- type: 'null'
title: Ui Fgcolor
template_fields:
anyOf:
- items:
type: string
type: array
- type: 'null'
title: Template Fields
downstream_task_ids:
anyOf:
- items:
type: string
type: array
- type: 'null'
title: Downstream Task Ids
doc_md:
anyOf:
- type: string
- type: 'null'
title: Doc Md
operator_name:
anyOf:
- type: string
- type: 'null'
title: Operator Name
params:
anyOf:
- type: object
- type: 'null'
title: Params
class_ref:
anyOf:
- type: object
- type: 'null'
title: Class Ref
is_mapped:
anyOf:
- type: boolean
- type: 'null'
title: Is Mapped
extra_links:
items:
type: string
type: array
title: Extra Links
description: Extract and return extra_links.
readOnly: true
type: object
required:
- task_id
- task_display_name
- owner
- start_date
- end_date
- trigger_rule
- depends_on_past
- wait_for_downstream
- retries
- queue
- pool
- pool_slots
- execution_timeout
- retry_delay
- retry_exponential_backoff
- priority_weight
- weight_rule
- ui_color
- ui_fgcolor
- template_fields
- downstream_task_ids
- doc_md
- operator_name
- params
- class_ref
- is_mapped
- extra_links
title: TaskResponse
description: Task serializer for responses.
TimeDelta:
properties:
__type:
type: string
title: ' Type'
default: TimeDelta
days:
type: integer
title: Days
seconds:
type: integer
title: Seconds
microseconds:
type: integer
title: Microseconds
type: object
required:
- days
- seconds
- microseconds
title: TimeDelta
description: TimeDelta can be used to interact with datetime.timedelta objects.
TriggerResponse:
properties:
id:
Expand Down
3 changes: 3 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from airflow.api_fastapi.core_api.routes.public.pools import pools_router
from airflow.api_fastapi.core_api.routes.public.providers import providers_router
from airflow.api_fastapi.core_api.routes.public.task_instances import task_instances_router
from airflow.api_fastapi.core_api.routes.public.tasks import tasks_router
from airflow.api_fastapi.core_api.routes.public.variables import variables_router
from airflow.api_fastapi.core_api.routes.public.version import version_router

Expand All @@ -56,3 +57,5 @@
public_router.include_router(variables_router)
public_router.include_router(version_router)
public_router.include_router(dag_stats_router)
public_router.include_router(plugins_router)
public_router.include_router(tasks_router)
Loading

0 comments on commit 7467a22

Please sign in to comment.