Skip to content

Commit

Permalink
feat: sink feature2 (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
MattiasMTS authored Mar 6, 2024
1 parent 317bd71 commit 4b2f057
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 37 deletions.
54 changes: 43 additions & 11 deletions dbt/include/risingwave/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
-- Here we only query table, view, materialized view and source. (without index and SINK)
{% macro risingwave__list_relations_without_caching(schema_relation) %}
{% call statement('list_relations_without_caching', fetch_result=True) -%}
select
select
'{{ schema_relation.database }}' as database,
rw_relations.name as name,
rw_schemas.name as schema,
Expand Down Expand Up @@ -61,7 +61,7 @@

create index if not exists
"{{ index_name }}"
on {{ relation }}
on {{ relation }}
({{ comma_separated_columns }});
{%- endmacro %}

Expand All @@ -88,38 +88,70 @@
{% endmacro %}

{% macro risingwave__create_view_as(relation, sql) -%}
create view if not exists {{ relation }}
create view if not exists {{ relation }}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
{{ get_assert_columns_equivalent(sql) }}
{%- endif %}
as (
{{ sql }}
as (
{{ sql }}
);
{%- endmacro %}

{% macro risingwave__create_table_as(relation, sql) -%}
create table if not exists {{ relation }}
create table if not exists {{ relation }}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
{{ get_assert_columns_equivalent(sql) }}
{%- endif %}
as (
{{ sql }}
as (
{{ sql }}
);
{%- endmacro %}

{% macro risingwave__create_materialized_view_as(relation, sql) -%}
create materialized view if not exists {{ relation }}
create materialized view if not exists {{ relation }}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
{{ get_assert_columns_equivalent(sql) }}
{%- endif %}
as (
{{ sql }}
as (
{{ sql }}
);
{%- endmacro %}

{% macro rising_wave__create_sink(relation, sql) -%}
{%- set _format_parameters = config.get("format_parameters") -%}
{%- set data_format = config.get("data_format") -%}
{%- set data_encode = config.get("data_encode") -%}

{%- set _connector_parameters = config.require("connector_parameters") -%}
{%- set connector = config.require("connector") -%}

create sink if not exists {{ relation }}
{% if "select" in sql.lower() -%}
as {{ sql }}
{%- else -%}
from {{ sql }}
{%- endif %}
with (
connector = '{{ connector }}',
{%- for key, value in _connector_parameters.items() %}
{{ key }} = '{{ value }}'
{%- if not loop.last -%},{%- endif -%}
{% endfor %}
)
{% if _format_parameters and data_format and data_encode -%}
format {{ data_format }} encode {{ data_encode }} (
{%- for key, value in _format_parameters.items() %}
{{ key }} = '{{ value }}'
{%- if not loop.last -%},{%- endif -%}
{% endfor %}
)
{%- endif -%}
;
{%- endmacro %}

{% macro risingwave__run_sql(sql) -%}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
Expand Down
61 changes: 35 additions & 26 deletions dbt/include/risingwave/macros/materializations/sink.sql
Original file line number Diff line number Diff line change
@@ -1,33 +1,42 @@
{% materialization sink, adapter='risingwave' %}
{%- set identifier = model['alias'] -%}
{%- set full_refresh_mode = should_full_refresh() -%}
{%- set old_relation = adapter.get_relation(identifier=identifier,
schema=schema,
database=database) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
schema=schema,
database=database,
type='sink') -%}
{% materialization sink, adapter = "risingwave" %}
{%- set identifier = model["alias"] -%}
{%- set full_refresh_mode = should_full_refresh() -%}
{%- set old_relation = adapter.get_relation(
identifier=identifier,
schema=schema,
database=database,
) -%}
{%- set target_relation = api.Relation.create(
identifier=identifier,
schema=schema,
database=database,
type="sink",
) -%}

{% if full_refresh_mode and old_relation %}
{{ adapter.drop_relation(old_relation) }}
{% endif %}
{% if full_refresh_mode and old_relation %}
{{ adapter.drop_relation(old_relation) }}
{% endif %}

{{ run_hooks(pre_hooks, inside_transaction=False) }}
{{ run_hooks(pre_hooks, inside_transaction=True) }}
{{ run_hooks(pre_hooks, inside_transaction=False) }}
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% if old_relation is none or (full_refresh_mode and old_relation) %}
{% call statement('main') -%}
{{ risingwave__run_sql(sql) }}
{%- endcall %}
{% else %}
{{ risingwave__execute_no_op(target_relation) }}
{% endif %}
{% if old_relation is none or (full_refresh_mode and old_relation) %}
{%- set connector = config.get("connector") -%}
{% call statement("main") -%}
{% if connector %}
{{ rising_wave__create_sink(target_relation, sql) }}
{% else %}
{{ risingwave__run_sql(sql) }}
{% endif %}
{%- endcall %}
{% else %}
{{ risingwave__execute_no_op(target_relation) }}
{% endif %}

{% do persist_docs(target_relation, model) %}
{% do persist_docs(target_relation, model) %}

{{ run_hooks(post_hooks, inside_transaction=False) }}
{{ run_hooks(post_hooks, inside_transaction=True) }}
{{ run_hooks(post_hooks, inside_transaction=False) }}
{{ run_hooks(post_hooks, inside_transaction=True) }}

{{ return({'relations': [target_relation]}) }}
{{ return({"relations": [target_relation]}) }}
{% endmaterialization %}

0 comments on commit 4b2f057

Please sign in to comment.