Skip to content

Commit

Permalink
update batch request to handle GetBatchRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
colin-rogers-dbt committed Sep 19, 2023
1 parent 4b9f10a commit 47d820c
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 16 deletions.
40 changes: 25 additions & 15 deletions dbt/adapters/bigquery/python_submissions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from typing import Dict, Union

from dbt.adapters.base import PythonJobHelper
Expand All @@ -9,8 +10,8 @@
from google.api_core.client_options import ClientOptions
from google.cloud import storage, dataproc_v1 # type: ignore
from google.protobuf.json_format import ParseDict
from google.cloud.dataproc_v1.types.batches import Batch
import time
import uuid

OPERATION_RETRY_TIME = 10

Expand Down Expand Up @@ -120,24 +121,33 @@ def _get_job_client(self) -> dataproc_v1.BatchControllerClient:
def _get_batch_id(self) -> str:
return self.parsed_model["config"].get("batch_id")

def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job:
def _submit_dataproc_job(self) -> Batch:
batch = self._configure_batch()
parent = f"projects/{self.credential.execution_project}/locations/{self.credential.dataproc_region}"
batch_id = uuid.uuid4().hex

request = dataproc_v1.CreateBatchRequest(parent=parent, batch=batch, batch_id=batch_id)
batch_id = self._get_batch_id()
request = dataproc_v1.CreateBatchRequest(parent=parent, batch=batch, batch_id=batch_id) # type: ignore
# make the request
self.job_client.create_batch(request=request)
# this takes quite a while, waiting on GCP response to resolve
# (not a google-api-core issue, more likely a dataproc serverless issue)

state = "State.PENDING"
while state not in ["State.SUCCEEDED", "State.FAILED", "State.CANCELLED"]:
time.sleep(2)
response = self.job_client.get_batch(
request=dataproc_v1.GetBatchRequest(name="".join([parent, "/batches/", batch_id])),
self.job_client.create_batch(request=request) # type: ignore
# using the creat_batch `.result()` method takes quite a while as it waits for all
# resources to tear down before returning, so we do the polling ourselves. This is a bit hacky but it works.
state = Batch.State.PENDING
response = None
run_time = 0
while state in [Batch.State.PENDING, Batch.State.RUNNING] and run_time < self.timeout:
time.sleep(1)
response = self.job_client.get_batch( # type: ignore
request=dataproc_v1.GetBatchRequest(name="".join([parent, "/batches/", batch_id])), # type: ignore
)
run_time = datetime.now().timestamp() - response.create_time.timestamp() # type: ignore
state = response.state
if not response:
raise ValueError("No response from Dataproc")
if run_time >= self.timeout:
raise ValueError(
f"Operation did not complete within the designated timeout of {self.timeout} seconds."
)
state = str(response.state)
if state != Batch.State.SUCCEEDED:
raise ValueError(response.state_message)
return response
# there might be useful results here that we can parse and return
# Dataproc job output is saved to the Cloud Storage bucket
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/adapter/test_python_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def model(dbt, spark):
"""

models__python_array_batch_id_python = """
import pandas
import pandas as pd
def model(dbt, spark):
random_array = [
Expand Down

0 comments on commit 47d820c

Please sign in to comment.