diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index de84e4bf8..cdfc3bbe5 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -492,17 +492,23 @@ def raw_execute( job_creation_timeout = self.get_job_creation_timeout_seconds(conn) job_execution_timeout = self.get_job_execution_timeout_seconds(conn) - def fn(): + def job_creation_fn(): return self._query_and_results( - client, - sql, - job_params, - job_creation_timeout=job_creation_timeout, + client, sql, job_params, job_creation_timeout=job_creation_timeout + ) + + query_job = self._retry_and_handle(msg=sql, conn=conn, fn=job_creation_fn) + + def wait_for_job_results_fn(): + return self._wait_for_results( + query_job=query_job, job_execution_timeout=job_execution_timeout, limit=limit, ) - query_job, iterator = self._retry_and_handle(msg=sql, conn=conn, fn=fn) + query_job, iterator = self._retry_and_handle( + msg=sql, conn=conn, fn=wait_for_job_results_fn + ) return query_job, iterator @@ -729,15 +735,7 @@ def query_schemas(): return self._retry_and_handle(msg="list dataset", conn=conn, fn=query_schemas) - def _query_and_results( - self, - client, - sql, - job_params, - job_creation_timeout=None, - job_execution_timeout=None, - limit: Optional[int] = None, - ): + def _query_and_results(self, client, sql, job_params, job_creation_timeout=None): """Query the client and wait for results.""" # Cannot reuse job_config if destination is set and ddl is used job_config = google.cloud.bigquery.QueryJobConfig(**job_params) @@ -750,7 +748,11 @@ def _query_and_results( logger.debug( self._bq_job_link(query_job.location, query_job.project, query_job.job_id) ) + return query_job + def _wait_for_results( + self, query_job, job_execution_timeout=None, limit: Optional[int] = None + ): # only use async logic if user specifies a timeout if job_execution_timeout: loop = asyncio.new_event_loop() @@ -787,7 +789,7 @@ def reopen_conn_on_error(error): target=fn, predicate=_ErrorCounter(self.get_job_retries(conn)).count_error, sleep_generator=self._retry_generator(), - deadline=self.get_job_retry_deadline_seconds(conn), + timeout=self.get_job_retry_deadline_seconds(conn), on_error=reopen_conn_on_error, )