diff --git a/airflow/models/abstractoperator.py b/airflow/models/abstractoperator.py index feafb0b6b637d..58c2aec6fdeb9 100644 --- a/airflow/models/abstractoperator.py +++ b/airflow/models/abstractoperator.py @@ -38,7 +38,7 @@ from airflow.utils.state import State, TaskInstanceState from airflow.utils.task_group import MappedTaskGroup from airflow.utils.trigger_rule import TriggerRule -from airflow.utils.weight_rule import WeightRule +from airflow.utils.weight_rule import WeightRule, db_safe_priority if TYPE_CHECKING: from collections.abc import Mapping @@ -335,7 +335,7 @@ def priority_weight_total(self) -> int: ) if isinstance(self.weight_rule, _AbsolutePriorityWeightStrategy): - return self.priority_weight + return db_safe_priority(self.priority_weight) elif isinstance(self.weight_rule, _DownstreamPriorityWeightStrategy): upstream = False elif isinstance(self.weight_rule, _UpstreamPriorityWeightStrategy): @@ -344,10 +344,13 @@ def priority_weight_total(self) -> int: upstream = False dag = self.get_dag() if dag is None: - return self.priority_weight - return self.priority_weight + sum( - dag.task_dict[task_id].priority_weight - for task_id in self.get_flat_relative_ids(upstream=upstream) + return db_safe_priority(self.priority_weight) + return db_safe_priority( + self.priority_weight + + sum( + dag.task_dict[task_id].priority_weight + for task_id in self.get_flat_relative_ids(upstream=upstream) + ) ) @cached_property diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index c1448ef9cc550..fa26a2026f5d0 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -473,6 +473,8 @@ class derived from this one results in the creation of a task object, This allows the executor to trigger higher priority tasks before others when things get backed up. Set priority_weight as a higher number for more important tasks. + As not all database engines support 64-bit integers, values are capped with 32-bit. + Valid range is from -2,147,483,648 to 2,147,483,647. :param weight_rule: weighting method used for the effective total priority weight of the task. Options are: ``{ downstream | upstream | absolute }`` default is ``downstream`` @@ -494,7 +496,8 @@ class derived from this one results in the creation of a task object, Additionally, when set to ``absolute``, there is bonus effect of significantly speeding up the task creation process as for very large DAGs. Options can be set as string or using the constants defined in - the static class ``airflow.utils.WeightRule`` + the static class ``airflow.utils.WeightRule``. + Irrespective of the weight rule, resulting priority values are capped with 32-bit. |experimental| Since 2.9.0, Airflow allows to define custom priority weight strategy, by creating a subclass of diff --git a/airflow/utils/weight_rule.py b/airflow/utils/weight_rule.py index a63358b0322ce..490bcfbe88843 100644 --- a/airflow/utils/weight_rule.py +++ b/airflow/utils/weight_rule.py @@ -21,6 +21,18 @@ import methodtools +# Databases do not support arbitrary precision integers, so we need to limit the range of priority weights. +# postgres: -2147483648 to +2147483647 (see https://www.postgresql.org/docs/current/datatype-numeric.html) +# mysql: -2147483648 to +2147483647 (see https://dev.mysql.com/doc/refman/8.4/en/integer-types.html) +# sqlite: -9223372036854775808 to +9223372036854775807 (see https://sqlite.org/datatype3.html) +DB_SAFE_MINIMUM = -2147483648 +DB_SAFE_MAXIMUM = 2147483647 + + +def db_safe_priority(priority_weight: int) -> int: + """Convert priority weight to a safe value for the database.""" + return max(DB_SAFE_MINIMUM, min(DB_SAFE_MAXIMUM, priority_weight)) + class WeightRule(str, Enum): """Weight rules.""" diff --git a/docs/apache-airflow/administration-and-deployment/priority-weight.rst b/docs/apache-airflow/administration-and-deployment/priority-weight.rst index dd61d25fcd4ee..7bdeff645026c 100644 --- a/docs/apache-airflow/administration-and-deployment/priority-weight.rst +++ b/docs/apache-airflow/administration-and-deployment/priority-weight.rst @@ -63,6 +63,12 @@ Below are the weighting methods. By default, Airflow's weighting method is ``dow The ``priority_weight`` parameter can be used in conjunction with :ref:`concepts:pool`. +.. note:: + + As most database engines are using 32-bit for integers, the maximum value for any calculated or + defined ``priority_weight`` is 2,147,483,647 and the minimum value is -2,147,483,648. + + Custom Weight Rule ------------------ diff --git a/task_sdk/src/airflow/sdk/definitions/abstractoperator.py b/task_sdk/src/airflow/sdk/definitions/abstractoperator.py index 524ff78b727ef..0e51a9748e824 100644 --- a/task_sdk/src/airflow/sdk/definitions/abstractoperator.py +++ b/task_sdk/src/airflow/sdk/definitions/abstractoperator.py @@ -43,6 +43,12 @@ DEFAULT_OWNER: str = "airflow" DEFAULT_POOL_SLOTS: int = 1 DEFAULT_PRIORITY_WEIGHT: int = 1 +# Databases do not support arbitrary precision integers, so we need to limit the range of priority weights. +# postgres: -2147483648 to +2147483647 (see https://www.postgresql.org/docs/current/datatype-numeric.html) +# mysql: -2147483648 to +2147483647 (see https://dev.mysql.com/doc/refman/8.4/en/integer-types.html) +# sqlite: -9223372036854775808 to +9223372036854775807 (see https://sqlite.org/datatype3.html) +MINIMUM_PRIORITY_WEIGHT: int = -2147483648 +MAXIMUM_PRIORITY_WEIGHT: int = 2147483647 DEFAULT_EXECUTOR: str | None = None DEFAULT_QUEUE: str = "default" DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST: bool = False diff --git a/task_sdk/src/airflow/sdk/definitions/baseoperator.py b/task_sdk/src/airflow/sdk/definitions/baseoperator.py index fc16682a63cd3..5ffdd3ebc3302 100644 --- a/task_sdk/src/airflow/sdk/definitions/baseoperator.py +++ b/task_sdk/src/airflow/sdk/definitions/baseoperator.py @@ -46,6 +46,8 @@ DEFAULT_TRIGGER_RULE, DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING, DEFAULT_WEIGHT_RULE, + MAXIMUM_PRIORITY_WEIGHT, + MINIMUM_PRIORITY_WEIGHT, AbstractOperator, ) from airflow.sdk.definitions.decorators import fixup_decorator_warning_stack @@ -364,6 +366,8 @@ class derived from this one results in the creation of a task object, This allows the executor to trigger higher priority tasks before others when things get backed up. Set priority_weight as a higher number for more important tasks. + As not all database engines support 64-bit integers, values are capped with 32-bit. + Valid range is from -2,147,483,648 to 2,147,483,647. :param weight_rule: weighting method used for the effective total priority weight of the task. Options are: ``{ downstream | upstream | absolute }`` default is ``downstream`` @@ -385,7 +389,8 @@ class derived from this one results in the creation of a task object, Additionally, when set to ``absolute``, there is bonus effect of significantly speeding up the task creation process as for very large DAGs. Options can be set as string or using the constants defined in - the static class ``airflow.utils.WeightRule`` + the static class ``airflow.utils.WeightRule``. + Irrespective of the weight rule, resulting priority values are capped with 32-bit. |experimental| Since 2.9.0, Airflow allows to define custom priority weight strategy, by creating a subclass of @@ -802,7 +807,7 @@ def __init__( self.params = ParamsDict(params) - self.priority_weight = priority_weight + self.priority_weight = max(MINIMUM_PRIORITY_WEIGHT, min(MAXIMUM_PRIORITY_WEIGHT, priority_weight)) self.weight_rule = validate_and_load_priority_weight_strategy(weight_rule) self.max_active_tis_per_dag: int | None = max_active_tis_per_dag diff --git a/tests/utils/test_weight_rule.py b/tests/utils/test_weight_rule.py index 73abafe782b86..387bb9b09e469 100644 --- a/tests/utils/test_weight_rule.py +++ b/tests/utils/test_weight_rule.py @@ -19,7 +19,14 @@ import pytest -from airflow.utils.weight_rule import WeightRule +from airflow.utils.weight_rule import DB_SAFE_MAXIMUM, DB_SAFE_MINIMUM, WeightRule, db_safe_priority + + +def test_db_safe_priority(): + assert db_safe_priority(1) == 1 + assert db_safe_priority(-1) == -1 + assert db_safe_priority(9999999999) == DB_SAFE_MAXIMUM + assert db_safe_priority(-9999999999) == DB_SAFE_MINIMUM class TestWeightRule: