diff --git a/.changes/unreleased/Features-20241001-161422.yaml b/.changes/unreleased/Features-20241001-161422.yaml new file mode 100644 index 00000000000..60f10d8d667 --- /dev/null +++ b/.changes/unreleased/Features-20241001-161422.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Change gating of microbatch feature to be behind project flag / behavior flag +time: 2024-10-01T16:14:22.267253-05:00 +custom: + Author: MichelleArk QMalcolm + Issue: "10798" diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index dfc8c9bb40b..ad83d445408 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -238,7 +238,7 @@ def resolve_limit(self) -> Optional[int]: def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeFilter]: event_time_filter = None if ( - os.environ.get("DBT_EXPERIMENTAL_MICROBATCH") + get_flags().require_builtin_microbatch_strategy and (isinstance(target.config, NodeConfig) or isinstance(target.config, SourceConfig)) and target.config.event_time and self.model.config.materialized == "incremental" diff --git a/core/dbt/contracts/project.py b/core/dbt/contracts/project.py index f5a4ec605ec..4a722eb66d7 100644 --- a/core/dbt/contracts/project.py +++ b/core/dbt/contracts/project.py @@ -338,6 +338,7 @@ class ProjectFlags(ExtensibleDbtClassMixin): write_json: Optional[bool] = None # legacy behaviors - https://github.com/dbt-labs/dbt-core/blob/main/docs/guides/behavior-change-flags.md + require_builtin_microbatch_strategy: bool = False require_explicit_package_overrides_for_builtin_materializations: bool = True require_resource_names_without_spaces: bool = False source_freshness_run_project_hooks: bool = False @@ -348,6 +349,7 @@ class ProjectFlags(ExtensibleDbtClassMixin): @property def project_only_flags(self) -> Dict[str, Any]: return { + "require_builtin_microbatch_strategy": self.require_builtin_microbatch_strategy, "require_explicit_package_overrides_for_builtin_materializations": self.require_explicit_package_overrides_for_builtin_materializations, "require_resource_names_without_spaces": self.require_resource_names_without_spaces, "source_freshness_run_project_hooks": self.source_freshness_run_project_hooks, diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index 7ffd00febc5..0e7d6a3dea9 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -1383,7 +1383,7 @@ def check_valid_snapshot_config(self): node.config.final_validate() def check_valid_microbatch_config(self): - if os.environ.get("DBT_EXPERIMENTAL_MICROBATCH"): + if get_flags().require_builtin_microbatch_strategy: for node in self.manifest.nodes.values(): if ( node.config.materialized == "incremental" diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index b1f706d72ec..49bd80a2382 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -1,5 +1,4 @@ import functools -import os import threading import time from copy import deepcopy @@ -33,6 +32,7 @@ RunningOperationCaughtError, ) from dbt.exceptions import CompilationError, DbtInternalError, DbtRuntimeError +from dbt.flags import get_flags from dbt.graph import ResourceTypeSelector from dbt.hooks import get_hook_dict from dbt.materializations.incremental.microbatch import MicrobatchBuilder @@ -466,9 +466,8 @@ def execute(self, model, manifest): ) hook_ctx = self.adapter.pre_model_hook(context_config) - if ( - os.environ.get("DBT_EXPERIMENTAL_MICROBATCH") + get_flags().require_builtin_microbatch_strategy and model.config.materialized == "incremental" and model.config.incremental_strategy == "microbatch" ): diff --git a/dev-requirements.txt b/dev-requirements.txt index 20605e632b8..83b2911011b 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,5 +1,5 @@ -git+https://github.com/dbt-labs/dbt-adapters.git@main -git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-tests-adapter +git+https://github.com/dbt-labs/dbt-adapters.git@microbatch-behavior-flag +git+https://github.com/dbt-labs/dbt-adapters.git@microbatch-behavior-flag#subdirectory=dbt-tests-adapter git+https://github.com/dbt-labs/dbt-common.git@main git+https://github.com/dbt-labs/dbt-postgres.git@main # black must match what's in .pre-commit-config.yaml to be sure local env matches CI diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index f6b49e1405a..dee17b6fae8 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -1,4 +1,3 @@ -import os from unittest import mock import pytest @@ -113,8 +112,24 @@ def models(self): def macros(self): return {"microbatch.sql": custom_microbatch_strategy} + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "require_builtin_microbatch_strategy": True, + } + } + class TestMicrobatchCustomUserStrategyDefault(BaseMicrobatchCustomUserStrategy): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "require_builtin_microbatch_strategy": False, + } + } + def test_use_custom_microbatch_strategy_by_default(self, project): with mock.patch.object( type(project.adapter), "valid_incremental_strategies", lambda _: [] @@ -128,7 +143,6 @@ def test_use_custom_microbatch_strategy_by_default(self, project): class TestMicrobatchCustomUserStrategyEnvVarTrueValid(BaseMicrobatchCustomUserStrategy): - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_use_custom_microbatch_strategy_env_var_true_invalid_incremental_strategy( self, project ): @@ -145,10 +159,7 @@ def test_use_custom_microbatch_strategy_env_var_true_invalid_incremental_strateg assert "custom microbatch strategy" in logs -# TODO: Consider a behaviour flag here if DBT_EXPERIMENTAL_MICROBATCH is removed -# Since this causes an exception prior to using an override class TestMicrobatchCustomUserStrategyEnvVarTrueInvalid(BaseMicrobatchCustomUserStrategy): - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_use_custom_microbatch_strategy_env_var_true_invalid_incremental_strategy( self, project ): @@ -173,6 +184,14 @@ def models(self): "microbatch_model.sql": microbatch_model_sql, } + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "require_builtin_microbatch_strategy": True, + } + } + def assert_row_count(self, project, relation_name: str, expected_row_count: int): relation = relation_from_name(project.adapter, relation_name) result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one") @@ -185,7 +204,6 @@ def assert_row_count(self, project, relation_name: str, expected_row_count: int) class TestMicrobatchCLI(BaseMicrobatchTest): - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # run without --event-time-start or --event-time-end - 3 expected rows in output catcher = EventCatcher(event_to_catch=LogModelResult) @@ -228,7 +246,6 @@ def test_run_with_event_time(self, project): class TestMicroBatchBoundsDefault(BaseMicrobatchTest): - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # initial run -- backfills all data with patch_microbatch_end_time("2020-01-03 13:57:00"): @@ -280,7 +297,6 @@ def models(self): "seeds.yml": seeds_yaml, } - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # ensure seed is created for source run_dbt(["seed"]) @@ -328,7 +344,6 @@ def models(self): "microbatch_model.sql": microbatch_model_sql, } - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # initial run -- backfills all data with patch_microbatch_end_time("2020-01-03 13:57:00"): @@ -356,7 +371,6 @@ def test_run_with_event_time(self, project): class TestMicrobatchUsingRefRenderSkipsFilter(BaseMicrobatchTest): - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # initial run -- backfills all data with patch_microbatch_end_time("2020-01-03 13:57:00"): @@ -409,7 +423,6 @@ def models(self): "microbatch_model.sql": microbatch_model_context_vars, } - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time_logs(self, project): with patch_microbatch_end_time("2020-01-03 13:57:00"): _, logs = run_dbt_and_capture(["run", "--event-time-start", "2020-01-01"]) @@ -442,7 +455,6 @@ def models(self): "downstream_model.sql": downstream_model_of_microbatch_sql, } - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): @@ -467,7 +479,6 @@ def models(self): "microbatch_model.sql": microbatch_model_failing_incremental_partition_sql, } - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): @@ -507,7 +518,6 @@ def models(self): "microbatch_model.sql": microbatch_model_failing_incremental_partition_sql, } - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): @@ -554,7 +564,6 @@ def models(self): "microbatch_model.sql": microbatch_model_first_partition_failing_sql, } - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): @@ -563,7 +572,6 @@ def test_run_with_event_time(self, project): class TestMicrobatchCompiledRunPaths(BaseMicrobatchTest): - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): @@ -646,7 +654,6 @@ def models(self): "downstream_model.sql": downstream_model_of_microbatch_sql, } - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # run all partitions from 2020-01-02 to spoofed "now" - 2 expected rows in output with patch_microbatch_end_time("2020-01-03 13:57:00"): diff --git a/tests/functional/microbatch/test_microbatch_config_validation.py b/tests/functional/microbatch/test_microbatch_config_validation.py index cdebd3a791b..f97e85a8f8a 100644 --- a/tests/functional/microbatch/test_microbatch_config_validation.py +++ b/tests/functional/microbatch/test_microbatch_config_validation.py @@ -1,6 +1,3 @@ -import os -from unittest import mock - import pytest from dbt.exceptions import ParsingError @@ -86,7 +83,14 @@ class BaseMicrobatchTestParseError: def models(self): return {} - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "require_builtin_microbatch_strategy": True, + } + } + def test_parsing_error_raised(self, project): with pytest.raises(ParsingError): run_dbt(["parse"]) @@ -97,7 +101,14 @@ class BaseMicrobatchTestNoError: def models(self): return {} - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "require_builtin_microbatch_strategy": True, + } + } + def test_parsing_error_not_raised(self, project): run_dbt(["parse"]) diff --git a/tests/unit/context/test_providers.py b/tests/unit/context/test_providers.py index 46c29254a9a..daa8d1d1bda 100644 --- a/tests/unit/context/test_providers.py +++ b/tests/unit/context/test_providers.py @@ -1,4 +1,4 @@ -import os +from argparse import Namespace from unittest import mock import pytest @@ -13,6 +13,7 @@ RuntimeRefResolver, RuntimeSourceResolver, ) +from dbt.flags import set_from_args class TestBaseResolver: @@ -56,8 +57,9 @@ def test_resolve_event_time_filter( incremental_strategy: str, expect_filter: bool, ) -> None: - if dbt_experimental_microbatch: - mocker.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + set_from_args( + Namespace(require_builtin_microbatch_strategy=dbt_experimental_microbatch), None + ) # Target mocking target = mock.Mock() @@ -117,6 +119,8 @@ def test_create_relation_with_empty(self, resolver, empty, is_ephemeral_model, e mock_node.is_ephemeral_model = is_ephemeral_model mock_node.defer_relation = None + set_from_args(Namespace(require_builtin_microbatch_strategy=False), None) + # create limited relation with mock.patch("dbt.contracts.graph.nodes.ParsedNode", new=mock.Mock): relation = resolver.create_relation(mock_node) @@ -156,6 +160,8 @@ def test_create_relation_with_empty(self, resolver, empty, expected_limit): mock_source.quoting_dict = {} resolver.manifest.resolve_source.return_value = mock_source + set_from_args(Namespace(require_builtin_microbatch_strategy=False), None) + # create limited relation relation = resolver.resolve("test", "test") assert relation.limit == expected_limit