Skip to content

Commit

Permalink
Allow use of multiple column unique keys in snapshots (#326)
Browse files Browse the repository at this point in the history
Co-authored-by: Antonio Papa <antoniogpapa@gmail.com>
Co-authored-by: Colin Rogers <111200756+colin-rogers-dbt@users.noreply.github.com>
  • Loading branch information
3 people authored Oct 19, 2024
1 parent 937c8c7 commit 5fd5467
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 33 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240422-081302.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Allows unique_key for snapshots to take a list
time: 2024-04-22T08:13:02.937534-04:00
custom:
Author: agpapa
Issue: "181"
11 changes: 11 additions & 0 deletions dbt/adapters/base/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Dict,
FrozenSet,
Iterator,
List,
Optional,
Set,
Tuple,
Expand Down Expand Up @@ -341,6 +342,16 @@ def create(
)
return cls.from_dict(kwargs)

@classmethod
def scd_args(cls: Type[Self], primary_key: Union[str, List[str]], updated_at) -> List[str]:
scd_args = []
if isinstance(primary_key, list):
scd_args.extend(primary_key)
else:
scd_args.append(primary_key)
scd_args.append(updated_at)
return scd_args

@property
def can_be_renamed(self) -> bool:
return self.type in self.renameable_relations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@

snapshotted_data as (

select *,
{{ strategy.unique_key }} as dbt_unique_key

select *, {{ unique_key_fields(strategy.unique_key) }}
from {{ target_relation }}
where
{% if config.get('dbt_valid_to_current') %}
Expand All @@ -65,9 +63,7 @@

insertions_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key,
select *, {{ unique_key_fields(strategy.unique_key) }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ get_dbt_valid_to_current(strategy, columns) }},
Expand All @@ -78,9 +74,7 @@

updates_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key,
select *, {{ unique_key_fields(strategy.unique_key) }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_to }}
Expand All @@ -92,9 +86,7 @@

deletes_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key
select *, {{ unique_key_fields(strategy.unique_key) }}
from snapshot_query
),
{% endif %}
Expand All @@ -106,13 +98,11 @@
source_data.*

from insertions_source_data as source_data
left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where snapshotted_data.dbt_unique_key is null
or (
snapshotted_data.dbt_unique_key is not null
and (
{{ strategy.row_changed }}
)
left outer join snapshotted_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "snapshotted_data") }}
or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and {{ strategy.row_changed }})

)

),
Expand All @@ -125,7 +115,8 @@
snapshotted_data.{{ columns.dbt_scd_id }}

from updates_source_data as source_data
join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
join snapshotted_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where (
{{ strategy.row_changed }}
)
Expand All @@ -145,8 +136,9 @@
snapshotted_data.{{ columns.dbt_scd_id }}

from snapshotted_data
left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where source_data.dbt_unique_key is null
left join deletes_source_data as source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
)
{%- endif %}

Expand Down Expand Up @@ -217,8 +209,51 @@
{% endif %}
{% endmacro %}


{% macro get_dbt_valid_to_current(strategy, columns) %}
{% set dbt_valid_to_current = config.get('dbt_valid_to_current') or "null" %}
coalesce(nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}), {{dbt_valid_to_current}})
as {{ columns.dbt_valid_to }}
{% endmacro %}


{% macro unique_key_fields(unique_key) %}
{% if unique_key | is_list %}
{% for key in unique_key %}
{{ key }} as dbt_unique_key_{{ loop.index }}
{%- if not loop.last %} , {%- endif %}
{% endfor %}
{% else %}
{{ unique_key }} as dbt_unique_key
{% endif %}
{% endmacro %}


{% macro unique_key_join_on(unique_key, identifier, from_identifier) %}
{% if strategy.unique_key | is_list %}
{% for key in strategy.unique_key %}
{{ identifier }}.dbt_unique_key_{{ loop.index }} = {{ from_identifier }}.dbt_unique_key_{{ loop.index }}
{%- if not loop.last %} and {%- endif %}
{% endfor %}
{% else %}
{{ identifier }}.dbt_unique_key = {{ from_identifier }}.dbt_unique_key
{% endif %}
{% endmacro %}


{% macro unique_key_is_null(unique_key, identifier) %}
{% if unique_key | is_list %}
{{ identifier }}.dbt_unique_key_1 is null
{% else %}
{{ identifer }}.dbt_unique_key is null
{% endif %}
{% endmacro %}


{% macro unique_key_is_not_null(unique_key, identifier) %}
{% if unique_key | is_list %}
{{ identifier }}.dbt_unique_key_1 is not null
{% else %}
{{ identifer }}.dbt_unique_key is not null
{% endif %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,22 @@
{% do adapter.expand_target_column_types(from_relation=staging_table,
to_relation=target_relation) %}

{% set remove_columns = ['dbt_change_type', 'DBT_CHANGE_TYPE', 'dbt_unique_key', 'DBT_UNIQUE_KEY'] %}
{% if unique_key | is_list %}
{% for key in strategy.unique_key %}
{{ remove_columns.append('dbt_unique_key_' + loop.index|string) }}
{{ remove_columns.append('DBT_UNIQUE_KEY_' + loop.index|string) }}
{% endfor %}
{% endif %}

{% set missing_columns = adapter.get_missing_columns(staging_table, target_relation)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| rejectattr('name', 'in', remove_columns)
| list %}

{% do create_columns(target_relation, missing_columns) %}

{% set source_columns = adapter.get_columns_in_relation(staging_table)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| rejectattr('name', 'in', remove_columns)
| list %}

{% set quoted_source_columns = [] %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@
({{ snapshotted_rel }}.{{ columns.dbt_valid_from }} < {{ current_rel }}.{{ updated_at }})
{%- endset %}

{% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %}
{% set scd_args = api.Relation.scd_args(primary_key, updated_at) %}
{% set scd_id_expr = snapshot_hash_arguments(scd_args) %}

{% do return({
"unique_key": primary_key,
Expand Down Expand Up @@ -166,7 +167,8 @@
)
{%- endset %}

{% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %}
{% set scd_args = api.Relation.scd_args(primary_key, updated_at) %}
{% set scd_id_expr = snapshot_hash_arguments(scd_args) %}

{% do return({
"unique_key": primary_key,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ classifiers = [
"Programming Language :: Python :: 3.12",
]
dependencies = [
"dbt-common>=1.10,<2.0",
"dbt-common>=1.11,<2.0",
"pytz>=2015.7",
# installed via dbt-common but used directly
"agate>=1.0,<2.0",
Expand Down

0 comments on commit 5fd5467

Please sign in to comment.