Skip to content

Commit

Permalink
retry wait for result independently from job creation
Browse files Browse the repository at this point in the history
  • Loading branch information
Kayrnt committed Dec 5, 2023
1 parent c489332 commit ec8886c
Showing 1 changed file with 18 additions and 16 deletions.
34 changes: 18 additions & 16 deletions dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,17 +492,23 @@ def raw_execute(
job_creation_timeout = self.get_job_creation_timeout_seconds(conn)
job_execution_timeout = self.get_job_execution_timeout_seconds(conn)

def fn():
def job_creation_fn():
return self._query_and_results(
client,
sql,
job_params,
job_creation_timeout=job_creation_timeout,
client, sql, job_params, job_creation_timeout=job_creation_timeout
)

query_job = self._retry_and_handle(msg=sql, conn=conn, fn=job_creation_fn)

def wait_for_job_results_fn():
return self._wait_for_results(
query_job=query_job,
job_execution_timeout=job_execution_timeout,
limit=limit,
)

query_job, iterator = self._retry_and_handle(msg=sql, conn=conn, fn=fn)
query_job, iterator = self._retry_and_handle(
msg=sql, conn=conn, fn=wait_for_job_results_fn
)

return query_job, iterator

Expand Down Expand Up @@ -729,15 +735,7 @@ def query_schemas():

return self._retry_and_handle(msg="list dataset", conn=conn, fn=query_schemas)

def _query_and_results(
self,
client,
sql,
job_params,
job_creation_timeout=None,
job_execution_timeout=None,
limit: Optional[int] = None,
):
def _query_and_results(self, client, sql, job_params, job_creation_timeout=None):
"""Query the client and wait for results."""
# Cannot reuse job_config if destination is set and ddl is used
job_config = google.cloud.bigquery.QueryJobConfig(**job_params)
Expand All @@ -750,7 +748,11 @@ def _query_and_results(
logger.debug(
self._bq_job_link(query_job.location, query_job.project, query_job.job_id)
)
return query_job

def _wait_for_results(
self, query_job, job_execution_timeout=None, limit: Optional[int] = None
):
# only use async logic if user specifies a timeout
if job_execution_timeout:
loop = asyncio.new_event_loop()
Expand Down Expand Up @@ -787,7 +789,7 @@ def reopen_conn_on_error(error):
target=fn,
predicate=_ErrorCounter(self.get_job_retries(conn)).count_error,
sleep_generator=self._retry_generator(),
deadline=self.get_job_retry_deadline_seconds(conn),
timeout=self.get_job_retry_deadline_seconds(conn),
on_error=reopen_conn_on_error,
)

Expand Down

0 comments on commit ec8886c

Please sign in to comment.