From 225464bdc0c1a76b5fc384fb97d8797aa83d7b50 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 2 Nov 2024 21:52:08 +0100 Subject: [PATCH 1/4] Ensure priority weight is capped at 32-bit integer to prevent roll-over --- airflow/models/abstractoperator.py | 15 +++++++++------ airflow/models/baseoperator.py | 5 ++++- airflow/utils/weight_rule.py | 12 ++++++++++++ .../priority-weight.rst | 6 ++++++ .../airflow/sdk/definitions/abstractoperator.py | 6 ++++++ .../src/airflow/sdk/definitions/baseoperator.py | 9 +++++++-- tests/utils/test_weight_rule.py | 9 ++++++++- 7 files changed, 52 insertions(+), 10 deletions(-) diff --git a/airflow/models/abstractoperator.py b/airflow/models/abstractoperator.py index feafb0b6b637..58c2aec6fdeb 100644 --- a/airflow/models/abstractoperator.py +++ b/airflow/models/abstractoperator.py @@ -38,7 +38,7 @@ from airflow.utils.state import State, TaskInstanceState from airflow.utils.task_group import MappedTaskGroup from airflow.utils.trigger_rule import TriggerRule -from airflow.utils.weight_rule import WeightRule +from airflow.utils.weight_rule import WeightRule, db_safe_priority if TYPE_CHECKING: from collections.abc import Mapping @@ -335,7 +335,7 @@ def priority_weight_total(self) -> int: ) if isinstance(self.weight_rule, _AbsolutePriorityWeightStrategy): - return self.priority_weight + return db_safe_priority(self.priority_weight) elif isinstance(self.weight_rule, _DownstreamPriorityWeightStrategy): upstream = False elif isinstance(self.weight_rule, _UpstreamPriorityWeightStrategy): @@ -344,10 +344,13 @@ def priority_weight_total(self) -> int: upstream = False dag = self.get_dag() if dag is None: - return self.priority_weight - return self.priority_weight + sum( - dag.task_dict[task_id].priority_weight - for task_id in self.get_flat_relative_ids(upstream=upstream) + return db_safe_priority(self.priority_weight) + return db_safe_priority( + self.priority_weight + + sum( + dag.task_dict[task_id].priority_weight + for task_id in self.get_flat_relative_ids(upstream=upstream) + ) ) @cached_property diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index c1448ef9cc55..fa26a2026f5d 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -473,6 +473,8 @@ class derived from this one results in the creation of a task object, This allows the executor to trigger higher priority tasks before others when things get backed up. Set priority_weight as a higher number for more important tasks. + As not all database engines support 64-bit integers, values are capped with 32-bit. + Valid range is from -2,147,483,648 to 2,147,483,647. :param weight_rule: weighting method used for the effective total priority weight of the task. Options are: ``{ downstream | upstream | absolute }`` default is ``downstream`` @@ -494,7 +496,8 @@ class derived from this one results in the creation of a task object, Additionally, when set to ``absolute``, there is bonus effect of significantly speeding up the task creation process as for very large DAGs. Options can be set as string or using the constants defined in - the static class ``airflow.utils.WeightRule`` + the static class ``airflow.utils.WeightRule``. + Irrespective of the weight rule, resulting priority values are capped with 32-bit. |experimental| Since 2.9.0, Airflow allows to define custom priority weight strategy, by creating a subclass of diff --git a/airflow/utils/weight_rule.py b/airflow/utils/weight_rule.py index a63358b0322c..490bcfbe8884 100644 --- a/airflow/utils/weight_rule.py +++ b/airflow/utils/weight_rule.py @@ -21,6 +21,18 @@ import methodtools +# Databases do not support arbitrary precision integers, so we need to limit the range of priority weights. +# postgres: -2147483648 to +2147483647 (see https://www.postgresql.org/docs/current/datatype-numeric.html) +# mysql: -2147483648 to +2147483647 (see https://dev.mysql.com/doc/refman/8.4/en/integer-types.html) +# sqlite: -9223372036854775808 to +9223372036854775807 (see https://sqlite.org/datatype3.html) +DB_SAFE_MINIMUM = -2147483648 +DB_SAFE_MAXIMUM = 2147483647 + + +def db_safe_priority(priority_weight: int) -> int: + """Convert priority weight to a safe value for the database.""" + return max(DB_SAFE_MINIMUM, min(DB_SAFE_MAXIMUM, priority_weight)) + class WeightRule(str, Enum): """Weight rules.""" diff --git a/docs/apache-airflow/administration-and-deployment/priority-weight.rst b/docs/apache-airflow/administration-and-deployment/priority-weight.rst index dd61d25fcd4e..7bdeff645026 100644 --- a/docs/apache-airflow/administration-and-deployment/priority-weight.rst +++ b/docs/apache-airflow/administration-and-deployment/priority-weight.rst @@ -63,6 +63,12 @@ Below are the weighting methods. By default, Airflow's weighting method is ``dow The ``priority_weight`` parameter can be used in conjunction with :ref:`concepts:pool`. +.. note:: + + As most database engines are using 32-bit for integers, the maximum value for any calculated or + defined ``priority_weight`` is 2,147,483,647 and the minimum value is -2,147,483,648. + + Custom Weight Rule ------------------ diff --git a/task_sdk/src/airflow/sdk/definitions/abstractoperator.py b/task_sdk/src/airflow/sdk/definitions/abstractoperator.py index 524ff78b727e..0e51a9748e82 100644 --- a/task_sdk/src/airflow/sdk/definitions/abstractoperator.py +++ b/task_sdk/src/airflow/sdk/definitions/abstractoperator.py @@ -43,6 +43,12 @@ DEFAULT_OWNER: str = "airflow" DEFAULT_POOL_SLOTS: int = 1 DEFAULT_PRIORITY_WEIGHT: int = 1 +# Databases do not support arbitrary precision integers, so we need to limit the range of priority weights. +# postgres: -2147483648 to +2147483647 (see https://www.postgresql.org/docs/current/datatype-numeric.html) +# mysql: -2147483648 to +2147483647 (see https://dev.mysql.com/doc/refman/8.4/en/integer-types.html) +# sqlite: -9223372036854775808 to +9223372036854775807 (see https://sqlite.org/datatype3.html) +MINIMUM_PRIORITY_WEIGHT: int = -2147483648 +MAXIMUM_PRIORITY_WEIGHT: int = 2147483647 DEFAULT_EXECUTOR: str | None = None DEFAULT_QUEUE: str = "default" DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST: bool = False diff --git a/task_sdk/src/airflow/sdk/definitions/baseoperator.py b/task_sdk/src/airflow/sdk/definitions/baseoperator.py index fc16682a63cd..5ffdd3ebc330 100644 --- a/task_sdk/src/airflow/sdk/definitions/baseoperator.py +++ b/task_sdk/src/airflow/sdk/definitions/baseoperator.py @@ -46,6 +46,8 @@ DEFAULT_TRIGGER_RULE, DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING, DEFAULT_WEIGHT_RULE, + MAXIMUM_PRIORITY_WEIGHT, + MINIMUM_PRIORITY_WEIGHT, AbstractOperator, ) from airflow.sdk.definitions.decorators import fixup_decorator_warning_stack @@ -364,6 +366,8 @@ class derived from this one results in the creation of a task object, This allows the executor to trigger higher priority tasks before others when things get backed up. Set priority_weight as a higher number for more important tasks. + As not all database engines support 64-bit integers, values are capped with 32-bit. + Valid range is from -2,147,483,648 to 2,147,483,647. :param weight_rule: weighting method used for the effective total priority weight of the task. Options are: ``{ downstream | upstream | absolute }`` default is ``downstream`` @@ -385,7 +389,8 @@ class derived from this one results in the creation of a task object, Additionally, when set to ``absolute``, there is bonus effect of significantly speeding up the task creation process as for very large DAGs. Options can be set as string or using the constants defined in - the static class ``airflow.utils.WeightRule`` + the static class ``airflow.utils.WeightRule``. + Irrespective of the weight rule, resulting priority values are capped with 32-bit. |experimental| Since 2.9.0, Airflow allows to define custom priority weight strategy, by creating a subclass of @@ -802,7 +807,7 @@ def __init__( self.params = ParamsDict(params) - self.priority_weight = priority_weight + self.priority_weight = max(MINIMUM_PRIORITY_WEIGHT, min(MAXIMUM_PRIORITY_WEIGHT, priority_weight)) self.weight_rule = validate_and_load_priority_weight_strategy(weight_rule) self.max_active_tis_per_dag: int | None = max_active_tis_per_dag diff --git a/tests/utils/test_weight_rule.py b/tests/utils/test_weight_rule.py index 73abafe782b8..387bb9b09e46 100644 --- a/tests/utils/test_weight_rule.py +++ b/tests/utils/test_weight_rule.py @@ -19,7 +19,14 @@ import pytest -from airflow.utils.weight_rule import WeightRule +from airflow.utils.weight_rule import DB_SAFE_MAXIMUM, DB_SAFE_MINIMUM, WeightRule, db_safe_priority + + +def test_db_safe_priority(): + assert db_safe_priority(1) == 1 + assert db_safe_priority(-1) == -1 + assert db_safe_priority(9999999999) == DB_SAFE_MAXIMUM + assert db_safe_priority(-9999999999) == DB_SAFE_MINIMUM class TestWeightRule: From ca130da37dbefc7bb5bb37bfe3f0a87942596009 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Tue, 12 Nov 2024 22:24:25 +0100 Subject: [PATCH 2/4] Add newsfragment --- newsfragments/43611.significant.rst | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 newsfragments/43611.significant.rst diff --git a/newsfragments/43611.significant.rst b/newsfragments/43611.significant.rst new file mode 100644 index 000000000000..e25fb2a5bba4 --- /dev/null +++ b/newsfragments/43611.significant.rst @@ -0,0 +1,6 @@ +TaskInstance ``priority_weight`` is capped in 32-bit signed integer ranges. + +Some database engines are limited to 32-bit integer values. As some users reported errors in +weight rolled-over to negative values, we decided to cap the value to the 32-bit integer. Even +if internally in python smaller or larger values to 64 bit are supported, ``priority_weight`` is +capped and only storing values from -2147483648 to 2147483647. From 0f825ffc7cb1a4dcb1b6b0f79c10a6e04c281cd5 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Tue, 12 Nov 2024 23:12:55 +0100 Subject: [PATCH 3/4] Move range check post type check --- task_sdk/src/airflow/sdk/definitions/baseoperator.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/task_sdk/src/airflow/sdk/definitions/baseoperator.py b/task_sdk/src/airflow/sdk/definitions/baseoperator.py index 5ffdd3ebc330..526c8a584587 100644 --- a/task_sdk/src/airflow/sdk/definitions/baseoperator.py +++ b/task_sdk/src/airflow/sdk/definitions/baseoperator.py @@ -807,7 +807,7 @@ def __init__( self.params = ParamsDict(params) - self.priority_weight = max(MINIMUM_PRIORITY_WEIGHT, min(MAXIMUM_PRIORITY_WEIGHT, priority_weight)) + self.priority_weight = priority_weight self.weight_rule = validate_and_load_priority_weight_strategy(weight_rule) self.max_active_tis_per_dag: int | None = max_active_tis_per_dag @@ -874,6 +874,11 @@ def __init__( validate_instance_args(self, BASEOPERATOR_ARGS_EXPECTED_TYPES) + # Ensure priority_weight is within the valid range + self.priority_weight = max( + MINIMUM_PRIORITY_WEIGHT, min(MAXIMUM_PRIORITY_WEIGHT, self.priority_weight) + ) + def __eq__(self, other): if type(self) is type(other): # Use getattr() instead of __dict__ as __dict__ doesn't return From 2e3bfd5774f9563a6188ba23cb7b0ef01724ca98 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Thu, 14 Nov 2024 22:01:15 +0100 Subject: [PATCH 4/4] Review feedback - consolidate to single implementation for now --- task_sdk/src/airflow/sdk/definitions/baseoperator.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/task_sdk/src/airflow/sdk/definitions/baseoperator.py b/task_sdk/src/airflow/sdk/definitions/baseoperator.py index 526c8a584587..1eae39576c8e 100644 --- a/task_sdk/src/airflow/sdk/definitions/baseoperator.py +++ b/task_sdk/src/airflow/sdk/definitions/baseoperator.py @@ -46,8 +46,6 @@ DEFAULT_TRIGGER_RULE, DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING, DEFAULT_WEIGHT_RULE, - MAXIMUM_PRIORITY_WEIGHT, - MINIMUM_PRIORITY_WEIGHT, AbstractOperator, ) from airflow.sdk.definitions.decorators import fixup_decorator_warning_stack @@ -62,6 +60,7 @@ from airflow.utils.setup_teardown import SetupTeardownContext from airflow.utils.trigger_rule import TriggerRule from airflow.utils.types import AttributeRemoved +from airflow.utils.weight_rule import db_safe_priority T = TypeVar("T", bound=FunctionType) @@ -875,9 +874,8 @@ def __init__( validate_instance_args(self, BASEOPERATOR_ARGS_EXPECTED_TYPES) # Ensure priority_weight is within the valid range - self.priority_weight = max( - MINIMUM_PRIORITY_WEIGHT, min(MAXIMUM_PRIORITY_WEIGHT, self.priority_weight) - ) + # Note: Cross-import from airflow.utils to be cleaned up later + self.priority_weight = db_safe_priority(self.priority_weight) def __eq__(self, other): if type(self) is type(other):