Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: cast dataproc batch runtime props #1008

Closed
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20231107-165830.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add support for integers as dataproc batch runtime config properties
time: 2023-11-07T16:58:30.065966-05:00
custom:
Author: jimmyshah
Issue: "1001"
28 changes: 17 additions & 11 deletions dbt/adapters/bigquery/python_submissions.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
from typing import Dict, Union

from dbt.events import AdapterLogger

from dbt.adapters.base import PythonJobHelper
from google.api_core.future.polling import POLLING_PREDICATE

from dbt.adapters.bigquery import BigQueryConnectionManager, BigQueryCredentials
from google.api_core import retry
from google.api_core.client_options import ClientOptions
from google.cloud import storage, dataproc_v1 # type: ignore
from google.api_core.future.polling import POLLING_PREDICATE
from google.cloud import dataproc_v1, storage # type: ignore
from google.cloud.dataproc_v1.types.batches import Batch

from dbt.adapters.base import PythonJobHelper
from dbt.adapters.bigquery import BigQueryConnectionManager, BigQueryCredentials
from dbt.adapters.bigquery.dataproc.batch import (
DEFAULT_JAR_FILE_URI,
create_batch_request,
poll_batch_job,
DEFAULT_JAR_FILE_URI,
update_batch_from_config,
)
from dbt.events import AdapterLogger

OPERATION_RETRY_TIME = 10
logger = AdapterLogger("BigQuery")
Expand Down Expand Up @@ -46,9 +44,12 @@ def __init__(self, parsed_model: Dict, credential: BigQueryCredentials) -> None:
self.credential = credential
self.GoogleCredentials = BigQueryConnectionManager.get_credentials(credential)
self.storage_client = storage.Client(
project=self.credential.execution_project, credentials=self.GoogleCredentials
project=self.credential.execution_project,
credentials=self.GoogleCredentials,
)
self.gcs_location = "gs://{}/{}".format(
self.credential.gcs_bucket, self.model_file_name
)
self.gcs_location = "gs://{}/{}".format(self.credential.gcs_bucket, self.model_file_name)

# set retry policy, default to timeout after 24 hours
self.timeout = self.parsed_model["config"].get(
Expand All @@ -58,7 +59,9 @@ def __init__(self, parsed_model: Dict, credential: BigQueryCredentials) -> None:
predicate=POLLING_PREDICATE, maximum=10.0, timeout=self.timeout
)
self.client_options = ClientOptions(
api_endpoint="{}-dataproc.googleapis.com:443".format(self.credential.dataproc_region)
api_endpoint="{}-dataproc.googleapis.com:443".format(
self.credential.dataproc_region
)
)
self.job_client = self._get_job_client()

Expand Down Expand Up @@ -184,4 +187,7 @@ def _configure_batch(self):
# Apply configuration from dataproc_batch key, possibly overriding defaults.
if self.credential.dataproc_batch:
batch = update_batch_from_config(self.credential.dataproc_batch, batch)
if batch.runtime_config.properties:
for key, val in batch.runtime_config.properties.items():
batch.runtime_config.properties[key] = str(val)
return batch
Loading