From ce0c1c0b56ccd5cf58b9ee858b60ce56a0b2ec31 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Mon, 28 Oct 2024 12:40:35 +0000 Subject: [PATCH] Migrate `TaskInstance` to UUID v7 primary key (#43243) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Migrate `TaskInstance` to UUID v7 primary key closes https://github.com/apache/airflow/issues/43161 part of [AIP-72](https://github.com/orgs/apache/projects/405). As part of the ongoing work for [AIP-72: Task Execution Interface](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-72+Task+Execution+Interface+aka+Task+SDK), we are migrating the task_instance table to use a UUID primary key. This change is being made to simplify task instance identification, especially when communicating between the executor and workers. Currently, the primary key of task_instance is a composite key consisting of `dag_id, task_id, run_id, and map_index` as shown below. This migration introduces a **UUID v7** column (`id`) as the new primary key. https://github.com/apache/airflow/blob/b4269f33c7151e6d61e07333003ec1e219285b07/airflow/models/taskinstance.py#L1815-L1819 The UUID v7 format was chosen because of its improved temporal sorting capabilities. For existing records, UUID v7 will be generated using either the queued_dttm, start_date, or the current timestamp. image (From [this blog post](https://www.toomanyafterthoughts.com/uuids-are-bad-for-database-index-performance-uuid7).) 1. **Migrated Primary Key to UUID v7** - Replaced the composite primary key (`dag_id`, `task_id`, `run_id`, `map_index`) with a UUID v7 `id` field, ensuring temporal sorting and simplified task instance identification. 2. **Database-Specific UUID v7 Functions** - Added UUID v7 functions for each database: - **PostgreSQL**: Uses `pgcrypto` for generation with fallback. - **MySQL**: Custom deterministic UUID v7 function. - **SQLite**: Utilizes `uuid6` Python package. 3. **Updated Constraints and Indexes** - Added `UniqueConstraint` on (`dag_id`, `task_id`, `run_id`, `map_index`) for compatibility. - Modified foreign key constraints for the new primary key, handling downgrades to restore previous constraints. 4. **Model and API Adjustments** - Updated `TaskInstance` model to use UUID v7 as the primary key via [`uuid6`](https://pypi.org/project/uuid6/) library, that has uuid7 ! 😄 . - Adjusted REST API, views, and queries to support UUID-based lookups. - Modified tests for compatibility with the new primary key. --- .../endpoints/task_instance_endpoint.py | 14 +- ..._add_uuid_primary_key_to_task_instance_.py | 281 ++ airflow/models/taskinstance.py | 28 +- airflow/utils/db.py | 2 +- airflow/www/security_manager.py | 11 +- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg | 3027 +++++++++-------- docs/apache-airflow/migrations-ref.rst | 4 +- hatch_build.py | 1 + .../openlineage/extractors/test_manager.py | 1 + .../endpoints/test_task_instance_endpoint.py | 36 +- tests/callbacks/test_callback_requests.py | 1 + tests/jobs/test_local_task_job.py | 22 +- tests/jobs/test_scheduler_job.py | 4 +- tests/models/test_cleartasks.py | 16 +- tests/models/test_dag.py | 10 +- tests/models/test_dagrun.py | 4 + tests/models/test_pool.py | 15 +- tests/models/test_taskinstance.py | 34 +- tests/models/test_timestamp.py | 5 +- tests/www/views/test_views_tasks.py | 11 +- 21 files changed, 1935 insertions(+), 1594 deletions(-) create mode 100644 airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index 2c32bad9f347..eff16f54066e 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -534,9 +534,11 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION detail=f"Task instance not found for task {task_id!r} on execution_date {execution_date}" ) - if run_id and not session.get( - TI, {"task_id": task_id, "dag_id": dag_id, "run_id": run_id, "map_index": -1} - ): + select_stmt = select(TI).where( + TI.dag_id == dag_id, TI.task_id == task_id, TI.run_id == run_id, TI.map_index == -1 + ) + + if run_id and not session.scalars(select_stmt).one_or_none(): error_message = f"Task instance not found for task {task_id!r} on DAG run with ID {run_id!r}" raise NotFound(detail=error_message) @@ -582,10 +584,12 @@ def patch_task_instance( if not dag.has_task(task_id): raise NotFound("Task not found", detail=f"Task {task_id!r} not found in DAG {dag_id!r}") - ti: TI | None = session.get( - TI, {"task_id": task_id, "dag_id": dag_id, "run_id": dag_run_id, "map_index": map_index} + select_stmt = select(TI).where( + TI.dag_id == dag_id, TI.task_id == task_id, TI.run_id == dag_run_id, TI.map_index == map_index ) + ti: TI | None = session.scalars(select_stmt).one_or_none() + if not ti: error_message = f"Task instance not found for task {task_id!r} on DAG run with ID {dag_run_id!r}" raise NotFound(detail=error_message) diff --git a/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py b/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py new file mode 100644 index 000000000000..2abd2116f989 --- /dev/null +++ b/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py @@ -0,0 +1,281 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Add UUID primary key to ``task_instance`` table. + +Revision ID: d59cbbef95eb +Revises: 05234396c6fc +Create Date: 2024-10-21 22:39:12.394079 +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op +from sqlalchemy import text +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "d59cbbef95eb" +down_revision = "05234396c6fc" +branch_labels = "None" +depends_on = None +airflow_version = "3.0.0" + +###### +# The following functions to create UUID v7 are solely for the purpose of this migration. +# This is done for production databases that do not support UUID v7 natively (Postgres, MySQL) +# and used instead of uuids from +# python libraries like uuid6.uuid7() for performance reasons since the task_instance table +# can be very large. +###### + +# PostgreSQL-specific UUID v7 function +pg_uuid7_fn = """ +DO $$ +DECLARE + pgcrypto_installed BOOLEAN; +BEGIN + -- Check if pgcrypto is already installed + pgcrypto_installed := EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pgcrypto'); + + -- Attempt to create pgcrypto if it is not installed + IF NOT pgcrypto_installed THEN + BEGIN + CREATE EXTENSION pgcrypto; + pgcrypto_installed := TRUE; + RAISE NOTICE 'pgcrypto extension successfully created.'; + EXCEPTION + WHEN insufficient_privilege THEN + RAISE NOTICE 'pgcrypto extension could not be installed due to insufficient privileges; using fallback'; + pgcrypto_installed := FALSE; + WHEN OTHERS THEN + RAISE NOTICE 'An unexpected error occurred while attempting to install pgcrypto; using fallback'; + pgcrypto_installed := FALSE; + END; + END IF; +END $$; + +CREATE OR REPLACE FUNCTION uuid_generate_v7(p_timestamp timestamp with time zone) +RETURNS uuid +LANGUAGE plpgsql +PARALLEL SAFE +AS $$ +DECLARE + unix_time_ms CONSTANT bytea NOT NULL DEFAULT substring(int8send((extract(epoch FROM p_timestamp) * 1000)::bigint) from 3); + buffer bytea; + pgcrypto_installed BOOLEAN := EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pgcrypto'); +BEGIN + -- Use pgcrypto if available, otherwise use the fallback + -- fallback from https://brandur.org/fragments/secure-bytes-without-pgcrypto + IF pgcrypto_installed THEN + buffer := unix_time_ms || gen_random_bytes(10); + ELSE + buffer := unix_time_ms || substring(uuid_send(gen_random_uuid()) FROM 1 FOR 5) || + substring(uuid_send(gen_random_uuid()) FROM 12 FOR 5); + END IF; + + -- Set UUID version and variant bits + buffer := set_byte(buffer, 6, (b'0111' || get_byte(buffer, 6)::bit(4))::bit(8)::int); + buffer := set_byte(buffer, 8, (b'10' || get_byte(buffer, 8)::bit(6))::bit(8)::int); + RETURN encode(buffer, 'hex')::uuid; +END +$$; +""" + +pg_uuid7_fn_drop = """ +DROP FUNCTION IF EXISTS uuid_generate_v7(timestamp with time zone); +""" + +# MySQL-specific UUID v7 function +mysql_uuid7_fn = """ +DROP FUNCTION IF EXISTS uuid_generate_v7; +CREATE FUNCTION uuid_generate_v7(p_timestamp DATETIME(3)) +RETURNS CHAR(36) +DETERMINISTIC +BEGIN + DECLARE unix_time_ms BIGINT; + DECLARE time_hex CHAR(12); + DECLARE rand_hex CHAR(24); + DECLARE uuid CHAR(36); + + -- Convert the passed timestamp to milliseconds since epoch + SET unix_time_ms = UNIX_TIMESTAMP(p_timestamp) * 1000; + SET time_hex = LPAD(HEX(unix_time_ms), 12, '0'); + SET rand_hex = CONCAT( + LPAD(HEX(FLOOR(RAND() * POW(2,32))), 8, '0'), + LPAD(HEX(FLOOR(RAND() * POW(2,32))), 8, '0') + ); + SET rand_hex = CONCAT(SUBSTRING(rand_hex, 1, 4), '7', SUBSTRING(rand_hex, 6)); + SET rand_hex = CONCAT(SUBSTRING(rand_hex, 1, 12), '8', SUBSTRING(rand_hex, 14)); + + SET uuid = LOWER(CONCAT( + SUBSTRING(time_hex, 1, 8), '-', + SUBSTRING(time_hex, 9, 4), '-', + SUBSTRING(rand_hex, 1, 4), '-', + SUBSTRING(rand_hex, 5, 4), '-', + SUBSTRING(rand_hex, 9) + )); + + RETURN uuid; +END; +""" + +mysql_uuid7_fn_drop = """ +DROP FUNCTION IF EXISTS uuid_generate_v7; +""" + +ti_table = "task_instance" + +# Foreign key columns from task_instance +ti_fk_cols = ["dag_id", "task_id", "run_id", "map_index"] + +# Foreign key constraints from other tables to task_instance +ti_fk_constraints = [ + {"table": "rendered_task_instance_fields", "fk": "rtif_ti_fkey"}, + {"table": "task_fail", "fk": "task_fail_ti_fkey"}, + {"table": "task_instance_history", "fk": "task_instance_history_ti_fkey"}, + {"table": "task_instance_note", "fk": "task_instance_note_ti_fkey"}, + {"table": "task_map", "fk": "task_map_task_instance_fkey"}, + {"table": "task_reschedule", "fk": "task_reschedule_ti_fkey"}, + {"table": "xcom", "fk": "xcom_task_instance_fkey"}, +] + + +def _get_type_id_column(dialect_name: str) -> sa.types.TypeEngine: + # For PostgreSQL, use the UUID type directly as it is more efficient + if dialect_name == "postgresql": + return postgresql.UUID(as_uuid=False) + # For other databases, use String(36) to match UUID format + else: + return sa.String(36) + + +def upgrade(): + """Add UUID primary key to task instance table.""" + conn = op.get_bind() + dialect_name = conn.dialect.name + + op.add_column("task_instance", sa.Column("id", _get_type_id_column(dialect_name), nullable=True)) + + if dialect_name == "postgresql": + op.execute(pg_uuid7_fn) + + # TODO: Add batching to handle updates in smaller chunks for large tables to avoid locking + # Migrate existing rows with UUID v7 using a timestamp-based generation + op.execute( + "UPDATE task_instance SET id = uuid_generate_v7(coalesce(queued_dttm, start_date, clock_timestamp()))" + ) + + op.execute(pg_uuid7_fn_drop) + + # Drop existing primary key constraint to task_instance table + op.execute("ALTER TABLE IF EXISTS task_instance DROP CONSTRAINT task_instance_pkey CASCADE") + + elif dialect_name == "mysql": + op.execute(mysql_uuid7_fn) + + # Migrate existing rows with UUID v7 + op.execute(""" + UPDATE task_instance + SET id = uuid_generate_v7(coalesce(queued_dttm, start_date, NOW(3))) + WHERE id IS NULL + """) + + # Drop this function as it is no longer needed + op.execute(mysql_uuid7_fn_drop) + for fk in ti_fk_constraints: + op.drop_constraint(fk["fk"], fk["table"], type_="foreignkey") + with op.batch_alter_table("task_instance") as batch_op: + batch_op.drop_constraint("task_instance_pkey", type_="primary") + elif dialect_name == "sqlite": + from uuid6 import uuid7 + + stmt = text("SELECT COUNT(*) FROM task_instance WHERE id IS NULL") + conn = op.get_bind() + task_instances = conn.execute(stmt).scalar() + uuid_values = [str(uuid7()) for _ in range(task_instances)] + + # Ensure `uuid_values` is a list or iterable with the UUIDs for the update. + stmt = text(""" + UPDATE task_instance + SET id = :uuid + WHERE id IS NULL + """) + + for uuid_value in uuid_values: + conn.execute(stmt.bindparams(uuid=uuid_value)) + + with op.batch_alter_table("task_instance") as batch_op: + batch_op.drop_constraint("task_instance_pkey", type_="primary") + + # Add primary key and unique constraint to task_instance table + with op.batch_alter_table("task_instance") as batch_op: + batch_op.alter_column("id", type_=_get_type_id_column(dialect_name), nullable=False) + batch_op.create_unique_constraint("task_instance_composite_key", ti_fk_cols) + batch_op.create_primary_key("task_instance_pkey", ["id"]) + + # Create foreign key constraints + for fk in ti_fk_constraints: + with op.batch_alter_table(fk["table"]) as batch_op: + batch_op.create_foreign_key( + constraint_name=fk["fk"], + referent_table=ti_table, + local_cols=ti_fk_cols, + remote_cols=ti_fk_cols, + ondelete="CASCADE", + ) + + +def downgrade(): + """Drop UUID primary key to task instance table.""" + conn = op.get_bind() + dialect_name = conn.dialect.name + + if dialect_name == "postgresql": + op.execute("ALTER TABLE IF EXISTS task_instance DROP CONSTRAINT task_instance_composite_key CASCADE") + op.execute(pg_uuid7_fn_drop) + + elif dialect_name == "mysql": + for fk in ti_fk_constraints: + op.drop_constraint(fk["fk"], fk["table"], type_="foreignkey") + + with op.batch_alter_table("task_instance") as batch_op: + batch_op.drop_constraint("task_instance_composite_key", type_="unique") + op.execute(mysql_uuid7_fn_drop) + + elif dialect_name == "sqlite": + with op.batch_alter_table("task_instance") as batch_op: + batch_op.drop_constraint("task_instance_composite_key", type_="unique") + + with op.batch_alter_table("task_instance") as batch_op: + batch_op.drop_constraint("task_instance_pkey", type_="primary") + batch_op.drop_column("id") + batch_op.create_primary_key("task_instance_pkey", ti_fk_cols) + + # Re-add foreign key constraints + for fk in ti_fk_constraints: + with op.batch_alter_table(fk["table"]) as batch_op: + batch_op.create_foreign_key( + constraint_name=fk["fk"], + referent_table=ti_table, + local_cols=ti_fk_cols, + remote_cols=ti_fk_cols, + ondelete="CASCADE", + ) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 7cb61bcb61de..9fc4c3b032b2 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -39,6 +39,7 @@ import jinja2 import lazy_object_proxy import pendulum +import uuid6 from jinja2 import TemplateAssertionError, UndefinedError from sqlalchemy import ( Column, @@ -50,6 +51,7 @@ PrimaryKeyConstraint, String, Text, + UniqueConstraint, and_, delete, false, @@ -59,6 +61,7 @@ text, update, ) +from sqlalchemy.dialects import postgresql from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.ext.mutable import MutableDict @@ -794,6 +797,7 @@ def _execute_callable(context: Context, **execute_callable_kwargs): def _set_ti_attrs(target, source, include_dag_run=False): # Fields ordered per model definition + target.id = source.id target.start_date = source.start_date target.end_date = source.end_date target.duration = source.duration @@ -1793,6 +1797,11 @@ def _handle_reschedule( return ti +def uuid7() -> str: + """Generate a new UUID7 string.""" + return str(uuid6.uuid7()) + + class TaskInstance(Base, LoggingMixin): """ Task instances store the state of a task instance. @@ -1813,10 +1822,16 @@ class TaskInstance(Base, LoggingMixin): """ __tablename__ = "task_instance" - task_id = Column(StringID(), primary_key=True, nullable=False) - dag_id = Column(StringID(), primary_key=True, nullable=False) - run_id = Column(StringID(), primary_key=True, nullable=False) - map_index = Column(Integer, primary_key=True, nullable=False, server_default=text("-1")) + id = Column( + String(36).with_variant(postgresql.UUID(as_uuid=False), "postgresql"), + primary_key=True, + default=uuid7, + nullable=False, + ) + task_id = Column(StringID(), nullable=False) + dag_id = Column(StringID(), nullable=False) + run_id = Column(StringID(), nullable=False) + map_index = Column(Integer, nullable=False, server_default=text("-1")) start_date = Column(UtcDateTime) end_date = Column(UtcDateTime) @@ -1869,7 +1884,8 @@ class TaskInstance(Base, LoggingMixin): Index("ti_pool", pool, state, priority_weight), Index("ti_job_id", job_id), Index("ti_trigger_id", trigger_id), - PrimaryKeyConstraint("dag_id", "task_id", "run_id", "map_index", name="task_instance_pkey"), + PrimaryKeyConstraint("id", name="task_instance_pkey"), + UniqueConstraint("dag_id", "task_id", "run_id", "map_index", name="task_instance_composite_key"), ForeignKeyConstraint( [trigger_id], ["trigger.id"], @@ -1938,6 +1954,8 @@ def __init__( self.run_id = run_id self.try_number = 0 self.max_tries = self.task.retries + if not self.id: + self.id = uuid7() self.unixname = getuser() if state: self.state = state diff --git a/airflow/utils/db.py b/airflow/utils/db.py index d2d8ac8b6007..2cae52db8e4c 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -97,7 +97,7 @@ class MappedClassProtocol(Protocol): "2.9.2": "686269002441", "2.10.0": "22ed7efa9da2", "2.10.3": "5f2621c13b39", - "3.0.0": "05234396c6fc", + "3.0.0": "d59cbbef95eb", } diff --git a/airflow/www/security_manager.py b/airflow/www/security_manager.py index bad5a6b28bfa..6d782410c3db 100644 --- a/airflow/www/security_manager.py +++ b/airflow/www/security_manager.py @@ -16,7 +16,6 @@ # under the License. from __future__ import annotations -import json from functools import cached_property from typing import TYPE_CHECKING, Callable @@ -199,16 +198,8 @@ def get_dag_id_from_dagrun_id(resource_pk): def get_dag_id_from_task_instance(resource_pk): if not resource_pk: return None - composite_pk = json.loads(resource_pk) dag_id = session.scalar( - select(TaskInstance.dag_id) - .where( - TaskInstance.dag_id == composite_pk[0], - TaskInstance.task_id == composite_pk[1], - TaskInstance.run_id == composite_pk[2], - TaskInstance.map_index >= composite_pk[3], - ) - .limit(1) + select(TaskInstance.dag_id).where(TaskInstance.id == resource_pk).limit(1) ) if not dag_id: raise AirflowException("Task instance not found") diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index e8ee39a6eba2..aad8dfbe06ef 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -5958e108ee31f4ca94d6a878767d8d1ca00ef4e664ecda9308b367c173a09226 \ No newline at end of file +621811bf2476a947cf52d8b75149e2be7e0ec9d383eda551b1c6496c44f2a22d \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index 7aed6e291273..c9232c64f2d8 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -4,11 +4,11 @@ - - + + %3 - + log @@ -396,1558 +396,1563 @@ asset_alias - -asset_alias - -id - - [INTEGER] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL + +asset_alias + +id + + [INTEGER] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL asset_alias_asset - -asset_alias_asset - -alias_id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL + +asset_alias_asset + +alias_id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL asset_alias--asset_alias_asset - -0..N -1 + +0..N +1 asset_alias_asset_event - -asset_alias_asset_event - -alias_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +asset_alias_asset_event + +alias_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL asset_alias--asset_alias_asset_event - -0..N -1 + +0..N +1 dag_schedule_asset_alias_reference - -dag_schedule_asset_alias_reference - -alias_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_alias_reference + +alias_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset_alias--dag_schedule_asset_alias_reference - -0..N -1 + +0..N +1 asset - -asset - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -extra - - [JSON] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL + +asset + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +extra + + [JSON] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL asset--asset_alias_asset - -0..N -1 + +0..N +1 asset_active - -asset_active - -name - - [VARCHAR(1500)] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL + +asset_active + +name + + [VARCHAR(1500)] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL asset--asset_active - -1 -1 + +1 +1 asset--asset_active - -1 -1 + +1 +1 dag_schedule_asset_reference - -dag_schedule_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset--dag_schedule_asset_reference - -0..N -1 + +0..N +1 task_outlet_asset_reference - -task_outlet_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +task_outlet_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset--task_outlet_asset_reference - -0..N -1 + +0..N +1 asset_dag_run_queue - -asset_dag_run_queue - -asset_id - - [INTEGER] - NOT NULL - -target_dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL + +asset_dag_run_queue + +asset_id + + [INTEGER] + NOT NULL + +target_dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL asset--asset_dag_run_queue - -0..N -1 + +0..N +1 asset_event - -asset_event - -id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL - -extra - - [JSON] - NOT NULL - -source_dag_id - - [VARCHAR(250)] - -source_map_index - - [INTEGER] - -source_run_id - - [VARCHAR(250)] - -source_task_id - - [VARCHAR(250)] - -timestamp - - [TIMESTAMP] - NOT NULL + +asset_event + +id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL + +extra + + [JSON] + NOT NULL + +source_dag_id + + [VARCHAR(250)] + +source_map_index + + [INTEGER] + +source_run_id + + [VARCHAR(250)] + +source_task_id + + [VARCHAR(250)] + +timestamp + + [TIMESTAMP] + NOT NULL asset_event--asset_alias_asset_event - -0..N -1 + +0..N +1 dagrun_asset_event - -dagrun_asset_event - -dag_run_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +dagrun_asset_event + +dag_run_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL asset_event--dagrun_asset_event - -0..N -1 + +0..N +1 dag - -dag - -dag_id - - [VARCHAR(250)] - NOT NULL - -asset_expression - - [JSON] - -dag_display_name - - [VARCHAR(2000)] - -default_view - - [VARCHAR(25)] - -description - - [TEXT] - -fileloc - - [VARCHAR(2000)] - -has_import_errors - - [BOOLEAN] - -has_task_concurrency_limits - - [BOOLEAN] - NOT NULL - -is_active - - [BOOLEAN] - -is_paused - - [BOOLEAN] - -last_expired - - [TIMESTAMP] - -last_parsed_time - - [TIMESTAMP] - -last_pickled - - [TIMESTAMP] - -max_active_runs - - [INTEGER] - -max_active_tasks - - [INTEGER] - NOT NULL - -max_consecutive_failed_dag_runs - - [INTEGER] - NOT NULL - -next_dagrun - - [TIMESTAMP] - -next_dagrun_create_after - - [TIMESTAMP] - -next_dagrun_data_interval_end - - [TIMESTAMP] - -next_dagrun_data_interval_start - - [TIMESTAMP] - -owners - - [VARCHAR(2000)] - -pickle_id - - [INTEGER] - -processor_subdir - - [VARCHAR(2000)] - -scheduler_lock - - [BOOLEAN] - -timetable_description - - [VARCHAR(1000)] - -timetable_summary - - [TEXT] + +dag + +dag_id + + [VARCHAR(250)] + NOT NULL + +asset_expression + + [JSON] + +dag_display_name + + [VARCHAR(2000)] + +default_view + + [VARCHAR(25)] + +description + + [TEXT] + +fileloc + + [VARCHAR(2000)] + +has_import_errors + + [BOOLEAN] + +has_task_concurrency_limits + + [BOOLEAN] + NOT NULL + +is_active + + [BOOLEAN] + +is_paused + + [BOOLEAN] + +last_expired + + [TIMESTAMP] + +last_parsed_time + + [TIMESTAMP] + +last_pickled + + [TIMESTAMP] + +max_active_runs + + [INTEGER] + +max_active_tasks + + [INTEGER] + NOT NULL + +max_consecutive_failed_dag_runs + + [INTEGER] + NOT NULL + +next_dagrun + + [TIMESTAMP] + +next_dagrun_create_after + + [TIMESTAMP] + +next_dagrun_data_interval_end + + [TIMESTAMP] + +next_dagrun_data_interval_start + + [TIMESTAMP] + +owners + + [VARCHAR(2000)] + +pickle_id + + [INTEGER] + +processor_subdir + + [VARCHAR(2000)] + +scheduler_lock + + [BOOLEAN] + +timetable_description + + [VARCHAR(1000)] + +timetable_summary + + [TEXT] dag--dag_schedule_asset_alias_reference - -0..N -1 + +0..N +1 dag--dag_schedule_asset_reference - -0..N -1 + +0..N +1 dag--task_outlet_asset_reference - -0..N -1 + +0..N +1 dag--asset_dag_run_queue - -0..N -1 + +0..N +1 dag_tag - -dag_tag - -dag_id - - [VARCHAR(250)] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL + +dag_tag + +dag_id + + [VARCHAR(250)] + NOT NULL + +name + + [VARCHAR(100)] + NOT NULL dag--dag_tag - -0..N -1 + +0..N +1 dag_owner_attributes - -dag_owner_attributes - -dag_id - - [VARCHAR(250)] - NOT NULL - -owner - - [VARCHAR(500)] - NOT NULL - -link - - [VARCHAR(500)] - NOT NULL + +dag_owner_attributes + +dag_id + + [VARCHAR(250)] + NOT NULL + +owner + + [VARCHAR(500)] + NOT NULL + +link + + [VARCHAR(500)] + NOT NULL dag--dag_owner_attributes - -0..N -1 + +0..N +1 dag_warning - -dag_warning - -dag_id - - [VARCHAR(250)] - NOT NULL - -warning_type - - [VARCHAR(50)] - NOT NULL - -message - - [TEXT] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL + +dag_warning + +dag_id + + [VARCHAR(250)] + NOT NULL + +warning_type + + [VARCHAR(50)] + NOT NULL + +message + + [TEXT] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL dag--dag_warning - -0..N -1 + +0..N +1 log_template - -log_template - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -elasticsearch_id - - [TEXT] - NOT NULL - -filename - - [TEXT] - NOT NULL + +log_template + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +elasticsearch_id + + [TEXT] + NOT NULL + +filename + + [TEXT] + NOT NULL dag_run - -dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - -clear_number - - [INTEGER] - NOT NULL - -conf - - [BYTEA] - -creating_job_id - - [INTEGER] - -dag_hash - - [VARCHAR(32)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -data_interval_end - - [TIMESTAMP] - -data_interval_start - - [TIMESTAMP] - -end_date - - [TIMESTAMP] - -external_trigger - - [BOOLEAN] - -last_scheduling_decision - - [TIMESTAMP] - -log_template_id - - [INTEGER] - -logical_date - - [TIMESTAMP] - NOT NULL - -queued_at - - [TIMESTAMP] - -run_id - - [VARCHAR(250)] - NOT NULL - -run_type - - [VARCHAR(50)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(50)] - -triggered_by - - [VARCHAR(50)] - -updated_at - - [TIMESTAMP] + +dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + +clear_number + + [INTEGER] + NOT NULL + +conf + + [BYTEA] + +creating_job_id + + [INTEGER] + +dag_hash + + [VARCHAR(32)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +data_interval_end + + [TIMESTAMP] + +data_interval_start + + [TIMESTAMP] + +end_date + + [TIMESTAMP] + +external_trigger + + [BOOLEAN] + +last_scheduling_decision + + [TIMESTAMP] + +log_template_id + + [INTEGER] + +logical_date + + [TIMESTAMP] + NOT NULL + +queued_at + + [TIMESTAMP] + +run_id + + [VARCHAR(250)] + NOT NULL + +run_type + + [VARCHAR(50)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(50)] + +triggered_by + + [VARCHAR(50)] + +updated_at + + [TIMESTAMP] log_template--dag_run - -0..N -{0,1} + +0..N +{0,1} dag_run--dagrun_asset_event - -0..N -1 + +0..N +1 task_instance - -task_instance - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -custom_operator_name - - [VARCHAR(1000)] - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -job_id - - [INTEGER] - -max_tries - - [INTEGER] - -next_kwargs - - [JSON] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance + +id + + [UUID] + NOT NULL + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +job_id + + [INTEGER] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSON] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] dag_run--task_instance - -0..N -1 + +0..N +1 dag_run--task_instance - -0..N -1 + +0..N +1 backfill_dag_run - -backfill_dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - NOT NULL - -dag_run_id - - [INTEGER] - -exception_reason - - [VARCHAR(250)] - -logical_date - - [TIMESTAMP] - NOT NULL - -sort_ordinal - - [INTEGER] - NOT NULL + +backfill_dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + NOT NULL + +dag_run_id + + [INTEGER] + +exception_reason + + [VARCHAR(250)] + +logical_date + + [TIMESTAMP] + NOT NULL + +sort_ordinal + + [INTEGER] + NOT NULL dag_run--backfill_dag_run - -0..N -{0,1} + +0..N +{0,1} dag_run_note - -dag_run_note - -dag_run_id - - [INTEGER] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +dag_run_note + +dag_run_id + + [INTEGER] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] dag_run--dag_run_note - -1 -1 + +1 +1 task_reschedule - -task_reschedule - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -duration - - [INTEGER] - NOT NULL - -end_date - - [TIMESTAMP] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -reschedule_date - - [TIMESTAMP] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -try_number - - [INTEGER] - NOT NULL + +task_reschedule + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +duration + + [INTEGER] + NOT NULL + +end_date + + [TIMESTAMP] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +reschedule_date + + [TIMESTAMP] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +try_number + + [INTEGER] + NOT NULL dag_run--task_reschedule - -0..N -1 + +0..N +1 dag_run--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 rendered_task_instance_fields - -rendered_task_instance_fields - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -k8s_pod_yaml - - [JSON] - -rendered_fields - - [JSON] - NOT NULL + +rendered_task_instance_fields + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +k8s_pod_yaml + + [JSON] + +rendered_fields + + [JSON] + NOT NULL task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_fail - -task_fail - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -duration - - [INTEGER] - -end_date - - [TIMESTAMP] - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -task_id - - [VARCHAR(250)] - NOT NULL + +task_fail + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +duration + + [INTEGER] + +end_date + + [TIMESTAMP] + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +task_id + + [VARCHAR(250)] + NOT NULL task_instance--task_fail - -0..N -1 + +0..N +1 task_instance--task_fail - -0..N -1 + +0..N +1 task_instance--task_fail - -0..N -1 + +0..N +1 task_instance--task_fail - -0..N -1 + +0..N +1 task_map - -task_map - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -keys - - [JSON] - -length - - [INTEGER] - NOT NULL + +task_map + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +keys + + [JSON] + +length + + [INTEGER] + NOT NULL task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 xcom - -xcom - -dag_run_id - - [INTEGER] - NOT NULL - -key - - [VARCHAR(512)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL - -value - - [BYTEA] + +xcom + +dag_run_id + + [INTEGER] + NOT NULL + +key + + [VARCHAR(512)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL + +value + + [BYTEA] task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance_note - -task_instance_note - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +task_instance_note + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance_history - -task_instance_history - -id - - [INTEGER] - NOT NULL - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -job_id - - [INTEGER] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSON] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - NOT NULL - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance_history + +id + + [INTEGER] + NOT NULL + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +job_id + + [INTEGER] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSON] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + NOT NULL + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 @@ -2005,369 +2010,369 @@ backfill--dag_run - -0..N -{0,1} + +0..N +{0,1} backfill--backfill_dag_run - -0..N -1 + +0..N +1 trigger - -trigger - -id - - [INTEGER] - NOT NULL - -classpath - - [VARCHAR(1000)] - NOT NULL - -created_date - - [TIMESTAMP] - NOT NULL - -kwargs - - [TEXT] - NOT NULL - -triggerer_id - - [INTEGER] + +trigger + +id + + [INTEGER] + NOT NULL + +classpath + + [VARCHAR(1000)] + NOT NULL + +created_date + + [TIMESTAMP] + NOT NULL + +kwargs + + [TEXT] + NOT NULL + +triggerer_id + + [INTEGER] trigger--task_instance - -0..N -{0,1} + +0..N +{0,1} session - -session - -id - - [INTEGER] - NOT NULL - -data - - [BYTEA] - -expiry - - [TIMESTAMP] - -session_id - - [VARCHAR(255)] + +session + +id + + [INTEGER] + NOT NULL + +data + + [BYTEA] + +expiry + + [TIMESTAMP] + +session_id + + [VARCHAR(255)] alembic_version - -alembic_version - -version_num - - [VARCHAR(32)] - NOT NULL + +alembic_version + +version_num + + [VARCHAR(32)] + NOT NULL ab_user - -ab_user - -id - - [INTEGER] - NOT NULL - -active - - [BOOLEAN] - -changed_by_fk - - [INTEGER] - -changed_on - - [TIMESTAMP] - -created_by_fk - - [INTEGER] - -created_on - - [TIMESTAMP] - -email - - [VARCHAR(512)] - NOT NULL - -fail_login_count - - [INTEGER] - -first_name - - [VARCHAR(256)] - NOT NULL - -last_login - - [TIMESTAMP] - -last_name - - [VARCHAR(256)] - NOT NULL - -login_count - - [INTEGER] - -password - - [VARCHAR(256)] - -username - - [VARCHAR(512)] - NOT NULL + +ab_user + +id + + [INTEGER] + NOT NULL + +active + + [BOOLEAN] + +changed_by_fk + + [INTEGER] + +changed_on + + [TIMESTAMP] + +created_by_fk + + [INTEGER] + +created_on + + [TIMESTAMP] + +email + + [VARCHAR(512)] + NOT NULL + +fail_login_count + + [INTEGER] + +first_name + + [VARCHAR(256)] + NOT NULL + +last_login + + [TIMESTAMP] + +last_name + + [VARCHAR(256)] + NOT NULL + +login_count + + [INTEGER] + +password + + [VARCHAR(256)] + +username + + [VARCHAR(512)] + NOT NULL ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} ab_user_role - -ab_user_role - -id - - [INTEGER] - NOT NULL - -role_id - - [INTEGER] - -user_id - - [INTEGER] + +ab_user_role + +id + + [INTEGER] + NOT NULL + +role_id + + [INTEGER] + +user_id + + [INTEGER] ab_user--ab_user_role - -0..N -{0,1} + +0..N +{0,1} ab_register_user - -ab_register_user - -id - - [INTEGER] - NOT NULL - -email - - [VARCHAR(512)] - NOT NULL - -first_name - - [VARCHAR(256)] - NOT NULL - -last_name - - [VARCHAR(256)] - NOT NULL - -password - - [VARCHAR(256)] - -registration_date - - [TIMESTAMP] - -registration_hash - - [VARCHAR(256)] - -username - - [VARCHAR(512)] - NOT NULL + +ab_register_user + +id + + [INTEGER] + NOT NULL + +email + + [VARCHAR(512)] + NOT NULL + +first_name + + [VARCHAR(256)] + NOT NULL + +last_name + + [VARCHAR(256)] + NOT NULL + +password + + [VARCHAR(256)] + +registration_date + + [TIMESTAMP] + +registration_hash + + [VARCHAR(256)] + +username + + [VARCHAR(512)] + NOT NULL ab_permission - -ab_permission - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL + +ab_permission + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(100)] + NOT NULL ab_permission_view - -ab_permission_view - -id - - [INTEGER] - NOT NULL - -permission_id - - [INTEGER] - -view_menu_id - - [INTEGER] + +ab_permission_view + +id + + [INTEGER] + NOT NULL + +permission_id + + [INTEGER] + +view_menu_id + + [INTEGER] ab_permission--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} ab_permission_view_role - -ab_permission_view_role - -id - - [INTEGER] - NOT NULL - -permission_view_id - - [INTEGER] - -role_id - - [INTEGER] + +ab_permission_view_role + +id + + [INTEGER] + NOT NULL + +permission_view_id + + [INTEGER] + +role_id + + [INTEGER] ab_permission_view--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} ab_view_menu - -ab_view_menu - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(250)] - NOT NULL + +ab_view_menu + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(250)] + NOT NULL ab_view_menu--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} ab_role - -ab_role - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(64)] - NOT NULL + +ab_role + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(64)] + NOT NULL ab_role--ab_user_role - -0..N -{0,1} + +0..N +{0,1} ab_role--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} alembic_version_fab - -alembic_version_fab - -version_num - - [VARCHAR(32)] - NOT NULL + +alembic_version_fab + +version_num + + [VARCHAR(32)] + NOT NULL diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index 3b0c3fdd023c..3719ece1e95e 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=========================+==================+===================+==============================================================+ -| ``05234396c6fc`` (head) | ``3a8972ecb8f9`` | ``3.0.0`` | Rename dataset as asset. | +| ``d59cbbef95eb`` (head) | ``05234396c6fc`` | ``3.0.0`` | Add UUID primary key to ``task_instance`` table. | ++-------------------------+------------------+-------------------+--------------------------------------------------------------+ +| ``05234396c6fc`` | ``3a8972ecb8f9`` | ``3.0.0`` | Rename dataset as asset. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``3a8972ecb8f9`` | ``fb2d4922cd79`` | ``3.0.0`` | Add exception_reason and logical_date to BackfillDagRun. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ diff --git a/hatch_build.py b/hatch_build.py index 26f1233fb8f2..b342038cafcb 100644 --- a/hatch_build.py +++ b/hatch_build.py @@ -437,6 +437,7 @@ # Universal Pathlib 0.2.4 adds extra validation for Paths and our integration with local file paths # Does not work with it Tracked in https://github.com/fsspec/universal_pathlib/issues/276 "universal-pathlib>=0.2.2,!=0.2.4", + "uuid6>=2024.7.10", # Werkzug 3 breaks Flask-Login 0.6.2, also connexion needs to be updated to >= 3.0 # we should remove this limitation when FAB supports Flask 2.3 and we migrate connexion to 3+ "werkzeug>=2.0,<3", diff --git a/providers/tests/openlineage/extractors/test_manager.py b/providers/tests/openlineage/extractors/test_manager.py index 8cdf7d6d7c23..726d7262c9b6 100644 --- a/providers/tests/openlineage/extractors/test_manager.py +++ b/providers/tests/openlineage/extractors/test_manager.py @@ -313,6 +313,7 @@ def use_read(): dr = dag_maker.create_dagrun() ti = TaskInstance(task=task, run_id=dr.run_id) + ti.refresh_from_db() ti.state = State.QUEUED session.merge(ti) session.commit() diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py index d8b9078bbeb4..bc8836981d42 100644 --- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py @@ -2033,15 +2033,15 @@ def test_should_call_mocked_api(self, mock_set_task_instance_state, session): self.create_task_instances(session) NEW_STATE = "failed" - mock_set_task_instance_state.return_value = session.get( - TaskInstance, - { - "task_id": "print_the_context", - "dag_id": "example_python_operator", - "run_id": "TEST_DAG_RUN_ID", - "map_index": -1, - }, - ) + mock_set_task_instance_state.return_value = session.scalars( + select(TaskInstance).where( + TaskInstance.dag_id == "example_python_operator", + TaskInstance.task_id == "print_the_context", + TaskInstance.run_id == "TEST_DAG_RUN_ID", + TaskInstance.map_index == -1, + ) + ).one_or_none() + response = self.client.patch( self.ENDPOINT_URL, environ_overrides={"REMOTE_USER": "test"}, @@ -2078,15 +2078,15 @@ def test_should_not_call_mocked_api_for_dry_run(self, mock_set_task_instance_sta self.create_task_instances(session) NEW_STATE = "failed" - mock_set_task_instance_state.return_value = session.get( - TaskInstance, - { - "task_id": "print_the_context", - "dag_id": "example_python_operator", - "run_id": "TEST_DAG_RUN_ID", - "map_index": -1, - }, - ) + mock_set_task_instance_state.return_value = session.scalars( + select(TaskInstance).where( + TaskInstance.dag_id == "example_python_operator", + TaskInstance.task_id == "print_the_context", + TaskInstance.run_id == "TEST_DAG_RUN_ID", + TaskInstance.map_index == -1, + ) + ).one_or_none() + response = self.client.patch( self.ENDPOINT_URL, environ_overrides={"REMOTE_USER": "test"}, diff --git a/tests/callbacks/test_callback_requests.py b/tests/callbacks/test_callback_requests.py index 7bbe41387750..ea26dc13bf3d 100644 --- a/tests/callbacks/test_callback_requests.py +++ b/tests/callbacks/test_callback_requests.py @@ -124,6 +124,7 @@ def test_simple_ti_roundtrip_dates(self, dag_maker): external_trigger=True, ) ti = TaskInstance(task=op, run_id=dr.run_id) + ti.refresh_from_db() ti.set_state("SUCCESS") start_date = ti.start_date end_date = ti.end_date diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index 88c43ae5625f..f9efb2e3ce65 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -902,7 +902,9 @@ def test_fast_follow( ti_by_task_id = {} with create_session() as session: for task_id in init_state: - ti = TaskInstance(dag.get_task(task_id), run_id=dag_run.run_id, state=init_state[task_id]) + ti = TaskInstance(dag.get_task(task_id), run_id=dag_run.run_id) + ti.refresh_from_db() + ti.state = init_state[task_id] session.merge(ti) ti_by_task_id[task_id] = ti @@ -951,11 +953,21 @@ def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, get_test_dag) task_k = dag.get_task("K") task_l = dag.get_task("L") with create_session() as session: - ti_k = TaskInstance(task_k, run_id=dr.run_id, state=State.SUCCESS) - ti_b = TaskInstance(task_l, run_id=dr.run_id, state=State.SUCCESS) + ti_k = dr.get_task_instance(task_k.task_id, session=session) + ti_k.refresh_from_task(task_k) + ti_k.state = State.SUCCESS - ti2_k = TaskInstance(task_k, run_id=dr2.run_id, state=State.NONE) - ti2_l = TaskInstance(task_l, run_id=dr2.run_id, state=State.NONE) + ti_b = dr.get_task_instance(task_l.task_id, session=session) + ti_b.refresh_from_task(task_l) + ti_b.state = State.SUCCESS + + ti2_k = dr2.get_task_instance(task_k.task_id, session=session) + ti2_k.refresh_from_task(task_k) + ti2_k.state = State.NONE + + ti2_l = dr2.get_task_instance(task_l.task_id, session=session) + ti2_l.refresh_from_task(task_l) + ti2_l.state = State.NONE session.merge(ti_k) session.merge(ti_b) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 3d71d5987994..311de0ce2b64 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1153,8 +1153,8 @@ def test_tis_for_queued_dagruns_are_not_run(self, dag_maker): scheduler_job = Job() self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull) session = settings.Session() - ti1 = TaskInstance(task1, run_id=dr1.run_id) - ti2 = TaskInstance(task1, run_id=dr2.run_id) + ti1 = dr1.get_task_instance(task1.task_id) + ti2 = dr2.get_task_instance(task1.task_id) ti1.state = State.SCHEDULED ti2.state = State.SCHEDULED session.merge(ti1) diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py index b018f2bf2b27..6f7fb49ea45c 100644 --- a/tests/models/test_cleartasks.py +++ b/tests/models/test_cleartasks.py @@ -597,7 +597,7 @@ def test_dag_clear(self, dag_maker): ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id) ti0.refresh_from_task(task0) ti1.refresh_from_task(task1) - session.get(TaskInstance, ti0.key.primary).try_number += 1 + session.get(TaskInstance, ti0.id).try_number += 1 session.commit() # Next try to run will be try 1 assert ti0.try_number == 1 @@ -611,7 +611,7 @@ def test_dag_clear(self, dag_maker): assert ti0.max_tries == 1 assert ti1.max_tries == 2 - session.get(TaskInstance, ti1.key.primary).try_number += 1 + session.get(TaskInstance, ti1.id).try_number += 1 session.commit() ti1.run() @@ -658,7 +658,7 @@ def test_dags_clear(self): # test clear all dags for i in range(num_of_dags): - session.get(TaskInstance, tis[i].key.primary).try_number += 1 + session.get(TaskInstance, tis[i].id).try_number += 1 session.commit() tis[i].run() assert tis[i].state == State.SUCCESS @@ -675,7 +675,7 @@ def test_dags_clear(self): # test dry_run for i in range(num_of_dags): - session.get(TaskInstance, tis[i].key.primary).try_number += 1 + session.get(TaskInstance, tis[i].id).try_number += 1 session.commit() tis[i].run() assert tis[i].state == State.SUCCESS @@ -728,7 +728,7 @@ def test_operator_clear(self, dag_maker, session): ti1.task = op1 ti2.task = op2 - session.get(TaskInstance, ti2.key.primary).try_number += 1 + session.get(TaskInstance, ti2.id).try_number += 1 session.commit() ti2.run() # Dependency not met @@ -737,14 +737,14 @@ def test_operator_clear(self, dag_maker, session): op2.clear(upstream=True) # max tries will be set to retries + curr try number == 1 + 1 == 2 - assert session.get(TaskInstance, ti2.key.primary).max_tries == 2 + assert session.get(TaskInstance, ti2.id).max_tries == 2 - session.get(TaskInstance, ti1.key.primary).try_number += 1 + session.get(TaskInstance, ti1.id).try_number += 1 session.commit() ti1.run() assert ti1.try_number == 1 - session.get(TaskInstance, ti2.key.primary).try_number += 1 + session.get(TaskInstance, ti2.id).try_number += 1 session.commit() ti2.run(ignore_ti_state=True) # max_tries is 0 because there is no task instance in db for ti1 diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 1d7a69ba8437..afdfd9ab8889 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -507,12 +507,16 @@ def test_get_num_task_instances(self): ) ti1 = TI(task=test_task, run_id=dr1.run_id) + ti1.refresh_from_db() ti1.state = None ti2 = TI(task=test_task, run_id=dr2.run_id) + ti2.refresh_from_db() ti2.state = State.RUNNING ti3 = TI(task=test_task, run_id=dr3.run_id) + ti3.refresh_from_db() ti3.state = State.QUEUED ti4 = TI(task=test_task, run_id=dr4.run_id) + ti4.refresh_from_db() ti4.state = State.RUNNING session = settings.Session() session.merge(ti1) @@ -1794,6 +1798,7 @@ def test_clear_set_dagrun_state(self, dag_run_state): session.merge(dagrun_1) task_instance_1 = TI(t_1, run_id=dagrun_1.run_id, state=State.RUNNING) + task_instance_1.refresh_from_db() session.merge(task_instance_1) session.commit() @@ -2006,7 +2011,7 @@ def test_clear_dag( self._clean_up(dag_id) task_id = "t1" dag = DAG(dag_id, schedule=None, start_date=DEFAULT_DATE, max_active_runs=1) - t_1 = EmptyOperator(task_id=task_id, dag=dag) + _ = EmptyOperator(task_id=task_id, dag=dag) session = settings.Session() # type: ignore triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} @@ -2020,7 +2025,8 @@ def test_clear_dag( ) session.merge(dagrun_1) - task_instance_1 = TI(t_1, run_id=dagrun_1.run_id, state=ti_state_begin) + task_instance_1 = dagrun_1.get_task_instance(task_id) + task_instance_1.state = ti_state_begin task_instance_1.job_id = 123 session.merge(task_instance_1) session.commit() diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index 4f788a2dccc6..d788c56218c8 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -854,7 +854,9 @@ def test_depends_on_past(self, dagbag, session, prev_ti_state, is_ti_success): ) prev_ti = TI(task, run_id=dag_run_1.run_id) + prev_ti.refresh_from_db() ti = TI(task, run_id=dag_run_2.run_id) + ti.refresh_from_db() prev_ti.set_state(prev_ti_state) ti.set_state(TaskInstanceState.QUEUED) @@ -892,7 +894,9 @@ def test_wait_for_downstream(self, dagbag, session, prev_ti_state, is_ti_success ) prev_ti_downstream = TI(task=downstream, run_id=dag_run_1.run_id) + prev_ti_downstream.refresh_from_db() ti = TI(task=upstream, run_id=dag_run_2.run_id) + ti.refresh_from_db() prev_ti = ti.get_previous_ti() prev_ti.set_state(TaskInstanceState.SUCCESS) assert prev_ti.state == TaskInstanceState.SUCCESS diff --git a/tests/models/test_pool.py b/tests/models/test_pool.py index 7711d828b678..e993ed6b09ca 100644 --- a/tests/models/test_pool.py +++ b/tests/models/test_pool.py @@ -83,9 +83,9 @@ def test_open_slots(self, dag_maker): dr = dag_maker.create_dagrun() - ti1 = TI(task=op1, run_id=dr.run_id) - ti2 = TI(task=op2, run_id=dr.run_id) - ti3 = TI(task=op3, run_id=dr.run_id) + ti1 = dr.get_task_instance(task_id=op1.task_id) + ti2 = dr.get_task_instance(task_id=op2.task_id) + ti3 = dr.get_task_instance(task_id=op3.task_id) ti1.state = State.RUNNING ti2.state = State.QUEUED ti3.state = State.DEFERRED @@ -133,8 +133,8 @@ def test_open_slots_including_deferred(self, dag_maker): dr = dag_maker.create_dagrun() - ti1 = TI(task=op1, run_id=dr.run_id) - ti2 = TI(task=op2, run_id=dr.run_id) + ti1 = dr.get_task_instance(task_id=op1.task_id) + ti2 = dr.get_task_instance(task_id=op2.task_id) ti1.state = State.RUNNING ti2.state = State.DEFERRED @@ -180,7 +180,9 @@ def test_infinite_slots(self, dag_maker): dr = dag_maker.create_dagrun() ti1 = TI(task=op1, run_id=dr.run_id) + ti1.refresh_from_db() ti2 = TI(task=op2, run_id=dr.run_id) + ti2.refresh_from_db() ti1.state = State.RUNNING ti2.state = State.QUEUED @@ -230,8 +232,11 @@ def test_default_pool_open_slots(self, dag_maker): ti1 = TI(task=op1, run_id=dr.run_id) ti2 = TI(task=op2, run_id=dr.run_id) ti3 = TI(task=op3, run_id=dr.run_id) + ti1.refresh_from_db() ti1.state = State.RUNNING + ti2.refresh_from_db() ti2.state = State.QUEUED + ti3.refresh_from_db() ti3.state = State.SCHEDULED session = settings.Session() diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index eee3c6c591e0..fe4121fdffed 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -36,6 +36,7 @@ import pendulum import pytest import time_machine +import uuid6 from sqlalchemy import select from airflow import settings @@ -646,7 +647,7 @@ def run_with_error(ti): ti.task = task with create_session() as session: - session.get(TaskInstance, ti.key.primary).try_number += 1 + session.get(TaskInstance, ti.id).try_number += 1 # first run -- up for retry run_with_error(ti) @@ -654,7 +655,7 @@ def run_with_error(ti): assert ti.try_number == 1 with create_session() as session: - session.get(TaskInstance, ti.key.primary).try_number += 1 + session.get(TaskInstance, ti.id).try_number += 1 # second run -- still up for retry because retry_delay hasn't expired time_machine.coordinates.shift(3) @@ -663,7 +664,7 @@ def run_with_error(ti): assert ti.try_number == 2 with create_session() as session: - session.get(TaskInstance, ti.key.primary).try_number += 1 + session.get(TaskInstance, ti.id).try_number += 1 # third run -- failed time_machine.coordinates.shift(datetime.datetime.resolution) @@ -698,7 +699,7 @@ def run_with_error(ti): ti.task = task assert ti.try_number == 0 - session.get(TaskInstance, ti.key.primary).try_number += 1 + session.get(TaskInstance, ti.id).try_number += 1 session.commit() # first run -- up for retry @@ -706,7 +707,7 @@ def run_with_error(ti): assert ti.state == State.UP_FOR_RETRY assert ti.try_number == 1 - session.get(TaskInstance, ti.key.primary).try_number += 1 + session.get(TaskInstance, ti.id).try_number += 1 session.commit() # second run -- fail @@ -718,7 +719,7 @@ def run_with_error(ti): # clearing it first dag.clear() - session.get(TaskInstance, ti.key.primary).try_number += 1 + session.get(TaskInstance, ti.id).try_number += 1 session.commit() # third run -- up for retry @@ -726,7 +727,7 @@ def run_with_error(ti): assert ti.state == State.UP_FOR_RETRY assert ti.try_number == 3 - session.get(TaskInstance, ti.key.primary).try_number += 1 + session.get(TaskInstance, ti.id).try_number += 1 session.commit() # fourth run -- fail @@ -888,7 +889,7 @@ def run_ti_and_assert( # We increment the try number because that's what the scheduler would do with create_session() as session: - session.get(TaskInstance, ti.key.primary).try_number += 1 + session.get(TaskInstance, ti.id).try_number += 1 # After the retry the start date is reset, hence the duration is also reset. @@ -900,7 +901,7 @@ def run_ti_and_assert( # scheduler would create a new try here with create_session() as session: - session.get(TaskInstance, ti.key.primary).try_number += 1 + session.get(TaskInstance, ti.id).try_number += 1 done, fail = False, False run_ti_and_assert(date3, date3, date3, 0, State.UP_FOR_RESCHEDULE, 2, 1) @@ -993,7 +994,7 @@ def run_ti_and_assert( # We increment the try number because that's what the scheduler would do with create_session() as session: - session.get(TaskInstance, ti.key.primary).try_number += 1 + session.get(TaskInstance, ti.id).try_number += 1 # After the retry the start date is reset, hence the duration is also reset. @@ -1004,7 +1005,7 @@ def run_ti_and_assert( run_ti_and_assert(date2, date1, date2, 60, State.UP_FOR_RETRY, 1, 1) with create_session() as session: - session.get(TaskInstance, ti.key.primary).try_number += 1 + session.get(TaskInstance, ti.id).try_number += 1 done, fail = False, False run_ti_and_assert(date3, date3, date3, 0, State.UP_FOR_RESCHEDULE, 2, 1) @@ -3599,7 +3600,7 @@ def test_handle_failure_updates_queued_task_updates_state(self, dag_maker): with dag_maker(): task = EmptyOperator(task_id="mytask", retries=1) dr = dag_maker.create_dagrun() - ti = TI(task=task, run_id=dr.run_id) + ti = dr.get_task_instance(task.task_id) ti.state = State.QUEUED session.merge(ti) session.flush() @@ -3618,7 +3619,7 @@ def test_handle_failure_no_task(self, Stats_incr, dag_maker): with dag_maker(): task = EmptyOperator(task_id="mytask", retries=1) dr = dag_maker.create_dagrun() - ti = TI(task=task, run_id=dr.run_id) + ti = dr.get_task_instance(task.task_id, session=session) ti.try_number += 1 ti = session.merge(ti) ti.task = None @@ -3789,7 +3790,7 @@ def test_echo_env_variables(self, dag_maker): run_type=DagRunType.MANUAL, external_trigger=False, ) - ti = TI(task=op, run_id=dr.run_id) + ti = dr.get_task_instance(op.task_id) ti.state = State.RUNNING session = settings.Session() session.merge(ti) @@ -3817,7 +3818,7 @@ def f(*args, **kwargs): task = PythonOperator(task_id="mytask", python_callable=f) dr = dag_maker.create_dagrun() - ti = TI(task=task, run_id=dr.run_id) + ti = dr.get_task_instance(task.task_id) ti.state = State.RUNNING session = settings.Session() session.merge(ti) @@ -3999,6 +4000,7 @@ def test_refresh_from_db(self, create_task_instance): "try_number": 1, "max_tries": 1, "hostname": "some_unique_hostname", + "id": str(uuid6.uuid7()), "unixname": "some_unique_unixname", "job_id": 1234, "pool": "some_fake_pool_id", @@ -5398,7 +5400,7 @@ def test__refresh_from_db_should_not_increment_try_number(dag_maker, session): BashOperator(task_id="hello", bash_command="hi") dag_maker.create_dagrun(state="success") ti = session.scalar(select(TaskInstance)) - session.get(TaskInstance, ti.key.primary).try_number += 1 + session.get(TaskInstance, ti.id).try_number += 1 session.commit() assert ti.task_id == "hello" # just to confirm... assert ti.try_number == 1 # starts out as 1 diff --git a/tests/models/test_timestamp.py b/tests/models/test_timestamp.py index e174629736bc..bd9fd5d0766f 100644 --- a/tests/models/test_timestamp.py +++ b/tests/models/test_timestamp.py @@ -20,7 +20,7 @@ import pytest import time_machine -from airflow.models import Log, TaskInstance +from airflow.models import Log from airflow.operators.empty import EmptyOperator from airflow.utils import timezone from airflow.utils.session import provide_session @@ -42,7 +42,8 @@ def add_log(execdate, session, dag_maker, timezone_override=None): with dag_maker(dag_id="logging", default_args={"start_date": execdate}): task = EmptyOperator(task_id="dummy") dag_run = dag_maker.create_dagrun() - task_instance = TaskInstance(task=task, run_id=dag_run.run_id, state="success") + task_instance = dag_run.get_task_instance(task.task_id) + task_instance.set_state(State.SUCCESS) session.merge(task_instance) log = Log(State.RUNNING, task_instance) if timezone_override: diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py index c3d282765968..fabd104e8c26 100644 --- a/tests/www/views/test_views_tasks.py +++ b/tests/www/views/test_views_tasks.py @@ -931,7 +931,7 @@ def test_task_instance_clear_downstream(session, admin_client, dag_maker): def test_task_instance_clear_failure(admin_client): - rowid = '["12345"]' # F.A.B. crashes if the rowid is *too* invalid. + rowid = "00000000-0000-0000-0000-000000000000" # F.A.B. crashes if the rowid is *too* invalid. resp = admin_client.post( "/taskinstance/action_post", data={"action": "clear", "rowid": rowid}, @@ -979,7 +979,7 @@ def test_task_instance_set_state(session, admin_client, action, expected_state): ], ) def test_task_instance_set_state_failure(admin_client, action): - rowid = '["12345"]' # F.A.B. crashes if the rowid is *too* invalid. + rowid = "00000000-0000-0000-0000-000000000000" # F.A.B. crashes if the rowid is *too* invalid. resp = admin_client.post( "/taskinstance/action_post", data={"action": action, "rowid": rowid}, @@ -1108,6 +1108,7 @@ def test_task_instances(admin_client): "executor_config": {}, "external_executor_id": None, "hostname": "", + "id": unittest.mock.ANY, # Ignore the `id` field "job_id": None, "map_index": -1, "max_tries": 0, @@ -1143,6 +1144,7 @@ def test_task_instances(admin_client): "executor_config": {}, "external_executor_id": None, "hostname": "", + "id": unittest.mock.ANY, # Ignore the `id` field "job_id": None, "map_index": -1, "max_tries": 0, @@ -1178,6 +1180,7 @@ def test_task_instances(admin_client): "executor_config": {}, "external_executor_id": None, "hostname": "", + "id": unittest.mock.ANY, # Ignore the `id` field "job_id": None, "map_index": -1, "max_tries": 0, @@ -1213,6 +1216,7 @@ def test_task_instances(admin_client): "executor_config": {}, "external_executor_id": None, "hostname": "", + "id": unittest.mock.ANY, # Ignore the `id` field "job_id": None, "map_index": -1, "max_tries": 0, @@ -1248,6 +1252,7 @@ def test_task_instances(admin_client): "executor_config": {}, "external_executor_id": None, "hostname": "", + "id": unittest.mock.ANY, # Ignore the `id` field "job_id": None, "map_index": -1, "max_tries": 0, @@ -1283,6 +1288,7 @@ def test_task_instances(admin_client): "executor_config": {}, "external_executor_id": None, "hostname": "", + "id": unittest.mock.ANY, # Ignore the `id` field "job_id": None, "map_index": -1, "max_tries": 0, @@ -1318,6 +1324,7 @@ def test_task_instances(admin_client): "executor_config": {}, "external_executor_id": None, "hostname": "", + "id": unittest.mock.ANY, # Ignore the `id` field "job_id": None, "map_index": -1, "max_tries": 0,