diff --git a/dbt/include/spark/macros/materializations/incremental/strategies.sql b/dbt/include/spark/macros/materializations/incremental/strategies.sql index eeb920493..019dad452 100644 --- a/dbt/include/spark/macros/materializations/incremental/strategies.sql +++ b/dbt/include/spark/macros/materializations/incremental/strategies.sql @@ -75,6 +75,9 @@ {%- elif strategy == 'insert_overwrite' -%} {#-- insert statements don't like CTEs, so support them via a temp view #} {{ get_insert_overwrite_sql(source, target, existing) }} + {%- elif strategy == 'microbatch' -%} + {#-- insert statements don't like CTEs, so support them via a temp view #} + {{ get_insert_overwrite_sql(source, target, existing) }} {%- elif strategy == 'merge' -%} {#-- merge all columns for datasources which implement MERGE INTO (e.g. databricks, iceberg) - schema changes are handled for us #} {{ get_merge_sql(target, source, unique_key, dest_columns=none, incremental_predicates=incremental_predicates) }} diff --git a/tests/functional/adapter/incremental_strategies/test_microbatch.py b/tests/functional/adapter/incremental_strategies/test_microbatch.py new file mode 100644 index 000000000..7806b0299 --- /dev/null +++ b/tests/functional/adapter/incremental_strategies/test_microbatch.py @@ -0,0 +1,17 @@ +import pytest + +from dbt.tests.adapter.incremental.test_incremental_microbatch import ( + BaseMicrobatch, +) + +# No requirement for a unique_id for spark microbatch! +_microbatch_model_no_unique_id_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day') }} +select * from {{ ref('input_model') }} +""" + + +class TestMicrobatch(BaseMicrobatch): + @pytest.fixture(scope="class") + def microbatch_model_sql(self) -> str: + return _microbatch_model_no_unique_id_sql