Skip to content
This repository has been archived by the owner on Apr 3, 2024. It is now read-only.

Commit

Permalink
Merge pull request #66 from github/backfillkeyerror
Browse files Browse the repository at this point in the history
Ensure backfill won't crash
  • Loading branch information
kimyen authored Aug 1, 2022
2 parents 5946cf1 + 221aa50 commit e61b945
Showing 1 changed file with 21 additions and 10 deletions.
31 changes: 21 additions & 10 deletions airflow/jobs/backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@
from airflow.utils.types import DagRunType


def _pop_running_ti(ti_status, reduced_key):
try:
ti_status.running.pop(reduced_key)
except KeyError:
# the task is not running tracking dict
pass


class BackfillJob(BaseJob):
"""
A backfill job consists of a dag or subdag for a specific time range. It
Expand Down Expand Up @@ -203,33 +211,39 @@ def _update_counters(self, ti_status, session=None):
if filter_for_tis is not None:
refreshed_tis = session.query(TI).filter(filter_for_tis).all()

self.log.debug("ti_status: %s", ti_status)
for ti in refreshed_tis:
# Here we remake the key by subtracting 1 to match in memory information
reduced_key = ti.key.reduced
if ti.state == State.SUCCESS:
ti_status.succeeded.add(reduced_key)
self.log.debug("Task instance %s succeeded. Don't rerun.", ti)
ti_status.running.pop(reduced_key)
self.log.debug("reduced_key: %s", reduced_key)
_pop_running_ti(ti_status, reduced_key)
continue
if ti.state == State.SKIPPED:
ti_status.skipped.add(reduced_key)
self.log.debug("Task instance %s skipped. Don't rerun.", ti)
ti_status.running.pop(reduced_key)
self.log.debug("reduced_key: %s", reduced_key)
_pop_running_ti(ti_status, reduced_key)
continue
if ti.state == State.FAILED:
self.log.error("Task instance %s failed", ti)
self.log.debug("reduced_key: %s", reduced_key)
ti_status.failed.add(reduced_key)
ti_status.running.pop(reduced_key)
_pop_running_ti(ti_status, reduced_key)
continue
# special case: if the task needs to run again put it back
if ti.state == State.UP_FOR_RETRY:
self.log.warning("Task instance %s is up for retry", ti)
ti_status.running.pop(reduced_key)
self.log.debug("reduced_key: %s", reduced_key)
_pop_running_ti(ti_status, reduced_key)
ti_status.to_run[ti.key] = ti
# special case: if the task needs to be rescheduled put it back
elif ti.state == State.UP_FOR_RESCHEDULE:
self.log.warning("Task instance %s is up for reschedule", ti)
ti_status.running.pop(reduced_key)
self.log.debug("reduced_key: %s", reduced_key)
_pop_running_ti(ti_status, reduced_key)
ti_status.to_run[ti.key] = ti
# special case: The state of the task can be set to NONE by the task itself
# when it reaches concurrency limits. It could also happen when the state
Expand All @@ -242,12 +256,9 @@ def _update_counters(self, ti_status, session=None):
"reaching concurrency limits. Re-adding task to queue.",
ti,
)
self.log.debug("reduced_key: %s", reduced_key)
tis_to_be_scheduled.append(ti)
try:
ti_status.running.pop(reduced_key)
except KeyError:
# the task is not running
pass
_pop_running_ti(ti_status, reduced_key)
ti_status.to_run[ti.key] = ti

# Batch schedule of task instances
Expand Down

0 comments on commit e61b945

Please sign in to comment.