Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable overriding the dataproc project for python models #1365

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241016-154952.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Enables overriding execution project for .py models run via dataproc
time: 2024-10-16T15:49:52.295605-06:00
custom:
Author: matt-winkler
Issue: "1364"
6 changes: 6 additions & 0 deletions dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ class BigQueryCredentials(Credentials):

dataproc_region: Optional[str] = None
dataproc_cluster_name: Optional[str] = None
dataproc_project: Optional[str] = None
gcs_bucket: Optional[str] = None

dataproc_batch: Optional[DataprocBatchConfig] = field(
Expand Down Expand Up @@ -213,6 +214,7 @@ def _connection_keys(self):
"dataproc_cluster_name",
"gcs_bucket",
"dataproc_batch",
"dataproc_project",
)

@classmethod
Expand All @@ -228,6 +230,10 @@ def __pre_deserialize__(cls, d: Dict[Any, Any]) -> Dict[Any, Any]:
# `execution_project` default to dataset/project
if "execution_project" not in d:
d["execution_project"] = d["database"]
# if no dataproc_project default to execution_project
if "dataproc_project" not in d:
d["dataproc_project"] = d["execution_project"]

return d


Expand Down
3 changes: 2 additions & 1 deletion dbt/adapters/bigquery/python_submissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def __init__(self, parsed_model: Dict, credential: BigQueryCredentials) -> None:
self.parsed_model = parsed_model
python_required_configs = [
"dataproc_region",
"dataproc_project",
"gcs_bucket",
]
for required_config in python_required_configs:
Expand Down Expand Up @@ -107,7 +108,7 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job:
}
operation = self.job_client.submit_job_as_operation(
request={
"project_id": self.credential.execution_project,
"project_id": self.credential.dataproc_project,
"region": self.credential.dataproc_region,
"job": job,
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import pytest

from dbt.tests.util import run_dbt
from tests.unit.test_bigquery_adapter import BaseTestBigQueryAdapter
from tests.functional.python_model_tests.files import SINGLE_RECORD # noqa: F401
from unittest.mock import patch


# Test application of dataproc_batch configuration to a
# google.cloud.dataproc_v1.Batch object.
# This reuses the machinery from BaseTestBigQueryAdapter to get hold of the
# parsed credentials
class TestOverrideDataprocProject(BaseTestBigQueryAdapter):
@pytest.fixture(scope="class")
def model_path(self):
return "models"

def test_update_dataproc_cluster(self):
# update the raw profile to set dataproc_project config
self.raw_profile["outputs"]["dataproc-serverless-configured"]["dataproc_batch"][
"dataproc_project"
] = "test"
adapter = self.get_adapter("dataproc-serverless-configured")
run_dbt(["models"])

# raw_profile = self.raw_profile["outputs"]["dataproc-serverless-configured"][
# "dataproc_batch"
# ]


# class BaseOverrideDatabase:
# @pytest.fixture(scope="class")
# def model_path(self):
# return "models"

# @pytest.fixture(scope="class")
# def project_config_update(self):
# return {
# "config-version": 2,
# "seed-paths": ["seeds"],
# "vars": {
# "alternate_db": ALT_DATABASE,
# },
# "quoting": {
# "database": True,
# },
# "seeds": {
# "quote_columns": False,
# },
# }

# @pytest.fixture(scope="function")
# def clean_up(self, project):
# yield
# relation = project.adapter.Relation.create(
# database=ALT_DATABASE, schema=project.test_schema
# )
# project.adapter.drop_schema(relation)


# class TestModelOverrideBigQuery(BaseOverrideDatabase):
# def run_database_override(self, project):
# run_dbt(["seed"])
# assert len(run_dbt(["run"])) == 4
# check_relations_equal_with_relations(
# project.adapter,
# [
# project.adapter.Relation.create(schema=project.test_schema, identifier="seed"),
# project.adapter.Relation.create(
# database=ALT_DATABASE, schema=project.test_schema, identifier="view_2"
# ),
# project.adapter.Relation.create(schema=project.test_schema, identifier="view_1"),
# project.adapter.Relation.create(schema=project.test_schema, identifier="view_3"),
# project.adapter.Relation.create(
# database=ALT_DATABASE, schema=project.test_schema, identifier="view_4"
# ),
# ],
# )

# def test_bigquery_database_override(self, project, clean_up):
# self.run_database_override(project)


# class BaseTestProjectModelOverrideBigQuery(BaseOverrideDatabase):
# def run_database_override(self, project):
# run_dbt(["seed"])
# assert len(run_dbt(["run"])) == 4
# self.assertExpectedRelations(project)

# def assertExpectedRelations(self, project):
# check_relations_equal_with_relations(
# project.adapter,
# [
# project.adapter.Relation.create(schema=project.test_schema, identifier="seed"),
# project.adapter.Relation.create(
# database=ALT_DATABASE, schema=project.test_schema, identifier="view_2"
# ),
# project.adapter.Relation.create(
# database=ALT_DATABASE, schema=project.test_schema, identifier="view_1"
# ),
# project.adapter.Relation.create(schema=project.test_schema, identifier="view_3"),
# project.adapter.Relation.create(
# database=ALT_DATABASE, schema=project.test_schema, identifier="view_4"
# ),
# ],
# )


# class TestProjectModelOverrideBigQuery(BaseTestProjectModelOverrideBigQuery):
# @pytest.fixture(scope="class")
# def project_config_update(self):
# return {
# "config-version": 2,
# "models": {
# "database": ALT_DATABASE,
# "test": {"subfolder": {"database": "{{ target.database }}"}},
# },
# "seed-paths": ["seeds"],
# "vars": {
# "alternate_db": ALT_DATABASE,
# },
# "quoting": {
# "database": True,
# },
# "seeds": {
# "quote_columns": False,
# },
# }

# def test_bigquery_database_override(self, project, clean_up):
# self.run_database_override(project)


# class TestProjectModelAliasOverrideBigQuery(BaseTestProjectModelOverrideBigQuery):
# @pytest.fixture(scope="class")
# def project_config_update(self):
# return {
# "config-version": 2,
# "models": {
# "project": ALT_DATABASE,
# "test": {"subfolder": {"project": "{{ target.database }}"}},
# },
# "seed-paths": ["seeds"],
# "vars": {
# "alternate_db": ALT_DATABASE,
# },
# "quoting": {
# "database": True,
# },
# "seeds": {
# "quote_columns": False,
# },
# }

# def test_bigquery_project_override(self, project, clean_up):
# self.run_database_override(project)


# class TestProjectSeedOverrideBigQuery(BaseOverrideDatabase):
# @pytest.fixture(scope="class")
# def project_config_update(self):
# return {
# "config-version": 2,
# "seed-paths": ["seeds"],
# "vars": {
# "alternate_db": ALT_DATABASE,
# },
# "seeds": {"database": ALT_DATABASE},
# }

# def run_database_override(self, project):
# run_dbt(["seed"])
# assert len(run_dbt(["run"])) == 4
# check_relations_equal_with_relations(
# project.adapter,
# [
# project.adapter.Relation.create(
# database=ALT_DATABASE, schema=project.test_schema, identifier="seed"
# ),
# project.adapter.Relation.create(
# database=ALT_DATABASE, schema=project.test_schema, identifier="view_2"
# ),
# project.adapter.Relation.create(schema=project.test_schema, identifier="view_1"),
# project.adapter.Relation.create(schema=project.test_schema, identifier="view_3"),
# project.adapter.Relation.create(
# database=ALT_DATABASE, schema=project.test_schema, identifier="view_4"
# ),
# ],
# )

# def test_bigquery_database_override(self, project, clean_up):
# self.run_database_override(project)
Loading