From 516c4261c7882629974b3fb2c88a5251df1cc378 Mon Sep 17 00:00:00 2001 From: colin-rogers-dbt <111200756+colin-rogers-dbt@users.noreply.github.com> Date: Mon, 11 Sep 2023 11:11:59 -0700 Subject: [PATCH] Timeout BQ queries (#902) * use dynamic schema in test_grant_access_to.py * use dynamic schema in test_grant_access_to.py * experiment with query job cancel on timeout * modify unit tests * remove test grants change * starting functional test * update functional test and experiment with polling logic * experiment with async wait_for * modifying connections.py for asyncio logic * swap back to new_event_loop * close loop, now seeing asyncio timeoutError * improve order and update functional test * update unit test * add changie * add max_result back to result call in async path * rescope the dbt_profile_target to being a class fixture * raise DbtRuntimeError instead database * remove exception type check in job timeout --------- Co-authored-by: Matthew McKnight Co-authored-by: Matthew McKnight <91097623+McKnight-42@users.noreply.github.com> (cherry picked from commit 2eb407d27fddb839946d727c9d13a398f9f3ec1f) --- .../unreleased/Fixes-20230829-162111.yaml | 6 ++ dbt/adapters/bigquery/connections.py | 23 ++++++- tests/conftest.py | 2 +- tests/functional/test_job_timeout.py | 62 +++++++++++++++++++ tests/unit/test_bigquery_adapter.py | 25 +++++++- 5 files changed, 114 insertions(+), 4 deletions(-) create mode 100644 .changes/unreleased/Fixes-20230829-162111.yaml create mode 100644 tests/functional/test_job_timeout.py diff --git a/.changes/unreleased/Fixes-20230829-162111.yaml b/.changes/unreleased/Fixes-20230829-162111.yaml new file mode 100644 index 000000000..5d34acd3e --- /dev/null +++ b/.changes/unreleased/Fixes-20230829-162111.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Time out queries if user supplies `job_execution_timeout` +time: 2023-08-29T16:21:11.69291-07:00 +custom: + Author: colin-rogers-dbt McKnight-42 + Issue: "231" diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index 15c983480..6bfd9b130 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -1,7 +1,10 @@ +import asyncio +import functools import json import re from contextlib import contextmanager from dataclasses import dataclass, field + from mashumaro.helper import pass_through from functools import lru_cache @@ -703,7 +706,6 @@ def _query_and_results( # Cannot reuse job_config if destination is set and ddl is used job_config = google.cloud.bigquery.QueryJobConfig(**job_params) query_job = client.query(query=sql, job_config=job_config, timeout=job_creation_timeout) - if ( query_job.location is not None and query_job.job_id is not None @@ -713,8 +715,25 @@ def _query_and_results( self._bq_job_link(query_job.location, query_job.project, query_job.job_id) ) - iterator = query_job.result(max_results=limit, timeout=job_execution_timeout) + # only use async logic if user specifies a timeout + if job_execution_timeout: + loop = asyncio.new_event_loop() + future_iterator = asyncio.wait_for( + loop.run_in_executor(None, functools.partial(query_job.result, max_results=limit)), + timeout=job_execution_timeout, + ) + try: + iterator = loop.run_until_complete(future_iterator) + except asyncio.TimeoutError: + query_job.cancel() + raise DbtRuntimeError( + f"Query exceeded configured timeout of {job_execution_timeout}s" + ) + finally: + loop.close() + else: + iterator = query_job.result(max_results=limit) return query_job, iterator def _retry_and_handle(self, msg, conn, fn): diff --git a/tests/conftest.py b/tests/conftest.py index 0ba0091fb..78f3d82e1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,7 +12,7 @@ def pytest_addoption(parser): parser.addoption("--profile", action="store", default="oauth", type=str) -@pytest.fixture(scope="session") +@pytest.fixture(scope="class") def dbt_profile_target(request): profile_type = request.config.getoption("--profile") if profile_type == "oauth": diff --git a/tests/functional/test_job_timeout.py b/tests/functional/test_job_timeout.py new file mode 100644 index 000000000..be559e816 --- /dev/null +++ b/tests/functional/test_job_timeout.py @@ -0,0 +1,62 @@ +import pytest + +from dbt.tests.util import run_dbt + +_REASONABLE_TIMEOUT = 300 +_SHORT_TIMEOUT = 1 + +_LONG_RUNNING_MODEL_SQL = """ + {{ config(materialized='table') }} + with array_1 as ( + select generated_ids from UNNEST(GENERATE_ARRAY(1, 200000)) AS generated_ids + ), + array_2 as ( + select generated_ids from UNNEST(GENERATE_ARRAY(2, 200000)) AS generated_ids + ) + + SELECT array_1.generated_ids + FROM array_1 + LEFT JOIN array_1 as jnd on 1=1 + LEFT JOIN array_2 as jnd2 on 1=1 + LEFT JOIN array_1 as jnd3 on jnd3.generated_ids >= jnd2.generated_ids +""" + +_SHORT_RUNNING_QUERY = """ + SELECT 1 as id + """ + + +class TestSuccessfulJobRun: + @pytest.fixture(scope="class") + def models(self): + return { + "model.sql": _SHORT_RUNNING_QUERY, + } + + @pytest.fixture(scope="class") + def profiles_config_update(self, dbt_profile_target): + outputs = {"default": dbt_profile_target} + outputs["default"]["job_execution_timeout_seconds"] = _REASONABLE_TIMEOUT + return {"test": {"outputs": outputs, "target": "default"}} + + def test_bigquery_job_run_succeeds_within_timeout(self, project): + result = run_dbt() + assert len(result) == 1 + + +class TestJobTimeout: + @pytest.fixture(scope="class") + def models(self): + return { + "model.sql": _LONG_RUNNING_MODEL_SQL, + } + + @pytest.fixture(scope="class") + def profiles_config_update(self, dbt_profile_target): + outputs = {"default": dbt_profile_target} + outputs["default"]["job_execution_timeout_seconds"] = _SHORT_TIMEOUT + return {"test": {"outputs": outputs, "target": "default"}} + + def test_job_timeout(self, project): + result = run_dbt(["run"], expect_pass=False) # project setup will fail + assert f"Query exceeded configured timeout of {_SHORT_TIMEOUT}s" in result[0].message diff --git a/tests/unit/test_bigquery_adapter.py b/tests/unit/test_bigquery_adapter.py index bb98db86f..10cb3f530 100644 --- a/tests/unit/test_bigquery_adapter.py +++ b/tests/unit/test_bigquery_adapter.py @@ -1,3 +1,5 @@ +import time + import agate import decimal import json @@ -634,18 +636,39 @@ def test_drop_dataset(self): @patch("dbt.adapters.bigquery.impl.google.cloud.bigquery") def test_query_and_results(self, mock_bq): + self.mock_client.query = Mock(return_value=Mock(state="DONE")) self.connections._query_and_results( self.mock_client, "sql", {"job_param_1": "blah"}, job_creation_timeout=15, - job_execution_timeout=100, + job_execution_timeout=3, + ) + + mock_bq.QueryJobConfig.assert_called_once() + self.mock_client.query.assert_called_once_with( + query="sql", job_config=mock_bq.QueryJobConfig(), timeout=15 ) + @patch("dbt.adapters.bigquery.impl.google.cloud.bigquery") + def test_query_and_results_timeout(self, mock_bq): + self.mock_client.query = Mock( + return_value=Mock(result=lambda *args, **kwargs: time.sleep(4)) + ) + with pytest.raises(dbt.exceptions.DbtRuntimeError) as exc: + self.connections._query_and_results( + self.mock_client, + "sql", + {"job_param_1": "blah"}, + job_creation_timeout=15, + job_execution_timeout=1, + ) + mock_bq.QueryJobConfig.assert_called_once() self.mock_client.query.assert_called_once_with( query="sql", job_config=mock_bq.QueryJobConfig(), timeout=15 ) + assert "Query exceeded configured timeout of 1s" in str(exc.value) def test_copy_bq_table_appends(self): self._copy_table(write_disposition=dbt.adapters.bigquery.impl.WRITE_APPEND)