diff --git a/.changes/unreleased/Features-20231107-165830.yaml b/.changes/unreleased/Features-20231107-165830.yaml new file mode 100644 index 000000000..be523ecfb --- /dev/null +++ b/.changes/unreleased/Features-20231107-165830.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add support for integers as dataproc batch runtime config properties +time: 2023-11-07T16:58:30.065966-05:00 +custom: + Author: jimmyshah + Issue: "1001" diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index 114ebf979..1325a94e3 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -1,22 +1,20 @@ from typing import Dict, Union -from dbt.events import AdapterLogger - -from dbt.adapters.base import PythonJobHelper -from google.api_core.future.polling import POLLING_PREDICATE - -from dbt.adapters.bigquery import BigQueryConnectionManager, BigQueryCredentials from google.api_core import retry from google.api_core.client_options import ClientOptions -from google.cloud import storage, dataproc_v1 # type: ignore +from google.api_core.future.polling import POLLING_PREDICATE +from google.cloud import dataproc_v1, storage # type: ignore from google.cloud.dataproc_v1.types.batches import Batch +from dbt.adapters.base import PythonJobHelper +from dbt.adapters.bigquery import BigQueryConnectionManager, BigQueryCredentials from dbt.adapters.bigquery.dataproc.batch import ( + DEFAULT_JAR_FILE_URI, create_batch_request, poll_batch_job, - DEFAULT_JAR_FILE_URI, update_batch_from_config, ) +from dbt.events import AdapterLogger OPERATION_RETRY_TIME = 10 logger = AdapterLogger("BigQuery") @@ -46,9 +44,12 @@ def __init__(self, parsed_model: Dict, credential: BigQueryCredentials) -> None: self.credential = credential self.GoogleCredentials = BigQueryConnectionManager.get_credentials(credential) self.storage_client = storage.Client( - project=self.credential.execution_project, credentials=self.GoogleCredentials + project=self.credential.execution_project, + credentials=self.GoogleCredentials, + ) + self.gcs_location = "gs://{}/{}".format( + self.credential.gcs_bucket, self.model_file_name ) - self.gcs_location = "gs://{}/{}".format(self.credential.gcs_bucket, self.model_file_name) # set retry policy, default to timeout after 24 hours self.timeout = self.parsed_model["config"].get( @@ -58,7 +59,9 @@ def __init__(self, parsed_model: Dict, credential: BigQueryCredentials) -> None: predicate=POLLING_PREDICATE, maximum=10.0, timeout=self.timeout ) self.client_options = ClientOptions( - api_endpoint="{}-dataproc.googleapis.com:443".format(self.credential.dataproc_region) + api_endpoint="{}-dataproc.googleapis.com:443".format( + self.credential.dataproc_region + ) ) self.job_client = self._get_job_client() @@ -184,4 +187,7 @@ def _configure_batch(self): # Apply configuration from dataproc_batch key, possibly overriding defaults. if self.credential.dataproc_batch: batch = update_batch_from_config(self.credential.dataproc_batch, batch) + if batch.runtime_config.properties: + for key, val in batch.runtime_config.properties.items(): + batch.runtime_config.properties[key] = str(val) return batch