Skip to content

Commit

Permalink
Swap dataproc batch_id declaration to model config (dbt-labs#804)
Browse files Browse the repository at this point in the history
* swap batch_id declaration to model config

* address changie req, fix python submission

* Update bug that is being resolved

* implement dbeatty's suggestion

* Update .changes/unreleased/Fixes-20230630-092618.yaml

Co-authored-by: Doug Beatty <44704949+dbeatty10@users.noreply.github.com>

* Add 2 tests

---------

Co-authored-by: Doug Beatty <44704949+dbeatty10@users.noreply.github.com>
Co-authored-by: colin-rogers-dbt <111200756+colin-rogers-dbt@users.noreply.github.com>
Co-authored-by: Mike Alfare <13974384+mikealfare@users.noreply.github.com>
  • Loading branch information
4 people authored Aug 11, 2023
1 parent dc56130 commit 3510f76
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230630-092618.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Change batch_id to model override
time: 2023-06-30T09:26:18.854492+01:00
custom:
Author: nickozilla
Issue: "671"
4 changes: 4 additions & 0 deletions dbt/adapters/bigquery/python_submissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,17 @@ def _get_job_client(self) -> dataproc_v1.BatchControllerClient:
client_options=self.client_options, credentials=self.GoogleCredentials
)

def _get_batch_id(self) -> str:
return self.parsed_model["config"].get("batch_id")

def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job:
batch = self._configure_batch()
parent = f"projects/{self.credential.execution_project}/locations/{self.credential.dataproc_region}"

request = dataproc_v1.CreateBatchRequest(
parent=parent,
batch=batch,
batch_id=self._get_batch_id(),
)
# make the request
operation = self.job_client.create_batch(request=request) # type: ignore
Expand Down
88 changes: 88 additions & 0 deletions tests/functional/adapter/test_python_model.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import pytest
import time
from dbt.tests.util import run_dbt, run_dbt_and_capture, write_file
import dbt.tests.adapter.python_model.test_python_model as dbt_tests

Expand Down Expand Up @@ -64,6 +65,93 @@ def model(dbt, spark):
return spark.createDataFrame(data, schema=['test1', 'test3'])
"""

models__python_array_batch_id_python = """
import pandas
def model(dbt, spark):
random_array = [
[9001.3985362160208, -157.9871329592354],
[-817.8786101352823, -528.9769041860632],
[-886.6488625065194, 941.0504221837489],
[6.69525238666165, 919.5903586746183],
[754.3718741592056, -121.25678519054622],
[-352.3158889341157, 254.9985130814921],
[563.0633042715097, 833.2963094260072],
]
df = pd.DataFrame(random_array, columns=["A", "B"])
df["C"] = df["A"] * df["B"]
final_df = df[["A", "B", "C"]]
return final_df
"""

models__python_array_batch_id_yaml = """
models:
- name: python_array_batch_id
description: A random table with a calculated column defined in python.
config:
batch_id: '{{ run_started_at.strftime("%Y-%m-%d-%H-%M-%S") }}-python-array'
columns:
- name: A
description: Column A
- name: B
description: Column B
- name: C
description: Column C
"""

custom_ts_id = str("custom-" + str(time.time()).replace(".", "-"))

models__bad_python_array_batch_id_yaml = f"""
models:
- name: python_array_batch_id
description: A random table with a calculated column defined in python.
config:
batch_id: {custom_ts_id}-python-array
columns:
- name: A
description: Column A
- name: B
description: Column B
- name: C
description: Column C
"""


class TestPythonBatchIdModels:
@pytest.fixture(scope="class")
def models(self):
return {
"python_array_batch_id.py": models__python_array_batch_id_python,
"python_array_batch_id.yml": models__python_array_batch_id_yaml,
}

def test_multiple_named_python_models(self, project):
result, output = run_dbt_and_capture(["run"], expect_pass=True)
time.sleep(5) # In case both runs are submitted simultaneously
result_two, output_two = run_dbt_and_capture(["run"], expect_pass=True)
assert len(result) == 1
assert len(result_two) == 1


class TestPythonDuplicateBatchIdModels:
@pytest.fixture(scope="class")
def models(self):
return {
"python_array_batch_id.py": models__python_array_batch_id_python,
"python_array_batch_id.yml": models__bad_python_array_batch_id_yaml,
}

def test_multiple_python_models_fixed_id(self, project):
result, output = run_dbt_and_capture(["run"], expect_pass=True)
result_two, output_two = run_dbt_and_capture(["run"], expect_pass=False)
assert result_two[0].message.startswith("409 Already exists: Failed to create batch:")
assert len(result) == 1
assert len(result_two) == 1


@pytest.mark.skip(reason=TEST_SKIP_MESSAGE)
class TestChangingSchemaDataproc:
Expand Down

0 comments on commit 3510f76

Please sign in to comment.