Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
Browse files Browse the repository at this point in the history
…AIP-84/modify_dag_run
  • Loading branch information
rawwar committed Oct 15, 2024
2 parents c668f3d + c9c4ca5 commit e1fa0d5
Show file tree
Hide file tree
Showing 870 changed files with 6,283 additions and 6,545 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

# Add tests and kubernetes_tests to context.
!tests
!tests_common
!kubernetes_tests
!helm_tests
!docker_tests
Expand Down
12 changes: 7 additions & 5 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ repos:
args: []
require_serial: true
additional_dependencies: ["ruff==0.5.5"]
exclude: ^.*/.*_vendor/|^tests/dags/test_imports.py|^airflow/contrib/
exclude: ^.*/.*_vendor/|^tests/dags/test_imports.py$
- id: replace-bad-characters
name: Replace bad characters
entry: ./scripts/ci/pre_commit/replace_bad_characters.py
Expand Down Expand Up @@ -736,7 +736,7 @@ repos:
name: Verify usage of Airflow deprecation classes in core
entry: category=DeprecationWarning|category=PendingDeprecationWarning
files: \.py$
exclude: ^airflow/configuration\.py$|^providers/src/airflow/providers/|^scripts/in_container/verify_providers\.py$|^(providers/)?tests/.*$|^dev/tests_common/
exclude: ^airflow/configuration\.py$|^providers/src/airflow/providers/|^scripts/in_container/verify_providers\.py$|^(providers/)?tests/.*$|^tests_common/
pass_filenames: true
- id: check-provide-create-sessions-imports
language: pygrep
Expand Down Expand Up @@ -966,8 +966,8 @@ repos:
name: Lint JSON Schema files
entry: ./scripts/ci/pre_commit/json_schema.py
args:
- --spec-url
- https://json-schema.org/draft-07/schema
- --spec-file
- scripts/ci/pre_commit/draft7_schema.json
language: python
pass_filenames: true
files: .*\.schema\.json$
Expand Down Expand Up @@ -1184,7 +1184,9 @@ repos:
^(providers/)?tests/ |
^dev/.*\.py$ |
^scripts/.*\.py$ |
^\w+_tests/ |
^docker_tests/.*$ |
^helm_tests/.*$ |
^tests_common/.*$ |
^docs/.*\.py$ |
^hatch_build.py$
- id: check-provider-docs-valid
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ ARG AIRFLOW_VERSION="2.10.2"
ARG PYTHON_BASE_IMAGE="python:3.9-slim-bookworm"

ARG AIRFLOW_PIP_VERSION=24.2
ARG AIRFLOW_UV_VERSION=0.4.17
ARG AIRFLOW_UV_VERSION=0.4.21
ARG AIRFLOW_USE_UV="false"
ARG UV_HTTP_TIMEOUT="300"
ARG AIRFLOW_IMAGE_REPOSITORY="https://github.com/apache/airflow"
Expand Down
8 changes: 6 additions & 2 deletions Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,9 @@ function environment_initialization() {
# Added to have run-tests on path
export PATH=${PATH}:${AIRFLOW_SOURCES}

# Directory where simple auth manager store generated passwords
export AIRFLOW_AUTH_MANAGER_CREDENTIAL_DIRECTORY="/files"

mkdir -pv "${AIRFLOW_HOME}/logs/"

# Change the default worker_concurrency for tests
Expand Down Expand Up @@ -1275,7 +1278,7 @@ ARG DEFAULT_CONSTRAINTS_BRANCH="constraints-main"
ARG AIRFLOW_CI_BUILD_EPOCH="10"
ARG AIRFLOW_PRE_CACHED_PIP_PACKAGES="true"
ARG AIRFLOW_PIP_VERSION=24.2
ARG AIRFLOW_UV_VERSION=0.4.17
ARG AIRFLOW_UV_VERSION=0.4.21
ARG AIRFLOW_USE_UV="true"
# Setup PIP
# By default PIP install run without cache to make image smaller
Expand All @@ -1299,7 +1302,7 @@ ARG AIRFLOW_VERSION=""
ARG ADDITIONAL_PIP_INSTALL_FLAGS=""

ARG AIRFLOW_PIP_VERSION=24.2
ARG AIRFLOW_UV_VERSION=0.4.7
ARG AIRFLOW_UV_VERSION=0.4.21
ARG AIRFLOW_USE_UV="true"

ENV AIRFLOW_REPO=${AIRFLOW_REPO}\
Expand Down Expand Up @@ -1366,6 +1369,7 @@ COPY pyproject.toml ${AIRFLOW_SOURCES}/pyproject.toml
COPY providers/pyproject.toml ${AIRFLOW_SOURCES}/providers/pyproject.toml
COPY task_sdk/pyproject.toml ${AIRFLOW_SOURCES}/task_sdk/pyproject.toml
COPY airflow/__init__.py ${AIRFLOW_SOURCES}/airflow/
COPY tests_common/ ${AIRFLOW_SOURCES}/tests_common/
COPY generated/* ${AIRFLOW_SOURCES}/generated/
COPY constraints/* ${AIRFLOW_SOURCES}/constraints/
COPY LICENSE ${AIRFLOW_SOURCES}/LICENSE
Expand Down
31 changes: 11 additions & 20 deletions airflow/api/auth/backend/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,21 @@

from __future__ import annotations

from functools import wraps
from typing import Any, Callable, TypeVar, cast
import warnings
from typing import Any

from flask import Response

from airflow.www.extensions.init_auth_manager import get_auth_manager
import airflow.providers.fab.auth_manager.api.auth.backend.session as fab_session
from airflow.exceptions import RemovedInAirflow3Warning

CLIENT_AUTH: tuple[str, str] | Any | None = None


def init_app(_):
"""Initialize authentication backend."""


T = TypeVar("T", bound=Callable)


def requires_authentication(function: T):
"""Decorate functions that require authentication."""
warnings.warn(
"This module is deprecated. Please use `airflow.providers.fab.auth_manager.api.auth.backend.session` instead.",
RemovedInAirflow3Warning,
stacklevel=2,
)

@wraps(function)
def decorated(*args, **kwargs):
if not get_auth_manager().is_logged_in():
return Response("Unauthorized", 401, {})
return function(*args, **kwargs)

return cast(T, decorated)
init_app = fab_session.init_app
requires_authentication = fab_session.requires_authentication
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/connection_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def get_connection(*, connection_id: str, session: Session = NEW_SESSION) -> API
@security.requires_access_connection("GET")
@format_parameters({"limit": check_limit})
@provide_session
@mark_fastapi_migration_done
def get_connections(
*,
limit: int,
Expand Down
2 changes: 2 additions & 0 deletions airflow/api_connexion/endpoints/dag_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from airflow.api_connexion.types import APIResponse, UpdateMask


@mark_fastapi_migration_done
@security.requires_access_dag("GET")
@provide_session
def get_dag(
Expand Down Expand Up @@ -215,6 +216,7 @@ def patch_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_pat
return dags_collection_schema.dump(DAGCollection(dags=dags, total_entries=total_entries))


@mark_fastapi_migration_done
@security.requires_access_dag("DELETE")
@action_logging
@provide_session
Expand Down
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
from airflow.api_connexion.types import APIResponse


@mark_fastapi_migration_done
@security.requires_access_dag("DELETE", DagAccessEntity.RUN)
@provide_session
@action_logging
Expand Down
40 changes: 28 additions & 12 deletions airflow/api_connexion/endpoints/dag_stats_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,40 @@

@security.requires_access_dag("GET", DagAccessEntity.RUN)
@provide_session
def get_dag_stats(*, dag_ids: str, session: Session = NEW_SESSION) -> APIResponse:
def get_dag_stats(
*,
dag_ids: str | None = None,
limit: int | None = None,
offset: int | None = None,
session: Session = NEW_SESSION,
) -> APIResponse:
"""Get Dag statistics."""
allowed_dag_ids = get_auth_manager().get_permitted_dag_ids(methods=["GET"], user=g.user)
dags_list = set(dag_ids.split(","))
filter_dag_ids = dags_list.intersection(allowed_dag_ids)
if dag_ids:
dags_list = set(dag_ids.split(","))
filter_dag_ids = dags_list.intersection(allowed_dag_ids)
else:
filter_dag_ids = allowed_dag_ids
query_dag_ids = sorted(list(filter_dag_ids))
if offset is not None:
query_dag_ids = query_dag_ids[offset:]
if limit is not None:
query_dag_ids = query_dag_ids[:limit]

query = (
select(DagRun.dag_id, DagRun.state, func.count(DagRun.state))
.group_by(DagRun.dag_id, DagRun.state)
.where(DagRun.dag_id.in_(filter_dag_ids))
.where(DagRun.dag_id.in_(query_dag_ids))
)
dag_state_stats = session.execute(query)

dag_state_data = {(dag_id, state): count for dag_id, state, count in dag_state_stats}
dag_stats = {
dag_id: [{"state": state, "count": dag_state_data.get((dag_id, state), 0)} for state in DagRunState]
for dag_id in filter_dag_ids
}

dags = [{"dag_id": stat, "stats": dag_stats[stat]} for stat in dag_stats]
return dag_stats_collection_schema.dump({"dags": dags, "total_entries": len(dag_stats)})
dags = [
{
"dag_id": dag_id,
"stats": [
{"state": state, "count": dag_state_data.get((dag_id, state), 0)} for state in DagRunState
],
}
for dag_id in query_dag_ids
]
return dag_stats_collection_schema.dump({"dags": dags, "total_entries": len(dags)})
2 changes: 2 additions & 0 deletions airflow/api_connexion/endpoints/health_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@

from airflow.api.common.airflow_health import get_airflow_health
from airflow.api_connexion.schemas.health_schema import health_schema
from airflow.utils.api_migration import mark_fastapi_migration_done

if TYPE_CHECKING:
from airflow.api_connexion.types import APIResponse


@mark_fastapi_migration_done
def get_health() -> APIResponse:
"""Return the health of the airflow scheduler, metadatabase and triggerer."""
airflow_health_status = get_airflow_health()
Expand Down
2 changes: 2 additions & 0 deletions airflow/api_connexion/endpoints/variable_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def get_variables(
)


@mark_fastapi_migration_done
@security.requires_access_variable("PUT")
@provide_session
@action_logging(
Expand Down Expand Up @@ -129,6 +130,7 @@ def patch_variable(
return variable_schema.dump(variable)


@mark_fastapi_migration_done
@security.requires_access_variable("POST")
@action_logging(
event=action_event_from_permission(
Expand Down
4 changes: 3 additions & 1 deletion airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2384,11 +2384,13 @@ paths:
operationId: get_dag_stats
tags: [DagStats]
parameters:
- $ref: "#/components/parameters/PageLimit"
- $ref: "#/components/parameters/PageOffset"
- name: dag_ids
in: query
schema:
type: string
required: true
required: false
description: |
One or more DAG IDs separated by commas to filter relevant Dags.
responses:
Expand Down
Loading

0 comments on commit e1fa0d5

Please sign in to comment.