diff --git a/.asf.yaml b/.asf.yaml index 50dea7951556..ea3c6a1497bc 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -122,7 +122,6 @@ github: # Max 10 collaborators allowed # https://github.com/apache/infrastructure-asfyaml/blob/main/README.md#assigning-the-github-triage-role-to-external-collaborators - aritra24 - - dirrao - omkar-foss - rawwar - nathadfield diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index c20fe916f92d..4d52cba5cdd1 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -11,7 +11,7 @@ # Kubernetes /airflow/kubernetes/ @dstandish @jedcunningham -/airflow/providers/cncf/kubernetes/executors/ @dstandish @jedcunningham +/providers/src/airflow/providers/cncf/kubernetes/executors/ @dstandish @jedcunningham # Helm Chart /chart/ @dstandish @jedcunningham @hussein-awala @@ -61,27 +61,28 @@ /airflow/secrets @dstandish @potiuk @ashb # Providers -/airflow/providers/amazon/ @eladkal @o-nikolas -/airflow/providers/celery/ @hussein-awala -/airflow/providers/cncf/kubernetes @jedcunningham @hussein-awala -/airflow/providers/common/sql/ @eladkal -/airflow/providers/dbt/cloud/ @josh-fell -/airflow/providers/edge @jscheffl -/airflow/providers/hashicorp/ @hussein-awala -/airflow/providers/openlineage/ @mobuchowski -/airflow/providers/slack/ @eladkal -/airflow/providers/smtp/ @hussein-awala -/airflow/providers/snowflake/ @potiuk @mik-laj -/airflow/providers/tabular/ @Fokko +/providers/src/airflow/providers/amazon/ @eladkal @o-nikolas +/providers/src/airflow/providers/celery/ @hussein-awala +/providers/src/airflow/providers/cncf/kubernetes @jedcunningham @hussein-awala +/providers/src/airflow/providers/common/sql/ @eladkal +/providers/src/airflow/providers/dbt/cloud/ @josh-fell +/providers/src/airflow/providers/edge @jscheffl +/providers/src/airflow/providers/hashicorp/ @hussein-awala +/providers/src/airflow/providers/openlineage/ @mobuchowski +/providers/src/airflow/providers/slack/ @eladkal +/providers/src/airflow/providers/smtp/ @hussein-awala +/providers/src/airflow/providers/snowflake/ @potiuk @mik-laj +/providers/src/airflow/providers/apache/iceberg/ @Fokko +/providers/tests/apache/iceberg/ @Fokko /docs/apache-airflow-providers-amazon/ @eladkal @o-nikolas /docs/apache-airflow-providers-cncf-kubernetes @jedcunningham /docs/apache-airflow-providers-common-sql/ @eladkal /docs/apache-airflow-providers-openlineage/ @mobuchowski /docs/apache-airflow-providers-slack/ @eladkal -/tests/providers/amazon/ @eladkal @o-nikolas -/tests/providers/common/sql/ @eladkal -/tests/providers/openlineage/ @mobuchowski -/tests/providers/slack/ @eladkal +/providers/tests/amazon/ @eladkal @o-nikolas +/providers/tests/common/sql/ @eladkal +/providers/tests/openlineage/ @mobuchowski +/providers/tests/slack/ @eladkal /tests/system/providers/amazon/ @eladkal @o-nikolas # Dev tools @@ -108,6 +109,6 @@ ISSUE_TRIAGE_PROCESS.rst @eladkal # AIP-58 - Object Storage /airflow/io/ @bolkedebruin -/airflow/providers/**/fs/ @bolkedebruin -/airflow/providers/common/io/ @bolkedebruin +/providers/src/airflow/providers/**/fs/ @bolkedebruin +/providers/src/airflow/providers/common/io/ @bolkedebruin /docs/apache-airflow/core-concepts/objectstorage.rst @bolkedebruin diff --git a/.github/workflows/additional-ci-image-checks.yml b/.github/workflows/additional-ci-image-checks.yml index 878800324b78..8a3b46e70d37 100644 --- a/.github/workflows/additional-ci-image-checks.yml +++ b/.github/workflows/additional-ci-image-checks.yml @@ -84,6 +84,10 @@ on: # yamllint disable-line rule:truthy description: "Whether to debug resources (true/false)" required: true type: string + use-uv: + description: "Whether to use uv to build the image (true/false)" + required: true + type: string jobs: # Push early BuildX cache to GitHub Registry in Apache repository, This cache does not wait for all the # tests to complete - it is run very early in the build process for "main" merges in order to refresh @@ -113,7 +117,7 @@ jobs: python-versions: ${{ inputs.python-versions }} branch: ${{ inputs.branch }} constraints-branch: ${{ inputs.constraints-branch }} - use-uv: "true" + use-uv: ${{ inputs.use-uv}} include-success-outputs: ${{ inputs.include-success-outputs }} docker-cache: ${{ inputs.docker-cache }} disable-airflow-repo-cache: ${{ inputs.disable-airflow-repo-cache }} @@ -170,7 +174,7 @@ jobs: # platform: "linux/arm64" # branch: ${{ inputs.branch }} # constraints-branch: ${{ inputs.constraints-branch }} -# use-uv: "true" +# use-uv: ${{ inputs.use-uv}} # upgrade-to-newer-dependencies: ${{ inputs.upgrade-to-newer-dependencies }} # docker-cache: ${{ inputs.docker-cache }} # disable-airflow-repo-cache: ${{ inputs.disable-airflow-repo-cache }} diff --git a/.github/workflows/build-images.yml b/.github/workflows/build-images.yml index 943b01f8f891..6bb8a9d21930 100644 --- a/.github/workflows/build-images.yml +++ b/.github/workflows/build-images.yml @@ -72,6 +72,7 @@ jobs: docker-cache: ${{ steps.selective-checks.outputs.docker-cache }} default-branch: ${{ steps.selective-checks.outputs.default-branch }} disable-airflow-repo-cache: ${{ steps.selective-checks.outputs.disable-airflow-repo-cache }} + force-pip: ${{ steps.selective-checks.outputs.force-pip }} constraints-branch: ${{ steps.selective-checks.outputs.default-constraints-branch }} runs-on-as-json-default: ${{ steps.selective-checks.outputs.runs-on-as-json-default }} runs-on-as-json-public: ${{ steps.selective-checks.outputs.runs-on-as-json-public }} @@ -203,7 +204,7 @@ jobs: pull-request-target: "true" is-committer-build: ${{ needs.build-info.outputs.is-committer-build }} push-image: "true" - use-uv: "true" + use-uv: ${{ needs.build-info.outputs.force-pip && 'false' || 'true' }} image-tag: ${{ needs.build-info.outputs.image-tag }} platform: "linux/amd64" python-versions: ${{ needs.build-info.outputs.python-versions }} @@ -248,7 +249,7 @@ jobs: pull-request-target: "true" is-committer-build: ${{ needs.build-info.outputs.is-committer-build }} push-image: "true" - use-uv: "true" + use-uv: ${{ needs.build-info.outputs.force-pip && 'false' || 'true' }} image-tag: ${{ needs.build-info.outputs.image-tag }} platform: linux/amd64 python-versions: ${{ needs.build-info.outputs.python-versions }} diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fdf63640af9a..956cb761f811 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -77,6 +77,7 @@ jobs: default-mysql-version: ${{ steps.selective-checks.outputs.default-mysql-version }} default-helm-version: ${{ steps.selective-checks.outputs.default-helm-version }} default-kind-version: ${{ steps.selective-checks.outputs.default-kind-version }} + force-pip: ${{ steps.selective-checks.outputs.force-pip }} full-tests-needed: ${{ steps.selective-checks.outputs.full-tests-needed }} parallel-test-types-list-as-string: >- ${{ steps.selective-checks.outputs.parallel-test-types-list-as-string }} @@ -205,7 +206,7 @@ jobs: platform: "linux/amd64" python-versions: ${{ needs.build-info.outputs.python-versions }} branch: ${{ needs.build-info.outputs.default-branch }} - use-uv: "true" + use-uv: ${{ needs.build-info.outputs.force-pip && 'false' || 'true' }} upgrade-to-newer-dependencies: ${{ needs.build-info.outputs.upgrade-to-newer-dependencies }} constraints-branch: ${{ needs.build-info.outputs.default-constraints-branch }} docker-cache: ${{ needs.build-info.outputs.docker-cache }} @@ -271,6 +272,7 @@ jobs: latest-versions-only: ${{ needs.build-info.outputs.latest-versions-only }} include-success-outputs: ${{ needs.build-info.outputs.include-success-outputs }} debug-resources: ${{ needs.build-info.outputs.debug-resources }} + use-uv: ${{ needs.build-info.outputs.force-pip && 'false' || 'true' }} generate-constraints: @@ -556,7 +558,7 @@ jobs: default-python-version: ${{ needs.build-info.outputs.default-python-version }} branch: ${{ needs.build-info.outputs.default-branch }} push-image: "true" - use-uv: "true" + use-uv: ${{ needs.build-info.outputs.force-pip && 'false' || 'true' }} build-provider-packages: ${{ needs.build-info.outputs.default-branch == 'main' }} upgrade-to-newer-dependencies: ${{ needs.build-info.outputs.upgrade-to-newer-dependencies }} chicken-egg-providers: ${{ needs.build-info.outputs.chicken-egg-providers }} diff --git a/.github/workflows/finalize-tests.yml b/.github/workflows/finalize-tests.yml index 6fae105e0a64..c948984ee10c 100644 --- a/.github/workflows/finalize-tests.yml +++ b/.github/workflows/finalize-tests.yml @@ -149,7 +149,7 @@ jobs: python-versions: ${{ inputs.python-versions }} branch: ${{ inputs.branch }} constraints-branch: ${{ inputs.constraints-branch }} - use-uv: "true" + use-uv: ${{ needs.build-info.outputs.force-pip && 'false' || 'true' }} include-success-outputs: ${{ inputs.include-success-outputs }} docker-cache: ${{ inputs.docker-cache }} disable-airflow-repo-cache: ${{ inputs.disable-airflow-repo-cache }} diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f6112cea4c20..90b519448770 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -234,7 +234,7 @@ repos: additional_dependencies: ["libcst>=1.1.0"] files: ^(providers/src/)?airflow/.*/(sensors|operators)/.*\.py$ - repo: https://github.com/asottile/blacken-docs - rev: 1.19.0 + rev: 1.19.1 hooks: - id: blacken-docs name: Run black on docs diff --git a/Dockerfile b/Dockerfile index 21e6af5e123c..ac3e810136f6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -49,8 +49,13 @@ ARG AIRFLOW_VERSION="2.10.2" ARG PYTHON_BASE_IMAGE="python:3.9-slim-bookworm" -ARG AIRFLOW_PIP_VERSION=24.2 -ARG AIRFLOW_UV_VERSION=0.4.26 + +# You can swap comments between those two args to test pip from the main version +# When you attempt to test if the version of `pip` from specified branch works for our builds +# Also use `force pip` label on your PR to swap all places we use `uv` to `pip` +ARG AIRFLOW_PIP_VERSION=24.3.1 +# ARG AIRFLOW_PIP_VERSION="git+https://github.com/pypa/pip.git@main" +ARG AIRFLOW_UV_VERSION=0.4.27 ARG AIRFLOW_USE_UV="false" ARG UV_HTTP_TIMEOUT="300" ARG AIRFLOW_IMAGE_REPOSITORY="https://github.com/apache/airflow" @@ -615,7 +620,7 @@ function common::install_packaging_tools() { echo "${COLOR_BLUE}Installing latest pip version${COLOR_RESET}" echo pip install --root-user-action ignore --disable-pip-version-check --upgrade pip - elif [[ ! ${AIRFLOW_PIP_VERSION} =~ [0-9.]* ]]; then + elif [[ ! ${AIRFLOW_PIP_VERSION} =~ ^[0-9].* ]]; then echo echo "${COLOR_BLUE}Installing pip version from spec ${AIRFLOW_PIP_VERSION}${COLOR_RESET}" echo @@ -628,7 +633,6 @@ function common::install_packaging_tools() { echo echo "${COLOR_BLUE}(Re)Installing pip version: ${AIRFLOW_PIP_VERSION}${COLOR_RESET}" echo - # shellcheck disable=SC2086 pip install --root-user-action ignore --disable-pip-version-check "pip==${AIRFLOW_PIP_VERSION}" fi fi @@ -637,7 +641,7 @@ function common::install_packaging_tools() { echo "${COLOR_BLUE}Installing latest uv version${COLOR_RESET}" echo pip install --root-user-action ignore --disable-pip-version-check --upgrade uv - elif [[ ! ${AIRFLOW_UV_VERSION} =~ [0-9.]* ]]; then + elif [[ ! ${AIRFLOW_UV_VERSION} =~ ^[0-9].* ]]; then echo echo "${COLOR_BLUE}Installing uv version from spec ${AIRFLOW_UV_VERSION}${COLOR_RESET}" echo diff --git a/Dockerfile.ci b/Dockerfile.ci index 357f0dd2a6fe..dadd6c827f57 100644 --- a/Dockerfile.ci +++ b/Dockerfile.ci @@ -561,7 +561,7 @@ function common::install_packaging_tools() { echo "${COLOR_BLUE}Installing latest pip version${COLOR_RESET}" echo pip install --root-user-action ignore --disable-pip-version-check --upgrade pip - elif [[ ! ${AIRFLOW_PIP_VERSION} =~ [0-9.]* ]]; then + elif [[ ! ${AIRFLOW_PIP_VERSION} =~ ^[0-9].* ]]; then echo echo "${COLOR_BLUE}Installing pip version from spec ${AIRFLOW_PIP_VERSION}${COLOR_RESET}" echo @@ -574,7 +574,6 @@ function common::install_packaging_tools() { echo echo "${COLOR_BLUE}(Re)Installing pip version: ${AIRFLOW_PIP_VERSION}${COLOR_RESET}" echo - # shellcheck disable=SC2086 pip install --root-user-action ignore --disable-pip-version-check "pip==${AIRFLOW_PIP_VERSION}" fi fi @@ -583,7 +582,7 @@ function common::install_packaging_tools() { echo "${COLOR_BLUE}Installing latest uv version${COLOR_RESET}" echo pip install --root-user-action ignore --disable-pip-version-check --upgrade uv - elif [[ ! ${AIRFLOW_UV_VERSION} =~ [0-9.]* ]]; then + elif [[ ! ${AIRFLOW_UV_VERSION} =~ ^[0-9].* ]]; then echo echo "${COLOR_BLUE}Installing uv version from spec ${AIRFLOW_UV_VERSION}${COLOR_RESET}" echo @@ -1362,8 +1361,12 @@ RUN bash /scripts/docker/install_packaging_tools.sh; \ # Here we fix the versions so all subsequent commands will use the versions # from the sources -ARG AIRFLOW_PIP_VERSION=24.2 -ARG AIRFLOW_UV_VERSION=0.4.26 +# You can swap comments between those two args to test pip from the main version +# When you attempt to test if the version of `pip` from specified branch works for our builds +# Also use `force pip` label on your PR to swap all places we use `uv` to `pip` +ARG AIRFLOW_PIP_VERSION=24.3.1 +# ARG AIRFLOW_PIP_VERSION="git+https://github.com/pypa/pip.git@main" +ARG AIRFLOW_UV_VERSION=0.4.27 ENV AIRFLOW_PIP_VERSION=${AIRFLOW_PIP_VERSION} \ AIRFLOW_UV_VERSION=${AIRFLOW_UV_VERSION} diff --git a/INTHEWILD.md b/INTHEWILD.md index 310d018b9832..bdcb4973d78c 100644 --- a/INTHEWILD.md +++ b/INTHEWILD.md @@ -158,6 +158,7 @@ Currently, **officially** using Airflow: 1. [Cryptalizer.com](https://www.cryptalizer.com/) 1. [Currency](https://www.gocurrency.com/) [[@FCLI](https://github.com/FCLI) & [@alexbegg](https://github.com/alexbegg)] 1. [Custom Ink](https://www.customink.com/) [[@david-dalisay](https://github.com/david-dalisay), [@dmartin11](https://github.com/dmartin11) & [@mpeteuil](https://github.com/mpeteuil)] +1. [Cyberdino](https://www.cyberdino.io) [[@cyberdino-io](https://github.com/cyberdino-io)] 1. [Cyscale](https://cyscale.com) [[@ocical](https://github.com/ocical)] 1. [Dailymotion](http://www.dailymotion.com/fr) [[@germaintanguy](https://github.com/germaintanguy) & [@hc](https://github.com/hc)] 1. [DANA](https://www.dana.id/) [[@imamdigmi](https://github.com/imamdigmi)] @@ -383,7 +384,7 @@ Currently, **officially** using Airflow: 1. [Paxful](https://paxful.com) [[@ne1r0n](https://github.com/ne1r0n)] 1. [PayFit](https://payfit.com) [[@pcorbel](https://github.com/pcorbel)] 1. [PAYMILL](https://www.paymill.com/) [[@paymill](https://github.com/paymill) & [@matthiashuschle](https://github.com/matthiashuschle)] -1. [PayPal](https://www.paypal.com/) [[@r39132](https://github.com/r39132) & [@jhsenjaliya](https://github.com/jhsenjaliya)] +1. [PayPal](https://www.paypal.com/) [[@kaddynator](https://github.com/kaddynator), [@r39132](https://github.com/r39132) & [@jhsenjaliya](https://github.com/jhsenjaliya)] 1. [Pecan](https://www.pecan.ai) [[@ohadmata](https://github.com/ohadmata)] 1. [Pernod-Ricard](https://www.pernod-ricard.com/) [[@romain-nio](https://github.com/romain-nio)] 1. [PEXA](https://www.pexa.com.au/) [[@andriyfedorov](https://github.com/andriyfedorov)] diff --git a/airflow/api_connexion/endpoints/dag_endpoint.py b/airflow/api_connexion/endpoints/dag_endpoint.py index 0352297bfffd..352bf9cfd4c1 100644 --- a/airflow/api_connexion/endpoints/dag_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_endpoint.py @@ -134,7 +134,7 @@ def get_dags( try: dags_collection_schema = ( - DAGCollectionSchema(only=[f"dags.{field}" for field in fields]) + DAGCollectionSchema(only=[f"dags.{field}" for field in fields] + ["total_entries"]) if fields else DAGCollectionSchema() ) diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index 0e98173f68a5..eff16f54066e 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -425,6 +425,7 @@ def get_task_instances_batch(session: Session = NEW_SESSION) -> APIResponse: except _UnsupportedOrderBy as e: raise BadRequest(detail=f"Ordering with {e.order_by!r} is not supported") + ti_query = ti_query.offset(data["page_offset"]).limit(data["page_limit"]) task_instances = session.scalars(ti_query) return task_instance_collection_schema.dump( @@ -533,9 +534,11 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION detail=f"Task instance not found for task {task_id!r} on execution_date {execution_date}" ) - if run_id and not session.get( - TI, {"task_id": task_id, "dag_id": dag_id, "run_id": run_id, "map_index": -1} - ): + select_stmt = select(TI).where( + TI.dag_id == dag_id, TI.task_id == task_id, TI.run_id == run_id, TI.map_index == -1 + ) + + if run_id and not session.scalars(select_stmt).one_or_none(): error_message = f"Task instance not found for task {task_id!r} on DAG run with ID {run_id!r}" raise NotFound(detail=error_message) @@ -581,10 +584,12 @@ def patch_task_instance( if not dag.has_task(task_id): raise NotFound("Task not found", detail=f"Task {task_id!r} not found in DAG {dag_id!r}") - ti: TI | None = session.get( - TI, {"task_id": task_id, "dag_id": dag_id, "run_id": dag_run_id, "map_index": map_index} + select_stmt = select(TI).where( + TI.dag_id == dag_id, TI.task_id == task_id, TI.run_id == dag_run_id, TI.map_index == map_index ) + ti: TI | None = session.scalars(select_stmt).one_or_none() + if not ti: error_message = f"Task instance not found for task {task_id!r} on DAG run with ID {dag_run_id!r}" raise NotFound(detail=error_message) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index 62d42264073f..252d4cd6ed3e 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -5038,6 +5038,15 @@ components: ListTaskInstanceForm: type: object properties: + page_offset: + type: integer + minimum: 0 + description: The number of items to skip before starting to collect the result set. + page_limit: + type: integer + minimum: 1 + default: 100 + description: The numbers of items to return. dag_ids: type: array items: diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 76c28214100f..a6ac7ac79d28 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -73,6 +73,103 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /ui/dags/recent_dag_runs: + get: + tags: + - Dags + summary: Recent Dag Runs + description: Get recent DAG runs. + operationId: recent_dag_runs + parameters: + - name: dag_runs_limit + in: query + required: false + schema: + type: integer + default: 10 + title: Dag Runs Limit + - name: limit + in: query + required: false + schema: + type: integer + default: 100 + title: Limit + - name: offset + in: query + required: false + schema: + type: integer + default: 0 + title: Offset + - name: tags + in: query + required: false + schema: + type: array + items: + type: string + title: Tags + - name: owners + in: query + required: false + schema: + type: array + items: + type: string + title: Owners + - name: dag_id_pattern + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: Dag Id Pattern + - name: dag_display_name_pattern + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: Dag Display Name Pattern + - name: only_active + in: query + required: false + schema: + type: boolean + default: true + title: Only Active + - name: paused + in: query + required: false + schema: + anyOf: + - type: boolean + - type: 'null' + title: Paused + - name: last_dag_run_state + in: query + required: false + schema: + anyOf: + - $ref: '#/components/schemas/DagRunState' + - type: 'null' + title: Last Dag Run State + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/DAGWithLatestDagRunsCollectionResponse' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/dags/: get: tags: @@ -2093,6 +2190,181 @@ components: - total_entries title: DAGTagCollectionResponse description: DAG Tags Collection serializer for responses. + DAGWithLatestDagRunsCollectionResponse: + properties: + total_entries: + type: integer + title: Total Entries + dags: + items: + $ref: '#/components/schemas/DAGWithLatestDagRunsResponse' + type: array + title: Dags + type: object + required: + - total_entries + - dags + title: DAGWithLatestDagRunsCollectionResponse + description: DAG with latest dag runs collection response serializer. + DAGWithLatestDagRunsResponse: + properties: + dag_id: + type: string + title: Dag Id + dag_display_name: + type: string + title: Dag Display Name + is_paused: + type: boolean + title: Is Paused + is_active: + type: boolean + title: Is Active + last_parsed_time: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Last Parsed Time + last_pickled: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Last Pickled + last_expired: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Last Expired + scheduler_lock: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Scheduler Lock + pickle_id: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Pickle Id + default_view: + anyOf: + - type: string + - type: 'null' + title: Default View + fileloc: + type: string + title: Fileloc + description: + anyOf: + - type: string + - type: 'null' + title: Description + timetable_summary: + anyOf: + - type: string + - type: 'null' + title: Timetable Summary + timetable_description: + anyOf: + - type: string + - type: 'null' + title: Timetable Description + tags: + items: + $ref: '#/components/schemas/DagTagPydantic' + type: array + title: Tags + max_active_tasks: + type: integer + title: Max Active Tasks + max_active_runs: + anyOf: + - type: integer + - type: 'null' + title: Max Active Runs + max_consecutive_failed_dag_runs: + type: integer + title: Max Consecutive Failed Dag Runs + has_task_concurrency_limits: + type: boolean + title: Has Task Concurrency Limits + has_import_errors: + type: boolean + title: Has Import Errors + next_dagrun: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Next Dagrun + next_dagrun_data_interval_start: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Next Dagrun Data Interval Start + next_dagrun_data_interval_end: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Next Dagrun Data Interval End + next_dagrun_create_after: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Next Dagrun Create After + owners: + items: + type: string + type: array + title: Owners + latest_dag_runs: + items: + $ref: '#/components/schemas/DAGRunResponse' + type: array + title: Latest Dag Runs + file_token: + type: string + title: File Token + description: Return file token. + readOnly: true + type: object + required: + - dag_id + - dag_display_name + - is_paused + - is_active + - last_parsed_time + - last_pickled + - last_expired + - scheduler_lock + - pickle_id + - default_view + - fileloc + - description + - timetable_summary + - timetable_description + - tags + - max_active_tasks + - max_active_runs + - max_consecutive_failed_dag_runs + - has_task_concurrency_limits + - has_import_errors + - next_dagrun + - next_dagrun_data_interval_start + - next_dagrun_data_interval_end + - next_dagrun_create_after + - owners + - latest_dag_runs + - file_token + title: DAGWithLatestDagRunsResponse + description: DAG with latest dag runs response serializer. DagProcessorInfoSchema: properties: status: diff --git a/airflow/api_fastapi/core_api/routes/ui/__init__.py b/airflow/api_fastapi/core_api/routes/ui/__init__.py index 9cd16fcdd16b..b7ebf9c5c46f 100644 --- a/airflow/api_fastapi/core_api/routes/ui/__init__.py +++ b/airflow/api_fastapi/core_api/routes/ui/__init__.py @@ -18,9 +18,11 @@ from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.routes.ui.assets import assets_router +from airflow.api_fastapi.core_api.routes.ui.dags import dags_router from airflow.api_fastapi.core_api.routes.ui.dashboard import dashboard_router ui_router = AirflowRouter(prefix="/ui") ui_router.include_router(assets_router) ui_router.include_router(dashboard_router) +ui_router.include_router(dags_router) diff --git a/airflow/api_fastapi/core_api/routes/ui/dags.py b/airflow/api_fastapi/core_api/routes/ui/dags.py new file mode 100644 index 000000000000..665373734bb9 --- /dev/null +++ b/airflow/api_fastapi/core_api/routes/ui/dags.py @@ -0,0 +1,133 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from fastapi import Depends +from sqlalchemy import and_, func, select +from sqlalchemy.orm import Session +from typing_extensions import Annotated + +from airflow.api_fastapi.common.db.common import ( + get_session, + paginated_select, +) +from airflow.api_fastapi.common.parameters import ( + QueryDagDisplayNamePatternSearch, + QueryDagIdPatternSearch, + QueryLastDagRunStateFilter, + QueryLimit, + QueryOffset, + QueryOnlyActiveFilter, + QueryOwnersFilter, + QueryPausedFilter, + QueryTagsFilter, +) +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.serializers.dag_run import DAGRunResponse +from airflow.api_fastapi.core_api.serializers.dags import DAGResponse +from airflow.api_fastapi.core_api.serializers.ui.dags import ( + DAGWithLatestDagRunsCollectionResponse, + DAGWithLatestDagRunsResponse, +) +from airflow.models import DagModel, DagRun + +dags_router = AirflowRouter(prefix="/dags", tags=["Dags"]) + + +@dags_router.get("/recent_dag_runs", include_in_schema=False, response_model_exclude_none=True) +async def recent_dag_runs( + limit: QueryLimit, + offset: QueryOffset, + tags: QueryTagsFilter, + owners: QueryOwnersFilter, + dag_id_pattern: QueryDagIdPatternSearch, + dag_display_name_pattern: QueryDagDisplayNamePatternSearch, + only_active: QueryOnlyActiveFilter, + paused: QueryPausedFilter, + last_dag_run_state: QueryLastDagRunStateFilter, + session: Annotated[Session, Depends(get_session)], + dag_runs_limit: int = 10, +) -> DAGWithLatestDagRunsCollectionResponse: + """Get recent DAG runs.""" + recent_runs_subquery = ( + select( + DagRun.dag_id, + DagRun.execution_date, + func.rank() + .over( + partition_by=DagRun.dag_id, + order_by=DagRun.execution_date.desc(), + ) + .label("rank"), + ) + .order_by(DagRun.execution_date.desc()) + .subquery() + ) + dags_with_recent_dag_runs_select = ( + select( + DagRun, + DagModel, + recent_runs_subquery.c.execution_date, + ) + .join(DagModel, DagModel.dag_id == recent_runs_subquery.c.dag_id) + .join( + DagRun, + and_( + DagRun.dag_id == DagModel.dag_id, + DagRun.execution_date == recent_runs_subquery.c.execution_date, + ), + ) + .where(recent_runs_subquery.c.rank <= dag_runs_limit) + .group_by( + DagModel.dag_id, + recent_runs_subquery.c.execution_date, + DagRun.execution_date, + DagRun.id, + ) + .order_by(recent_runs_subquery.c.execution_date.desc()) + ) + dags_with_recent_dag_runs_select_filter, _ = paginated_select( + dags_with_recent_dag_runs_select, + [only_active, paused, dag_id_pattern, dag_display_name_pattern, tags, owners, last_dag_run_state], + None, + offset, + limit, + ) + dags_with_recent_dag_runs = session.execute(dags_with_recent_dag_runs_select_filter) + # aggregate rows by dag_id + dag_runs_by_dag_id: dict[str, DAGWithLatestDagRunsResponse] = {} + + for row in dags_with_recent_dag_runs: + dag_run, dag, *_ = row + dag_id = dag.dag_id + dag_run_response = DAGRunResponse.model_validate(dag_run, from_attributes=True) + if dag_id not in dag_runs_by_dag_id: + dag_response = DAGResponse.model_validate(dag, from_attributes=True) + dag_runs_by_dag_id[dag_id] = DAGWithLatestDagRunsResponse.model_validate( + { + **dag_response.dict(), + "latest_dag_runs": [dag_run_response], + } + ) + else: + dag_runs_by_dag_id[dag_id].latest_dag_runs.append(dag_run_response) + + return DAGWithLatestDagRunsCollectionResponse( + total_entries=len(dag_runs_by_dag_id), + dags=list(dag_runs_by_dag_id.values()), + ) diff --git a/airflow/api_fastapi/core_api/serializers/ui/__init__.py b/airflow/api_fastapi/core_api/serializers/ui/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/airflow/api_fastapi/core_api/serializers/ui/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/api_fastapi/core_api/serializers/ui/dags.py b/airflow/api_fastapi/core_api/serializers/ui/dags.py new file mode 100644 index 000000000000..f985ce99a972 --- /dev/null +++ b/airflow/api_fastapi/core_api/serializers/ui/dags.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from pydantic import BaseModel + +from airflow.api_fastapi.core_api.serializers.dag_run import DAGRunResponse +from airflow.api_fastapi.core_api.serializers.dags import DAGResponse + + +class DAGWithLatestDagRunsResponse(DAGResponse): + """DAG with latest dag runs response serializer.""" + + latest_dag_runs: list[DAGRunResponse] + + +class DAGWithLatestDagRunsCollectionResponse(BaseModel): + """DAG with latest dag runs collection response serializer.""" + + total_entries: int + dags: list[DAGWithLatestDagRunsResponse] diff --git a/airflow/dag_processing/collection.py b/airflow/dag_processing/collection.py index 034c9c054012..f27f45dda82e 100644 --- a/airflow/dag_processing/collection.py +++ b/airflow/dag_processing/collection.py @@ -37,7 +37,6 @@ from airflow.assets import Asset, AssetAlias from airflow.assets.manager import asset_manager from airflow.models.asset import ( - AssetActive, AssetAliasModel, AssetModel, DagScheduleAssetAliasReference, @@ -277,7 +276,7 @@ class AssetModelOperation(NamedTuple): schedule_asset_references: dict[str, list[Asset]] schedule_asset_alias_references: dict[str, list[AssetAlias]] outlet_references: dict[str, list[tuple[str, Asset]]] - assets: dict[str, Asset] + assets: dict[tuple[str, str], Asset] asset_aliases: dict[str, AssetAlias] @classmethod @@ -300,22 +299,25 @@ def collect(cls, dags: dict[str, DAG]) -> Self: ] for dag_id, dag in dags.items() }, - assets={asset.uri: asset for asset in _find_all_assets(dags.values())}, + assets={(asset.name, asset.uri): asset for asset in _find_all_assets(dags.values())}, asset_aliases={alias.name: alias for alias in _find_all_asset_aliases(dags.values())}, ) return coll - def add_assets(self, *, session: Session) -> dict[str, AssetModel]: + def add_assets(self, *, session: Session) -> dict[tuple[str, str], AssetModel]: # Optimization: skip all database calls if no assets were collected. if not self.assets: return {} - orm_assets: dict[str, AssetModel] = { - am.uri: am for am in session.scalars(select(AssetModel).where(AssetModel.uri.in_(self.assets))) + orm_assets: dict[tuple[str, str], AssetModel] = { + (am.name, am.uri): am + for am in session.scalars( + select(AssetModel).where(tuple_(AssetModel.name, AssetModel.uri).in_(self.assets)) + ) } orm_assets.update( - (model.uri, model) + ((model.name, model.uri), model) for model in asset_manager.create_assets( - [asset for uri, asset in self.assets.items() if uri not in orm_assets], + [asset for name_uri, asset in self.assets.items() if name_uri not in orm_assets], session=session, ) ) @@ -340,24 +342,10 @@ def add_asset_aliases(self, *, session: Session) -> dict[str, AssetAliasModel]: ) return orm_aliases - def add_asset_active_references(self, assets: Collection[AssetModel], *, session: Session) -> None: - existing_entries = set( - session.execute( - select(AssetActive.name, AssetActive.uri).where( - tuple_(AssetActive.name, AssetActive.uri).in_((asset.name, asset.uri) for asset in assets) - ) - ) - ) - session.add_all( - AssetActive.for_asset(asset) - for asset in assets - if (asset.name, asset.uri) not in existing_entries - ) - def add_dag_asset_references( self, dags: dict[str, DagModel], - assets: dict[str, AssetModel], + assets: dict[tuple[str, str], AssetModel], *, session: Session, ) -> None: @@ -369,7 +357,7 @@ def add_dag_asset_references( if not references: dags[dag_id].schedule_asset_references = [] continue - referenced_asset_ids = {asset.id for asset in (assets[r.uri] for r in references)} + referenced_asset_ids = {asset.id for asset in (assets[r.name, r.uri] for r in references)} orm_refs = {r.asset_id: r for r in dags[dag_id].schedule_asset_references} for asset_id, ref in orm_refs.items(): if asset_id not in referenced_asset_ids: @@ -409,7 +397,7 @@ def add_dag_asset_alias_references( def add_task_asset_references( self, dags: dict[str, DagModel], - assets: dict[str, AssetModel], + assets: dict[tuple[str, str], AssetModel], *, session: Session, ) -> None: @@ -423,7 +411,7 @@ def add_task_asset_references( continue referenced_outlets = { (task_id, asset.id) - for task_id, asset in ((task_id, assets[d.uri]) for task_id, d in references) + for task_id, asset in ((task_id, assets[d.name, d.uri]) for task_id, d in references) } orm_refs = {(r.task_id, r.asset_id): r for r in dags[dag_id].task_outlet_asset_references} for key, ref in orm_refs.items(): diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index f030cb75019e..8694f5890ccd 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -28,7 +28,7 @@ from typing import TYPE_CHECKING, Generator, Iterable from setproctitle import setproctitle -from sqlalchemy import delete, event +from sqlalchemy import delete, event, select from airflow import settings from airflow.api_internal.internal_api_call import internal_api_call @@ -533,7 +533,14 @@ def _validate_task_pools_and_update_dag_warnings( ) ) - stored_warnings = set(session.query(DagWarning).filter(DagWarning.dag_id.in_(dag_ids)).all()) + stored_warnings = set( + session.scalars( + select(DagWarning).where( + DagWarning.dag_id.in_(dag_ids), + DagWarning.warning_type == DagWarningType.NONEXISTENT_POOL, + ) + ) + ) for warning_to_delete in stored_warnings - warnings: session.delete(warning_to_delete) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 04ea8c5e6167..15042b0d3f17 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -19,6 +19,7 @@ import itertools import multiprocessing +import operator import os import signal import sys @@ -55,6 +56,7 @@ from airflow.models.dag import DAG, DagModel from airflow.models.dagbag import DagBag from airflow.models.dagrun import DagRun +from airflow.models.dagwarning import DagWarning, DagWarningType from airflow.models.serialized_dag import SerializedDagModel from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance from airflow.stats import Stats @@ -1078,7 +1080,7 @@ def _run_scheduler_loop(self) -> None: timers.call_regular_interval( conf.getfloat("scheduler", "parsing_cleanup_interval"), - self._orphan_unreferenced_assets, + self._update_asset_orphanage, ) if self._standalone_dag_processor: @@ -2068,44 +2070,106 @@ def _cleanup_stale_dags(self, session: Session = NEW_SESSION) -> None: SerializedDagModel.remove_dag(dag_id=dag.dag_id, session=session) session.flush() - def _get_orphaning_identifier(self, asset: AssetModel) -> tuple[str, str]: - self.log.info("Orphaning unreferenced %s", asset) - return asset.name, asset.uri - @provide_session - def _orphan_unreferenced_assets(self, session: Session = NEW_SESSION) -> None: + def _update_asset_orphanage(self, session: Session = NEW_SESSION) -> None: """ - Detect orphaned assets and remove their active entry. + Check assets orphanization and update their active entry. - An orphaned asset is no longer referenced in any DAG schedule parameters or task outlets. + An orphaned asset is no longer referenced in any DAG schedule parameters + or task outlets. Active assets (non-orphaned) have entries in AssetActive + and must have unique names and URIs. """ - orphaned_asset_query = session.scalars( - select(AssetModel) - .join( - DagScheduleAssetReference, - isouter=True, - ) - .join( - TaskOutletAssetReference, - isouter=True, - ) + # Group assets into orphaned=True and orphaned=False groups. + orphaned = ( + (func.count(DagScheduleAssetReference.dag_id) + func.count(TaskOutletAssetReference.dag_id)) == 0 + ).label("orphaned") + asset_reference_query = session.execute( + select(orphaned, AssetModel) + .outerjoin(DagScheduleAssetReference) + .outerjoin(TaskOutletAssetReference) .group_by(AssetModel.id) - .where(AssetModel.active.has()) - .having( - and_( - func.count(DagScheduleAssetReference.dag_id) == 0, - func.count(TaskOutletAssetReference.dag_id) == 0, + .order_by(orphaned) + ) + asset_orphanation: dict[bool, Collection[AssetModel]] = { + orphaned: [asset for _, asset in group] + for orphaned, group in itertools.groupby(asset_reference_query, key=operator.itemgetter(0)) + } + self._orphan_unreferenced_assets(asset_orphanation.get(True, ()), session=session) + self._activate_referenced_assets(asset_orphanation.get(False, ()), session=session) + + @staticmethod + def _orphan_unreferenced_assets(assets: Collection[AssetModel], *, session: Session) -> None: + if assets: + session.execute( + delete(AssetActive).where( + tuple_in_condition((AssetActive.name, AssetActive.uri), ((a.name, a.uri) for a in assets)) + ) + ) + Stats.gauge("asset.orphaned", len(assets)) + + @staticmethod + def _activate_referenced_assets(assets: Collection[AssetModel], *, session: Session) -> None: + if not assets: + return + + active_assets = set( + session.execute( + select(AssetActive.name, AssetActive.uri).where( + tuple_in_condition((AssetActive.name, AssetActive.uri), ((a.name, a.uri) for a in assets)) ) ) ) - orphaning_identifiers = [self._get_orphaning_identifier(asset) for asset in orphaned_asset_query] + active_name_to_uri: dict[str, str] = {name: uri for name, uri in active_assets} + active_uri_to_name: dict[str, str] = {uri: name for name, uri in active_assets} + + def _generate_dag_warnings(offending: AssetModel, attr: str, value: str) -> Iterator[DagWarning]: + for ref in itertools.chain(offending.consuming_dags, offending.producing_tasks): + yield DagWarning( + dag_id=ref.dag_id, + error_type=DagWarningType.ASSET_CONFLICT, + message=f"Cannot activate asset {offending}; {attr} is already associated to {value!r}", + ) + + def _activate_assets_generate_warnings() -> Iterator[DagWarning]: + incoming_name_to_uri: dict[str, str] = {} + incoming_uri_to_name: dict[str, str] = {} + for asset in assets: + if (asset.name, asset.uri) in active_assets: + continue + existing_uri = active_name_to_uri.get(asset.name) or incoming_name_to_uri.get(asset.name) + if existing_uri is not None and existing_uri != asset.uri: + yield from _generate_dag_warnings(asset, "name", existing_uri) + continue + existing_name = active_uri_to_name.get(asset.uri) or incoming_uri_to_name.get(asset.uri) + if existing_name is not None and existing_name != asset.name: + yield from _generate_dag_warnings(asset, "uri", existing_name) + continue + incoming_name_to_uri[asset.name] = asset.uri + incoming_uri_to_name[asset.uri] = asset.name + session.add(AssetActive.for_asset(asset)) + + warnings_to_have = {w.dag_id: w for w in _activate_assets_generate_warnings()} session.execute( - delete(AssetActive).where( - tuple_in_condition((AssetActive.name, AssetActive.uri), orphaning_identifiers) + delete(DagWarning).where( + DagWarning.warning_type == DagWarningType.ASSET_CONFLICT, + DagWarning.dag_id.not_in(warnings_to_have), + ) + ) + existing_warned_dag_ids: set[str] = set( + session.scalars( + select(DagWarning.dag_id).where( + DagWarning.warning_type == DagWarningType.ASSET_CONFLICT, + DagWarning.dag_id.not_in(warnings_to_have), + ) ) ) - Stats.gauge("asset.orphaned", len(orphaning_identifiers)) + for dag_id, warning in warnings_to_have.items(): + if dag_id in existing_warned_dag_ids: + session.merge(warning) + continue + session.add(warning) + existing_warned_dag_ids.add(warning.dag_id) def _executor_to_tis(self, tis: list[TaskInstance]) -> dict[BaseExecutor, list[TaskInstance]]: """Organize TIs into lists per their respective executor.""" diff --git a/airflow/migrations/versions/0026_2_10_0_dag_schedule_dataset_alias_reference.py b/airflow/migrations/versions/0026_2_10_0_dag_schedule_dataset_alias_reference.py index 6d2a25d49e3a..f4c11a7b006a 100644 --- a/airflow/migrations/versions/0026_2_10_0_dag_schedule_dataset_alias_reference.py +++ b/airflow/migrations/versions/0026_2_10_0_dag_schedule_dataset_alias_reference.py @@ -45,14 +45,14 @@ def upgrade(): """Add dag_schedule_dataset_alias_reference table.""" op.create_table( "dag_schedule_dataset_alias_reference", - sa.Column("alias_id", sa.Integer(), nullable=False), + sa.Column("alias_id", sa.Integer(), primary_key=True, nullable=False), sa.Column("dag_id", StringID(), primary_key=True, nullable=False), sa.Column("created_at", airflow.utils.sqlalchemy.UtcDateTime(timezone=True), nullable=False), sa.Column("updated_at", airflow.utils.sqlalchemy.UtcDateTime(timezone=True), nullable=False), sa.ForeignKeyConstraint( ("alias_id",), ["dataset_alias.id"], - name="dsdar_dataset_alias_fkey", + name="dsdar_dataset_fkey", ondelete="CASCADE", ), sa.ForeignKeyConstraint( diff --git a/airflow/migrations/versions/0027_2_10_3_fix_dag_schedule_dataset_alias_reference_naming.py b/airflow/migrations/versions/0027_2_10_3_fix_dag_schedule_dataset_alias_reference_naming.py new file mode 100644 index 000000000000..8fb02d3dcf19 --- /dev/null +++ b/airflow/migrations/versions/0027_2_10_3_fix_dag_schedule_dataset_alias_reference_naming.py @@ -0,0 +1,129 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Rename dag_schedule_dataset_alias_reference constraint names. + +Revision ID: 5f2621c13b39 +Revises: 22ed7efa9da2 +Create Date: 2024-10-25 04:03:33.002701 + +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from alembic import op +from sqlalchemy import inspect + +# revision identifiers, used by Alembic. +revision = "5f2621c13b39" +down_revision = "22ed7efa9da2" +branch_labels = None +depends_on = None +airflow_version = "2.10.3" + +if TYPE_CHECKING: + from alembic.operations.base import BatchOperations + from sqlalchemy.sql.elements import conv + + +def _rename_fk_constraint( + *, + batch_op: BatchOperations, + original_name: str | conv, + new_name: str | conv, + referent_table: str, + local_cols: list[str], + remote_cols: list[str], + ondelete: str, +) -> None: + batch_op.drop_constraint(original_name, type_="foreignkey") + batch_op.create_foreign_key( + constraint_name=new_name, + referent_table=referent_table, + local_cols=local_cols, + remote_cols=remote_cols, + ondelete=ondelete, + ) + + +def upgrade(): + """Rename dag_schedule_dataset_alias_reference constraint.""" + with op.batch_alter_table("dag_schedule_dataset_alias_reference", schema=None) as batch_op: + bind = op.get_context().bind + insp = inspect(bind) + fk_constraints = [fk["name"] for fk in insp.get_foreign_keys("dag_schedule_dataset_alias_reference")] + + # "dsdar_dataset_alias_fkey" was the constraint name defined in the model while "dsdar_dataset_fkey" is the one + # defined in the previous migration. + # Rename this constraint name if user is using the name "dsdar_dataset_fkey". + if "dsdar_dataset_fkey" in fk_constraints: + _rename_fk_constraint( + batch_op=batch_op, + original_name="dsdar_dataset_fkey", + new_name="dsdar_dataset_alias_fkey", + referent_table="dataset_alias", + local_cols=["alias_id"], + remote_cols=["id"], + ondelete="CASCADE", + ) + + # "dsdar_dag_fkey" was the constraint name defined in the model while "dsdar_dag_id_fkey" is the one + # defined in the previous migration. + # Rename this constraint name if user is using the name "dsdar_dag_fkey". + if "dsdar_dag_fkey" in fk_constraints: + _rename_fk_constraint( + batch_op=batch_op, + original_name="dsdar_dag_fkey", + new_name="dsdar_dag_id_fkey", + referent_table="dataset_alias", + local_cols=["alias_id"], + remote_cols=["id"], + ondelete="CASCADE", + ) + + +def downgrade(): + """Undo dag_schedule_dataset_alias_reference constraint rename.""" + with op.batch_alter_table("dag_schedule_dataset_alias_reference", schema=None) as batch_op: + bind = op.get_context().bind + insp = inspect(bind) + fk_constraints = [fk["name"] for fk in insp.get_foreign_keys("dag_schedule_dataset_alias_reference")] + if "dsdar_dataset_alias_fkey" in fk_constraints: + _rename_fk_constraint( + batch_op=batch_op, + original_name="dsdar_dataset_alias_fkey", + new_name="dsdar_dataset_fkey", + referent_table="dataset_alias", + local_cols=["alias_id"], + remote_cols=["id"], + ondelete="CASCADE", + ) + + if "dsdar_dag_id_fkey" in fk_constraints: + _rename_fk_constraint( + batch_op=batch_op, + original_name="dsdar_dag_id_fkey", + new_name="dsdar_dag_fkey", + referent_table="dataset_alias", + local_cols=["alias_id"], + remote_cols=["id"], + ondelete="CASCADE", + ) diff --git a/airflow/migrations/versions/0027_3_0_0_drop_ab_user_id_foreign_key.py b/airflow/migrations/versions/0028_3_0_0_drop_ab_user_id_foreign_key.py similarity index 97% rename from airflow/migrations/versions/0027_3_0_0_drop_ab_user_id_foreign_key.py rename to airflow/migrations/versions/0028_3_0_0_drop_ab_user_id_foreign_key.py index 5c1955948852..f88aaa014bb3 100644 --- a/airflow/migrations/versions/0027_3_0_0_drop_ab_user_id_foreign_key.py +++ b/airflow/migrations/versions/0028_3_0_0_drop_ab_user_id_foreign_key.py @@ -20,7 +20,7 @@ Drop ab_user.id foreign key. Revision ID: 044f740568ec -Revises: 22ed7efa9da2 +Revises: 5f2621c13b39 Create Date: 2024-08-02 07:18:29.830521 """ @@ -31,7 +31,7 @@ # revision identifiers, used by Alembic. revision = "044f740568ec" -down_revision = "22ed7efa9da2" +down_revision = "5f2621c13b39" branch_labels = None depends_on = None airflow_version = "3.0.0" diff --git a/airflow/migrations/versions/0028_3_0_0_remove_is_subdag.py b/airflow/migrations/versions/0029_3_0_0_remove_is_subdag.py similarity index 100% rename from airflow/migrations/versions/0028_3_0_0_remove_is_subdag.py rename to airflow/migrations/versions/0029_3_0_0_remove_is_subdag.py diff --git a/airflow/migrations/versions/0029_3_0_0_rename_schedule_interval_to_timetable_.py b/airflow/migrations/versions/0030_3_0_0_rename_schedule_interval_to_timetable_.py similarity index 100% rename from airflow/migrations/versions/0029_3_0_0_rename_schedule_interval_to_timetable_.py rename to airflow/migrations/versions/0030_3_0_0_rename_schedule_interval_to_timetable_.py diff --git a/airflow/migrations/versions/0030_3_0_0_add_triggered_by_field_to_dagrun.py b/airflow/migrations/versions/0031_3_0_0_add_triggered_by_field_to_dagrun.py similarity index 100% rename from airflow/migrations/versions/0030_3_0_0_add_triggered_by_field_to_dagrun.py rename to airflow/migrations/versions/0031_3_0_0_add_triggered_by_field_to_dagrun.py diff --git a/airflow/migrations/versions/0031_3_0_0_drop_execution_date_unique.py b/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py similarity index 100% rename from airflow/migrations/versions/0031_3_0_0_drop_execution_date_unique.py rename to airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py diff --git a/airflow/migrations/versions/0032_3_0_0_add_tables_for_backfill.py b/airflow/migrations/versions/0033_3_0_0_add_tables_for_backfill.py similarity index 100% rename from airflow/migrations/versions/0032_3_0_0_add_tables_for_backfill.py rename to airflow/migrations/versions/0033_3_0_0_add_tables_for_backfill.py diff --git a/airflow/migrations/versions/0033_3_0_0_remove_redundant_index.py b/airflow/migrations/versions/0034_3_0_0_remove_redundant_index.py similarity index 100% rename from airflow/migrations/versions/0033_3_0_0_remove_redundant_index.py rename to airflow/migrations/versions/0034_3_0_0_remove_redundant_index.py diff --git a/airflow/migrations/versions/0034_3_0_0_update_user_id_type.py b/airflow/migrations/versions/0035_3_0_0_update_user_id_type.py similarity index 100% rename from airflow/migrations/versions/0034_3_0_0_update_user_id_type.py rename to airflow/migrations/versions/0035_3_0_0_update_user_id_type.py diff --git a/airflow/migrations/versions/0035_3_0_0_add_name_field_to_dataset_model.py b/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py similarity index 100% rename from airflow/migrations/versions/0035_3_0_0_add_name_field_to_dataset_model.py rename to airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py diff --git a/airflow/migrations/versions/0036_3_0_0_add_backfill_to_dag_run_model.py b/airflow/migrations/versions/0037_3_0_0_add_backfill_to_dag_run_model.py similarity index 100% rename from airflow/migrations/versions/0036_3_0_0_add_backfill_to_dag_run_model.py rename to airflow/migrations/versions/0037_3_0_0_add_backfill_to_dag_run_model.py diff --git a/airflow/migrations/versions/0037_3_0_0_add_asset_active.py b/airflow/migrations/versions/0038_3_0_0_add_asset_active.py similarity index 100% rename from airflow/migrations/versions/0037_3_0_0_add_asset_active.py rename to airflow/migrations/versions/0038_3_0_0_add_asset_active.py diff --git a/airflow/migrations/versions/0038_3_0_0_tweak_assetaliasmodel_to_match_asset.py b/airflow/migrations/versions/0039_3_0_0_tweak_assetaliasmodel_to_match_asset.py similarity index 100% rename from airflow/migrations/versions/0038_3_0_0_tweak_assetaliasmodel_to_match_asset.py rename to airflow/migrations/versions/0039_3_0_0_tweak_assetaliasmodel_to_match_asset.py diff --git a/airflow/migrations/versions/0039_3_0_0_add_exception_reason_and_logical_date_.py b/airflow/migrations/versions/0040_3_0_0_add_exception_reason_and_logical_date_.py similarity index 100% rename from airflow/migrations/versions/0039_3_0_0_add_exception_reason_and_logical_date_.py rename to airflow/migrations/versions/0040_3_0_0_add_exception_reason_and_logical_date_.py diff --git a/airflow/migrations/versions/0040_3_0_0_rename_dataset_as_asset.py b/airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py similarity index 98% rename from airflow/migrations/versions/0040_3_0_0_rename_dataset_as_asset.py rename to airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py index e400fd5301c5..03836503efe6 100644 --- a/airflow/migrations/versions/0040_3_0_0_rename_dataset_as_asset.py +++ b/airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py @@ -194,7 +194,8 @@ def upgrade(): with op.batch_alter_table("dag_schedule_asset_alias_reference", schema=None) as batch_op: batch_op.drop_constraint("dsdar_dataset_alias_fkey", type_="foreignkey") - batch_op.drop_constraint("dsdar_dag_fkey", type_="foreignkey") + if op.get_bind().dialect.name in ("postgresql", "mysql"): + batch_op.drop_constraint("dsdar_dag_id_fkey", type_="foreignkey") _rename_pk_constraint( batch_op=batch_op, @@ -218,7 +219,7 @@ def upgrade(): ondelete="CASCADE", ) batch_op.create_foreign_key( - constraint_name="dsaar_dag_fkey", + constraint_name="dsaar_dag_id_fkey", referent_table="dag", local_cols=["dag_id"], remote_cols=["dag_id"], @@ -495,7 +496,7 @@ def downgrade(): with op.batch_alter_table("dag_schedule_dataset_alias_reference", schema=None) as batch_op: batch_op.drop_constraint("dsaar_asset_alias_fkey", type_="foreignkey") - batch_op.drop_constraint("dsaar_dag_fkey", type_="foreignkey") + batch_op.drop_constraint("dsaar_dag_id_fkey", type_="foreignkey") _rename_pk_constraint( batch_op=batch_op, @@ -519,7 +520,7 @@ def downgrade(): ondelete="CASCADE", ) batch_op.create_foreign_key( - constraint_name="dsdar_dag_fkey", + constraint_name="dsdar_dag_id_fkey", referent_table="dag", local_cols=["dag_id"], remote_cols=["dag_id"], diff --git a/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py b/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py new file mode 100644 index 000000000000..2abd2116f989 --- /dev/null +++ b/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py @@ -0,0 +1,281 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Add UUID primary key to ``task_instance`` table. + +Revision ID: d59cbbef95eb +Revises: 05234396c6fc +Create Date: 2024-10-21 22:39:12.394079 +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op +from sqlalchemy import text +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "d59cbbef95eb" +down_revision = "05234396c6fc" +branch_labels = "None" +depends_on = None +airflow_version = "3.0.0" + +###### +# The following functions to create UUID v7 are solely for the purpose of this migration. +# This is done for production databases that do not support UUID v7 natively (Postgres, MySQL) +# and used instead of uuids from +# python libraries like uuid6.uuid7() for performance reasons since the task_instance table +# can be very large. +###### + +# PostgreSQL-specific UUID v7 function +pg_uuid7_fn = """ +DO $$ +DECLARE + pgcrypto_installed BOOLEAN; +BEGIN + -- Check if pgcrypto is already installed + pgcrypto_installed := EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pgcrypto'); + + -- Attempt to create pgcrypto if it is not installed + IF NOT pgcrypto_installed THEN + BEGIN + CREATE EXTENSION pgcrypto; + pgcrypto_installed := TRUE; + RAISE NOTICE 'pgcrypto extension successfully created.'; + EXCEPTION + WHEN insufficient_privilege THEN + RAISE NOTICE 'pgcrypto extension could not be installed due to insufficient privileges; using fallback'; + pgcrypto_installed := FALSE; + WHEN OTHERS THEN + RAISE NOTICE 'An unexpected error occurred while attempting to install pgcrypto; using fallback'; + pgcrypto_installed := FALSE; + END; + END IF; +END $$; + +CREATE OR REPLACE FUNCTION uuid_generate_v7(p_timestamp timestamp with time zone) +RETURNS uuid +LANGUAGE plpgsql +PARALLEL SAFE +AS $$ +DECLARE + unix_time_ms CONSTANT bytea NOT NULL DEFAULT substring(int8send((extract(epoch FROM p_timestamp) * 1000)::bigint) from 3); + buffer bytea; + pgcrypto_installed BOOLEAN := EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pgcrypto'); +BEGIN + -- Use pgcrypto if available, otherwise use the fallback + -- fallback from https://brandur.org/fragments/secure-bytes-without-pgcrypto + IF pgcrypto_installed THEN + buffer := unix_time_ms || gen_random_bytes(10); + ELSE + buffer := unix_time_ms || substring(uuid_send(gen_random_uuid()) FROM 1 FOR 5) || + substring(uuid_send(gen_random_uuid()) FROM 12 FOR 5); + END IF; + + -- Set UUID version and variant bits + buffer := set_byte(buffer, 6, (b'0111' || get_byte(buffer, 6)::bit(4))::bit(8)::int); + buffer := set_byte(buffer, 8, (b'10' || get_byte(buffer, 8)::bit(6))::bit(8)::int); + RETURN encode(buffer, 'hex')::uuid; +END +$$; +""" + +pg_uuid7_fn_drop = """ +DROP FUNCTION IF EXISTS uuid_generate_v7(timestamp with time zone); +""" + +# MySQL-specific UUID v7 function +mysql_uuid7_fn = """ +DROP FUNCTION IF EXISTS uuid_generate_v7; +CREATE FUNCTION uuid_generate_v7(p_timestamp DATETIME(3)) +RETURNS CHAR(36) +DETERMINISTIC +BEGIN + DECLARE unix_time_ms BIGINT; + DECLARE time_hex CHAR(12); + DECLARE rand_hex CHAR(24); + DECLARE uuid CHAR(36); + + -- Convert the passed timestamp to milliseconds since epoch + SET unix_time_ms = UNIX_TIMESTAMP(p_timestamp) * 1000; + SET time_hex = LPAD(HEX(unix_time_ms), 12, '0'); + SET rand_hex = CONCAT( + LPAD(HEX(FLOOR(RAND() * POW(2,32))), 8, '0'), + LPAD(HEX(FLOOR(RAND() * POW(2,32))), 8, '0') + ); + SET rand_hex = CONCAT(SUBSTRING(rand_hex, 1, 4), '7', SUBSTRING(rand_hex, 6)); + SET rand_hex = CONCAT(SUBSTRING(rand_hex, 1, 12), '8', SUBSTRING(rand_hex, 14)); + + SET uuid = LOWER(CONCAT( + SUBSTRING(time_hex, 1, 8), '-', + SUBSTRING(time_hex, 9, 4), '-', + SUBSTRING(rand_hex, 1, 4), '-', + SUBSTRING(rand_hex, 5, 4), '-', + SUBSTRING(rand_hex, 9) + )); + + RETURN uuid; +END; +""" + +mysql_uuid7_fn_drop = """ +DROP FUNCTION IF EXISTS uuid_generate_v7; +""" + +ti_table = "task_instance" + +# Foreign key columns from task_instance +ti_fk_cols = ["dag_id", "task_id", "run_id", "map_index"] + +# Foreign key constraints from other tables to task_instance +ti_fk_constraints = [ + {"table": "rendered_task_instance_fields", "fk": "rtif_ti_fkey"}, + {"table": "task_fail", "fk": "task_fail_ti_fkey"}, + {"table": "task_instance_history", "fk": "task_instance_history_ti_fkey"}, + {"table": "task_instance_note", "fk": "task_instance_note_ti_fkey"}, + {"table": "task_map", "fk": "task_map_task_instance_fkey"}, + {"table": "task_reschedule", "fk": "task_reschedule_ti_fkey"}, + {"table": "xcom", "fk": "xcom_task_instance_fkey"}, +] + + +def _get_type_id_column(dialect_name: str) -> sa.types.TypeEngine: + # For PostgreSQL, use the UUID type directly as it is more efficient + if dialect_name == "postgresql": + return postgresql.UUID(as_uuid=False) + # For other databases, use String(36) to match UUID format + else: + return sa.String(36) + + +def upgrade(): + """Add UUID primary key to task instance table.""" + conn = op.get_bind() + dialect_name = conn.dialect.name + + op.add_column("task_instance", sa.Column("id", _get_type_id_column(dialect_name), nullable=True)) + + if dialect_name == "postgresql": + op.execute(pg_uuid7_fn) + + # TODO: Add batching to handle updates in smaller chunks for large tables to avoid locking + # Migrate existing rows with UUID v7 using a timestamp-based generation + op.execute( + "UPDATE task_instance SET id = uuid_generate_v7(coalesce(queued_dttm, start_date, clock_timestamp()))" + ) + + op.execute(pg_uuid7_fn_drop) + + # Drop existing primary key constraint to task_instance table + op.execute("ALTER TABLE IF EXISTS task_instance DROP CONSTRAINT task_instance_pkey CASCADE") + + elif dialect_name == "mysql": + op.execute(mysql_uuid7_fn) + + # Migrate existing rows with UUID v7 + op.execute(""" + UPDATE task_instance + SET id = uuid_generate_v7(coalesce(queued_dttm, start_date, NOW(3))) + WHERE id IS NULL + """) + + # Drop this function as it is no longer needed + op.execute(mysql_uuid7_fn_drop) + for fk in ti_fk_constraints: + op.drop_constraint(fk["fk"], fk["table"], type_="foreignkey") + with op.batch_alter_table("task_instance") as batch_op: + batch_op.drop_constraint("task_instance_pkey", type_="primary") + elif dialect_name == "sqlite": + from uuid6 import uuid7 + + stmt = text("SELECT COUNT(*) FROM task_instance WHERE id IS NULL") + conn = op.get_bind() + task_instances = conn.execute(stmt).scalar() + uuid_values = [str(uuid7()) for _ in range(task_instances)] + + # Ensure `uuid_values` is a list or iterable with the UUIDs for the update. + stmt = text(""" + UPDATE task_instance + SET id = :uuid + WHERE id IS NULL + """) + + for uuid_value in uuid_values: + conn.execute(stmt.bindparams(uuid=uuid_value)) + + with op.batch_alter_table("task_instance") as batch_op: + batch_op.drop_constraint("task_instance_pkey", type_="primary") + + # Add primary key and unique constraint to task_instance table + with op.batch_alter_table("task_instance") as batch_op: + batch_op.alter_column("id", type_=_get_type_id_column(dialect_name), nullable=False) + batch_op.create_unique_constraint("task_instance_composite_key", ti_fk_cols) + batch_op.create_primary_key("task_instance_pkey", ["id"]) + + # Create foreign key constraints + for fk in ti_fk_constraints: + with op.batch_alter_table(fk["table"]) as batch_op: + batch_op.create_foreign_key( + constraint_name=fk["fk"], + referent_table=ti_table, + local_cols=ti_fk_cols, + remote_cols=ti_fk_cols, + ondelete="CASCADE", + ) + + +def downgrade(): + """Drop UUID primary key to task instance table.""" + conn = op.get_bind() + dialect_name = conn.dialect.name + + if dialect_name == "postgresql": + op.execute("ALTER TABLE IF EXISTS task_instance DROP CONSTRAINT task_instance_composite_key CASCADE") + op.execute(pg_uuid7_fn_drop) + + elif dialect_name == "mysql": + for fk in ti_fk_constraints: + op.drop_constraint(fk["fk"], fk["table"], type_="foreignkey") + + with op.batch_alter_table("task_instance") as batch_op: + batch_op.drop_constraint("task_instance_composite_key", type_="unique") + op.execute(mysql_uuid7_fn_drop) + + elif dialect_name == "sqlite": + with op.batch_alter_table("task_instance") as batch_op: + batch_op.drop_constraint("task_instance_composite_key", type_="unique") + + with op.batch_alter_table("task_instance") as batch_op: + batch_op.drop_constraint("task_instance_pkey", type_="primary") + batch_op.drop_column("id") + batch_op.create_primary_key("task_instance_pkey", ti_fk_cols) + + # Re-add foreign key constraints + for fk in ti_fk_constraints: + with op.batch_alter_table(fk["table"]) as batch_op: + batch_op.create_foreign_key( + constraint_name=fk["fk"], + referent_table=ti_table, + local_cols=ti_fk_cols, + remote_cols=ti_fk_cols, + ondelete="CASCADE", + ) diff --git a/airflow/models/asset.py b/airflow/models/asset.py index fc77cb7a31d6..fdfb55143cb3 100644 --- a/airflow/models/asset.py +++ b/airflow/models/asset.py @@ -181,7 +181,7 @@ class AssetModel(Base): created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False) - active = relationship("AssetActive", uselist=False, viewonly=True) + active = relationship("AssetActive", uselist=False, viewonly=True, back_populates="asset") consuming_dags = relationship("DagScheduleAssetReference", back_populates="asset") producing_tasks = relationship("TaskOutletAssetReference", back_populates="asset") @@ -221,7 +221,7 @@ def __hash__(self): return hash((self.name, self.uri)) def __repr__(self): - return f"{self.__class__.__name__}(uri={self.uri!r}, extra={self.extra!r})" + return f"{self.__class__.__name__}(name={self.name!r}, uri={self.uri!r}, extra={self.extra!r})" def to_public(self) -> Asset: return Asset(name=self.name, uri=self.uri, group=self.group, extra=self.extra) @@ -264,6 +264,8 @@ class AssetActive(Base): nullable=False, ) + asset = relationship("AssetModel", back_populates="active") + __tablename__ = "asset_active" __table_args__ = ( PrimaryKeyConstraint(name, uri, name="asset_active_pkey"), @@ -305,7 +307,7 @@ class DagScheduleAssetAliasReference(Base): ForeignKeyConstraint( columns=(dag_id,), refcolumns=["dag.dag_id"], - name="dsaar_dag_fkey", + name="dsaar_dag_id_fkey", ondelete="CASCADE", ), Index("idx_dag_schedule_asset_alias_reference_dag_id", dag_id), diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 00943ec2ee26..fd1c67debe24 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -2571,7 +2571,6 @@ def bulk_write_to_db( orm_asset_aliases = asset_op.add_asset_aliases(session=session) session.flush() # This populates id so we can create fks in later calls. - asset_op.add_asset_active_references(orm_assets.values(), session=session) asset_op.add_dag_asset_references(orm_dags, orm_assets, session=session) asset_op.add_dag_asset_alias_references(orm_dags, orm_asset_aliases, session=session) asset_op.add_task_asset_references(orm_dags, orm_assets, session=session) diff --git a/airflow/models/dagwarning.py b/airflow/models/dagwarning.py index ffab515f8549..e0c271c4c8ec 100644 --- a/airflow/models/dagwarning.py +++ b/airflow/models/dagwarning.py @@ -104,4 +104,5 @@ class DagWarningType(str, Enum): in the DagWarning model. """ + ASSET_CONFLICT = "asset conflict" NONEXISTENT_POOL = "non-existent pool" diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 7cb61bcb61de..9fc4c3b032b2 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -39,6 +39,7 @@ import jinja2 import lazy_object_proxy import pendulum +import uuid6 from jinja2 import TemplateAssertionError, UndefinedError from sqlalchemy import ( Column, @@ -50,6 +51,7 @@ PrimaryKeyConstraint, String, Text, + UniqueConstraint, and_, delete, false, @@ -59,6 +61,7 @@ text, update, ) +from sqlalchemy.dialects import postgresql from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.ext.mutable import MutableDict @@ -794,6 +797,7 @@ def _execute_callable(context: Context, **execute_callable_kwargs): def _set_ti_attrs(target, source, include_dag_run=False): # Fields ordered per model definition + target.id = source.id target.start_date = source.start_date target.end_date = source.end_date target.duration = source.duration @@ -1793,6 +1797,11 @@ def _handle_reschedule( return ti +def uuid7() -> str: + """Generate a new UUID7 string.""" + return str(uuid6.uuid7()) + + class TaskInstance(Base, LoggingMixin): """ Task instances store the state of a task instance. @@ -1813,10 +1822,16 @@ class TaskInstance(Base, LoggingMixin): """ __tablename__ = "task_instance" - task_id = Column(StringID(), primary_key=True, nullable=False) - dag_id = Column(StringID(), primary_key=True, nullable=False) - run_id = Column(StringID(), primary_key=True, nullable=False) - map_index = Column(Integer, primary_key=True, nullable=False, server_default=text("-1")) + id = Column( + String(36).with_variant(postgresql.UUID(as_uuid=False), "postgresql"), + primary_key=True, + default=uuid7, + nullable=False, + ) + task_id = Column(StringID(), nullable=False) + dag_id = Column(StringID(), nullable=False) + run_id = Column(StringID(), nullable=False) + map_index = Column(Integer, nullable=False, server_default=text("-1")) start_date = Column(UtcDateTime) end_date = Column(UtcDateTime) @@ -1869,7 +1884,8 @@ class TaskInstance(Base, LoggingMixin): Index("ti_pool", pool, state, priority_weight), Index("ti_job_id", job_id), Index("ti_trigger_id", trigger_id), - PrimaryKeyConstraint("dag_id", "task_id", "run_id", "map_index", name="task_instance_pkey"), + PrimaryKeyConstraint("id", name="task_instance_pkey"), + UniqueConstraint("dag_id", "task_id", "run_id", "map_index", name="task_instance_composite_key"), ForeignKeyConstraint( [trigger_id], ["trigger.id"], @@ -1938,6 +1954,8 @@ def __init__( self.run_id = run_id self.try_number = 0 self.max_tries = self.task.retries + if not self.id: + self.id = uuid7() self.unixname = getuser() if state: self.state = state diff --git a/airflow/timetables/trigger.py b/airflow/timetables/trigger.py index a4666946fa7b..4488a7fdaf61 100644 --- a/airflow/timetables/trigger.py +++ b/airflow/timetables/trigger.py @@ -21,7 +21,7 @@ from airflow.timetables._cron import CronMixin from airflow.timetables.base import DagRunInfo, DataInterval, Timetable -from airflow.utils import timezone +from airflow.utils.timezone import coerce_datetime, utcnow if TYPE_CHECKING: from dateutil.relativedelta import relativedelta @@ -43,6 +43,24 @@ class CronTriggerTimetable(CronMixin, Timetable): for one data interval to pass. Don't pass ``@once`` in here; use ``OnceTimetable`` instead. + + :param cron: cron string that defines when to run + :param timezone: Which timezone to use to interpret the cron string + :param interval: timedelta that defines the data interval start. Default 0. + + *run_immediately* controls, if no *start_time* is given to the DAG, when + the first run of the DAG should be scheduled. It has no effect if there + already exist runs for this DAG. + + * If *True*, always run immediately the most recent possible DAG run. + * If *False*, wait to run until the next scheduled time in the future. + * If passed a ``timedelta``, will run the most recent possible DAG run + if that run's ``data_interval_end`` is within timedelta of now. + * If *None*, the timedelta is calculated as 10% of the time between the + most recent past scheduled time and the next scheduled time. E.g. if + running every hour, this would run the previous time if less than 6 + minutes had past since the previous run time, otherwise it would wait + until the next hour. """ def __init__( @@ -51,9 +69,11 @@ def __init__( *, timezone: str | Timezone | FixedTimezone, interval: datetime.timedelta | relativedelta = datetime.timedelta(), + run_immediately: bool | datetime.timedelta = False, ) -> None: super().__init__(cron, timezone) self._interval = interval + self.run_immediately = run_immediately @classmethod def deserialize(cls, data: dict[str, Any]) -> Timetable: @@ -64,7 +84,21 @@ def deserialize(cls, data: dict[str, Any]) -> Timetable: interval = decode_relativedelta(data["interval"]) else: interval = datetime.timedelta(seconds=data["interval"]) - return cls(data["expression"], timezone=decode_timezone(data["timezone"]), interval=interval) + + immediate: bool | datetime.timedelta + if "immediate" not in data: + immediate = False + elif isinstance(data["immediate"], float): + immediate = datetime.timedelta(seconds=data["interval"]) + else: + immediate = data["immediate"] + + return cls( + data["expression"], + timezone=decode_timezone(data["timezone"]), + interval=interval, + run_immediately=immediate, + ) def serialize(self) -> dict[str, Any]: from airflow.serialization.serialized_objects import encode_relativedelta, encode_timezone @@ -75,7 +109,17 @@ def serialize(self) -> dict[str, Any]: else: interval = encode_relativedelta(self._interval) timezone = encode_timezone(self._timezone) - return {"expression": self._expression, "timezone": timezone, "interval": interval} + immediate: bool | float + if isinstance(self.run_immediately, datetime.timedelta): + immediate = self.run_immediately.total_seconds() + else: + immediate = self.run_immediately + return { + "expression": self._expression, + "timezone": timezone, + "interval": interval, + "run_immediately": immediate, + } def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval: return DataInterval( @@ -95,13 +139,16 @@ def next_dagrun_info( if last_automated_data_interval is not None: next_start_time = self._get_next(last_automated_data_interval.end) elif restriction.earliest is None: - return None # Don't know where to catch up from, give up. + next_start_time = self._calc_first_run() else: next_start_time = self._align_to_next(restriction.earliest) else: - start_time_candidates = [self._align_to_prev(timezone.coerce_datetime(timezone.utcnow()))] + start_time_candidates = [self._align_to_prev(coerce_datetime(utcnow()))] if last_automated_data_interval is not None: start_time_candidates.append(self._get_next(last_automated_data_interval.end)) + elif restriction.earliest is None: + # Run immediately has no effect if there is restriction on earliest + start_time_candidates.append(self._calc_first_run()) if restriction.earliest is not None: start_time_candidates.append(self._align_to_next(restriction.earliest)) next_start_time = max(start_time_candidates) @@ -113,3 +160,27 @@ def next_dagrun_info( next_start_time - self._interval, # type: ignore[arg-type] next_start_time, ) + + def _calc_first_run(self): + """ + If no start_time is set, determine the start. + + If True, always prefer past run, if False, never. If None, if within 10% of next run, + if timedelta, if within that timedelta from past run. + """ + now = coerce_datetime(utcnow()) + past_run_time = self._align_to_prev(now) + next_run_time = self._align_to_next(now) + if self.run_immediately is True: # not truthy, actually set to True + return past_run_time + + gap_between_runs = next_run_time - past_run_time + gap_to_past = now - past_run_time + if isinstance(self.run_immediately, datetime.timedelta): + buffer_between_runs = self.run_immediately + else: + buffer_between_runs = max(gap_between_runs / 10, datetime.timedelta(minutes=5)) + if gap_to_past <= buffer_between_runs: + return past_run_time + else: + return next_run_time diff --git a/airflow/ui/.prettierrc b/airflow/ui/.prettierrc new file mode 100644 index 000000000000..93ba8a38a47f --- /dev/null +++ b/airflow/ui/.prettierrc @@ -0,0 +1,13 @@ +{ + "$schema": "http://json.schemastore.org/prettierrc", + "endOfLine": "lf", + "importOrder": ["", "^(src|openapi)/", "^[./]"], + "importOrderSeparation": true, + "jsxSingleQuote": false, + "plugins": ["@trivago/prettier-plugin-sort-imports"], + "printWidth": 80, + "singleQuote": false, + "tabWidth": 2, + "trailingComma": "all", + "useTabs": false +} diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index b12c133b6c2e..5fa46e4e91f0 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -6,6 +6,7 @@ import { ConnectionService, DagRunService, DagService, + DagsService, DashboardService, MonitorService, PluginService, @@ -54,6 +55,56 @@ export const UseDashboardServiceHistoricalMetricsKeyFn = ( useDashboardServiceHistoricalMetricsKey, ...(queryKey ?? [{ endDate, startDate }]), ]; +export type DagsServiceRecentDagRunsDefaultResponse = Awaited< + ReturnType +>; +export type DagsServiceRecentDagRunsQueryResult< + TData = DagsServiceRecentDagRunsDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useDagsServiceRecentDagRunsKey = "DagsServiceRecentDagRuns"; +export const UseDagsServiceRecentDagRunsKeyFn = ( + { + dagDisplayNamePattern, + dagIdPattern, + dagRunsLimit, + lastDagRunState, + limit, + offset, + onlyActive, + owners, + paused, + tags, + }: { + dagDisplayNamePattern?: string; + dagIdPattern?: string; + dagRunsLimit?: number; + lastDagRunState?: DagRunState; + limit?: number; + offset?: number; + onlyActive?: boolean; + owners?: string[]; + paused?: boolean; + tags?: string[]; + } = {}, + queryKey?: Array, +) => [ + useDagsServiceRecentDagRunsKey, + ...(queryKey ?? [ + { + dagDisplayNamePattern, + dagIdPattern, + dagRunsLimit, + lastDagRunState, + limit, + offset, + onlyActive, + owners, + paused, + tags, + }, + ]), +]; export type DagServiceGetDagsDefaultResponse = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 3f681a4a13b6..72b4376751f7 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -6,6 +6,7 @@ import { ConnectionService, DagRunService, DagService, + DagsService, DashboardService, MonitorService, PluginService, @@ -62,6 +63,76 @@ export const prefetchUseDashboardServiceHistoricalMetrics = ( }), queryFn: () => DashboardService.historicalMetrics({ endDate, startDate }), }); +/** + * Recent Dag Runs + * Get recent DAG runs. + * @param data The data for the request. + * @param data.dagRunsLimit + * @param data.limit + * @param data.offset + * @param data.tags + * @param data.owners + * @param data.dagIdPattern + * @param data.dagDisplayNamePattern + * @param data.onlyActive + * @param data.paused + * @param data.lastDagRunState + * @returns DAGWithLatestDagRunsCollectionResponse Successful Response + * @throws ApiError + */ +export const prefetchUseDagsServiceRecentDagRuns = ( + queryClient: QueryClient, + { + dagDisplayNamePattern, + dagIdPattern, + dagRunsLimit, + lastDagRunState, + limit, + offset, + onlyActive, + owners, + paused, + tags, + }: { + dagDisplayNamePattern?: string; + dagIdPattern?: string; + dagRunsLimit?: number; + lastDagRunState?: DagRunState; + limit?: number; + offset?: number; + onlyActive?: boolean; + owners?: string[]; + paused?: boolean; + tags?: string[]; + } = {}, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseDagsServiceRecentDagRunsKeyFn({ + dagDisplayNamePattern, + dagIdPattern, + dagRunsLimit, + lastDagRunState, + limit, + offset, + onlyActive, + owners, + paused, + tags, + }), + queryFn: () => + DagsService.recentDagRuns({ + dagDisplayNamePattern, + dagIdPattern, + dagRunsLimit, + lastDagRunState, + limit, + offset, + onlyActive, + owners, + paused, + tags, + }), + }); /** * Get Dags * Get all DAGs. diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index e3942ad84e08..ce319bc6cd67 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -11,6 +11,7 @@ import { ConnectionService, DagRunService, DagService, + DagsService, DashboardService, MonitorService, PluginService, @@ -86,6 +87,85 @@ export const useDashboardServiceHistoricalMetrics = < DashboardService.historicalMetrics({ endDate, startDate }) as TData, ...options, }); +/** + * Recent Dag Runs + * Get recent DAG runs. + * @param data The data for the request. + * @param data.dagRunsLimit + * @param data.limit + * @param data.offset + * @param data.tags + * @param data.owners + * @param data.dagIdPattern + * @param data.dagDisplayNamePattern + * @param data.onlyActive + * @param data.paused + * @param data.lastDagRunState + * @returns DAGWithLatestDagRunsCollectionResponse Successful Response + * @throws ApiError + */ +export const useDagsServiceRecentDagRuns = < + TData = Common.DagsServiceRecentDagRunsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagDisplayNamePattern, + dagIdPattern, + dagRunsLimit, + lastDagRunState, + limit, + offset, + onlyActive, + owners, + paused, + tags, + }: { + dagDisplayNamePattern?: string; + dagIdPattern?: string; + dagRunsLimit?: number; + lastDagRunState?: DagRunState; + limit?: number; + offset?: number; + onlyActive?: boolean; + owners?: string[]; + paused?: boolean; + tags?: string[]; + } = {}, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseDagsServiceRecentDagRunsKeyFn( + { + dagDisplayNamePattern, + dagIdPattern, + dagRunsLimit, + lastDagRunState, + limit, + offset, + onlyActive, + owners, + paused, + tags, + }, + queryKey, + ), + queryFn: () => + DagsService.recentDagRuns({ + dagDisplayNamePattern, + dagIdPattern, + dagRunsLimit, + lastDagRunState, + limit, + offset, + onlyActive, + owners, + paused, + tags, + }) as TData, + ...options, + }); /** * Get Dags * Get all DAGs. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index eb91e8f1ba93..cd7bc95fa5b5 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -6,6 +6,7 @@ import { ConnectionService, DagRunService, DagService, + DagsService, DashboardService, MonitorService, PluginService, @@ -75,6 +76,85 @@ export const useDashboardServiceHistoricalMetricsSuspense = < DashboardService.historicalMetrics({ endDate, startDate }) as TData, ...options, }); +/** + * Recent Dag Runs + * Get recent DAG runs. + * @param data The data for the request. + * @param data.dagRunsLimit + * @param data.limit + * @param data.offset + * @param data.tags + * @param data.owners + * @param data.dagIdPattern + * @param data.dagDisplayNamePattern + * @param data.onlyActive + * @param data.paused + * @param data.lastDagRunState + * @returns DAGWithLatestDagRunsCollectionResponse Successful Response + * @throws ApiError + */ +export const useDagsServiceRecentDagRunsSuspense = < + TData = Common.DagsServiceRecentDagRunsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagDisplayNamePattern, + dagIdPattern, + dagRunsLimit, + lastDagRunState, + limit, + offset, + onlyActive, + owners, + paused, + tags, + }: { + dagDisplayNamePattern?: string; + dagIdPattern?: string; + dagRunsLimit?: number; + lastDagRunState?: DagRunState; + limit?: number; + offset?: number; + onlyActive?: boolean; + owners?: string[]; + paused?: boolean; + tags?: string[]; + } = {}, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseDagsServiceRecentDagRunsKeyFn( + { + dagDisplayNamePattern, + dagIdPattern, + dagRunsLimit, + lastDagRunState, + limit, + offset, + onlyActive, + owners, + paused, + tags, + }, + queryKey, + ), + queryFn: () => + DagsService.recentDagRuns({ + dagDisplayNamePattern, + dagIdPattern, + dagRunsLimit, + lastDagRunState, + limit, + offset, + onlyActive, + owners, + paused, + tags, + }) as TData, + ...options, + }); /** * Get Dags * Get all DAGs. diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index b940646a69a7..a30712d02ace 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -1122,6 +1122,289 @@ export const $DAGTagCollectionResponse = { description: "DAG Tags Collection serializer for responses.", } as const; +export const $DAGWithLatestDagRunsCollectionResponse = { + properties: { + total_entries: { + type: "integer", + title: "Total Entries", + }, + dags: { + items: { + $ref: "#/components/schemas/DAGWithLatestDagRunsResponse", + }, + type: "array", + title: "Dags", + }, + }, + type: "object", + required: ["total_entries", "dags"], + title: "DAGWithLatestDagRunsCollectionResponse", + description: "DAG with latest dag runs collection response serializer.", +} as const; + +export const $DAGWithLatestDagRunsResponse = { + properties: { + dag_id: { + type: "string", + title: "Dag Id", + }, + dag_display_name: { + type: "string", + title: "Dag Display Name", + }, + is_paused: { + type: "boolean", + title: "Is Paused", + }, + is_active: { + type: "boolean", + title: "Is Active", + }, + last_parsed_time: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Last Parsed Time", + }, + last_pickled: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Last Pickled", + }, + last_expired: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Last Expired", + }, + scheduler_lock: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Scheduler Lock", + }, + pickle_id: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Pickle Id", + }, + default_view: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Default View", + }, + fileloc: { + type: "string", + title: "Fileloc", + }, + description: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Description", + }, + timetable_summary: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Timetable Summary", + }, + timetable_description: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Timetable Description", + }, + tags: { + items: { + $ref: "#/components/schemas/DagTagPydantic", + }, + type: "array", + title: "Tags", + }, + max_active_tasks: { + type: "integer", + title: "Max Active Tasks", + }, + max_active_runs: { + anyOf: [ + { + type: "integer", + }, + { + type: "null", + }, + ], + title: "Max Active Runs", + }, + max_consecutive_failed_dag_runs: { + type: "integer", + title: "Max Consecutive Failed Dag Runs", + }, + has_task_concurrency_limits: { + type: "boolean", + title: "Has Task Concurrency Limits", + }, + has_import_errors: { + type: "boolean", + title: "Has Import Errors", + }, + next_dagrun: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Next Dagrun", + }, + next_dagrun_data_interval_start: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Next Dagrun Data Interval Start", + }, + next_dagrun_data_interval_end: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Next Dagrun Data Interval End", + }, + next_dagrun_create_after: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Next Dagrun Create After", + }, + owners: { + items: { + type: "string", + }, + type: "array", + title: "Owners", + }, + latest_dag_runs: { + items: { + $ref: "#/components/schemas/DAGRunResponse", + }, + type: "array", + title: "Latest Dag Runs", + }, + file_token: { + type: "string", + title: "File Token", + description: "Return file token.", + readOnly: true, + }, + }, + type: "object", + required: [ + "dag_id", + "dag_display_name", + "is_paused", + "is_active", + "last_parsed_time", + "last_pickled", + "last_expired", + "scheduler_lock", + "pickle_id", + "default_view", + "fileloc", + "description", + "timetable_summary", + "timetable_description", + "tags", + "max_active_tasks", + "max_active_runs", + "max_consecutive_failed_dag_runs", + "has_task_concurrency_limits", + "has_import_errors", + "next_dagrun", + "next_dagrun_data_interval_start", + "next_dagrun_data_interval_end", + "next_dagrun_create_after", + "owners", + "latest_dag_runs", + "file_token", + ], + title: "DAGWithLatestDagRunsResponse", + description: "DAG with latest dag runs response serializer.", +} as const; + export const $DagProcessorInfoSchema = { properties: { status: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index bfe1d2e39d36..08e93457c4f6 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -7,6 +7,8 @@ import type { NextRunAssetsResponse, HistoricalMetricsData, HistoricalMetricsResponse, + RecentDagRunsData, + RecentDagRunsResponse, GetDagsData, GetDagsResponse, PatchDagsData, @@ -111,6 +113,49 @@ export class DashboardService { } } +export class DagsService { + /** + * Recent Dag Runs + * Get recent DAG runs. + * @param data The data for the request. + * @param data.dagRunsLimit + * @param data.limit + * @param data.offset + * @param data.tags + * @param data.owners + * @param data.dagIdPattern + * @param data.dagDisplayNamePattern + * @param data.onlyActive + * @param data.paused + * @param data.lastDagRunState + * @returns DAGWithLatestDagRunsCollectionResponse Successful Response + * @throws ApiError + */ + public static recentDagRuns( + data: RecentDagRunsData = {}, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/ui/dags/recent_dag_runs", + query: { + dag_runs_limit: data.dagRunsLimit, + limit: data.limit, + offset: data.offset, + tags: data.tags, + owners: data.owners, + dag_id_pattern: data.dagIdPattern, + dag_display_name_pattern: data.dagDisplayNamePattern, + only_active: data.onlyActive, + paused: data.paused, + last_dag_run_state: data.lastDagRunState, + }, + errors: { + 422: "Validation Error", + }, + }); + } +} + export class DagService { /** * Get Dags diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 1bca12c04e77..174190f9f493 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -206,6 +206,50 @@ export type DAGTagCollectionResponse = { total_entries: number; }; +/** + * DAG with latest dag runs collection response serializer. + */ +export type DAGWithLatestDagRunsCollectionResponse = { + total_entries: number; + dags: Array; +}; + +/** + * DAG with latest dag runs response serializer. + */ +export type DAGWithLatestDagRunsResponse = { + dag_id: string; + dag_display_name: string; + is_paused: boolean; + is_active: boolean; + last_parsed_time: string | null; + last_pickled: string | null; + last_expired: string | null; + scheduler_lock: string | null; + pickle_id: string | null; + default_view: string | null; + fileloc: string; + description: string | null; + timetable_summary: string | null; + timetable_description: string | null; + tags: Array; + max_active_tasks: number; + max_active_runs: number | null; + max_consecutive_failed_dag_runs: number; + has_task_concurrency_limits: boolean; + has_import_errors: boolean; + next_dagrun: string | null; + next_dagrun_data_interval_start: string | null; + next_dagrun_data_interval_end: string | null; + next_dagrun_create_after: string | null; + owners: Array; + latest_dag_runs: Array; + /** + * Return file token. + */ + readonly file_token: string; +}; + /** * Schema for DagProcessor info. */ @@ -474,6 +518,21 @@ export type HistoricalMetricsData = { export type HistoricalMetricsResponse = HistoricalMetricDataResponse; +export type RecentDagRunsData = { + dagDisplayNamePattern?: string | null; + dagIdPattern?: string | null; + dagRunsLimit?: number; + lastDagRunState?: DagRunState | null; + limit?: number; + offset?: number; + onlyActive?: boolean; + owners?: Array; + paused?: boolean | null; + tags?: Array; +}; + +export type RecentDagRunsResponse = DAGWithLatestDagRunsCollectionResponse; + export type GetDagsData = { dagDisplayNamePattern?: string | null; dagIdPattern?: string | null; @@ -696,6 +755,21 @@ export type $OpenApiTs = { }; }; }; + "/ui/dags/recent_dag_runs": { + get: { + req: RecentDagRunsData; + res: { + /** + * Successful Response + */ + 200: DAGWithLatestDagRunsCollectionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/public/dags/": { get: { req: GetDagsData; diff --git a/airflow/ui/package.json b/airflow/ui/package.json index 3ca8d1a06f41..97f3a2ee63e3 100644 --- a/airflow/ui/package.json +++ b/airflow/ui/package.json @@ -30,7 +30,8 @@ "react-dom": "^18.3.1", "react-icons": "^5.3.0", "react-router-dom": "^6.26.2", - "use-debounce": "^10.0.3" + "use-debounce": "^10.0.3", + "usehooks-ts": "^3.1.0" }, "devDependencies": { "@7nohe/openapi-react-query-codegen": "^1.6.0", diff --git a/airflow/ui/pnpm-lock.yaml b/airflow/ui/pnpm-lock.yaml index 3ceee513bb13..45858f664a62 100644 --- a/airflow/ui/pnpm-lock.yaml +++ b/airflow/ui/pnpm-lock.yaml @@ -53,6 +53,9 @@ importers: use-debounce: specifier: ^10.0.3 version: 10.0.3(react@18.3.1) + usehooks-ts: + specifier: ^3.1.0 + version: 3.1.0(react@18.3.1) devDependencies: '@7nohe/openapi-react-query-codegen': specifier: ^1.6.0 @@ -2464,6 +2467,9 @@ packages: resolution: {integrity: sha512-iPZK6eYjbxRu3uB4/WZ3EsEIMJFMqAoopl3R+zuq0UjcAm/MO6KCweDgPfP3elTztoKP3KtnVHxTn2NHBSDVUw==} engines: {node: '>=10'} + lodash.debounce@4.0.8: + resolution: {integrity: sha512-FT1yDzDYEoYWhnSGnpE/4Kj1fLZkDFyqRb7fNt6FdYOSxlUWAtp42Eh6Wb0rGIv/m9Bgo7x4GhQbm5Ys4SG5ow==} + lodash.merge@4.6.2: resolution: {integrity: sha512-0KpjqXRVvrYyCsX1swR/XTK0va6VQkQM6MNo7PqW77ByjAhoARA8EfrP1N4+KlKj8YS0ZUCtRT/YUuhyYDujIQ==} @@ -3255,6 +3261,12 @@ packages: '@types/react': optional: true + usehooks-ts@3.1.0: + resolution: {integrity: sha512-bBIa7yUyPhE1BCc0GmR96VU/15l/9gP1Ch5mYdLcFBaFGQsdmXkvjV0TtOqW1yUd6VjIwDunm+flSciCQXujiw==} + engines: {node: '>=16.15.0'} + peerDependencies: + react: ^16.8.0 || ^17 || ^18 + validate-npm-package-license@3.0.4: resolution: {integrity: sha512-DpKm2Ui/xN7/HQKCtpZxoRWBhZ9Z0kqtygG8XCgNQ8ZlDnxuQmWhj566j8fN4Cu3/JmbhsDo7fcAJq4s9h27Ew==} @@ -6113,6 +6125,8 @@ snapshots: dependencies: p-locate: 5.0.0 + lodash.debounce@4.0.8: {} + lodash.merge@4.6.2: {} lodash.mergewith@4.6.2: {} @@ -6927,6 +6941,11 @@ snapshots: optionalDependencies: '@types/react': 18.3.5 + usehooks-ts@3.1.0(react@18.3.1): + dependencies: + lodash.debounce: 4.0.8 + react: 18.3.1 + validate-npm-package-license@3.0.4: dependencies: spdx-correct: 3.2.0 diff --git a/airflow/ui/src/App.test.tsx b/airflow/ui/src/App.test.tsx deleted file mode 100644 index 38b90d1c4983..000000000000 --- a/airflow/ui/src/App.test.tsx +++ /dev/null @@ -1,124 +0,0 @@ -/*! - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -import type { QueryObserverSuccessResult } from "@tanstack/react-query"; -import { render } from "@testing-library/react"; -import { afterEach, beforeEach, describe, it, vi } from "vitest"; - -import * as openapiQueriesModule from "openapi/queries"; -import type { DAGCollectionResponse } from "openapi/requests/types.gen"; - -import { App } from "./App"; -import { Wrapper } from "./utils/Wrapper"; - -// The null fields actually have to be null instead of undefined -/* eslint-disable unicorn/no-null */ - -const mockListDags: DAGCollectionResponse = { - dags: [ - { - dag_display_name: "nested_groups", - dag_id: "nested_groups", - default_view: "grid", - description: null, - file_token: - "Ii9maWxlcy9kYWdzL25lc3RlZF90YXNrX2dyb3Vwcy5weSI.G3EkdxmDUDQsVb7AIZww1TSGlFE", - fileloc: "/files/dags/nested_task_groups.py", - has_import_errors: false, - has_task_concurrency_limits: false, - is_active: true, - is_paused: false, - last_expired: null, - last_parsed_time: "2024-08-22T13:50:10.372238+00:00", - last_pickled: null, - max_active_runs: 16, - max_active_tasks: 16, - max_consecutive_failed_dag_runs: 0, - next_dagrun: "2024-08-22T00:00:00+00:00", - next_dagrun_create_after: "2024-08-23T00:00:00+00:00", - next_dagrun_data_interval_end: "2024-08-23T00:00:00+00:00", - next_dagrun_data_interval_start: "2024-08-22T00:00:00+00:00", - owners: ["airflow"], - pickle_id: null, - scheduler_lock: null, - tags: [], - timetable_description: "", - timetable_summary: "", - }, - { - dag_display_name: "simple_bash_operator", - dag_id: "simple_bash_operator", - default_view: "grid", - description: null, - file_token: - "Ii9maWxlcy9kYWdzL3NpbXBsZV9iYXNoX29wZXJhdG9yLnB5Ig.RteaxTC78ceHlgMkfU3lfznlcLI", - fileloc: "/files/dags/simple_bash_operator.py", - has_import_errors: false, - has_task_concurrency_limits: false, - is_active: true, - is_paused: false, - last_expired: null, - last_parsed_time: "2024-08-22T13:50:10.368561+00:00", - last_pickled: null, - max_active_runs: 16, - max_active_tasks: 16, - max_consecutive_failed_dag_runs: 0, - next_dagrun: "2024-08-22T00:00:00+00:00", - next_dagrun_create_after: "2024-08-23T00:00:00+00:00", - next_dagrun_data_interval_end: "2024-08-23T00:00:00+00:00", - next_dagrun_data_interval_start: "2024-08-22T00:00:00+00:00", - owners: ["airflow"], - pickle_id: null, - scheduler_lock: null, - tags: [ - { - dag_id: "dag", - name: "example2", - }, - { - dag_id: "dag", - name: "example", - }, - ], - timetable_description: "At 00:00", - timetable_summary: "sum", - }, - ], - total_entries: 2, -}; - -beforeEach(() => { - const returnValue = { - data: mockListDags, - isLoading: false, - } as QueryObserverSuccessResult; - - vi.spyOn(openapiQueriesModule, "useDagServiceGetDags").mockImplementation( - () => returnValue, - ); -}); - -afterEach(() => { - vi.restoreAllMocks(); -}); - -describe("App", () => { - it("App component should render", () => { - render(, { wrapper: Wrapper }); - }); -}); diff --git a/airflow/ui/src/components/DataTable/CardList.tsx b/airflow/ui/src/components/DataTable/CardList.tsx index ddebff81b249..e6397a94d1db 100644 --- a/airflow/ui/src/components/DataTable/CardList.tsx +++ b/airflow/ui/src/components/DataTable/CardList.tsx @@ -17,26 +17,19 @@ * under the License. */ import { Box, SimpleGrid, Skeleton } from "@chakra-ui/react"; -import { - type CoreRow, - flexRender, - type Table as TanStackTable, -} from "@tanstack/react-table"; -import type { SyntheticEvent } from "react"; +import { flexRender, type Table as TanStackTable } from "@tanstack/react-table"; import type { CardDef } from "./types"; type DataTableProps = { readonly cardDef: CardDef; readonly isLoading?: boolean; - readonly onRowClick?: (e: SyntheticEvent, row: CoreRow) => void; readonly table: TanStackTable; }; export const CardList = ({ cardDef, isLoading, - onRowClick, table, }: DataTableProps) => { const defaultGridProps = { column: { base: 1 }, spacing: 2 }; @@ -45,12 +38,7 @@ export const CardList = ({ {table.getRowModel().rows.map((row) => ( - onRowClick(event, row) : undefined} - title={onRowClick ? "View details" : undefined} - > + {Boolean(isLoading) && (cardDef.meta?.customSkeleton ?? ( ({ ({ colSpan, column, getContext, id, isPlaceholder }) => { const sort = column.getIsSorted(); const canSort = column.getCanSort(); + const text = flexRender(column.columnDef.header, getContext()); - return ( - - {isPlaceholder ? undefined : ( - <>{flexRender(column.columnDef.header, getContext())} - )} - {canSort && sort === false ? ( + let rightIcon; + + if (canSort) { + if (sort === "desc") { + rightIcon = ( + + ); + } else if (sort === "asc") { + rightIcon = ( + + ); + } else { + rightIcon = ( - ) : undefined} - {canSort && sort !== false ? ( - sort === "desc" ? ( - - ) : ( - - ) - ) : undefined} + ); + } + + return ( + + {isPlaceholder ? undefined : ( + + )} + + ); + } + + return ( + + {isPlaceholder ? undefined : text} ); }, diff --git a/airflow/ui/src/context/timezone/TimezoneProvider.tsx b/airflow/ui/src/context/timezone/TimezoneProvider.tsx index dfe40f697670..74c0c2462b39 100644 --- a/airflow/ui/src/context/timezone/TimezoneProvider.tsx +++ b/airflow/ui/src/context/timezone/TimezoneProvider.tsx @@ -16,12 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -import { - createContext, - useState, - useMemo, - type PropsWithChildren, -} from "react"; +import { createContext, useMemo, type PropsWithChildren } from "react"; +import { useLocalStorage } from "usehooks-ts"; export type TimezoneContextType = { selectedTimezone: string; @@ -35,20 +31,14 @@ export const TimezoneContext = createContext( const TIMEZONE_KEY = "timezone"; export const TimezoneProvider = ({ children }: PropsWithChildren) => { - const [selectedTimezone, setSelectedTimezone] = useState(() => { - const timezone = localStorage.getItem(TIMEZONE_KEY); - - return timezone ?? "UTC"; - }); - - const selectTimezone = (tz: string) => { - localStorage.setItem(TIMEZONE_KEY, tz); - setSelectedTimezone(tz); - }; + const [selectedTimezone, setSelectedTimezone] = useLocalStorage( + TIMEZONE_KEY, + "UTC", + ); const value = useMemo( - () => ({ selectedTimezone, setSelectedTimezone: selectTimezone }), - [selectedTimezone], + () => ({ selectedTimezone, setSelectedTimezone }), + [selectedTimezone, setSelectedTimezone], ); return ( diff --git a/airflow/ui/src/layouts/BaseLayout.tsx b/airflow/ui/src/layouts/BaseLayout.tsx index 4aa7a74de6fc..c0d1b6c869bd 100644 --- a/airflow/ui/src/layouts/BaseLayout.tsx +++ b/airflow/ui/src/layouts/BaseLayout.tsx @@ -17,15 +17,16 @@ * under the License. */ import { Box } from "@chakra-ui/react"; +import type { PropsWithChildren } from "react"; import { Outlet } from "react-router-dom"; import { Nav } from "./Nav"; -export const BaseLayout = () => ( +export const BaseLayout = ({ children }: PropsWithChildren) => ( <>