Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADAP-394] support change monitoring for materialized views #914

Merged
merged 73 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
7efee19
init push of ADAP-394
McKnight-42 Sep 7, 2023
aed5f99
update lots of framework and update to main
McKnight-42 Sep 12, 2023
956b609
Merge branch 'main' into feature/materialized-views/ADAP-394
McKnight-42 Sep 12, 2023
b7cd870
updating based on feedback
McKnight-42 Sep 13, 2023
824be38
Merge branch 'feature/materialized-views/ADAP-394' of https://github.…
McKnight-42 Sep 13, 2023
b42fe01
clear conflicts
McKnight-42 Sep 13, 2023
ed0a69d
add changelog
McKnight-42 Sep 13, 2023
52f5dfd
remove in preivew option
McKnight-42 Sep 14, 2023
eb34007
fill out changeset and config change classes for specific options
McKnight-42 Sep 14, 2023
971979e
change partition_by and cluster_by to FrozenSet, initial attempt at d…
McKnight-42 Sep 20, 2023
30fc389
Merge branch 'main' of https://github.com/dbt-labs/dbt-bigquery into …
McKnight-42 Sep 20, 2023
9e1d559
create utility.py to add bool_setting method, update parse_model_node…
McKnight-42 Sep 20, 2023
71c903d
update describe.sql query
McKnight-42 Sep 21, 2023
377414c
update describe sql to be able to create list of cluster by field names
McKnight-42 Sep 21, 2023
d219af2
initital attempt at modifying get_alter_materialized_view_as_sql
McKnight-42 Sep 22, 2023
dfc4dd4
Merge branch 'main' of https://github.com/dbt-labs/dbt-bigquery into …
McKnight-42 Sep 22, 2023
0e184a6
Merge branch 'main' of https://github.com/dbt-labs/dbt-bigquery into …
McKnight-42 Sep 25, 2023
3915a7f
Merge branch 'main' into feature/materialized-views/ADAP-394
McKnight-42 Sep 25, 2023
241f431
Merge branch 'feature/materialized-views/ADAP-394' of https://github.…
McKnight-42 Sep 25, 2023
b5349b9
Merge branch 'main' of https://github.com/dbt-labs/dbt-bigquery into …
McKnight-42 Sep 26, 2023
6bedc5e
update to main and add space
McKnight-42 Sep 26, 2023
da4d295
initial build out of mini classes for bigquery cluster, partition, au…
McKnight-42 Sep 27, 2023
9f10d36
Merge branch 'main' of https://github.com/dbt-labs/dbt-bigquery into …
McKnight-42 Sep 27, 2023
0eba246
remove local package (dbt-bigquery) on `make dev-uninstall`
mikealfare Sep 27, 2023
c56076e
update changelog entry to encompass all features in this branch
mikealfare Sep 27, 2023
4c2904e
remove alteration to setup/teardown for materialized view materializa…
mikealfare Sep 27, 2023
8f722fe
fix spelling error, prepend underscore on base class module to mark a…
mikealfare Sep 27, 2023
fe9aa65
update call to relation to include quote and include policies, update…
mikealfare Sep 27, 2023
47b787d
update create statement to include partition, cluster, and options cl…
mikealfare Sep 27, 2023
8398d65
update partition config to align with existing dbt-bigquery table config
mikealfare Sep 28, 2023
2d744f2
update cluster config to align with existing dbt-bigquery table config
mikealfare Sep 28, 2023
f75a406
update auto refresh config to align with other configs
mikealfare Sep 28, 2023
ae42de0
revert parse results to accept an agate Row
mikealfare Sep 28, 2023
3a682b4
update how defaults are handled
mikealfare Sep 28, 2023
894fdb2
add description option to materialized view since it is handled for t…
mikealfare Sep 28, 2023
95c6c01
add description option to materialized view since it is handled for t…
mikealfare Sep 28, 2023
3ea6cbe
fix method call chain in parse_relation_results on cluster, partition…
McKnight-42 Sep 28, 2023
8a31379
move PartitionConfig into relation_configs to be used by materialized…
mikealfare Sep 28, 2023
630913c
move PartitionConfig into relation_configs to be used by materialized…
mikealfare Sep 28, 2023
97b025c
update create materialized view to use the relation config
mikealfare Sep 28, 2023
e90af61
Merge remote-tracking branch 'origin/feature/materialized-views/ADAP-…
mikealfare Sep 28, 2023
d08c54b
condition on existence of properties before templating them
mikealfare Sep 28, 2023
630145d
allow for "drop if exists" functionality via the google sdk
mikealfare Sep 28, 2023
288afe2
remove unnecessary trailing semicolon
mikealfare Sep 29, 2023
37b2ccd
implement replace based on create
mikealfare Sep 29, 2023
0696752
implement clustering, partitioning, and auto refresh for materialized…
mikealfare Oct 5, 2023
d0bf159
Merge branch 'main' into feature/materialized-views/ADAP-394
mikealfare Oct 5, 2023
d4634ab
remove include_policy from BigQueryRelation, it's causing unit tests …
mikealfare Oct 5, 2023
f4f9cf5
partition type cannot be queried for materialized views, adjust the d…
mikealfare Oct 5, 2023
ee32026
add describe_relation for materialized views
mikealfare Oct 6, 2023
32d9b89
break out common utilities into a mixin for materialized view tests
mikealfare Oct 6, 2023
a9a4581
change refresh_interval_minutes from an int to a float to match the b…
mikealfare Oct 6, 2023
b84beef
make partition optional on relation results since it cannot be querie…
mikealfare Oct 6, 2023
0b609f2
initial draft of materialized view change tests
mikealfare Oct 6, 2023
779504b
build changeset for materialized view
mikealfare Oct 6, 2023
876dad8
implement change monitoring for autorefresh and clustering on materia…
mikealfare Oct 10, 2023
6989ff3
Merge branch 'main' into feature/materialized-views/ADAP-394
mikealfare Oct 10, 2023
9a23f2f
committing to park changes and wrap up other 1.7 items
mikealfare Oct 10, 2023
9f1ef3d
Merge branch 'main' into feature/materialized-views/ADAP-394
mikealfare Oct 10, 2023
f258781
Merge branch 'main' into feature/materialized-views/ADAP-940
colin-rogers-dbt Oct 11, 2023
0ebff83
Merge branch 'main' into feature/materialized-views/ADAP-394
colin-rogers-dbt Oct 11, 2023
9aafbb9
update describe to use the sdk instead of sql to pick up partition in…
mikealfare Oct 11, 2023
a2e9fa3
basic tests pass
mikealfare Oct 11, 2023
b2187bd
existing change monitoring tests pass
mikealfare Oct 11, 2023
120c1ce
partition change monitoring tests pass
mikealfare Oct 11, 2023
4551047
Merge remote-tracking branch 'origin/feature/materialized-views/ADAP-…
mikealfare Oct 11, 2023
2581b62
ADAP-940: Add change monitoring for partitioning clause (#962)
mikealfare Oct 11, 2023
f41a716
Merge branch 'main' into feature/materialized-views/ADAP-394
mikealfare Oct 11, 2023
f3b1e71
implement PR review feedback
mikealfare Oct 11, 2023
79489c3
Merge branch 'feature/materialized-views/ADAP-940' into feature/mater…
mikealfare Oct 11, 2023
b5c19d9
delete empty file
mikealfare Oct 11, 2023
9f57900
Merge branch 'main' into feature/materialized-views/ADAP-394
mikealfare Oct 11, 2023
b134fb3
add MV tests for cluster and partition alone, update combined tests t…
mikealfare Oct 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230913-130445.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: "Support change monitoring for materialized views, including: autorefresh, clustering, partitioning"
time: 2023-09-13T13:04:45.761294-05:00
custom:
Author: McKnight-42
Issue: "924"
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ ignore =
E741,
E501,
exclude = tests
per-file-ignores =
*/__init__.py: F401
197 changes: 62 additions & 135 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,56 @@
from dataclasses import dataclass
import json
import threading
from typing import Dict, List, Optional, Any, Set, Union, Type

from dbt.contracts.connection import AdapterResponse
from dbt.contracts.graph.nodes import ColumnLevelConstraint, ModelLevelConstraint, ConstraintType # type: ignore
from dbt.dataclass_schema import dbtClassMixin, ValidationError

import dbt.deprecations
import dbt.exceptions
import dbt.clients.agate_helper
import time
from typing import Any, Dict, List, Optional, Type, Set, Union

import agate
from dbt import ui # type: ignore
from dbt.adapters.base import ( # type: ignore
AdapterConfig,
BaseAdapter,
BaseRelation,
ConstraintSupport,
available,
PythonJobHelper,
RelationType,
BaseRelation,
SchemaSearchMap,
AdapterConfig,
PythonJobHelper,
available,
)

from dbt.adapters.cache import _make_ref_key_dict # type: ignore

from dbt.adapters.bigquery.column import get_nested_column_data_types
from dbt.adapters.bigquery.relation import BigQueryRelation
from dbt.adapters.bigquery.dataset import add_access_entry_to_dataset, is_access_entry_in_dataset
from dbt.adapters.bigquery import BigQueryColumn
from dbt.adapters.bigquery import BigQueryConnectionManager
from dbt.adapters.bigquery.python_submissions import (
ClusterDataprocHelper,
ServerlessDataProcHelper,
)
from dbt.adapters.bigquery.connections import BigQueryAdapterResponse
import dbt.clients.agate_helper
from dbt.contracts.connection import AdapterResponse
from dbt.contracts.graph.manifest import Manifest
from dbt.events import (
AdapterLogger,
)
from dbt.contracts.graph.nodes import ColumnLevelConstraint, ConstraintType, ModelLevelConstraint # type: ignore
from dbt.dataclass_schema import dbtClassMixin
import dbt.deprecations
from dbt.events import AdapterLogger
from dbt.events.functions import fire_event
from dbt.events.types import SchemaCreation, SchemaDrop
import dbt.exceptions
from dbt.utils import filter_null_values

import google.auth
import google.api_core
import google.auth
import google.oauth2
import google.cloud.exceptions
import google.cloud.bigquery
from google.cloud.bigquery import AccessEntry, SchemaField, Table as BigQueryTable
import google.cloud.exceptions

from google.cloud.bigquery import AccessEntry, SchemaField
from dbt.adapters.bigquery import BigQueryColumn, BigQueryConnectionManager
from dbt.adapters.bigquery.column import get_nested_column_data_types
from dbt.adapters.bigquery.connections import BigQueryAdapterResponse
from dbt.adapters.bigquery.dataset import add_access_entry_to_dataset, is_access_entry_in_dataset
from dbt.adapters.bigquery.python_submissions import (
ClusterDataprocHelper,
ServerlessDataProcHelper,
)
from dbt.adapters.bigquery.relation import BigQueryRelation
from dbt.adapters.bigquery.relation_configs import (
BigQueryBaseRelationConfig,
BigQueryMaterializedViewConfig,
PartitionConfig,
)
from dbt.adapters.bigquery.utility import sql_escape

import time
import agate
import json

logger = AdapterLogger("BigQuery")

Expand All @@ -64,105 +62,6 @@
_dataset_lock = threading.Lock()


def sql_escape(string):
if not isinstance(string, str):
raise dbt.exceptions.CompilationError(f"cannot escape a non-string: {string}")
return json.dumps(string)[1:-1]


@dataclass
class PartitionConfig(dbtClassMixin):
field: str
data_type: str = "date"
granularity: str = "day"
range: Optional[Dict[str, Any]] = None
time_ingestion_partitioning: bool = False
copy_partitions: bool = False

PARTITION_DATE = "_PARTITIONDATE"
PARTITION_TIME = "_PARTITIONTIME"

def data_type_for_partition(self):
"""Return the data type of partitions for replacement.
When time_ingestion_partitioning is enabled, the data type supported are date & timestamp.
"""
if not self.time_ingestion_partitioning:
return self.data_type

return "date" if self.data_type == "date" else "timestamp"

def reject_partition_field_column(self, columns: List[Any]) -> List[str]:
return [c for c in columns if not c.name.upper() == self.field.upper()]

def data_type_should_be_truncated(self):
"""Return true if the data type should be truncated instead of cast to the data type."""
return not (
self.data_type == "int64" or (self.data_type == "date" and self.granularity == "day")
)

def time_partitioning_field(self) -> str:
"""Return the time partitioning field name based on the data type.
The default is _PARTITIONTIME, but for date it is _PARTITIONDATE
else it will fail statements for type mismatch."""
if self.data_type == "date":
return self.PARTITION_DATE
else:
return self.PARTITION_TIME

def insertable_time_partitioning_field(self) -> str:
"""Return the insertable time partitioning field name based on the data type.
Practically, only _PARTITIONTIME works so far.
The function is meant to keep the call sites consistent as it might evolve."""
return self.PARTITION_TIME

def render(self, alias: Optional[str] = None):
column: str = (
self.field if not self.time_ingestion_partitioning else self.time_partitioning_field()
)
if alias:
column = f"{alias}.{column}"

if self.data_type_should_be_truncated():
return f"{self.data_type}_trunc({column}, {self.granularity})"
else:
return column

def render_wrapped(self, alias: Optional[str] = None):
"""Wrap the partitioning column when time involved to ensure it is properly cast to matching time."""
# if data type is going to be truncated, no need to wrap
if (
self.data_type in ("date", "timestamp", "datetime")
and not self.data_type_should_be_truncated()
and not (
self.time_ingestion_partitioning and self.data_type == "date"
) # _PARTITIONDATE is already a date
):
return f"{self.data_type}({self.render(alias)})"
else:
return self.render(alias)

@classmethod
def parse(cls, raw_partition_by) -> Optional["PartitionConfig"]:
if raw_partition_by is None:
return None
try:
cls.validate(raw_partition_by)
return cls.from_dict(
{
key: (value.lower() if isinstance(value, str) else value)
for key, value in raw_partition_by.items()
}
)
except ValidationError as exc:
raise dbt.exceptions.DbtValidationError("Could not parse partition config") from exc
except TypeError:
raise dbt.exceptions.CompilationError(
f"Invalid partition_by config:\n"
f" Got: {raw_partition_by}\n"
f' Expected a dictionary with "field" and "data_type" keys'
)


@dataclass
class GrantTarget(dbtClassMixin):
dataset: str
Expand Down Expand Up @@ -241,7 +140,9 @@ def drop_relation(self, relation: BigQueryRelation) -> None:
conn = self.connections.get_thread_connection()

table_ref = self.get_table_ref_from_relation(relation)
conn.handle.delete_table(table_ref)

# mimic "drop if exists" functionality that's ubiquitous in most sql implementations
conn.handle.delete_table(table_ref, not_found_ok=True)

def truncate_relation(self, relation: BigQueryRelation) -> None:
raise dbt.exceptions.NotImplementedError("`truncate` is not implemented for this adapter!")
Expand Down Expand Up @@ -849,6 +750,32 @@ def get_view_options(self, config: Dict[str, Any], node: Dict[str, Any]) -> Dict
opts = self.get_common_options(config, node)
return opts

@available.parse(lambda *a, **k: True)
def get_bq_table(self, relation: BigQueryRelation) -> Optional[BigQueryTable]:
try:
table = self.connections.get_bq_table(
relation.database, relation.schema, relation.identifier
)
except google.cloud.exceptions.NotFound:
table = None
return table

@available.parse(lambda *a, **k: True)
def describe_relation(
self, relation: BigQueryRelation
) -> Optional[BigQueryBaseRelationConfig]:
if relation.type == RelationType.MaterializedView:
bq_table = self.get_bq_table(relation)
parser = BigQueryMaterializedViewConfig
else:
raise dbt.exceptions.DbtRuntimeError(
f"The method `BigQueryAdapter.describe_relation` is not implemented "
f"for the relation type: {relation.type}"
)
if bq_table:
return parser.from_bq_table(bq_table)
return None

@available.parse_none
def grant_access_to(self, entity, entity_type, role, grant_target_dict):
"""
Expand Down
58 changes: 52 additions & 6 deletions dbt/adapters/bigquery/relation.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
from dataclasses import dataclass
from typing import Optional
from typing import FrozenSet, Optional, TypeVar

from itertools import chain, islice

from dbt.context.providers import RuntimeConfigObject
from dbt.adapters.base.relation import BaseRelation, ComponentName, InformationSchema
from dbt.adapters.relation_configs import RelationConfigChangeAction
from dbt.adapters.bigquery.relation_configs import (
BigQueryClusterConfigChange,
BigQueryMaterializedViewConfig,
BigQueryMaterializedViewConfigChangeset,
BigQueryOptionsConfigChange,
BigQueryPartitionConfigChange,
)
from dbt.contracts.graph.nodes import ModelNode
from dbt.contracts.relation import RelationType
from dbt.exceptions import CompilationError
from dbt.utils import filter_null_values
from typing import TypeVar


Self = TypeVar("Self", bound="BigQueryRelation")
Expand All @@ -17,9 +25,10 @@
class BigQueryRelation(BaseRelation):
quote_character: str = "`"
location: Optional[str] = None
# why do we need to use default_factory here but we can assign it directly in dbt-postgres?
renameable_relations = frozenset({RelationType.Table})
replaceable_relations = frozenset({RelationType.Table, RelationType.View})
renameable_relations: FrozenSet[RelationType] = frozenset({RelationType.Table})
replaceable_relations: FrozenSet[RelationType] = frozenset(
{RelationType.Table, RelationType.View}
)

def matches(
self,
Expand Down Expand Up @@ -53,6 +62,43 @@ def project(self):
def dataset(self):
return self.schema

@classmethod
def materialized_view_from_model_node(
cls, model_node: ModelNode
) -> BigQueryMaterializedViewConfig:
return BigQueryMaterializedViewConfig.from_model_node(model_node) # type: ignore

@classmethod
def materialized_view_config_changeset(
mikealfare marked this conversation as resolved.
Show resolved Hide resolved
cls,
existing_materialized_view: BigQueryMaterializedViewConfig,
runtime_config: RuntimeConfigObject,
) -> Optional[BigQueryMaterializedViewConfigChangeset]:
config_change_collection = BigQueryMaterializedViewConfigChangeset()
new_materialized_view = cls.materialized_view_from_model_node(runtime_config.model)

if new_materialized_view.options != existing_materialized_view.options:
config_change_collection.options = BigQueryOptionsConfigChange(
action=RelationConfigChangeAction.alter,
context=new_materialized_view.options,
)

if new_materialized_view.partition != existing_materialized_view.partition:
config_change_collection.partition = BigQueryPartitionConfigChange(
action=RelationConfigChangeAction.alter,
context=new_materialized_view.partition,
)

if new_materialized_view.cluster != existing_materialized_view.cluster:
config_change_collection.cluster = BigQueryClusterConfigChange(
action=RelationConfigChangeAction.alter,
context=new_materialized_view.cluster,
)

if config_change_collection:
return config_change_collection
return None

def information_schema(self, identifier: Optional[str] = None) -> "BigQueryInformationSchema":
return BigQueryInformationSchema.from_relation(self, identifier)

Expand Down
21 changes: 21 additions & 0 deletions dbt/adapters/bigquery/relation_configs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from dbt.adapters.bigquery.relation_configs._base import BigQueryBaseRelationConfig
from dbt.adapters.bigquery.relation_configs._cluster import (
BigQueryClusterConfig,
BigQueryClusterConfigChange,
)
from dbt.adapters.bigquery.relation_configs._materialized_view import (
BigQueryMaterializedViewConfig,
BigQueryMaterializedViewConfigChangeset,
)
from dbt.adapters.bigquery.relation_configs._options import (
BigQueryOptionsConfig,
BigQueryOptionsConfigChange,
)
from dbt.adapters.bigquery.relation_configs._partition import (
PartitionConfig,
BigQueryPartitionConfigChange,
)
from dbt.adapters.bigquery.relation_configs._policies import (
BigQueryIncludePolicy,
BigQueryQuotePolicy,
)
Loading