Skip to content

Commit

Permalink
Merge pull request #255 from SoryRawyer/rds/add-dictionary-materializ…
Browse files Browse the repository at this point in the history
…ation

Add materialization macro for dictionaries
  • Loading branch information
BentsiLeviav authored Mar 21, 2024
2 parents d811e90 + b722679 commit 940ecde
Show file tree
Hide file tree
Showing 6 changed files with 355 additions and 10 deletions.
26 changes: 22 additions & 4 deletions dbt/adapters/clickhouse/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dbt.adapters.sql import SQLAdapter
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import ConstraintType, ModelLevelConstraint
from dbt.contracts.relation import Path, RelationType
from dbt.contracts.relation import Path
from dbt.events.functions import warn_or_error
from dbt.events.types import ConstraintNotSupported
from dbt.exceptions import DbtInternalError, DbtRuntimeError, NotImplementedError
Expand All @@ -28,7 +28,7 @@
)
from dbt.adapters.clickhouse.logger import logger
from dbt.adapters.clickhouse.query import quote_identifier
from dbt.adapters.clickhouse.relation import ClickHouseRelation
from dbt.adapters.clickhouse.relation import ClickHouseRelation, ClickHouseRelationType
from dbt.adapters.clickhouse.util import NewColumnDataType, compare_versions

GET_CATALOG_MACRO_NAME = 'get_catalog'
Expand Down Expand Up @@ -271,10 +271,15 @@ def list_relations_without_caching(
relations = []
for row in results:
name, schema, type_info, db_engine, on_cluster = row
rel_type = RelationType.View if 'view' in type_info else RelationType.Table
if 'view' in type_info:
rel_type = ClickHouseRelationType.View
elif type_info == 'dictionary':
rel_type = ClickHouseRelationType.Dictionary
else:
rel_type = ClickHouseRelationType.Table
can_exchange = (
conn_supports_exchange
and rel_type == RelationType.Table
and rel_type == ClickHouseRelationType.Table
and db_engine in ('Atomic', 'Replicated')
)

Expand Down Expand Up @@ -445,6 +450,19 @@ def get_column_schema_from_query(self, sql: str, *_) -> List[ClickHouseColumn]:
def format_columns(self, columns) -> List[Dict]:
return [{'name': column.name, 'data_type': column.dtype} for column in columns]

@available
def get_credentials(self) -> Dict:
conn = self.connections.get_if_exists()
if conn is None or conn.credentials is None:
return dict()
return {
'user': conn.credentials.user,
'password': conn.credentials.password,
'database': conn.credentials.database,
'host': conn.credentials.host,
'port': conn.credentials.port,
}

@classmethod
def render_raw_columns_constraints(cls, raw_columns: Dict[str, Dict[str, Any]]) -> List:
rendered_columns = []
Expand Down
16 changes: 14 additions & 2 deletions dbt/adapters/clickhouse/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

from dbt.adapters.base.relation import BaseRelation, Policy, Self
from dbt.contracts.graph.nodes import ManifestNode, SourceDefinition
from dbt.contracts.relation import HasQuoting, Path, RelationType
from dbt.contracts.relation import HasQuoting, Path
from dbt.dataclass_schema import StrEnum
from dbt.exceptions import DbtRuntimeError
from dbt.utils import deep_merge, merge

Expand All @@ -24,8 +25,19 @@ class ClickHouseIncludePolicy(Policy):
identifier: bool = True


class ClickHouseRelationType(StrEnum):
Table = "table"
View = "view"
CTE = "cte"
MaterializedView = "materialized_view"
External = "external"
Ephemeral = "ephemeral"
Dictionary = "dictionary"


@dataclass(frozen=True, eq=False, repr=False)
class ClickHouseRelation(BaseRelation):
type: Optional[ClickHouseRelationType] = None
quote_policy: Policy = field(default_factory=lambda: ClickHouseQuotePolicy())
include_policy: Policy = field(default_factory=lambda: ClickHouseIncludePolicy())
quote_character: str = '`'
Expand All @@ -42,7 +54,7 @@ def render(self) -> str:

def derivative(self, suffix: str, relation_type: Optional[str] = None) -> BaseRelation:
path = Path(schema=self.path.schema, database='', identifier=self.path.identifier + suffix)
derivative_type = RelationType[relation_type] if relation_type else self.type
derivative_type = ClickHouseRelationType(relation_type) if relation_type else self.type
return ClickHouseRelation(type=derivative_type, path=path)

def matches(
Expand Down
6 changes: 5 additions & 1 deletion dbt/include/clickhouse/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
select
t.name as name,
t.database as schema,
if(engine not in ('MaterializedView', 'View'), 'table', 'view') as type,
multiIf(
engine in ('MaterializedView', 'View'), 'view',
engine = 'Dictionary', 'dictionary',
'table'
) as type,
db.engine as db_engine,
{%- if adapter.get_clickhouse_cluster_name() -%}
count(distinct _shard_num) > 1 as is_on_cluster
Expand Down
116 changes: 116 additions & 0 deletions dbt/include/clickhouse/macros/materializations/dictionary.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
{%- materialization dictionary, adapter='clickhouse' -%}

{%- set existing_relation = load_cached_relation(this) -%}
{%- set target_relation = this.incorporate(type='dictionary') -%}
{%- set intermediate_relation = make_intermediate_relation(target_relation) -%}
{%- set existing_intermediate_relation = load_cached_relation(intermediate_relation) -%}
{%- set backup_relation_type = 'dictionary' if existing_relation is none else existing_relation.type -%}
{%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}
{%- set existing_backup_relation = load_cached_relation(backup_relation) -%}

{%- set grant_config = config.get('grants') -%}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

{{ drop_dictionary_if_exists(existing_backup_relation) }}
{{ drop_dictionary_if_exists(existing_intermediate_relation) }}


{{ run_hooks(pre_hooks, inside_transaction=True) }}

{# create our new dictionary #}
{% call statement('main') -%}
{{ clickhouse__get_create_dictionary_as_sql(intermediate_relation, sql) }}
{%- endcall %}

{# cleanup #}
{% if existing_relation is not none %}
{% set existing_relation = load_cached_relation(existing_relation) %}
{% if existing_relation is not none %}
{{ adapter.rename_relation(existing_relation, backup_relation) }}
{% endif %}
{% endif %}
{{ adapter.rename_relation(intermediate_relation, target_relation) }}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{{ adapter.commit() }}

{{ drop_dictionary_if_exists(backup_relation) }}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization -%}


{% macro clickhouse__get_create_dictionary_as_sql(relation, sql) %}
{%- set fields = config.get('fields') -%}
{%- set source_type = config.get('source_type') -%}

CREATE DICTIONARY {{ relation }} {{ on_cluster_clause(relation) }}
(
{%- for (name, data_type) in fields -%}
{{ name }} {{ data_type }}{%- if not loop.last -%},{%- endif -%}
{%- endfor -%}
)
{{ primary_key_clause(label="primary key") }}
SOURCE(
{%- if source_type == 'http' %}
{{ http_source() }}
{% else %}
{{ clickhouse_source(sql) }}
{% endif -%}
)
LAYOUT({{ config.get('layout') }})
LIFETIME({{ config.get('lifetime') }})
{% endmacro %}


{% macro http_source() %}
HTTP(URL '{{ config.get("url") }}' FORMAT '{{ config.get("format") }}')
{% endmacro %}


{% macro clickhouse_source(sql) %}
{%- set credentials = adapter.get_credentials() -%}
{%- set table = config.get('table') -%}
CLICKHOUSE(
user '{{ credentials.get("user") }}'
{% if credentials.get("password") != '' -%}
password '{{ credentials.get("password") }}'
{%- endif %}
{% if credentials.get("database") != '' -%}
db '{{ credentials.get("database") }}'
{%- endif %}
{% if credentials.get("host") != '' and credentials.get("host") != 'localhost' -%}
host '{{ credentials.get("host") }}'
{% if credentials.get("port") != '' -%}
port '{{ credentials.get("port") }}'
{%- endif %}
{%- endif %}
{%- if table is not none %}
table '{{ table }}'
{% else %}
query "{{ sql }}"
{% endif -%}
)
{% endmacro %}


{% macro drop_dictionary_if_exists(relation) %}
{% if relation.type != 'dictionary' %}
{{ log(relation ~ ' is not a dictionary; defaulting to drop_relation_if_exists') }}
{{ drop_relation_if_exists(relation) }}
{% else %}
{% call statement('drop_dictionary_if_exists') %}
drop dictionary if exists {{ relation }}
{% endcall %}
{% endif %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
{%- materialization materialized_view, adapter='clickhouse' -%}

{%- set target_relation = this.incorporate(type='table') -%}
{%- set mv_relation = target_relation.derivative('_mv', 'MaterializedView') -%}
{%- set mv_relation = target_relation.derivative('_mv', 'materialized_view') -%}
{%- set cluster_clause = on_cluster_clause(target_relation) -%}

{# look for an existing relation for the target table and create backup relations if necessary #}
Expand Down Expand Up @@ -87,7 +87,7 @@
{{ get_create_table_as_sql(False, relation, sql) }}
{% endcall %}
{%- set cluster_clause = on_cluster_clause(relation) -%}
{%- set mv_relation = relation.derivative('_mv', 'MaterializedView') -%}
{%- set mv_relation = relation.derivative('_mv', 'materialized_view') -%}
{{ clickhouse__create_mv_sql(mv_relation, relation, cluster_clause, sql) }}
{%- endmacro %}

Expand All @@ -102,7 +102,7 @@
{% macro clickhouse__replace_mv(target_relation, existing_relation, intermediate_relation, backup_relation, sql) %}
{# drop existing materialized view while we recreate the target table #}
{%- set cluster_clause = on_cluster_clause(target_relation) -%}
{%- set mv_relation = target_relation.derivative('_mv', 'MaterializedView') -%}
{%- set mv_relation = target_relation.derivative('_mv', 'materialized_view') -%}
{% call statement('drop existing mv') -%}
drop view if exists {{ mv_relation }} {{ cluster_clause }}
{%- endcall %}
Expand Down
Loading

0 comments on commit 940ecde

Please sign in to comment.