From 1f20dbe6a7e68e1ce14e764456e592dada02cc19 Mon Sep 17 00:00:00 2001 From: matt-winkler Date: Tue, 8 Oct 2024 14:39:26 -0700 Subject: [PATCH 1/9] enable overriding the dataproc project for python models --- dbt/adapters/bigquery/connections.py | 5 +++++ dbt/adapters/bigquery/python_submissions.py | 3 ++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index d3eee3ef3..c389e398f 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -148,6 +148,7 @@ class BigQueryCredentials(Credentials): dataproc_region: Optional[str] = None dataproc_cluster_name: Optional[str] = None + dataproc_project: Optional[str] = None gcs_bucket: Optional[str] = None dataproc_batch: Optional[DataprocBatchConfig] = field( @@ -228,6 +229,10 @@ def __pre_deserialize__(cls, d: Dict[Any, Any]) -> Dict[Any, Any]: # `execution_project` default to dataset/project if "execution_project" not in d: d["execution_project"] = d["database"] + + if "dataproc_project" not in d: + d["dataproc_project"] = d["execution_project"] + return d diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index 368ed9d07..bb48c6c8a 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -36,6 +36,7 @@ def __init__(self, parsed_model: Dict, credential: BigQueryCredentials) -> None: self.parsed_model = parsed_model python_required_configs = [ "dataproc_region", + "dataproc_project", "gcs_bucket", ] for required_config in python_required_configs: @@ -107,7 +108,7 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: } operation = self.job_client.submit_job_as_operation( request={ - "project_id": self.credential.execution_project, + "project_id": self.credential.dataproc_project, "region": self.credential.dataproc_region, "job": job, } From 0ccd79f2b751254c7b5be7712b50a0247542fe8a Mon Sep 17 00:00:00 2001 From: matt-winkler Date: Tue, 8 Oct 2024 17:52:11 -0700 Subject: [PATCH 2/9] fix trailing whitespace --- dbt/adapters/bigquery/connections.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index c389e398f..1429f63a7 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -229,7 +229,7 @@ def __pre_deserialize__(cls, d: Dict[Any, Any]) -> Dict[Any, Any]: # `execution_project` default to dataset/project if "execution_project" not in d: d["execution_project"] = d["database"] - + # if no dataproc_project default to execution_project if "dataproc_project" not in d: d["dataproc_project"] = d["execution_project"] From 90cf030dbac9f986da49030303cf6610b6b8312c Mon Sep 17 00:00:00 2001 From: matt-winkler Date: Tue, 15 Oct 2024 09:48:17 -0600 Subject: [PATCH 3/9] push up to replicate duplicate package name error --- dbt/include/bigquery/dbt_project.yml | 7 +++++++ dbt/include/bigquery/models/_config.yml | 6 ++++++ dbt/include/bigquery/models/my_py_model.py | 2 ++ dbt/include/bigquery/models/my_upstream_model.sql | 1 + 4 files changed, 16 insertions(+) create mode 100644 dbt/include/bigquery/models/_config.yml create mode 100644 dbt/include/bigquery/models/my_py_model.py create mode 100644 dbt/include/bigquery/models/my_upstream_model.sql diff --git a/dbt/include/bigquery/dbt_project.yml b/dbt/include/bigquery/dbt_project.yml index b4e88b7b0..fce8934e6 100644 --- a/dbt/include/bigquery/dbt_project.yml +++ b/dbt/include/bigquery/dbt_project.yml @@ -2,4 +2,11 @@ config-version: 2 name: dbt_bigquery version: 1.0 +profile: bigquery_core_dev + macro-paths: ["macros"] +model-paths: ["models"] + +models: + dbt_bigquery: + +materialized: view diff --git a/dbt/include/bigquery/models/_config.yml b/dbt/include/bigquery/models/_config.yml new file mode 100644 index 000000000..259227efc --- /dev/null +++ b/dbt/include/bigquery/models/_config.yml @@ -0,0 +1,6 @@ +models: + - name: my_py_model + config: + submission_method: cluster + dataproc_cluster_name: tdunlap-dbt-python-cluster + dataproc_project: some-other-test-project diff --git a/dbt/include/bigquery/models/my_py_model.py b/dbt/include/bigquery/models/my_py_model.py new file mode 100644 index 000000000..5b05754ac --- /dev/null +++ b/dbt/include/bigquery/models/my_py_model.py @@ -0,0 +1,2 @@ +def model(dbt, session): + return dbt.ref("my_upstream_model") diff --git a/dbt/include/bigquery/models/my_upstream_model.sql b/dbt/include/bigquery/models/my_upstream_model.sql new file mode 100644 index 000000000..43258a714 --- /dev/null +++ b/dbt/include/bigquery/models/my_upstream_model.sql @@ -0,0 +1 @@ +select 1 as id From a12dc5c8f1598cca857ddb95fdbf962f748e7257 Mon Sep 17 00:00:00 2001 From: matt-winkler Date: Wed, 16 Oct 2024 15:34:05 -0600 Subject: [PATCH 4/9] additional connection config --- dbt/adapters/bigquery/connections.py | 1 + dbt/include/bigquery/dbt_project.yml | 12 ------------ dbt/include/bigquery/models/_config.yml | 6 ------ dbt/include/bigquery/models/my_py_model.py | 2 -- 4 files changed, 1 insertion(+), 20 deletions(-) delete mode 100644 dbt/include/bigquery/dbt_project.yml delete mode 100644 dbt/include/bigquery/models/_config.yml delete mode 100644 dbt/include/bigquery/models/my_py_model.py diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index 1429f63a7..cfca3ee16 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -214,6 +214,7 @@ def _connection_keys(self): "dataproc_cluster_name", "gcs_bucket", "dataproc_batch", + "dataproc_project", ) @classmethod diff --git a/dbt/include/bigquery/dbt_project.yml b/dbt/include/bigquery/dbt_project.yml deleted file mode 100644 index fce8934e6..000000000 --- a/dbt/include/bigquery/dbt_project.yml +++ /dev/null @@ -1,12 +0,0 @@ -config-version: 2 -name: dbt_bigquery -version: 1.0 - -profile: bigquery_core_dev - -macro-paths: ["macros"] -model-paths: ["models"] - -models: - dbt_bigquery: - +materialized: view diff --git a/dbt/include/bigquery/models/_config.yml b/dbt/include/bigquery/models/_config.yml deleted file mode 100644 index 259227efc..000000000 --- a/dbt/include/bigquery/models/_config.yml +++ /dev/null @@ -1,6 +0,0 @@ -models: - - name: my_py_model - config: - submission_method: cluster - dataproc_cluster_name: tdunlap-dbt-python-cluster - dataproc_project: some-other-test-project diff --git a/dbt/include/bigquery/models/my_py_model.py b/dbt/include/bigquery/models/my_py_model.py deleted file mode 100644 index 5b05754ac..000000000 --- a/dbt/include/bigquery/models/my_py_model.py +++ /dev/null @@ -1,2 +0,0 @@ -def model(dbt, session): - return dbt.ref("my_upstream_model") From 3c4f51951d89957bb780c805c0af9ad624535123 Mon Sep 17 00:00:00 2001 From: matt-winkler Date: Wed, 16 Oct 2024 15:50:12 -0600 Subject: [PATCH 5/9] add changelog --- .changes/unreleased/Features-20241016-154952.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Features-20241016-154952.yaml diff --git a/.changes/unreleased/Features-20241016-154952.yaml b/.changes/unreleased/Features-20241016-154952.yaml new file mode 100644 index 000000000..93328046e --- /dev/null +++ b/.changes/unreleased/Features-20241016-154952.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Enables overriding execution project for .py models run via dataproc +time: 2024-10-16T15:49:52.295605-06:00 +custom: + Author: matt-winkler + Issue: "1364" From a40c781354b5e4baafd864cb704868fd5f0bb372 Mon Sep 17 00:00:00 2001 From: matt-winkler Date: Wed, 16 Oct 2024 15:58:31 -0600 Subject: [PATCH 6/9] add back dbt project yml --- dbt/include/bigquery/dbt_project.yml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 dbt/include/bigquery/dbt_project.yml diff --git a/dbt/include/bigquery/dbt_project.yml b/dbt/include/bigquery/dbt_project.yml new file mode 100644 index 000000000..b4e88b7b0 --- /dev/null +++ b/dbt/include/bigquery/dbt_project.yml @@ -0,0 +1,5 @@ +config-version: 2 +name: dbt_bigquery +version: 1.0 + +macro-paths: ["macros"] From 01a9f46ca5f11c28205d9c9cc957b2b834e6f1fc Mon Sep 17 00:00:00 2001 From: matt-winkler Date: Wed, 16 Oct 2024 15:59:13 -0600 Subject: [PATCH 7/9] remove upstream model from adapter code --- dbt/include/bigquery/models/my_upstream_model.sql | 1 - 1 file changed, 1 deletion(-) delete mode 100644 dbt/include/bigquery/models/my_upstream_model.sql diff --git a/dbt/include/bigquery/models/my_upstream_model.sql b/dbt/include/bigquery/models/my_upstream_model.sql deleted file mode 100644 index 43258a714..000000000 --- a/dbt/include/bigquery/models/my_upstream_model.sql +++ /dev/null @@ -1 +0,0 @@ -select 1 as id From 7688a82b19f07d96b084f0027f8c40b54a0a5456 Mon Sep 17 00:00:00 2001 From: matt-winkler Date: Mon, 21 Oct 2024 07:27:18 -0600 Subject: [PATCH 8/9] copy from existing test --- .../functional/python_model_tests/fixtures.py | 30 +++ .../test_override_dataproc_project.py | 204 ++++++++++++++++++ 2 files changed, 234 insertions(+) create mode 100644 tests/functional/python_model_tests/fixtures.py create mode 100644 tests/functional/python_model_tests/test_override_dataproc_project.py diff --git a/tests/functional/python_model_tests/fixtures.py b/tests/functional/python_model_tests/fixtures.py new file mode 100644 index 000000000..24cfcfdbc --- /dev/null +++ b/tests/functional/python_model_tests/fixtures.py @@ -0,0 +1,30 @@ +import os +import pytest +from dbt.tests.fixtures.project import write_project_files + +ALT_DATABASE = os.getenv("BIGQUERY_TEST_ALT_DATABASE") + +models__view_1_sql = """ +select 1 as id +""" + +models__python_model_py = """ +def model(dbt, session): + return dbt.ref("view_1") +""" + + +@pytest.fixture(scope="class") +def models(): + return { + "view_1.sql": models__view_1_sql, + "python_model.py": models__python_model_py, + } + + +@pytest.fixture(scope="class") +def project_files( + project_root, + models, +): + write_project_files(project_root, "models", models) diff --git a/tests/functional/python_model_tests/test_override_dataproc_project.py b/tests/functional/python_model_tests/test_override_dataproc_project.py new file mode 100644 index 000000000..21cf7d58e --- /dev/null +++ b/tests/functional/python_model_tests/test_override_dataproc_project.py @@ -0,0 +1,204 @@ +import os +from unittest.mock import patch + +from dbt.tests.util import run_dbt, check_relations_equal_with_relations +from tests.unit.test_bigquery_adapter import BaseTestBigQueryAdapter + +from tests.functional.python_model_tests.files import SINGLE_RECORD # noqa: F401 + +ALT_DATABASE = os.getenv("BIGQUERY_TEST_ALT_DATABASE") + +""" +dataset: dbt_mwinkler_core_dev +keyfile: /Users/matt-winkler/Downloads/sales-demo-project-314714-2e886a5c2612.json +method: service-account +project: sales-demo-project-314714 +threads: 4 +type: bigquery +gcs_bucket: matt-w-python-demo +dataproc_cluster_name: matt-w-python-demo +dataproc_region: us-west1 +dataproc_project: test-some-other-project +""" + + +# Test application of dataproc_batch configuration to a +# google.cloud.dataproc_v1.Batch object. +# This reuses the machinery from BaseTestBigQueryAdapter to get hold of the +# parsed credentials +class TestOverrideDataprocProject(BaseTestBigQueryAdapter): + @patch( + "dbt.adapters.bigquery.connections.get_bigquery_defaults", + return_value=("credentials", "project_id"), + ) + def test_update_dataproc_cluster(self): + adapter = self.get_adapter("dataproc-serverless-configured") + raw_profile = self.raw_profile["outputs"]["dataproc-serverless-configured"][ + "dataproc_batch" + ] + print("showing raw_profile") + print(raw_profile) + + +# class BaseOverrideDatabase: +# @pytest.fixture(scope="class") +# def model_path(self): +# return "models" + +# @pytest.fixture(scope="class") +# def project_config_update(self): +# return { +# "config-version": 2, +# "seed-paths": ["seeds"], +# "vars": { +# "alternate_db": ALT_DATABASE, +# }, +# "quoting": { +# "database": True, +# }, +# "seeds": { +# "quote_columns": False, +# }, +# } + +# @pytest.fixture(scope="function") +# def clean_up(self, project): +# yield +# relation = project.adapter.Relation.create( +# database=ALT_DATABASE, schema=project.test_schema +# ) +# project.adapter.drop_schema(relation) + + +# class TestModelOverrideBigQuery(BaseOverrideDatabase): +# def run_database_override(self, project): +# run_dbt(["seed"]) +# assert len(run_dbt(["run"])) == 4 +# check_relations_equal_with_relations( +# project.adapter, +# [ +# project.adapter.Relation.create(schema=project.test_schema, identifier="seed"), +# project.adapter.Relation.create( +# database=ALT_DATABASE, schema=project.test_schema, identifier="view_2" +# ), +# project.adapter.Relation.create(schema=project.test_schema, identifier="view_1"), +# project.adapter.Relation.create(schema=project.test_schema, identifier="view_3"), +# project.adapter.Relation.create( +# database=ALT_DATABASE, schema=project.test_schema, identifier="view_4" +# ), +# ], +# ) + +# def test_bigquery_database_override(self, project, clean_up): +# self.run_database_override(project) + + +# class BaseTestProjectModelOverrideBigQuery(BaseOverrideDatabase): +# def run_database_override(self, project): +# run_dbt(["seed"]) +# assert len(run_dbt(["run"])) == 4 +# self.assertExpectedRelations(project) + +# def assertExpectedRelations(self, project): +# check_relations_equal_with_relations( +# project.adapter, +# [ +# project.adapter.Relation.create(schema=project.test_schema, identifier="seed"), +# project.adapter.Relation.create( +# database=ALT_DATABASE, schema=project.test_schema, identifier="view_2" +# ), +# project.adapter.Relation.create( +# database=ALT_DATABASE, schema=project.test_schema, identifier="view_1" +# ), +# project.adapter.Relation.create(schema=project.test_schema, identifier="view_3"), +# project.adapter.Relation.create( +# database=ALT_DATABASE, schema=project.test_schema, identifier="view_4" +# ), +# ], +# ) + + +# class TestProjectModelOverrideBigQuery(BaseTestProjectModelOverrideBigQuery): +# @pytest.fixture(scope="class") +# def project_config_update(self): +# return { +# "config-version": 2, +# "models": { +# "database": ALT_DATABASE, +# "test": {"subfolder": {"database": "{{ target.database }}"}}, +# }, +# "seed-paths": ["seeds"], +# "vars": { +# "alternate_db": ALT_DATABASE, +# }, +# "quoting": { +# "database": True, +# }, +# "seeds": { +# "quote_columns": False, +# }, +# } + +# def test_bigquery_database_override(self, project, clean_up): +# self.run_database_override(project) + + +# class TestProjectModelAliasOverrideBigQuery(BaseTestProjectModelOverrideBigQuery): +# @pytest.fixture(scope="class") +# def project_config_update(self): +# return { +# "config-version": 2, +# "models": { +# "project": ALT_DATABASE, +# "test": {"subfolder": {"project": "{{ target.database }}"}}, +# }, +# "seed-paths": ["seeds"], +# "vars": { +# "alternate_db": ALT_DATABASE, +# }, +# "quoting": { +# "database": True, +# }, +# "seeds": { +# "quote_columns": False, +# }, +# } + +# def test_bigquery_project_override(self, project, clean_up): +# self.run_database_override(project) + + +# class TestProjectSeedOverrideBigQuery(BaseOverrideDatabase): +# @pytest.fixture(scope="class") +# def project_config_update(self): +# return { +# "config-version": 2, +# "seed-paths": ["seeds"], +# "vars": { +# "alternate_db": ALT_DATABASE, +# }, +# "seeds": {"database": ALT_DATABASE}, +# } + +# def run_database_override(self, project): +# run_dbt(["seed"]) +# assert len(run_dbt(["run"])) == 4 +# check_relations_equal_with_relations( +# project.adapter, +# [ +# project.adapter.Relation.create( +# database=ALT_DATABASE, schema=project.test_schema, identifier="seed" +# ), +# project.adapter.Relation.create( +# database=ALT_DATABASE, schema=project.test_schema, identifier="view_2" +# ), +# project.adapter.Relation.create(schema=project.test_schema, identifier="view_1"), +# project.adapter.Relation.create(schema=project.test_schema, identifier="view_3"), +# project.adapter.Relation.create( +# database=ALT_DATABASE, schema=project.test_schema, identifier="view_4" +# ), +# ], +# ) + +# def test_bigquery_database_override(self, project, clean_up): +# self.run_database_override(project) From 24472a63970671372e01f096a1911b6ce1375302 Mon Sep 17 00:00:00 2001 From: matt-winkler Date: Mon, 21 Oct 2024 10:20:17 -0600 Subject: [PATCH 9/9] maybe this is runnable? --- .../functional/python_model_tests/fixtures.py | 30 ------------- .../test_override_dataproc_project.py | 44 +++++++------------ 2 files changed, 16 insertions(+), 58 deletions(-) delete mode 100644 tests/functional/python_model_tests/fixtures.py diff --git a/tests/functional/python_model_tests/fixtures.py b/tests/functional/python_model_tests/fixtures.py deleted file mode 100644 index 24cfcfdbc..000000000 --- a/tests/functional/python_model_tests/fixtures.py +++ /dev/null @@ -1,30 +0,0 @@ -import os -import pytest -from dbt.tests.fixtures.project import write_project_files - -ALT_DATABASE = os.getenv("BIGQUERY_TEST_ALT_DATABASE") - -models__view_1_sql = """ -select 1 as id -""" - -models__python_model_py = """ -def model(dbt, session): - return dbt.ref("view_1") -""" - - -@pytest.fixture(scope="class") -def models(): - return { - "view_1.sql": models__view_1_sql, - "python_model.py": models__python_model_py, - } - - -@pytest.fixture(scope="class") -def project_files( - project_root, - models, -): - write_project_files(project_root, "models", models) diff --git a/tests/functional/python_model_tests/test_override_dataproc_project.py b/tests/functional/python_model_tests/test_override_dataproc_project.py index 21cf7d58e..43cd0deb5 100644 --- a/tests/functional/python_model_tests/test_override_dataproc_project.py +++ b/tests/functional/python_model_tests/test_override_dataproc_project.py @@ -1,25 +1,9 @@ -import os -from unittest.mock import patch +import pytest -from dbt.tests.util import run_dbt, check_relations_equal_with_relations +from dbt.tests.util import run_dbt from tests.unit.test_bigquery_adapter import BaseTestBigQueryAdapter - from tests.functional.python_model_tests.files import SINGLE_RECORD # noqa: F401 - -ALT_DATABASE = os.getenv("BIGQUERY_TEST_ALT_DATABASE") - -""" -dataset: dbt_mwinkler_core_dev -keyfile: /Users/matt-winkler/Downloads/sales-demo-project-314714-2e886a5c2612.json -method: service-account -project: sales-demo-project-314714 -threads: 4 -type: bigquery -gcs_bucket: matt-w-python-demo -dataproc_cluster_name: matt-w-python-demo -dataproc_region: us-west1 -dataproc_project: test-some-other-project -""" +from unittest.mock import patch # Test application of dataproc_batch configuration to a @@ -27,17 +11,21 @@ # This reuses the machinery from BaseTestBigQueryAdapter to get hold of the # parsed credentials class TestOverrideDataprocProject(BaseTestBigQueryAdapter): - @patch( - "dbt.adapters.bigquery.connections.get_bigquery_defaults", - return_value=("credentials", "project_id"), - ) + @pytest.fixture(scope="class") + def model_path(self): + return "models" + def test_update_dataproc_cluster(self): + # update the raw profile to set dataproc_project config + self.raw_profile["outputs"]["dataproc-serverless-configured"]["dataproc_batch"][ + "dataproc_project" + ] = "test" adapter = self.get_adapter("dataproc-serverless-configured") - raw_profile = self.raw_profile["outputs"]["dataproc-serverless-configured"][ - "dataproc_batch" - ] - print("showing raw_profile") - print(raw_profile) + run_dbt(["models"]) + + # raw_profile = self.raw_profile["outputs"]["dataproc-serverless-configured"][ + # "dataproc_batch" + # ] # class BaseOverrideDatabase: