Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 1.6.latest] Timeout BQ queries #926

Merged
merged 1 commit into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230829-162111.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Time out queries if user supplies `job_execution_timeout`
time: 2023-08-29T16:21:11.69291-07:00
custom:
Author: colin-rogers-dbt McKnight-42
Issue: "231"
23 changes: 21 additions & 2 deletions dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import asyncio
import functools
import json
import re
from contextlib import contextmanager
from dataclasses import dataclass, field

from mashumaro.helper import pass_through

from functools import lru_cache
Expand Down Expand Up @@ -703,7 +706,6 @@ def _query_and_results(
# Cannot reuse job_config if destination is set and ddl is used
job_config = google.cloud.bigquery.QueryJobConfig(**job_params)
query_job = client.query(query=sql, job_config=job_config, timeout=job_creation_timeout)

if (
query_job.location is not None
and query_job.job_id is not None
Expand All @@ -713,8 +715,25 @@ def _query_and_results(
self._bq_job_link(query_job.location, query_job.project, query_job.job_id)
)

iterator = query_job.result(max_results=limit, timeout=job_execution_timeout)
# only use async logic if user specifies a timeout
if job_execution_timeout:
loop = asyncio.new_event_loop()
future_iterator = asyncio.wait_for(
loop.run_in_executor(None, functools.partial(query_job.result, max_results=limit)),
timeout=job_execution_timeout,
)

try:
iterator = loop.run_until_complete(future_iterator)
except asyncio.TimeoutError:
query_job.cancel()
raise DbtRuntimeError(
f"Query exceeded configured timeout of {job_execution_timeout}s"
)
finally:
loop.close()
else:
iterator = query_job.result(max_results=limit)
return query_job, iterator

def _retry_and_handle(self, msg, conn, fn):
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def pytest_addoption(parser):
parser.addoption("--profile", action="store", default="oauth", type=str)


@pytest.fixture(scope="session")
@pytest.fixture(scope="class")
def dbt_profile_target(request):
profile_type = request.config.getoption("--profile")
if profile_type == "oauth":
Expand Down
62 changes: 62 additions & 0 deletions tests/functional/test_job_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import pytest

from dbt.tests.util import run_dbt

_REASONABLE_TIMEOUT = 300
_SHORT_TIMEOUT = 1

_LONG_RUNNING_MODEL_SQL = """
{{ config(materialized='table') }}
with array_1 as (
select generated_ids from UNNEST(GENERATE_ARRAY(1, 200000)) AS generated_ids
),
array_2 as (
select generated_ids from UNNEST(GENERATE_ARRAY(2, 200000)) AS generated_ids
)

SELECT array_1.generated_ids
FROM array_1
LEFT JOIN array_1 as jnd on 1=1
LEFT JOIN array_2 as jnd2 on 1=1
LEFT JOIN array_1 as jnd3 on jnd3.generated_ids >= jnd2.generated_ids
"""

_SHORT_RUNNING_QUERY = """
SELECT 1 as id
"""


class TestSuccessfulJobRun:
@pytest.fixture(scope="class")
def models(self):
return {
"model.sql": _SHORT_RUNNING_QUERY,
}

@pytest.fixture(scope="class")
def profiles_config_update(self, dbt_profile_target):
outputs = {"default": dbt_profile_target}
outputs["default"]["job_execution_timeout_seconds"] = _REASONABLE_TIMEOUT
return {"test": {"outputs": outputs, "target": "default"}}

def test_bigquery_job_run_succeeds_within_timeout(self, project):
result = run_dbt()
assert len(result) == 1


class TestJobTimeout:
@pytest.fixture(scope="class")
def models(self):
return {
"model.sql": _LONG_RUNNING_MODEL_SQL,
}

@pytest.fixture(scope="class")
def profiles_config_update(self, dbt_profile_target):
outputs = {"default": dbt_profile_target}
outputs["default"]["job_execution_timeout_seconds"] = _SHORT_TIMEOUT
return {"test": {"outputs": outputs, "target": "default"}}

def test_job_timeout(self, project):
result = run_dbt(["run"], expect_pass=False) # project setup will fail
assert f"Query exceeded configured timeout of {_SHORT_TIMEOUT}s" in result[0].message
25 changes: 24 additions & 1 deletion tests/unit/test_bigquery_adapter.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import time

import agate
import decimal
import json
Expand Down Expand Up @@ -634,18 +636,39 @@ def test_drop_dataset(self):

@patch("dbt.adapters.bigquery.impl.google.cloud.bigquery")
def test_query_and_results(self, mock_bq):
self.mock_client.query = Mock(return_value=Mock(state="DONE"))
self.connections._query_and_results(
self.mock_client,
"sql",
{"job_param_1": "blah"},
job_creation_timeout=15,
job_execution_timeout=100,
job_execution_timeout=3,
)

mock_bq.QueryJobConfig.assert_called_once()
self.mock_client.query.assert_called_once_with(
query="sql", job_config=mock_bq.QueryJobConfig(), timeout=15
)

@patch("dbt.adapters.bigquery.impl.google.cloud.bigquery")
def test_query_and_results_timeout(self, mock_bq):
self.mock_client.query = Mock(
return_value=Mock(result=lambda *args, **kwargs: time.sleep(4))
)
with pytest.raises(dbt.exceptions.DbtRuntimeError) as exc:
self.connections._query_and_results(
self.mock_client,
"sql",
{"job_param_1": "blah"},
job_creation_timeout=15,
job_execution_timeout=1,
)

mock_bq.QueryJobConfig.assert_called_once()
self.mock_client.query.assert_called_once_with(
query="sql", job_config=mock_bq.QueryJobConfig(), timeout=15
)
assert "Query exceeded configured timeout of 1s" in str(exc.value)

def test_copy_bq_table_appends(self):
self._copy_table(write_disposition=dbt.adapters.bigquery.impl.WRITE_APPEND)
Expand Down