Skip to content

Commit

Permalink
Add more robust handling of existing DagRun (#43168)
Browse files Browse the repository at this point in the history
Previously we would just try to create and if exists just move on.

Now we'll make sure that we create the BackfillDagRun record with a note documenting that we did not create the dag run because it already existed.

This is a stepping stone towards implementing "clear existing" behavior.
  • Loading branch information
dstandish authored Oct 18, 2024
1 parent 6e9c536 commit 025082a
Show file tree
Hide file tree
Showing 7 changed files with 2,092 additions and 1,979 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Add exception_reason and logical_date to BackfillDagRun.
Revision ID: 3a8972ecb8f9
Revises: fb2d4922cd79
Create Date: 2024-10-18 16:24:38.932005
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

from airflow.utils.sqlalchemy import UtcDateTime

revision = "3a8972ecb8f9"
down_revision = "fb2d4922cd79"
branch_labels = None
depends_on = None
airflow_version = "3.0.0"


def upgrade():
"""Apply Add exception_reason and logical_date to BackfillDagRun."""
with op.batch_alter_table("backfill_dag_run", schema=None) as batch_op:
batch_op.add_column(sa.Column("exception_reason", sa.String(length=250), nullable=True))
batch_op.add_column(sa.Column("logical_date", UtcDateTime(timezone=True), nullable=False))


def downgrade():
"""Unapply Add exception_reason and logical_date to BackfillDagRun."""
with op.batch_alter_table("backfill_dag_run", schema=None) as batch_op:
batch_op.drop_column("logical_date")
batch_op.drop_column("exception_reason")
119 changes: 92 additions & 27 deletions airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from __future__ import annotations

import logging
from enum import Enum
from typing import TYPE_CHECKING

from sqlalchemy import Boolean, Column, ForeignKeyConstraint, Integer, UniqueConstraint, func, select, update
Expand Down Expand Up @@ -82,15 +83,26 @@ def __repr__(self):
return f"Backfill({self.dag_id=}, {self.from_date=}, {self.to_date=})"


class BackfillDagRunExceptionReason(str, Enum):
"""
Enum for storing reasons why dag run not created.
:meta private:
"""

ALREADY_EXISTS = "already exists"
UNKNOWN = "unknown"


class BackfillDagRun(Base):
"""Mapping table between backfill run and dag run."""

__tablename__ = "backfill_dag_run"
id = Column(Integer, primary_key=True, autoincrement=True)
backfill_id = Column(Integer, nullable=False)
dag_run_id = Column(
Integer, nullable=True
) # the run might already exist; we could store the reason we did not create
dag_run_id = Column(Integer, nullable=True)
exception_reason = Column(StringID())
logical_date = Column(UtcDateTime, nullable=False)
sort_ordinal = Column(Integer, nullable=False)

backfill = relationship("Backfill", back_populates="backfill_dag_run_associations")
Expand Down Expand Up @@ -119,6 +131,50 @@ def validate_sort_ordinal(self, key, val):
return val


def _create_backfill_dag_run(dag, info, backfill_id, dag_run_conf, backfill_sort_ordinal, session):
from airflow.models import DagRun

dr = session.scalar(select(DagRun).where(DagRun.execution_date == info.logical_date).limit(1))
if dr:
session.add(
BackfillDagRun(
backfill_id=backfill_id,
dag_run_id=None,
logical_date=info.logical_date,
exception_reason=BackfillDagRunExceptionReason.ALREADY_EXISTS,
sort_ordinal=backfill_sort_ordinal,
)
)
log.info(
"dag run already exists for dag_id=%s backfill_id=%s, info=%s",
dag.dag_id,
backfill_id,
info,
)
return
dr = dag.create_dagrun(
triggered_by=DagRunTriggeredByType.BACKFILL,
execution_date=info.logical_date,
data_interval=info.data_interval,
start_date=timezone.utcnow(),
state=DagRunState.QUEUED,
external_trigger=False,
conf=dag_run_conf,
run_type=DagRunType.BACKFILL_JOB,
creating_job_id=None,
session=session,
backfill_id=backfill_id,
)
session.add(
BackfillDagRun(
backfill_id=backfill_id,
dag_run_id=dr.id,
sort_ordinal=backfill_sort_ordinal,
logical_date=info.logical_date,
)
)


def _create_backfill(
*,
dag_id: str,
Expand Down Expand Up @@ -168,37 +224,46 @@ def _create_backfill(
dagrun_info_list = reversed([x for x in dag.iter_dagrun_infos_between(from_date, to_date)])
for info in dagrun_info_list:
backfill_sort_ordinal += 1
log.info("creating backfill dag run %s dag_id=%s backfill_id=%s, info=", dag.dag_id, br.id, info)
dr = None
session.commit()
from tenacity import RetryError, Retrying, stop_after_attempt

try:
dr = dag.create_dagrun(
triggered_by=DagRunTriggeredByType.BACKFILL,
execution_date=info.logical_date,
data_interval=info.data_interval,
start_date=timezone.utcnow(),
state=DagRunState.QUEUED,
external_trigger=False,
conf=br.dag_run_conf,
run_type=DagRunType.BACKFILL_JOB,
creating_job_id=None,
session=session,
backfill_id=br.id,
)
except Exception:
for attempt in Retrying(stop=stop_after_attempt(3)):
# we do retries here because it's possible that we check to see if dr exists
# before we attempt to create the dag run. if something else creates the dag
# run in between, we'll have to retry the transaction
with attempt:
with session.begin():
_create_backfill_dag_run(
dag=dag,
info=info,
backfill_id=br.id,
dag_run_conf=br.dag_run_conf,
backfill_sort_ordinal=backfill_sort_ordinal,
session=session,
)
log.info(
"created backfill dag run dag_id=%s backfill_id=%s, info=%s",
dag.dag_id,
br.id,
info,
)
except RetryError:
dag.log.exception(
"Error while attempting to create a dag run dag_id='%s' logical_date='%s'",
dag.dag_id,
info.logical_date,
)
session.rollback()
session.add(
BackfillDagRun(
backfill_id=br.id,
dag_run_id=dr.id if dr else None, # this means we failed to create the dag run
sort_ordinal=backfill_sort_ordinal,
session.add(
BackfillDagRun(
backfill_id=br.id,
dag_run_id=None,
exception_reason=BackfillDagRunExceptionReason.UNKNOWN,
logical_date=info.logical_date,
sort_ordinal=backfill_sort_ordinal,
)
)
)
session.commit()
session.commit()
return br


Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class MappedClassProtocol(Protocol):
"2.9.0": "1949afb29106",
"2.9.2": "686269002441",
"2.10.0": "22ed7efa9da2",
"3.0.0": "fb2d4922cd79",
"3.0.0": "3a8972ecb8f9",
}


Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
b39a456ac1bd3aa91198d036eaa3859f25a47151164974f6b4fe7f998d41f720
15cd7929c7e5bd787115396bf4e3ba6e6a228824ee8fc7f8406d19da82051d01
Loading

0 comments on commit 025082a

Please sign in to comment.