From 2a71760e8b19aacf2fc917923e29e43c3e315254 Mon Sep 17 00:00:00 2001 From: Jimmy Shah Date: Tue, 7 Nov 2023 16:35:18 -0500 Subject: [PATCH 1/9] fix: cast dataproc batch runtime props There is a bug currently that requires `runtime_config` `properties` values to be strings. As a result, integer values need quotes around them in the yml files, which is awkward. With this PR, values are cast as strings, so that the properties can be set in a more expected way. [ADAP-1007] --- dbt/adapters/bigquery/python_submissions.py | 25 +++++++++++++-------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index 8fd354eb5..bd815c7de 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -1,18 +1,17 @@ from typing import Dict, Union -from dbt.adapters.base import PythonJobHelper -from google.api_core.future.polling import POLLING_PREDICATE - -from dbt.adapters.bigquery import BigQueryConnectionManager, BigQueryCredentials from google.api_core import retry from google.api_core.client_options import ClientOptions -from google.cloud import storage, dataproc_v1 # type: ignore +from google.api_core.future.polling import POLLING_PREDICATE +from google.cloud import dataproc_v1, storage # type: ignore from google.cloud.dataproc_v1.types.batches import Batch +from dbt.adapters.base import PythonJobHelper +from dbt.adapters.bigquery import BigQueryConnectionManager, BigQueryCredentials from dbt.adapters.bigquery.dataproc.batch import ( + DEFAULT_JAR_FILE_URI, create_batch_request, poll_batch_job, - DEFAULT_JAR_FILE_URI, update_batch_from_config, ) @@ -43,9 +42,12 @@ def __init__(self, parsed_model: Dict, credential: BigQueryCredentials) -> None: self.credential = credential self.GoogleCredentials = BigQueryConnectionManager.get_credentials(credential) self.storage_client = storage.Client( - project=self.credential.execution_project, credentials=self.GoogleCredentials + project=self.credential.execution_project, + credentials=self.GoogleCredentials, + ) + self.gcs_location = "gs://{}/{}".format( + self.credential.gcs_bucket, self.model_file_name ) - self.gcs_location = "gs://{}/{}".format(self.credential.gcs_bucket, self.model_file_name) # set retry policy, default to timeout after 24 hours self.timeout = self.parsed_model["config"].get( @@ -55,7 +57,9 @@ def __init__(self, parsed_model: Dict, credential: BigQueryCredentials) -> None: predicate=POLLING_PREDICATE, maximum=10.0, timeout=self.timeout ) self.client_options = ClientOptions( - api_endpoint="{}-dataproc.googleapis.com:443".format(self.credential.dataproc_region) + api_endpoint="{}-dataproc.googleapis.com:443".format( + self.credential.dataproc_region + ) ) self.job_client = self._get_job_client() @@ -177,4 +181,7 @@ def _configure_batch(self): # Apply configuration from dataproc_batch key, possibly overriding defaults. if self.credential.dataproc_batch: batch = update_batch_from_config(self.credential.dataproc_batch, batch) + if batch.runtime_config.properties: + for key, val in batch.runtime_config.properties.items(): + batch.runtime_config.properties[key] = str(val) return batch From 0c3596b0b8f8685378565da81d38608b55d99411 Mon Sep 17 00:00:00 2001 From: Jimmy Shah Date: Tue, 7 Nov 2023 16:58:55 -0500 Subject: [PATCH 2/9] add changie file --- .changes/unreleased/Features-20231107-165830.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Features-20231107-165830.yaml diff --git a/.changes/unreleased/Features-20231107-165830.yaml b/.changes/unreleased/Features-20231107-165830.yaml new file mode 100644 index 000000000..be523ecfb --- /dev/null +++ b/.changes/unreleased/Features-20231107-165830.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add support for integers as dataproc batch runtime config properties +time: 2023-11-07T16:58:30.065966-05:00 +custom: + Author: jimmyshah + Issue: "1001" From c49376f8645893ebc39595ae3275cf8d8a1aaebf Mon Sep 17 00:00:00 2001 From: Jimmy Shah Date: Tue, 7 Nov 2023 17:06:11 -0500 Subject: [PATCH 3/9] Update python_submissions.py --- dbt/adapters/bigquery/python_submissions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index bd815c7de..ae11c26d4 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -183,5 +183,5 @@ def _configure_batch(self): batch = update_batch_from_config(self.credential.dataproc_batch, batch) if batch.runtime_config.properties: for key, val in batch.runtime_config.properties.items(): - batch.runtime_config.properties[key] = str(val) + batch.runtime_config.properties[key] = str(val) return batch From a1f1608effe59944aec97f6e96859eb545e0108d Mon Sep 17 00:00:00 2001 From: Jimmy Shah Date: Fri, 10 Nov 2023 14:08:23 -0500 Subject: [PATCH 4/9] use pre-commit --- dbt/adapters/bigquery/python_submissions.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index bbd205293..1985dba56 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -47,9 +47,7 @@ def __init__(self, parsed_model: Dict, credential: BigQueryCredentials) -> None: project=self.credential.execution_project, credentials=self.GoogleCredentials, ) - self.gcs_location = "gs://{}/{}".format( - self.credential.gcs_bucket, self.model_file_name - ) + self.gcs_location = "gs://{}/{}".format(self.credential.gcs_bucket, self.model_file_name) # set retry policy, default to timeout after 24 hours self.timeout = self.parsed_model["config"].get( @@ -59,9 +57,7 @@ def __init__(self, parsed_model: Dict, credential: BigQueryCredentials) -> None: predicate=POLLING_PREDICATE, maximum=10.0, timeout=self.timeout ) self.client_options = ClientOptions( - api_endpoint="{}-dataproc.googleapis.com:443".format( - self.credential.dataproc_region - ) + api_endpoint="{}-dataproc.googleapis.com:443".format(self.credential.dataproc_region) ) self.job_client = self._get_job_client() @@ -189,5 +185,5 @@ def _configure_batch(self): batch = update_batch_from_config(self.credential.dataproc_batch, batch) if batch.runtime_config.properties: for key, val in batch.runtime_config.properties.items(): - batch.runtime_config.properties[key] = str(val) + batch.runtime_config.properties[key] = str(val) return batch From 3072f30d71918f02f8c5de43ff3ffff976c0af54 Mon Sep 17 00:00:00 2001 From: Jimmy Shah Date: Fri, 10 Nov 2023 14:10:45 -0500 Subject: [PATCH 5/9] fix --- dbt/adapters/bigquery/python_submissions.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index 1985dba56..1325a94e3 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -47,7 +47,9 @@ def __init__(self, parsed_model: Dict, credential: BigQueryCredentials) -> None: project=self.credential.execution_project, credentials=self.GoogleCredentials, ) - self.gcs_location = "gs://{}/{}".format(self.credential.gcs_bucket, self.model_file_name) + self.gcs_location = "gs://{}/{}".format( + self.credential.gcs_bucket, self.model_file_name + ) # set retry policy, default to timeout after 24 hours self.timeout = self.parsed_model["config"].get( @@ -57,7 +59,9 @@ def __init__(self, parsed_model: Dict, credential: BigQueryCredentials) -> None: predicate=POLLING_PREDICATE, maximum=10.0, timeout=self.timeout ) self.client_options = ClientOptions( - api_endpoint="{}-dataproc.googleapis.com:443".format(self.credential.dataproc_region) + api_endpoint="{}-dataproc.googleapis.com:443".format( + self.credential.dataproc_region + ) ) self.job_client = self._get_job_client() From 8b6f7a21ef0f1496a36c197ded57d00fb63eb300 Mon Sep 17 00:00:00 2001 From: Jimmy Shah Date: Fri, 10 Nov 2023 14:14:05 -0500 Subject: [PATCH 6/9] fix --- dbt/adapters/bigquery/python_submissions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index 1325a94e3..9c2af4a9e 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -176,7 +176,7 @@ def _configure_batch(self): ) } ) - # Apply defaults + # Apply the defaults batch.pyspark_batch.main_python_file_uri = self.gcs_location jar_file_uri = self.parsed_model["config"].get( "jar_file_uri", From 0ca740b95610e832ff4b47a330f97a2c788230a2 Mon Sep 17 00:00:00 2001 From: Jimmy Shah Date: Fri, 10 Nov 2023 14:15:09 -0500 Subject: [PATCH 7/9] Update python_submissions.py --- dbt/adapters/bigquery/python_submissions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index 9c2af4a9e..1325a94e3 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -176,7 +176,7 @@ def _configure_batch(self): ) } ) - # Apply the defaults + # Apply defaults batch.pyspark_batch.main_python_file_uri = self.gcs_location jar_file_uri = self.parsed_model["config"].get( "jar_file_uri", From bf7d6ba6d85cf3db66bcba68730c6e377adf4919 Mon Sep 17 00:00:00 2001 From: Jimmy Shah Date: Fri, 10 Nov 2023 14:17:33 -0500 Subject: [PATCH 8/9] Update python_submissions.py --- dbt/adapters/bigquery/python_submissions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index 1325a94e3..9c2af4a9e 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -176,7 +176,7 @@ def _configure_batch(self): ) } ) - # Apply defaults + # Apply the defaults batch.pyspark_batch.main_python_file_uri = self.gcs_location jar_file_uri = self.parsed_model["config"].get( "jar_file_uri", From f1a37a3a694bd44e40abfa1b119afb06f4070610 Mon Sep 17 00:00:00 2001 From: Jimmy Shah Date: Fri, 10 Nov 2023 14:22:35 -0500 Subject: [PATCH 9/9] Update python_submissions.py --- dbt/adapters/bigquery/python_submissions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index 9c2af4a9e..1325a94e3 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -176,7 +176,7 @@ def _configure_batch(self): ) } ) - # Apply the defaults + # Apply defaults batch.pyspark_batch.main_python_file_uri = self.gcs_location jar_file_uri = self.parsed_model["config"].get( "jar_file_uri",