Skip to content

Commit

Permalink
Merge branch 'main' into dependabot/pip/ddtrace-approx-eq-1.19
Browse files Browse the repository at this point in the history
  • Loading branch information
McKnight-42 authored Oct 2, 2023
2 parents 5216d7e + d683dec commit 2010080
Show file tree
Hide file tree
Showing 17 changed files with 407 additions and 266 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230721-101041.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Serverless Spark to Poll with .GetBatch() instead of using operation.result()
time: 2023-07-21T10:10:41.64843-07:00
custom:
Author: wazi55
Issue: "734"
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20230921-155645.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Address type annotation issues and remove protected method ref from impl
time: 2023-09-21T15:56:45.329798-07:00
custom:
Author: colin-rogers-dbt
Issue: "933"
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20230922-114217.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: update SQLQuery to include node_info
time: 2023-09-22T11:42:17.770033-07:00
custom:
Author: colin-rogers-dbt
Issue: "936"
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20230922-125327.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Fixed a mypy failure by reworking BigQueryAdapter constructor.
time: 2023-09-22T12:53:27.339599-04:00
custom:
Author: peterallenwebb
Issue: "934"
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20230925-143628.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Add tests for inlined limit + sql-header in dbt show query
time: 2023-09-25T14:36:28.335466+01:00
custom:
Author: michelleark
Issue: "940"
21 changes: 21 additions & 0 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ jobs:

outputs:
matrix: ${{ steps.generate-matrix.outputs.result }}
run-python-tests: ${{ steps.filter.outputs.bigquery-python }}

steps:
- name: Check out the repository (non-PR)
Expand Down Expand Up @@ -96,6 +97,11 @@ jobs:
- 'dbt/**'
- 'tests/**'
- 'dev-requirements.txt'
bigquery-python:
- 'dbt/adapters/bigquery/dataproc/**'
- 'dbt/adapters/bigquery/python_submissions.py'
- 'dbt/include/bigquery/python_model/**'
- name: Generate integration test matrix
id: generate-matrix
uses: actions/github-script@v6
Expand Down Expand Up @@ -186,6 +192,21 @@ jobs:
GCS_BUCKET: dbt-ci
run: tox -- --ddtrace

# python models tests are slow so we only want to run them if we're changing them
- name: Run tox (python models)
if: needs.test-metadata.outputs.run-python-tests == 'true'
env:
BIGQUERY_TEST_SERVICE_ACCOUNT_JSON: ${{ secrets.BIGQUERY_TEST_SERVICE_ACCOUNT_JSON }}
BIGQUERY_TEST_ALT_DATABASE: ${{ secrets.BIGQUERY_TEST_ALT_DATABASE }}
BIGQUERY_TEST_NO_ACCESS_DATABASE: ${{ secrets.BIGQUERY_TEST_NO_ACCESS_DATABASE }}
DBT_TEST_USER_1: group:buildbot@dbtlabs.com
DBT_TEST_USER_2: group:engineering-core-team@dbtlabs.com
DBT_TEST_USER_3: serviceAccount:dbt-integration-test-user@dbt-test-env.iam.gserviceaccount.com
DATAPROC_REGION: us-central1
DATAPROC_CLUSTER_NAME: dbt-test-1
GCS_BUCKET: dbt-ci
run: tox -e python-tests -- --ddtrace

- uses: actions/upload-artifact@v3
if: always()
with:
Expand Down
30 changes: 30 additions & 0 deletions .github/workflows/repository-cleanup.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# **what?**
# Cleanup branches left over from automation and testing. Also cleanup
# draft releases from release testing.

# **why?**
# The automations are leaving behind branches and releases that clutter
# the repository. Sometimes we need them to debug processes so we don't
# want them immediately deleted. Running on Saturday to avoid running
# at the same time as an actual release to prevent breaking a release
# mid-release.

# **when?**
# Mainly on a schedule of 12:00 Saturday.
# Manual trigger can also run on demand

name: Repository Cleanup

on:
schedule:
- cron: '0 12 * * SAT' # At 12:00 on Saturday - details in `why` above

workflow_dispatch: # for manual triggering

permissions:
contents: write

jobs:
cleanup-repo:
uses: dbt-labs/actions/.github/workflows/repository-cleanup.yml@main
secrets: inherit
18 changes: 17 additions & 1 deletion dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from contextlib import contextmanager
from dataclasses import dataclass, field

from dbt.events.contextvars import get_node_info
from mashumaro.helper import pass_through

from functools import lru_cache
Expand Down Expand Up @@ -444,7 +445,8 @@ def raw_execute(
conn = self.get_thread_connection()
client = conn.handle

fire_event(SQLQuery(conn_name=conn.name, sql=sql))
fire_event(SQLQuery(conn_name=conn.name, sql=sql, node_info=get_node_info()))

if (
hasattr(self.profile, "query_comment")
and self.profile.query_comment
Expand Down Expand Up @@ -700,6 +702,20 @@ def fn():

self._retry_and_handle(msg="create dataset", conn=conn, fn=fn)

def list_dataset(self, database: str):
# the database string we get here is potentially quoted. Strip that off
# for the API call.
database = database.strip("`")
conn = self.get_thread_connection()
client = conn.handle

def query_schemas():
# this is similar to how we have to deal with listing tables
all_datasets = client.list_datasets(project=database, max_results=10000)
return [ds.dataset_id for ds in all_datasets]

return self._retry_and_handle(msg="list dataset", conn=conn, fn=query_schemas)

def _query_and_results(
self,
client,
Expand Down
Empty file.
67 changes: 67 additions & 0 deletions dbt/adapters/bigquery/dataproc/batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from typing import Union, Dict

import time
from datetime import datetime
from google.cloud.dataproc_v1 import (
CreateBatchRequest,
BatchControllerClient,
Batch,
GetBatchRequest,
)
from google.protobuf.json_format import ParseDict

from dbt.adapters.bigquery.connections import DataprocBatchConfig

_BATCH_RUNNING_STATES = [Batch.State.PENDING, Batch.State.RUNNING]
DEFAULT_JAR_FILE_URI = "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.21.1.jar"


def create_batch_request(
batch: Batch, batch_id: str, project: str, region: str
) -> CreateBatchRequest:
return CreateBatchRequest(
parent=f"projects/{project}/locations/{region}", # type: ignore
batch_id=batch_id, # type: ignore
batch=batch, # type: ignore
)


def poll_batch_job(
parent: str, batch_id: str, job_client: BatchControllerClient, timeout: int
) -> Batch:
batch_name = "".join([parent, "/batches/", batch_id])
state = Batch.State.PENDING
response = None
run_time = 0
while state in _BATCH_RUNNING_STATES and run_time < timeout:
time.sleep(1)
response = job_client.get_batch( # type: ignore
request=GetBatchRequest(name=batch_name), # type: ignore
)
run_time = datetime.now().timestamp() - response.create_time.timestamp() # type: ignore
state = response.state
if not response:
raise ValueError("No response from Dataproc")
if state != Batch.State.SUCCEEDED:
if run_time >= timeout:
raise ValueError(
f"Operation did not complete within the designated timeout of {timeout} seconds."
)
else:
raise ValueError(response.state_message)
return response


def update_batch_from_config(config_dict: Union[Dict, DataprocBatchConfig], target: Batch):
try:
# updates in place
ParseDict(config_dict, target._pb)
except Exception as e:
docurl = (
"https://cloud.google.com/dataproc-serverless/docs/reference/rpc/google.cloud.dataproc.v1"
"#google.cloud.dataproc.v1.Batch"
)
raise ValueError(
f"Unable to parse dataproc_batch as valid batch specification. See {docurl}. {str(e)}"
) from e
return target
51 changes: 5 additions & 46 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ class BigQueryAdapter(BaseAdapter):
ConstraintType.foreign_key: ConstraintSupport.ENFORCED,
}

def __init__(self, config) -> None:
super().__init__(config)
self.connections: BigQueryConnectionManager = self.connections

###
# Implementations of abstract methods
###
Expand Down Expand Up @@ -267,18 +271,7 @@ def rename_relation(

@available
def list_schemas(self, database: str) -> List[str]:
# the database string we get here is potentially quoted. Strip that off
# for the API call.
database = database.strip("`")
conn = self.connections.get_thread_connection()
client = conn.handle

def query_schemas():
# this is similar to how we have to deal with listing tables
all_datasets = client.list_datasets(project=database, max_results=10000)
return [ds.dataset_id for ds in all_datasets]

return self.connections._retry_and_handle(msg="list dataset", conn=conn, fn=query_schemas)
return self.connections.list_dataset(database)

@available.parse(lambda *a, **k: False)
def check_schema_exists(self, database: str, schema: str) -> bool:
Expand Down Expand Up @@ -481,40 +474,6 @@ def _agate_to_schema(
bq_schema.append(SchemaField(col_name, type_)) # type: ignore[arg-type]
return bq_schema

def _materialize_as_view(self, model: Dict[str, Any]) -> str:
model_database = model.get("database")
model_schema = model.get("schema")
model_alias = model.get("alias")
model_code = model.get("compiled_code")

logger.debug("Model SQL ({}):\n{}".format(model_alias, model_code))
self.connections.create_view(
database=model_database, schema=model_schema, table_name=model_alias, sql=model_code
)
return "CREATE VIEW"

def _materialize_as_table(
self,
model: Dict[str, Any],
model_sql: str,
decorator: Optional[str] = None,
) -> str:
model_database = model.get("database")
model_schema = model.get("schema")
model_alias = model.get("alias")

if decorator is None:
table_name = model_alias
else:
table_name = "{}${}".format(model_alias, decorator)

logger.debug("Model SQL ({}):\n{}".format(table_name, model_sql))
self.connections.create_table(
database=model_database, schema=model_schema, table_name=table_name, sql=model_sql
)

return "CREATE TABLE"

@available.parse(lambda *a, **k: "")
def copy_table(self, source, destination, materialization):
if materialization == "incremental":
Expand Down
61 changes: 26 additions & 35 deletions dbt/adapters/bigquery/python_submissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@
from google.api_core.future.polling import POLLING_PREDICATE

from dbt.adapters.bigquery import BigQueryConnectionManager, BigQueryCredentials
from dbt.adapters.bigquery.connections import DataprocBatchConfig
from google.api_core import retry
from google.api_core.client_options import ClientOptions
from google.cloud import storage, dataproc_v1 # type: ignore
from google.protobuf.json_format import ParseDict
from google.cloud.dataproc_v1.types.batches import Batch

from dbt.adapters.bigquery.dataproc.batch import (
create_batch_request,
poll_batch_job,
DEFAULT_JAR_FILE_URI,
update_batch_from_config,
)

OPERATION_RETRY_TIME = 10

Expand Down Expand Up @@ -102,8 +108,8 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job:
"job": job,
}
)
response = operation.result(polling=self.result_polling_policy)
# check if job failed
response = operation.result(polling=self.result_polling_policy)
if response.status.state == 6:
raise ValueError(response.status.details)
return response
Expand All @@ -118,21 +124,22 @@ def _get_job_client(self) -> dataproc_v1.BatchControllerClient:
def _get_batch_id(self) -> str:
return self.parsed_model["config"].get("batch_id")

def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job:
batch = self._configure_batch()
parent = f"projects/{self.credential.execution_project}/locations/{self.credential.dataproc_region}"

request = dataproc_v1.CreateBatchRequest(
parent=parent,
batch=batch,
batch_id=self._get_batch_id(),
)
def _submit_dataproc_job(self) -> Batch:
batch_id = self._get_batch_id()
request = create_batch_request(
batch=self._configure_batch(),
batch_id=batch_id,
region=self.credential.dataproc_region, # type: ignore
project=self.credential.execution_project, # type: ignore
) # type: ignore
# make the request
operation = self.job_client.create_batch(request=request) # type: ignore
# this takes quite a while, waiting on GCP response to resolve
# (not a google-api-core issue, more likely a dataproc serverless issue)
response = operation.result(polling=self.result_polling_policy)
return response
self.job_client.create_batch(request=request) # type: ignore
return poll_batch_job(
parent=request.parent,
batch_id=batch_id,
job_client=self.job_client, # type: ignore
timeout=self.timeout,
)
# there might be useful results here that we can parse and return
# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
Expand Down Expand Up @@ -163,27 +170,11 @@ def _configure_batch(self):
batch.pyspark_batch.main_python_file_uri = self.gcs_location
jar_file_uri = self.parsed_model["config"].get(
"jar_file_uri",
"gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.21.1.jar",
DEFAULT_JAR_FILE_URI,
)
batch.pyspark_batch.jar_file_uris = [jar_file_uri]

# Apply configuration from dataproc_batch key, possibly overriding defaults.
if self.credential.dataproc_batch:
self._update_batch_from_config(self.credential.dataproc_batch, batch)
batch = update_batch_from_config(self.credential.dataproc_batch, batch)
return batch

@classmethod
def _update_batch_from_config(
cls, config_dict: Union[Dict, DataprocBatchConfig], target: dataproc_v1.Batch
):
try:
# updates in place
ParseDict(config_dict, target._pb)
except Exception as e:
docurl = (
"https://cloud.google.com/dataproc-serverless/docs/reference/rpc/google.cloud.dataproc.v1"
"#google.cloud.dataproc.v1.Batch"
)
raise ValueError(
f"Unable to parse dataproc_batch as valid batch specification. See {docurl}. {str(e)}"
) from e
9 changes: 9 additions & 0 deletions tests/functional/adapter/dbt_show/test_dbt_show.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from dbt.tests.adapter.dbt_show.test_dbt_show import BaseShowSqlHeader, BaseShowLimit


class TestBigQueryShowLimit(BaseShowLimit):
pass


class TestBigQueryShowSqlHeader(BaseShowSqlHeader):
pass
Loading

0 comments on commit 2010080

Please sign in to comment.