From 9f6c449619cf245f1788fcf7e5cf13f3e3a855a6 Mon Sep 17 00:00:00 2001 From: colin-rogers-dbt <111200756+colin-rogers-dbt@users.noreply.github.com> Date: Fri, 22 Sep 2023 11:24:14 -0700 Subject: [PATCH 1/6] Address type annotation issues and clean up impl (#933) * use dynamic schema in test_grant_access_to.py * use dynamic schema in test_grant_access_to.py * update impl ConnectionManager typing and move list_datasets into BigQueryConnectionManager * refactor unit tests and add one to cover list_datasets * accidental commit rever * add changie * Rework constructor for mypy. Remove unused functions. * Add changelog entry. * merge paw/type-fix --------- Co-authored-by: Peter Allen Webb --- .../Under the Hood-20230921-155645.yaml | 6 + .../Under the Hood-20230922-125327.yaml | 6 + dbt/adapters/bigquery/connections.py | 14 ++ dbt/adapters/bigquery/impl.py | 51 +---- tests/unit/test_bigquery_adapter.py | 182 +--------------- .../unit/test_bigquery_connection_manager.py | 198 ++++++++++++++++++ 6 files changed, 230 insertions(+), 227 deletions(-) create mode 100644 .changes/unreleased/Under the Hood-20230921-155645.yaml create mode 100644 .changes/unreleased/Under the Hood-20230922-125327.yaml create mode 100644 tests/unit/test_bigquery_connection_manager.py diff --git a/.changes/unreleased/Under the Hood-20230921-155645.yaml b/.changes/unreleased/Under the Hood-20230921-155645.yaml new file mode 100644 index 000000000..12cd663f8 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20230921-155645.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: Address type annotation issues and remove protected method ref from impl +time: 2023-09-21T15:56:45.329798-07:00 +custom: + Author: colin-rogers-dbt + Issue: "933" diff --git a/.changes/unreleased/Under the Hood-20230922-125327.yaml b/.changes/unreleased/Under the Hood-20230922-125327.yaml new file mode 100644 index 000000000..9ce871321 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20230922-125327.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: Fixed a mypy failure by reworking BigQueryAdapter constructor. +time: 2023-09-22T12:53:27.339599-04:00 +custom: + Author: peterallenwebb + Issue: "934" diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index c136042c3..7799ecb8a 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -700,6 +700,20 @@ def fn(): self._retry_and_handle(msg="create dataset", conn=conn, fn=fn) + def list_dataset(self, database: str): + # the database string we get here is potentially quoted. Strip that off + # for the API call. + database = database.strip("`") + conn = self.get_thread_connection() + client = conn.handle + + def query_schemas(): + # this is similar to how we have to deal with listing tables + all_datasets = client.list_datasets(project=database, max_results=10000) + return [ds.dataset_id for ds in all_datasets] + + return self._retry_and_handle(msg="list dataset", conn=conn, fn=query_schemas) + def _query_and_results( self, client, diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index bb04c78b8..8fc1b69bb 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -217,6 +217,10 @@ class BigQueryAdapter(BaseAdapter): ConstraintType.foreign_key: ConstraintSupport.ENFORCED, } + def __init__(self, config) -> None: + super().__init__(config) + self.connections: BigQueryConnectionManager = self.connections + ### # Implementations of abstract methods ### @@ -267,18 +271,7 @@ def rename_relation( @available def list_schemas(self, database: str) -> List[str]: - # the database string we get here is potentially quoted. Strip that off - # for the API call. - database = database.strip("`") - conn = self.connections.get_thread_connection() - client = conn.handle - - def query_schemas(): - # this is similar to how we have to deal with listing tables - all_datasets = client.list_datasets(project=database, max_results=10000) - return [ds.dataset_id for ds in all_datasets] - - return self.connections._retry_and_handle(msg="list dataset", conn=conn, fn=query_schemas) + return self.connections.list_dataset(database) @available.parse(lambda *a, **k: False) def check_schema_exists(self, database: str, schema: str) -> bool: @@ -481,40 +474,6 @@ def _agate_to_schema( bq_schema.append(SchemaField(col_name, type_)) # type: ignore[arg-type] return bq_schema - def _materialize_as_view(self, model: Dict[str, Any]) -> str: - model_database = model.get("database") - model_schema = model.get("schema") - model_alias = model.get("alias") - model_code = model.get("compiled_code") - - logger.debug("Model SQL ({}):\n{}".format(model_alias, model_code)) - self.connections.create_view( - database=model_database, schema=model_schema, table_name=model_alias, sql=model_code - ) - return "CREATE VIEW" - - def _materialize_as_table( - self, - model: Dict[str, Any], - model_sql: str, - decorator: Optional[str] = None, - ) -> str: - model_database = model.get("database") - model_schema = model.get("schema") - model_alias = model.get("alias") - - if decorator is None: - table_name = model_alias - else: - table_name = "{}${}".format(model_alias, decorator) - - logger.debug("Model SQL ({}):\n{}".format(table_name, model_sql)) - self.connections.create_table( - database=model_database, schema=model_schema, table_name=table_name, sql=model_sql - ) - - return "CREATE TABLE" - @available.parse(lambda *a, **k: "") def copy_table(self, source, destination, materialization): if materialization == "incremental": diff --git a/tests/unit/test_bigquery_adapter.py b/tests/unit/test_bigquery_adapter.py index 10cb3f530..4db2ce83d 100644 --- a/tests/unit/test_bigquery_adapter.py +++ b/tests/unit/test_bigquery_adapter.py @@ -1,26 +1,19 @@ -import time - import agate import decimal -import json import string import random import re import pytest import unittest -from contextlib import contextmanager -from requests.exceptions import ConnectionError -from unittest.mock import patch, MagicMock, Mock, create_autospec, ANY +from unittest.mock import patch, MagicMock, create_autospec import dbt.dataclass_schema from dbt.adapters.bigquery import PartitionConfig -from dbt.adapters.bigquery import BigQueryCredentials from dbt.adapters.bigquery import BigQueryAdapter from dbt.adapters.bigquery import BigQueryRelation from dbt.adapters.bigquery import Plugin as BigQueryPlugin from google.cloud.bigquery.table import Table -from dbt.adapters.bigquery.connections import BigQueryConnectionManager from dbt.adapters.bigquery.connections import _sanitize_label, _VALIDATE_LABEL_LENGTH_LIMIT from dbt.adapters.base.query_headers import MacroQueryStringSetter from dbt.clients import agate_helper @@ -543,179 +536,6 @@ def test_replace(self): assert other_schema.quote_policy.database is False -class TestBigQueryConnectionManager(unittest.TestCase): - def setUp(self): - credentials = Mock(BigQueryCredentials) - profile = Mock(query_comment=None, credentials=credentials) - self.connections = BigQueryConnectionManager(profile=profile) - - self.mock_client = Mock(dbt.adapters.bigquery.impl.google.cloud.bigquery.Client) - self.mock_connection = MagicMock() - - self.mock_connection.handle = self.mock_client - - self.connections.get_thread_connection = lambda: self.mock_connection - self.connections.get_job_retry_deadline_seconds = lambda x: None - self.connections.get_job_retries = lambda x: 1 - - @patch("dbt.adapters.bigquery.connections._is_retryable", return_value=True) - def test_retry_and_handle(self, is_retryable): - self.connections.DEFAULT_MAXIMUM_DELAY = 2.0 - - @contextmanager - def dummy_handler(msg): - yield - - self.connections.exception_handler = dummy_handler - - class DummyException(Exception): - """Count how many times this exception is raised""" - - count = 0 - - def __init__(self): - DummyException.count += 1 - - def raiseDummyException(): - raise DummyException() - - with self.assertRaises(DummyException): - self.connections._retry_and_handle( - "some sql", Mock(credentials=Mock(retries=8)), raiseDummyException - ) - self.assertEqual(DummyException.count, 9) - - @patch("dbt.adapters.bigquery.connections._is_retryable", return_value=True) - def test_retry_connection_reset(self, is_retryable): - self.connections.open = MagicMock() - self.connections.close = MagicMock() - self.connections.DEFAULT_MAXIMUM_DELAY = 2.0 - - @contextmanager - def dummy_handler(msg): - yield - - self.connections.exception_handler = dummy_handler - - def raiseConnectionResetError(): - raise ConnectionResetError("Connection broke") - - mock_conn = Mock(credentials=Mock(retries=1)) - with self.assertRaises(ConnectionResetError): - self.connections._retry_and_handle("some sql", mock_conn, raiseConnectionResetError) - self.connections.close.assert_called_once_with(mock_conn) - self.connections.open.assert_called_once_with(mock_conn) - - def test_is_retryable(self): - _is_retryable = dbt.adapters.bigquery.connections._is_retryable - exceptions = dbt.adapters.bigquery.impl.google.cloud.exceptions - internal_server_error = exceptions.InternalServerError("code broke") - bad_request_error = exceptions.BadRequest("code broke") - connection_error = ConnectionError("code broke") - client_error = exceptions.ClientError("bad code") - rate_limit_error = exceptions.Forbidden( - "code broke", errors=[{"reason": "rateLimitExceeded"}] - ) - - self.assertTrue(_is_retryable(internal_server_error)) - self.assertTrue(_is_retryable(bad_request_error)) - self.assertTrue(_is_retryable(connection_error)) - self.assertFalse(_is_retryable(client_error)) - self.assertTrue(_is_retryable(rate_limit_error)) - - def test_drop_dataset(self): - mock_table = Mock() - mock_table.reference = "table1" - self.mock_client.list_tables.return_value = [mock_table] - - self.connections.drop_dataset("project", "dataset") - - self.mock_client.list_tables.assert_not_called() - self.mock_client.delete_table.assert_not_called() - self.mock_client.delete_dataset.assert_called_once() - - @patch("dbt.adapters.bigquery.impl.google.cloud.bigquery") - def test_query_and_results(self, mock_bq): - self.mock_client.query = Mock(return_value=Mock(state="DONE")) - self.connections._query_and_results( - self.mock_client, - "sql", - {"job_param_1": "blah"}, - job_creation_timeout=15, - job_execution_timeout=3, - ) - - mock_bq.QueryJobConfig.assert_called_once() - self.mock_client.query.assert_called_once_with( - query="sql", job_config=mock_bq.QueryJobConfig(), timeout=15 - ) - - @patch("dbt.adapters.bigquery.impl.google.cloud.bigquery") - def test_query_and_results_timeout(self, mock_bq): - self.mock_client.query = Mock( - return_value=Mock(result=lambda *args, **kwargs: time.sleep(4)) - ) - with pytest.raises(dbt.exceptions.DbtRuntimeError) as exc: - self.connections._query_and_results( - self.mock_client, - "sql", - {"job_param_1": "blah"}, - job_creation_timeout=15, - job_execution_timeout=1, - ) - - mock_bq.QueryJobConfig.assert_called_once() - self.mock_client.query.assert_called_once_with( - query="sql", job_config=mock_bq.QueryJobConfig(), timeout=15 - ) - assert "Query exceeded configured timeout of 1s" in str(exc.value) - - def test_copy_bq_table_appends(self): - self._copy_table(write_disposition=dbt.adapters.bigquery.impl.WRITE_APPEND) - args, kwargs = self.mock_client.copy_table.call_args - self.mock_client.copy_table.assert_called_once_with( - [self._table_ref("project", "dataset", "table1")], - self._table_ref("project", "dataset", "table2"), - job_config=ANY, - ) - args, kwargs = self.mock_client.copy_table.call_args - self.assertEqual( - kwargs["job_config"].write_disposition, dbt.adapters.bigquery.impl.WRITE_APPEND - ) - - def test_copy_bq_table_truncates(self): - self._copy_table(write_disposition=dbt.adapters.bigquery.impl.WRITE_TRUNCATE) - args, kwargs = self.mock_client.copy_table.call_args - self.mock_client.copy_table.assert_called_once_with( - [self._table_ref("project", "dataset", "table1")], - self._table_ref("project", "dataset", "table2"), - job_config=ANY, - ) - args, kwargs = self.mock_client.copy_table.call_args - self.assertEqual( - kwargs["job_config"].write_disposition, dbt.adapters.bigquery.impl.WRITE_TRUNCATE - ) - - def test_job_labels_valid_json(self): - expected = {"key": "value"} - labels = self.connections._labels_from_query_comment(json.dumps(expected)) - self.assertEqual(labels, expected) - - def test_job_labels_invalid_json(self): - labels = self.connections._labels_from_query_comment("not json") - self.assertEqual(labels, {"query_comment": "not_json"}) - - def _table_ref(self, proj, ds, table): - return self.connections.table_ref(proj, ds, table) - - def _copy_table(self, write_disposition): - source = BigQueryRelation.create(database="project", schema="dataset", identifier="table1") - destination = BigQueryRelation.create( - database="project", schema="dataset", identifier="table2" - ) - self.connections.copy_bq_table(source, destination, write_disposition) - - class TestBigQueryAdapter(BaseTestBigQueryAdapter): def test_copy_table_materialization_table(self): adapter = self.get_adapter("oauth") diff --git a/tests/unit/test_bigquery_connection_manager.py b/tests/unit/test_bigquery_connection_manager.py new file mode 100644 index 000000000..d6c3f64fc --- /dev/null +++ b/tests/unit/test_bigquery_connection_manager.py @@ -0,0 +1,198 @@ +import time +import json +import pytest +import unittest +from contextlib import contextmanager +from requests.exceptions import ConnectionError +from unittest.mock import patch, MagicMock, Mock, ANY + +import dbt.dataclass_schema + +from dbt.adapters.bigquery import BigQueryCredentials +from dbt.adapters.bigquery import BigQueryRelation +from dbt.adapters.bigquery.connections import BigQueryConnectionManager +import dbt.exceptions +from dbt.logger import GLOBAL_LOGGER as logger # noqa + + +class TestBigQueryConnectionManager(unittest.TestCase): + def setUp(self): + credentials = Mock(BigQueryCredentials) + profile = Mock(query_comment=None, credentials=credentials) + self.connections = BigQueryConnectionManager(profile=profile) + + self.mock_client = Mock(dbt.adapters.bigquery.impl.google.cloud.bigquery.Client) + self.mock_connection = MagicMock() + + self.mock_connection.handle = self.mock_client + + self.connections.get_thread_connection = lambda: self.mock_connection + self.connections.get_job_retry_deadline_seconds = lambda x: None + self.connections.get_job_retries = lambda x: 1 + + @patch("dbt.adapters.bigquery.connections._is_retryable", return_value=True) + def test_retry_and_handle(self, is_retryable): + self.connections.DEFAULT_MAXIMUM_DELAY = 2.0 + + @contextmanager + def dummy_handler(msg): + yield + + self.connections.exception_handler = dummy_handler + + class DummyException(Exception): + """Count how many times this exception is raised""" + + count = 0 + + def __init__(self): + DummyException.count += 1 + + def raiseDummyException(): + raise DummyException() + + with self.assertRaises(DummyException): + self.connections._retry_and_handle( + "some sql", Mock(credentials=Mock(retries=8)), raiseDummyException + ) + self.assertEqual(DummyException.count, 9) + + @patch("dbt.adapters.bigquery.connections._is_retryable", return_value=True) + def test_retry_connection_reset(self, is_retryable): + self.connections.open = MagicMock() + self.connections.close = MagicMock() + self.connections.DEFAULT_MAXIMUM_DELAY = 2.0 + + @contextmanager + def dummy_handler(msg): + yield + + self.connections.exception_handler = dummy_handler + + def raiseConnectionResetError(): + raise ConnectionResetError("Connection broke") + + mock_conn = Mock(credentials=Mock(retries=1)) + with self.assertRaises(ConnectionResetError): + self.connections._retry_and_handle("some sql", mock_conn, raiseConnectionResetError) + self.connections.close.assert_called_once_with(mock_conn) + self.connections.open.assert_called_once_with(mock_conn) + + def test_is_retryable(self): + _is_retryable = dbt.adapters.bigquery.connections._is_retryable + exceptions = dbt.adapters.bigquery.impl.google.cloud.exceptions + internal_server_error = exceptions.InternalServerError("code broke") + bad_request_error = exceptions.BadRequest("code broke") + connection_error = ConnectionError("code broke") + client_error = exceptions.ClientError("bad code") + rate_limit_error = exceptions.Forbidden( + "code broke", errors=[{"reason": "rateLimitExceeded"}] + ) + + self.assertTrue(_is_retryable(internal_server_error)) + self.assertTrue(_is_retryable(bad_request_error)) + self.assertTrue(_is_retryable(connection_error)) + self.assertFalse(_is_retryable(client_error)) + self.assertTrue(_is_retryable(rate_limit_error)) + + def test_drop_dataset(self): + mock_table = Mock() + mock_table.reference = "table1" + self.mock_client.list_tables.return_value = [mock_table] + + self.connections.drop_dataset("project", "dataset") + + self.mock_client.list_tables.assert_not_called() + self.mock_client.delete_table.assert_not_called() + self.mock_client.delete_dataset.assert_called_once() + + @patch("dbt.adapters.bigquery.impl.google.cloud.bigquery") + def test_query_and_results(self, mock_bq): + self.mock_client.query = Mock(return_value=Mock(state="DONE")) + self.connections._query_and_results( + self.mock_client, + "sql", + {"job_param_1": "blah"}, + job_creation_timeout=15, + job_execution_timeout=3, + ) + + mock_bq.QueryJobConfig.assert_called_once() + self.mock_client.query.assert_called_once_with( + query="sql", job_config=mock_bq.QueryJobConfig(), timeout=15 + ) + + @patch("dbt.adapters.bigquery.impl.google.cloud.bigquery") + def test_query_and_results_timeout(self, mock_bq): + self.mock_client.query = Mock( + return_value=Mock(result=lambda *args, **kwargs: time.sleep(4)) + ) + with pytest.raises(dbt.exceptions.DbtRuntimeError) as exc: + self.connections._query_and_results( + self.mock_client, + "sql", + {"job_param_1": "blah"}, + job_creation_timeout=15, + job_execution_timeout=1, + ) + + mock_bq.QueryJobConfig.assert_called_once() + self.mock_client.query.assert_called_once_with( + query="sql", job_config=mock_bq.QueryJobConfig(), timeout=15 + ) + assert "Query exceeded configured timeout of 1s" in str(exc.value) + + def test_copy_bq_table_appends(self): + self._copy_table(write_disposition=dbt.adapters.bigquery.impl.WRITE_APPEND) + args, kwargs = self.mock_client.copy_table.call_args + self.mock_client.copy_table.assert_called_once_with( + [self._table_ref("project", "dataset", "table1")], + self._table_ref("project", "dataset", "table2"), + job_config=ANY, + ) + args, kwargs = self.mock_client.copy_table.call_args + self.assertEqual( + kwargs["job_config"].write_disposition, dbt.adapters.bigquery.impl.WRITE_APPEND + ) + + def test_copy_bq_table_truncates(self): + self._copy_table(write_disposition=dbt.adapters.bigquery.impl.WRITE_TRUNCATE) + args, kwargs = self.mock_client.copy_table.call_args + self.mock_client.copy_table.assert_called_once_with( + [self._table_ref("project", "dataset", "table1")], + self._table_ref("project", "dataset", "table2"), + job_config=ANY, + ) + args, kwargs = self.mock_client.copy_table.call_args + self.assertEqual( + kwargs["job_config"].write_disposition, dbt.adapters.bigquery.impl.WRITE_TRUNCATE + ) + + def test_job_labels_valid_json(self): + expected = {"key": "value"} + labels = self.connections._labels_from_query_comment(json.dumps(expected)) + self.assertEqual(labels, expected) + + def test_job_labels_invalid_json(self): + labels = self.connections._labels_from_query_comment("not json") + self.assertEqual(labels, {"query_comment": "not_json"}) + + def test_list_dataset_correctly_calls_lists_datasets(self): + mock_dataset = Mock(dataset_id="d1") + mock_list_dataset = Mock(return_value=[mock_dataset]) + self.mock_client.list_datasets = mock_list_dataset + result = self.connections.list_dataset("project") + self.mock_client.list_datasets.assert_called_once_with( + project="project", max_results=10000 + ) + assert result == ["d1"] + + def _table_ref(self, proj, ds, table): + return self.connections.table_ref(proj, ds, table) + + def _copy_table(self, write_disposition): + source = BigQueryRelation.create(database="project", schema="dataset", identifier="table1") + destination = BigQueryRelation.create( + database="project", schema="dataset", identifier="table2" + ) + self.connections.copy_bq_table(source, destination, write_disposition) From 4c02c07b95f8c1c582320558384a349e1d4fd51a Mon Sep 17 00:00:00 2001 From: colin-rogers-dbt <111200756+colin-rogers-dbt@users.noreply.github.com> Date: Fri, 22 Sep 2023 14:03:18 -0700 Subject: [PATCH 2/6] Update SQLQuery to include node_info (#936) * use dynamic schema in test_grant_access_to.py * use dynamic schema in test_grant_access_to.py * update SQLQuery to include node_info * add changie * revert setup --- .changes/unreleased/Under the Hood-20230922-114217.yaml | 6 ++++++ dbt/adapters/bigquery/connections.py | 4 +++- 2 files changed, 9 insertions(+), 1 deletion(-) create mode 100644 .changes/unreleased/Under the Hood-20230922-114217.yaml diff --git a/.changes/unreleased/Under the Hood-20230922-114217.yaml b/.changes/unreleased/Under the Hood-20230922-114217.yaml new file mode 100644 index 000000000..78fee33c4 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20230922-114217.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: update SQLQuery to include node_info +time: 2023-09-22T11:42:17.770033-07:00 +custom: + Author: colin-rogers-dbt + Issue: "936" diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index 7799ecb8a..a5c7b9355 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -5,6 +5,7 @@ from contextlib import contextmanager from dataclasses import dataclass, field +from dbt.events.contextvars import get_node_info from mashumaro.helper import pass_through from functools import lru_cache @@ -444,7 +445,8 @@ def raw_execute( conn = self.get_thread_connection() client = conn.handle - fire_event(SQLQuery(conn_name=conn.name, sql=sql)) + fire_event(SQLQuery(conn_name=conn.name, sql=sql, node_info=get_node_info())) + if ( hasattr(self.profile, "query_comment") and self.profile.query_comment From 63ae274c8723025790b7b256da4655fbfe7c2ed6 Mon Sep 17 00:00:00 2001 From: Emily Rockman Date: Tue, 26 Sep 2023 11:49:18 -0500 Subject: [PATCH 3/6] automate repo cleanup (#944) --- .github/workflows/repository-cleanup.yml | 30 ++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 .github/workflows/repository-cleanup.yml diff --git a/.github/workflows/repository-cleanup.yml b/.github/workflows/repository-cleanup.yml new file mode 100644 index 000000000..c1d780281 --- /dev/null +++ b/.github/workflows/repository-cleanup.yml @@ -0,0 +1,30 @@ +# **what?** +# Cleanup branches left over from automation and testing. Also cleanup +# draft releases from release testing. + +# **why?** +# The automations are leaving behind branches and releases that clutter +# the repository. Sometimes we need them to debug processes so we don't +# want them immediately deleted. Running on Saturday to avoid running +# at the same time as an actual release to prevent breaking a release +# mid-release. + +# **when?** +# Mainly on a schedule of 12:00 Saturday. +# Manual trigger can also run on demand + +name: Repository Cleanup + +on: + schedule: + - cron: '0 12 * * SAT' # At 12:00 on Saturday - details in `why` above + + workflow_dispatch: # for manual triggering + +permissions: + contents: write + +jobs: + cleanup-repo: + uses: dbt-labs/actions/.github/workflows/repository-cleanup.yml@main + secrets: inherit From e5a89afdf7820905e44e850556e2901b7b872303 Mon Sep 17 00:00:00 2001 From: colin-rogers-dbt <111200756+colin-rogers-dbt@users.noreply.github.com> Date: Tue, 26 Sep 2023 15:48:57 -0700 Subject: [PATCH 4/6] poll `.GetBatch()` instead of using `operation.result()` (#929) * re:PR https://github.com/dbt-labs/dbt-bigquery/pull/840/files * adding back comment # check if job failed * adding changelog * precommit code format * sleep(2) first in the while loop before the request to eliminate the last 2 seconds sleep if the response is in one of the 3 options * removing empty spaces * update batch request to handle `GetBatchRequest` * conditionally run python model tests and factor out batch functions to own module * Move events to common * fix import * fix mistaken import change * update unit test * clean up and typing --------- Co-authored-by: Zi Wang Co-authored-by: wazi55 <134335723+wazi55@users.noreply.github.com> Co-authored-by: Anders Co-authored-by: Mike Alfare <13974384+mikealfare@users.noreply.github.com> --- .../unreleased/Fixes-20230721-101041.yaml | 6 ++ .github/workflows/integration.yml | 21 ++++++ dbt/adapters/bigquery/dataproc/__init__.py | 0 dbt/adapters/bigquery/dataproc/batch.py | 67 +++++++++++++++++++ dbt/adapters/bigquery/python_submissions.py | 61 +++++++---------- tests/functional/adapter/test_python_model.py | 2 +- tests/unit/test_configure_dataproc_batch.py | 4 +- 7 files changed, 123 insertions(+), 38 deletions(-) create mode 100644 .changes/unreleased/Fixes-20230721-101041.yaml create mode 100644 dbt/adapters/bigquery/dataproc/__init__.py create mode 100644 dbt/adapters/bigquery/dataproc/batch.py diff --git a/.changes/unreleased/Fixes-20230721-101041.yaml b/.changes/unreleased/Fixes-20230721-101041.yaml new file mode 100644 index 000000000..6db81cf50 --- /dev/null +++ b/.changes/unreleased/Fixes-20230721-101041.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Serverless Spark to Poll with .GetBatch() instead of using operation.result() +time: 2023-07-21T10:10:41.64843-07:00 +custom: + Author: wazi55 + Issue: "734" diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 99f78e33d..bb0211b35 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -64,6 +64,7 @@ jobs: outputs: matrix: ${{ steps.generate-matrix.outputs.result }} + run-python-tests: ${{ steps.filter.outputs.bigquery-python }} steps: - name: Check out the repository (non-PR) @@ -96,6 +97,11 @@ jobs: - 'dbt/**' - 'tests/**' - 'dev-requirements.txt' + bigquery-python: + - 'dbt/adapters/bigquery/dataproc/**' + - 'dbt/adapters/bigquery/python_submissions.py' + - 'dbt/include/bigquery/python_model/**' + - name: Generate integration test matrix id: generate-matrix uses: actions/github-script@v6 @@ -186,6 +192,21 @@ jobs: GCS_BUCKET: dbt-ci run: tox -- --ddtrace + # python models tests are slow so we only want to run them if we're changing them + - name: Run tox (python models) + if: needs.test-metadata.outputs.run-python-tests == 'true' + env: + BIGQUERY_TEST_SERVICE_ACCOUNT_JSON: ${{ secrets.BIGQUERY_TEST_SERVICE_ACCOUNT_JSON }} + BIGQUERY_TEST_ALT_DATABASE: ${{ secrets.BIGQUERY_TEST_ALT_DATABASE }} + BIGQUERY_TEST_NO_ACCESS_DATABASE: ${{ secrets.BIGQUERY_TEST_NO_ACCESS_DATABASE }} + DBT_TEST_USER_1: group:buildbot@dbtlabs.com + DBT_TEST_USER_2: group:engineering-core-team@dbtlabs.com + DBT_TEST_USER_3: serviceAccount:dbt-integration-test-user@dbt-test-env.iam.gserviceaccount.com + DATAPROC_REGION: us-central1 + DATAPROC_CLUSTER_NAME: dbt-test-1 + GCS_BUCKET: dbt-ci + run: tox -e python-tests -- --ddtrace + - uses: actions/upload-artifact@v3 if: always() with: diff --git a/dbt/adapters/bigquery/dataproc/__init__.py b/dbt/adapters/bigquery/dataproc/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dbt/adapters/bigquery/dataproc/batch.py b/dbt/adapters/bigquery/dataproc/batch.py new file mode 100644 index 000000000..0dc54aa78 --- /dev/null +++ b/dbt/adapters/bigquery/dataproc/batch.py @@ -0,0 +1,67 @@ +from typing import Union, Dict + +import time +from datetime import datetime +from google.cloud.dataproc_v1 import ( + CreateBatchRequest, + BatchControllerClient, + Batch, + GetBatchRequest, +) +from google.protobuf.json_format import ParseDict + +from dbt.adapters.bigquery.connections import DataprocBatchConfig + +_BATCH_RUNNING_STATES = [Batch.State.PENDING, Batch.State.RUNNING] +DEFAULT_JAR_FILE_URI = "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.21.1.jar" + + +def create_batch_request( + batch: Batch, batch_id: str, project: str, region: str +) -> CreateBatchRequest: + return CreateBatchRequest( + parent=f"projects/{project}/locations/{region}", # type: ignore + batch_id=batch_id, # type: ignore + batch=batch, # type: ignore + ) + + +def poll_batch_job( + parent: str, batch_id: str, job_client: BatchControllerClient, timeout: int +) -> Batch: + batch_name = "".join([parent, "/batches/", batch_id]) + state = Batch.State.PENDING + response = None + run_time = 0 + while state in _BATCH_RUNNING_STATES and run_time < timeout: + time.sleep(1) + response = job_client.get_batch( # type: ignore + request=GetBatchRequest(name=batch_name), # type: ignore + ) + run_time = datetime.now().timestamp() - response.create_time.timestamp() # type: ignore + state = response.state + if not response: + raise ValueError("No response from Dataproc") + if state != Batch.State.SUCCEEDED: + if run_time >= timeout: + raise ValueError( + f"Operation did not complete within the designated timeout of {timeout} seconds." + ) + else: + raise ValueError(response.state_message) + return response + + +def update_batch_from_config(config_dict: Union[Dict, DataprocBatchConfig], target: Batch): + try: + # updates in place + ParseDict(config_dict, target._pb) + except Exception as e: + docurl = ( + "https://cloud.google.com/dataproc-serverless/docs/reference/rpc/google.cloud.dataproc.v1" + "#google.cloud.dataproc.v1.Batch" + ) + raise ValueError( + f"Unable to parse dataproc_batch as valid batch specification. See {docurl}. {str(e)}" + ) from e + return target diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index 6e5a11e52..8fd354eb5 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -4,11 +4,17 @@ from google.api_core.future.polling import POLLING_PREDICATE from dbt.adapters.bigquery import BigQueryConnectionManager, BigQueryCredentials -from dbt.adapters.bigquery.connections import DataprocBatchConfig from google.api_core import retry from google.api_core.client_options import ClientOptions from google.cloud import storage, dataproc_v1 # type: ignore -from google.protobuf.json_format import ParseDict +from google.cloud.dataproc_v1.types.batches import Batch + +from dbt.adapters.bigquery.dataproc.batch import ( + create_batch_request, + poll_batch_job, + DEFAULT_JAR_FILE_URI, + update_batch_from_config, +) OPERATION_RETRY_TIME = 10 @@ -102,8 +108,8 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: "job": job, } ) - response = operation.result(polling=self.result_polling_policy) # check if job failed + response = operation.result(polling=self.result_polling_policy) if response.status.state == 6: raise ValueError(response.status.details) return response @@ -118,21 +124,22 @@ def _get_job_client(self) -> dataproc_v1.BatchControllerClient: def _get_batch_id(self) -> str: return self.parsed_model["config"].get("batch_id") - def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: - batch = self._configure_batch() - parent = f"projects/{self.credential.execution_project}/locations/{self.credential.dataproc_region}" - - request = dataproc_v1.CreateBatchRequest( - parent=parent, - batch=batch, - batch_id=self._get_batch_id(), - ) + def _submit_dataproc_job(self) -> Batch: + batch_id = self._get_batch_id() + request = create_batch_request( + batch=self._configure_batch(), + batch_id=batch_id, + region=self.credential.dataproc_region, # type: ignore + project=self.credential.execution_project, # type: ignore + ) # type: ignore # make the request - operation = self.job_client.create_batch(request=request) # type: ignore - # this takes quite a while, waiting on GCP response to resolve - # (not a google-api-core issue, more likely a dataproc serverless issue) - response = operation.result(polling=self.result_polling_policy) - return response + self.job_client.create_batch(request=request) # type: ignore + return poll_batch_job( + parent=request.parent, + batch_id=batch_id, + job_client=self.job_client, # type: ignore + timeout=self.timeout, + ) # there might be useful results here that we can parse and return # Dataproc job output is saved to the Cloud Storage bucket # allocated to the job. Use regex to obtain the bucket and blob info. @@ -163,27 +170,11 @@ def _configure_batch(self): batch.pyspark_batch.main_python_file_uri = self.gcs_location jar_file_uri = self.parsed_model["config"].get( "jar_file_uri", - "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.21.1.jar", + DEFAULT_JAR_FILE_URI, ) batch.pyspark_batch.jar_file_uris = [jar_file_uri] # Apply configuration from dataproc_batch key, possibly overriding defaults. if self.credential.dataproc_batch: - self._update_batch_from_config(self.credential.dataproc_batch, batch) + batch = update_batch_from_config(self.credential.dataproc_batch, batch) return batch - - @classmethod - def _update_batch_from_config( - cls, config_dict: Union[Dict, DataprocBatchConfig], target: dataproc_v1.Batch - ): - try: - # updates in place - ParseDict(config_dict, target._pb) - except Exception as e: - docurl = ( - "https://cloud.google.com/dataproc-serverless/docs/reference/rpc/google.cloud.dataproc.v1" - "#google.cloud.dataproc.v1.Batch" - ) - raise ValueError( - f"Unable to parse dataproc_batch as valid batch specification. See {docurl}. {str(e)}" - ) from e diff --git a/tests/functional/adapter/test_python_model.py b/tests/functional/adapter/test_python_model.py index 241082cdb..b389fe8aa 100644 --- a/tests/functional/adapter/test_python_model.py +++ b/tests/functional/adapter/test_python_model.py @@ -66,7 +66,7 @@ def model(dbt, spark): """ models__python_array_batch_id_python = """ -import pandas +import pandas as pd def model(dbt, spark): random_array = [ diff --git a/tests/unit/test_configure_dataproc_batch.py b/tests/unit/test_configure_dataproc_batch.py index 58ff52bab..94cb28efb 100644 --- a/tests/unit/test_configure_dataproc_batch.py +++ b/tests/unit/test_configure_dataproc_batch.py @@ -1,6 +1,6 @@ from unittest.mock import patch -from dbt.adapters.bigquery.python_submissions import ServerlessDataProcHelper +from dbt.adapters.bigquery.dataproc.batch import update_batch_from_config from google.cloud import dataproc_v1 from .test_bigquery_adapter import BaseTestBigQueryAdapter @@ -39,7 +39,7 @@ def test_update_dataproc_serverless_batch(self, mock_get_bigquery_defaults): batch = dataproc_v1.Batch() - ServerlessDataProcHelper._update_batch_from_config(raw_batch_config, batch) + batch = update_batch_from_config(raw_batch_config, batch) def to_str_values(d): """google's protobuf types expose maps as dict[str, str]""" From 0fe8afc105ed196bcd201c0c487f03e2f93aba8b Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 28 Sep 2023 19:32:25 +0100 Subject: [PATCH 5/6] Add dbt show tests (#927) * add dbt show tests * changelog entry * repoint to core main * reuse core fixture for dbt show sql header test --------- Co-authored-by: Matthew McKnight <91097623+McKnight-42@users.noreply.github.com> Co-authored-by: Matthew McKnight --- .changes/unreleased/Under the Hood-20230925-143628.yaml | 6 ++++++ dev-requirements.txt | 4 ++-- tests/functional/adapter/dbt_show/test_dbt_show.py | 9 +++++++++ 3 files changed, 17 insertions(+), 2 deletions(-) create mode 100644 .changes/unreleased/Under the Hood-20230925-143628.yaml create mode 100644 tests/functional/adapter/dbt_show/test_dbt_show.py diff --git a/.changes/unreleased/Under the Hood-20230925-143628.yaml b/.changes/unreleased/Under the Hood-20230925-143628.yaml new file mode 100644 index 000000000..fb925ae40 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20230925-143628.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: Add tests for inlined limit + sql-header in dbt show query +time: 2023-09-25T14:36:28.335466+01:00 +custom: + Author: michelleark + Issue: "940" diff --git a/dev-requirements.txt b/dev-requirements.txt index 54ca40f26..fa5c37b49 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,7 +1,7 @@ # install latest changes in dbt-core # TODO: how to automate switching from develop to version branches? -git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core -git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-tests-adapter&subdirectory=tests/adapter +git+https://github.com/dbt-labs/dbt-core.git@improve-show-fixture#egg=dbt-core&subdirectory=core +git+https://github.com/dbt-labs/dbt-core.git@improve-show-fixture#egg=dbt-tests-adapter&subdirectory=tests/adapter # if version 1.x or greater -> pin to major version # if version 0.x -> pin to minor diff --git a/tests/functional/adapter/dbt_show/test_dbt_show.py b/tests/functional/adapter/dbt_show/test_dbt_show.py new file mode 100644 index 000000000..c60a26aec --- /dev/null +++ b/tests/functional/adapter/dbt_show/test_dbt_show.py @@ -0,0 +1,9 @@ +from dbt.tests.adapter.dbt_show.test_dbt_show import BaseShowSqlHeader, BaseShowLimit + + +class TestBigQueryShowLimit(BaseShowLimit): + pass + + +class TestBigQueryShowSqlHeader(BaseShowSqlHeader): + pass From d683dec3c3d725057526199bbcdaedbcc892248b Mon Sep 17 00:00:00 2001 From: colin-rogers-dbt <111200756+colin-rogers-dbt@users.noreply.github.com> Date: Thu, 28 Sep 2023 11:47:52 -0700 Subject: [PATCH 6/6] fix mistaken dev-requirements change (#945) * add dbt show tests * changelog entry * repoint to core main * reuse core fixture for dbt show sql header test * fix dev-requirements.txt * use dynamic schema in test_grant_access_to.py * use dynamic schema in test_grant_access_to.py * revert setup * fix dev-requirements.txt --------- Co-authored-by: Michelle Ark Co-authored-by: Michelle Ark Co-authored-by: Matthew McKnight <91097623+McKnight-42@users.noreply.github.com> Co-authored-by: Matthew McKnight --- dev-requirements.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index fa5c37b49..54ca40f26 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,7 +1,7 @@ # install latest changes in dbt-core # TODO: how to automate switching from develop to version branches? -git+https://github.com/dbt-labs/dbt-core.git@improve-show-fixture#egg=dbt-core&subdirectory=core -git+https://github.com/dbt-labs/dbt-core.git@improve-show-fixture#egg=dbt-tests-adapter&subdirectory=tests/adapter +git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core +git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-tests-adapter&subdirectory=tests/adapter # if version 1.x or greater -> pin to major version # if version 0.x -> pin to minor