Skip to content

Commit

Permalink
Ensure priority weight is capped at 32-bit integer to prevent roll-over
Browse files Browse the repository at this point in the history
  • Loading branch information
jscheffl committed Nov 2, 2024
1 parent 17e5100 commit c18bfe7
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 10 deletions.
15 changes: 9 additions & 6 deletions airflow/models/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from airflow.utils.state import State, TaskInstanceState
from airflow.utils.task_group import MappedTaskGroup
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.weight_rule import WeightRule
from airflow.utils.weight_rule import WeightRule, db_safe_priority

if TYPE_CHECKING:
from collections.abc import Mapping
Expand Down Expand Up @@ -335,7 +335,7 @@ def priority_weight_total(self) -> int:
)

if isinstance(self.weight_rule, _AbsolutePriorityWeightStrategy):
return self.priority_weight
return db_safe_priority(self.priority_weight)
elif isinstance(self.weight_rule, _DownstreamPriorityWeightStrategy):
upstream = False
elif isinstance(self.weight_rule, _UpstreamPriorityWeightStrategy):
Expand All @@ -344,10 +344,13 @@ def priority_weight_total(self) -> int:
upstream = False
dag = self.get_dag()
if dag is None:
return self.priority_weight
return self.priority_weight + sum(
dag.task_dict[task_id].priority_weight
for task_id in self.get_flat_relative_ids(upstream=upstream)
return db_safe_priority(self.priority_weight)
return db_safe_priority(
self.priority_weight
+ sum(
dag.task_dict[task_id].priority_weight
for task_id in self.get_flat_relative_ids(upstream=upstream)
)
)

@cached_property
Expand Down
5 changes: 4 additions & 1 deletion airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,8 @@ class derived from this one results in the creation of a task object,
This allows the executor to trigger higher priority tasks before
others when things get backed up. Set priority_weight as a higher
number for more important tasks.
As not all database engines support 64-bit integers, values are capped with 32-bit.
Valid range is from -2,147,483,648 to 2,147,483,647.
:param weight_rule: weighting method used for the effective total
priority weight of the task. Options are:
``{ downstream | upstream | absolute }`` default is ``downstream``
Expand All @@ -494,7 +496,8 @@ class derived from this one results in the creation of a task object,
Additionally, when set to ``absolute``, there is bonus effect of
significantly speeding up the task creation process as for very large
DAGs. Options can be set as string or using the constants defined in
the static class ``airflow.utils.WeightRule``
the static class ``airflow.utils.WeightRule``.
Irrespective of the weight rule, resulting priority values are capped with 32-bit.
|experimental|
Since 2.9.0, Airflow allows to define custom priority weight strategy,
by creating a subclass of
Expand Down
12 changes: 12 additions & 0 deletions airflow/utils/weight_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@

import methodtools

# Databases do not support arbitrary precision integers, so we need to limit the range of priority weights.
# postgres: -2147483648 to +2147483647 (see https://www.postgresql.org/docs/current/datatype-numeric.html)
# mysql: -2147483648 to +2147483647 (see https://dev.mysql.com/doc/refman/8.4/en/integer-types.html)
# sqlite: -9223372036854775808 to +9223372036854775807 (see https://sqlite.org/datatype3.html)
DB_SAFE_MINIMUM = -2147483648
DB_SAFE_MAXIMUM = 2147483647


def db_safe_priority(priority_weight: int) -> int:
"""Convert priority weight to a safe value for the database."""
return max(DB_SAFE_MINIMUM, min(DB_SAFE_MAXIMUM, priority_weight))


class WeightRule(str, Enum):
"""Weight rules."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ Below are the weighting methods. By default, Airflow's weighting method is ``dow

The ``priority_weight`` parameter can be used in conjunction with :ref:`concepts:pool`.

.. note::

As most database engines are using 32-bit for integers, the maximum value for any calculated or
defined ``priority_weight`` is 2,147,483,647 and the minimum value is -2,147,483,648.


Custom Weight Rule
------------------

Expand Down
6 changes: 6 additions & 0 deletions task_sdk/src/airflow/sdk/definitions/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@
DEFAULT_OWNER: str = "airflow"
DEFAULT_POOL_SLOTS: int = 1
DEFAULT_PRIORITY_WEIGHT: int = 1
# Databases do not support arbitrary precision integers, so we need to limit the range of priority weights.
# postgres: -2147483648 to +2147483647 (see https://www.postgresql.org/docs/current/datatype-numeric.html)
# mysql: -2147483648 to +2147483647 (see https://dev.mysql.com/doc/refman/8.4/en/integer-types.html)
# sqlite: -9223372036854775808 to +9223372036854775807 (see https://sqlite.org/datatype3.html)
MINIMUM_PRIORITY_WEIGHT: int = -2147483648
MAXIMUM_PRIORITY_WEIGHT: int = 2147483647
DEFAULT_EXECUTOR: str | None = None
DEFAULT_QUEUE: str = "default"
DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST: bool = False
Expand Down
9 changes: 7 additions & 2 deletions task_sdk/src/airflow/sdk/definitions/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
DEFAULT_TRIGGER_RULE,
DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING,
DEFAULT_WEIGHT_RULE,
MAXIMUM_PRIORITY_WEIGHT,
MINIMUM_PRIORITY_WEIGHT,
AbstractOperator,
)
from airflow.sdk.definitions.decorators import fixup_decorator_warning_stack
Expand Down Expand Up @@ -364,6 +366,8 @@ class derived from this one results in the creation of a task object,
This allows the executor to trigger higher priority tasks before
others when things get backed up. Set priority_weight as a higher
number for more important tasks.
As not all database engines support 64-bit integers, values are capped with 32-bit.
Valid range is from -2,147,483,648 to 2,147,483,647.
:param weight_rule: weighting method used for the effective total
priority weight of the task. Options are:
``{ downstream | upstream | absolute }`` default is ``downstream``
Expand All @@ -385,7 +389,8 @@ class derived from this one results in the creation of a task object,
Additionally, when set to ``absolute``, there is bonus effect of
significantly speeding up the task creation process as for very large
DAGs. Options can be set as string or using the constants defined in
the static class ``airflow.utils.WeightRule``
the static class ``airflow.utils.WeightRule``.
Irrespective of the weight rule, resulting priority values are capped with 32-bit.
|experimental|
Since 2.9.0, Airflow allows to define custom priority weight strategy,
by creating a subclass of
Expand Down Expand Up @@ -802,7 +807,7 @@ def __init__(

self.params = ParamsDict(params)

self.priority_weight = priority_weight
self.priority_weight = max(MINIMUM_PRIORITY_WEIGHT, min(MAXIMUM_PRIORITY_WEIGHT, priority_weight))
self.weight_rule = validate_and_load_priority_weight_strategy(weight_rule)

self.max_active_tis_per_dag: int | None = max_active_tis_per_dag
Expand Down
9 changes: 8 additions & 1 deletion tests/utils/test_weight_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,14 @@

import pytest

from airflow.utils.weight_rule import WeightRule
from airflow.utils.weight_rule import DB_SAFE_MAXIMUM, DB_SAFE_MINIMUM, WeightRule, db_safe_priority


def test_db_safe_priority():
assert db_safe_priority(1) == 1
assert db_safe_priority(-1) == -1
assert db_safe_priority(9999999999) == DB_SAFE_MAXIMUM
assert db_safe_priority(-9999999999) == DB_SAFE_MINIMUM


class TestWeightRule:
Expand Down

0 comments on commit c18bfe7

Please sign in to comment.