Skip to content

Commit

Permalink
Merge branch 'main' into feature/materialized-tests/adap-892
Browse files Browse the repository at this point in the history
  • Loading branch information
colin-rogers-dbt authored Oct 11, 2023
2 parents 412fbc8 + 2d5c5a5 commit 3221886
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 6 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Features-20230426-152526.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: Support partition_by and cluster_by on python models when supplied in model
configurations
time: 2023-04-26T15:25:26.285021+09:00
custom:
Author: kalanyuz
Issue: "680"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230817-095527.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Avoid setting lifetime on impersonation tokens.
time: 2023-08-17T09:55:27.333914673-04:00
custom:
Author: cmc333333
Issue: "769"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230906-141819.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fix bigquery copy materialization
time: 2023-09-06T14:18:19.445262+02:00
custom:
Author: m-sche
Issue: "820"
1 change: 0 additions & 1 deletion dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ def get_impersonated_credentials(cls, profile_credentials):
source_credentials=source_credentials,
target_principal=profile_credentials.impersonate_service_account,
target_scopes=list(profile_credentials.scopes),
lifetime=(profile_credentials.job_execution_timeout_seconds or 300),
)

@classmethod
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/bigquery/macros/materializations/copy.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
{# cycle over ref() and source() to create source tables array #}
{% set source_array = [] %}
{% for ref_table in model.refs %}
{{ source_array.append(ref(*ref_table)) }}
{{ source_array.append(ref(ref_table.get('package'), ref_table.name, version=ref_table.get('version'))) }}
{% endfor %}

{% for src_table in model.sources %}
Expand Down
17 changes: 13 additions & 4 deletions dbt/include/bigquery/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@

{% endmaterialization %}

-- TODO dataproc requires a temp bucket to perform BQ write
-- this is hard coded to internal testing ATM. need to adjust to render
-- or find another way around
{% macro py_write_table(compiled_code, target_relation) %}
from pyspark.sql import SparkSession
{%- set raw_partition_by = config.get('partition_by', none) -%}
{%- set raw_cluster_by = config.get('cluster_by', none) -%}
{%- set partition_config = adapter.parse_partition_by(raw_partition_by) %}

spark = SparkSession.builder.appName('smallTest').getOrCreate()

Expand Down Expand Up @@ -109,6 +109,15 @@ else:
df.write \
.mode("overwrite") \
.format("bigquery") \
.option("writeMethod", "direct").option("writeDisposition", 'WRITE_TRUNCATE') \
.option("writeMethod", "indirect").option("writeDisposition", 'WRITE_TRUNCATE') \
{%- if partition_config.data_type | lower in ('date','timestamp','datetime') %}
.option("partitionField", "{{- partition_config.field -}}") \
{%- if partition_config.granularity is not none %}
.option("partitionType", "{{- partition_config.granularity -}}") \
{%- endif %}
{%- endif %}
{%- if raw_cluster_by is not none %}
.option("clusteredFields", "{{- raw_cluster_by|join(',') -}}") \
{%- endif %}
.save("{{target_relation}}")
{% endmacro %}
62 changes: 62 additions & 0 deletions tests/functional/adapter/test_copy_materialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import pytest
from pathlib import Path
from dbt.tests.util import run_dbt, write_file, check_relations_equal

_SEED_A = """
load_date,id,first_name,last_name,email,gender,ip_address
2021-03-05,1,Jack,Hunter,jhunter0@pbs.org,Male,59.80.20.168
2021-03-05,2,Kathryn,Walker,kwalker1@ezinearticles.com,Female,194.121.179.35
2021-03-05,3,Gerald,Ryan,gryan2@com.com,Male,11.3.212.243
""".lstrip()

_SEED_B = """
load_date,id,first_name,last_name,email,gender,ip_address
2021-03-05,4,Bonnie,Spencer,bspencer3@ameblo.jp,Female,216.32.196.175
2021-03-05,5,Harold,Taylor,htaylor4@people.com.cn,Male,253.10.246.136
""".lstrip()

_EXPECTED_RESULT = """
load_date,id,first_name,last_name,email,gender,ip_address
2021-03-05,1,Jack,Hunter,jhunter0@pbs.org,Male,59.80.20.168
2021-03-05,2,Kathryn,Walker,kwalker1@ezinearticles.com,Female,194.121.179.35
2021-03-05,3,Gerald,Ryan,gryan2@com.com,Male,11.3.212.243
2021-03-05,4,Bonnie,Spencer,bspencer3@ameblo.jp,Female,216.32.196.175
2021-03-05,5,Harold,Taylor,htaylor4@people.com.cn,Male,253.10.246.136
""".lstrip()

_COPY_MODEL = """
{{ config(
materialized="copy",
copy_materialization="incremental",
) }}
SELECT * FROM {{ ref("seed") }}
"""


class BaseCopyModelConfig:
@pytest.fixture(scope="class")
def models(self):
return {"copy_model.sql": _COPY_MODEL}

@pytest.fixture(scope="class")
def seeds(self):
return {
"seed.csv": _SEED_A,
"expected_result.csv": _EXPECTED_RESULT,
}


class TestCopyMaterialization(BaseCopyModelConfig):
def test_incremental_copy(self, project):
run_dbt(["seed"])
run_dbt(["run"])

# Replace original seed _SEED_A with _SEED_B
seed_file = project.project_root / Path("seeds") / Path("seed.csv")
write_file(_SEED_B, seed_file)

run_dbt(["seed"])
run_dbt(["run"])

check_relations_equal(project.adapter, ["copy_model", "expected_result"])
95 changes: 95 additions & 0 deletions tests/functional/adapter/test_python_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,101 @@ def model(dbt, spark):
return spark.createDataFrame(data, schema=['test', 'test2'])
"""

macro__partition_count_sql = """
{% test number_partitions(model, expected) %}
{%- set result = get_partitions_metadata(model) %}
{% if result %}
{% set partitions = result.columns['partition_id'].values() %}
{% else %}
{% set partitions = () %}
{% endif %}
{% set actual = partitions | length %}
{% set success = 1 if model and actual == expected else 0 %}
select 'Expected {{ expected }}, but got {{ actual }}' as validation_error
from (select true)
where {{ success }} = 0
{% endtest %}
"""

models__partitioned_model_python = """
import pandas as pd
def model(dbt, spark):
dbt.config(
materialized='table',
partition_by={
"field": "C",
"data_type": "timestamp",
"granularity": "day",
},
cluster_by=["A"],
)
random_array = [
["A", -157.9871329592354],
["B", -528.9769041860632],
["B", 941.0504221837489],
["B", 919.5903586746183],
["A", -121.25678519054622],
["A", 254.9985130814921],
["A", 833.2963094260072],
]
df = pd.DataFrame(random_array, columns=["A", "B"])
df["C"] = pd.to_datetime('now')
final_df = df[["A", "B", "C"]]
return final_df
"""

models__partitioned_model_yaml = """
models:
- name: python_partitioned_model
description: A random table with a calculated column defined in python.
config:
batch_id: '{{ run_started_at.strftime("%Y-%m-%d-%H-%M-%S") }}-python-partitioned'
tests:
- number_partitions:
expected: "{{ var('expected', 1) }}"
columns:
- name: A
description: Column A
- name: B
description: Column B
- name: C
description: Column C
"""


class TestPythonPartitionedModels:
@pytest.fixture(scope="class")
def macros(self):
return {"partition_metadata.sql": macro__partition_count_sql}

@pytest.fixture(scope="class")
def models(self):
return {
"python_partitioned_model.py": models__partitioned_model_python,
"python_partitioned_model.yml": models__partitioned_model_yaml,
}

def test_multiple_named_python_models(self, project):
result = run_dbt(["run"])
assert len(result) == 1

test_results = run_dbt(["test"])
for result in test_results:
assert result.status == "pass"
assert not result.skipped
assert result.failures == 0


models__simple_python_model_v2 = """
import pandas
Expand Down

0 comments on commit 3221886

Please sign in to comment.