Skip to content

Commit

Permalink
Merge pull request #92 from tekliner/BI-483-merge-fork-of-dbt-ch-conn…
Browse files Browse the repository at this point in the history
…ector

Adaptive materialization strategy when using Atomic schema engine
  • Loading branch information
genzgd authored Sep 2, 2022
2 parents 0952cf5 + c79d2fc commit a72555b
Show file tree
Hide file tree
Showing 6 changed files with 275 additions and 26 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ __pycache__/
# Distribution / packaging
.Python
env*/
venv*/
dbt_env/
build/
develop-eggs/
Expand Down
31 changes: 31 additions & 0 deletions dbt/include/clickhouse/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,34 @@
{{ sql }}
{{ adapter.get_model_settings(model) }}
{%- endmacro %}


{% macro engine_exchange_support(rel) %}
{% if rel is none %}
{% do return(None) %}
{% endif %}

{% if not execute %}
{% do return(None) %}
{% endif %}

{% set relation = adapter.get_relation(rel.database, rel.schema, rel.table) %}
{% if relation is none %}
{% do return(None) %}
{% endif %}

{% set sel %}
( SELECT engine FROM system.databases WHERE name='{{relation.schema}}' )
{% endset %}

{% set results = run_query(sel) %}
{% set engine = results.columns[0].values()[0] %}
{% do return(engine in ['Atomic', 'Replicated']) %}
{% endmacro %}


{% macro exchange_tables_atomic(intermediate_relation, target_relation) %}
{%- call statement('exchange_tables_atomic') -%}
EXCHANGE TABLES {{ intermediate_relation }} AND {{ target_relation }}
{% endcall %}
{% endmacro %}
70 changes: 50 additions & 20 deletions dbt/include/clickhouse/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
{% materialization incremental, adapter='clickhouse' -%}
{% materialization incremental, adapter='clickhouse' %}

{% set unique_key = config.get('unique_key') %}
{% set inserts_only = config.get('inserts_only') %}

{% set target_relation = this.incorporate(type='table') %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(target_relation) %}

{% set is_atomic = engine_exchange_support(this) %}

{%- set full_refresh_mode = (should_full_refresh()) -%}

{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}
Expand Down Expand Up @@ -48,7 +51,6 @@

{% set build_sql = create_table_as(False, intermediate_relation, sql) %}
{% set need_swap = true %}
{% do to_drop.append(backup_relation) %}
{% else %}
{%- if inserts_only or unique_key is none -%}
-- Run incremental insert without updates - updated rows will be added too to the table and will create
Expand All @@ -68,31 +70,59 @@
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% endif %}

{% do adapter.rename_relation(target_relation, old_relation) %}
-- Create a new target table.
{% set create_sql = clickhouse__incremental_create(old_relation, target_relation) %}
{% call statement('main') %}
{{ create_sql }}
{% endcall %}
-- Insert all untouched rows to the target table.
{% set currect_insert_sql = clickhouse__incremental_cur_insert(old_relation, tmp_relation, target_relation, unique_key=unique_key) %}
{% call statement('main') %}
{{ currect_insert_sql }}
{% endcall %}
-- Insert all incremental updates to the target table.
{% set build_sql = clickhouse__incremental_insert_from_table(tmp_relation, target_relation) %}
{% if is_atomic %}
-- Create a new target table from existing (old_relation will be new table here, then EXCHANGE'd with target_relation)
{% set create_sql = clickhouse__incremental_create(target_relation, old_relation) %}
{% call statement('main') %}
{{ create_sql }}
{% endcall %}
-- Insert all current rows from target table to the new table (old_relation)
{% set currect_insert_sql = clickhouse__incremental_cur_insert(target_relation, tmp_relation, old_relation, unique_key=unique_key) %}
{% call statement('main') %}
{{ currect_insert_sql }}
{% endcall %}
-- Insert all incremental updates from tmp table to the new table (old_relation)
{% set build_sql_insert = clickhouse__incremental_insert_from_table(tmp_relation, old_relation) %}
{% call statement('main') %}
{{ build_sql_insert }}
{% endcall %}
-- Exchange tables
{% do exchange_tables_atomic(old_relation, target_relation) %}
{% else %}
{% do adapter.rename_relation(target_relation, old_relation) %}
-- Create a new target table.
{% set create_sql = clickhouse__incremental_create(old_relation, target_relation) %}
{% call statement('main') %}
{{ create_sql }}
{% endcall %}
-- Insert all untouched rows to the target table.
{% set currect_insert_sql = clickhouse__incremental_cur_insert(old_relation, tmp_relation, target_relation, unique_key=unique_key) %}
{% call statement('main') %}
{{ currect_insert_sql }}
{% endcall %}
-- Insert all incremental updates to the target table.
{% set build_sql = clickhouse__incremental_insert_from_table(tmp_relation, target_relation) %}
{% endif %}
{% do to_drop.append(old_relation) %}
{% do to_drop.append(tmp_relation) %}
{% endif %}
{% endif %}

{% call statement('main') %}
{{ build_sql }}
{% endcall %}
{% if build_sql %}
{% call statement('main') %}
{{ build_sql }}
{% endcall %}
{% endif %}

{% if need_swap %}
{% do adapter.rename_relation(target_relation, backup_relation) %}
{% do adapter.rename_relation(intermediate_relation, target_relation) %}
{% if is_atomic %}
{% do adapter.rename_relation(intermediate_relation, backup_relation) %}
{% do exchange_tables_atomic(backup_relation, target_relation) %}
{% else %}
{% do adapter.rename_relation(target_relation, backup_relation) %}
{% do adapter.rename_relation(intermediate_relation, target_relation) %}
{% endif %}
{% do to_drop.append(backup_relation) %}
{% endif %}

{% do persist_docs(target_relation, model) %}
Expand Down
25 changes: 19 additions & 6 deletions dbt/include/clickhouse/macros/materializations/snapshot.sql
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,24 @@
where {{ source }}.dbt_change_type IN ('insert');
{% endcall %}

{% call statement('drop_target_relation') %}
drop table if exists {{ target }}
{% endcall %}
{% if engine_exchange_support(target) %}

{% do exchange_tables_atomic(upsert, target) %}

{% call statement('drop_exchanged_relation') %}
drop table if exists {{ upsert }};
{% endcall %}

{% else %}

{% call statement('drop_target_relation') %}
drop table if exists {{ target }};
{% endcall %}

{% call statement('rename_upsert_relation') %}
rename table {{ upsert }} to {{ target }};
{% endcall %}

{% endif %}

{% call statement('rename_upsert_relation') %}
rename table {{ upsert }} to {{ target }}
{% endcall %}
{% endmacro %}
85 changes: 85 additions & 0 deletions dbt/include/clickhouse/macros/materializations/table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
{% materialization table, adapter='clickhouse' %}
{%- set identifier = model['alias'] -%}
{%- set tmp_identifier = model['name'] + '__dbt_tmp' -%}
{%- set backup_identifier = model['name'] + '__dbt_backup' -%}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
schema=schema,
database=database,
type='table') -%}
{%- set intermediate_relation = api.Relation.create(identifier=tmp_identifier,
schema=schema,
database=database,
type='table') -%}
-- the intermediate_relation should not already exist in the database; get_relation
-- will return None in that case. Otherwise, we get a relation that we can drop
-- later, before we try to use this name for the current operation
{%- set preexisting_intermediate_relation = adapter.get_relation(identifier=tmp_identifier,
schema=schema,
database=database) -%}
/*
See ../view/view.sql for more information about this relation.
*/
{%- set backup_relation_type = 'table' if old_relation is none else old_relation.type -%}
{%- set backup_relation = api.Relation.create(identifier=backup_identifier,
schema=schema,
database=database,
type=backup_relation_type) -%}
-- as above, the backup_relation should not already exist
{%- set preexisting_backup_relation = adapter.get_relation(identifier=backup_identifier,
schema=schema,
database=database) -%}


-- drop the temp relations if they exist already in the database
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
{{ drop_relation_if_exists(preexisting_backup_relation) }}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% if is_atomic and old_relation is not none %}
-- only makes sense to EXCHANGE when the relation already exists

-- build model as backup_relation so it is saved after EXCHANGE
{% call statement('main') -%}
{{ get_create_table_as_sql(False, backup_relation, sql) }}
{%- endcall %}

{% do exchange_tables_atomic(backup_relation, old_relation) %}

{% else %}

-- build model
{% call statement('main') -%}
{{ get_create_table_as_sql(False, intermediate_relation, sql) }}
{%- endcall %}

-- cleanup, move the existing table out of the way and rename intermediate to target
{% if old_relation is not none %}
{{ adapter.rename_relation(old_relation, backup_relation) }}
{% endif %}

{{ adapter.rename_relation(intermediate_relation, target_relation) }}

{% endif %}

{% do create_indexes(target_relation) %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{% do persist_docs(target_relation, model) %}

-- `COMMIT` happens here
{{ adapter.commit() }}

-- finally, drop the existing/backup relation after the commit
{{ drop_relation_if_exists(backup_relation) }}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
89 changes: 89 additions & 0 deletions dbt/include/clickhouse/macros/materializations/view.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
{%- materialization view, adapter='clickhouse' -%}

{%- set identifier = model['alias'] -%}
{%- set tmp_identifier = model['name'] + '__dbt_tmp' -%}
{%- set backup_identifier = model['name'] + '__dbt_backup' -%}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set is_atomic = engine_exchange_support(old_relation) -%}

{%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, database=database,
type='view') -%}
{%- set intermediate_relation = api.Relation.create(identifier=tmp_identifier,
schema=schema, database=database, type='view') -%}
-- the intermediate_relation should not already exist in the database; get_relation
-- will return None in that case. Otherwise, we get a relation that we can drop
-- later, before we try to use this name for the current operation
{%- set preexisting_intermediate_relation = adapter.get_relation(identifier=tmp_identifier,
schema=schema,
database=database) -%}
/*
This relation (probably) doesn't exist yet. If it does exist, it's a leftover from
a previous run, and we're going to try to drop it immediately. At the end of this
materialization, we're going to rename the "old_relation" to this identifier,
and then we're going to drop it. In order to make sure we run the correct one of:
- drop view ...
- drop table ...
We need to set the type of this relation to be the type of the old_relation, if it exists,
or else "view" as a sane default if it does not. Note that if the old_relation does not
exist, then there is nothing to move out of the way and subsequentally drop. In that case,
this relation will be effectively unused.
*/
{%- set backup_relation_type = 'view' if old_relation is none else old_relation.type -%}
{%- set backup_relation = api.Relation.create(identifier=backup_identifier,
schema=schema, database=database,
type=backup_relation_type) -%}
-- as above, the backup_relation should not already exist
{%- set preexisting_backup_relation = adapter.get_relation(identifier=backup_identifier,
schema=schema,
database=database) -%}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- drop the temp relations if they exist already in the database
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
{{ drop_relation_if_exists(preexisting_backup_relation) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% if is_atomic and old_relation is not none %}
-- only makes sense to EXCHANGE when the relation already exists

-- build model as backup_relation so it is saved after EXCHANGE
{% call statement('main') -%}
{{ create_view_as(backup_relation, sql) }}
{%- endcall %}

{% do exchange_tables_atomic(backup_relation, old_relation) %}

{% else %}

-- build model
{% call statement('main') -%}
{{ create_view_as(intermediate_relation, sql) }}
{%- endcall %}

-- cleanup, move the existing view out of the way and rename intermediate to target
{% if old_relation is not none %}
{{ adapter.rename_relation(old_relation, backup_relation) }}
{% endif %}

{{ adapter.rename_relation(intermediate_relation, target_relation) }}

{% endif %}

{% do persist_docs(target_relation, model) %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{{ adapter.commit() }}

{{ drop_relation_if_exists(backup_relation) }}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization -%}

0 comments on commit a72555b

Please sign in to comment.