Skip to content

Commit

Permalink
add _get_one_catalog_by_relations method, redirect _get_one_catalog t…
Browse files Browse the repository at this point in the history
…o this method to reduce code duplication
  • Loading branch information
mikealfare committed Dec 16, 2023
1 parent bbac669 commit 28dd554
Showing 1 changed file with 15 additions and 5 deletions.
20 changes: 15 additions & 5 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any, Dict, Iterable, List, Optional, Union, Type, Tuple, Callable, Set

from dbt.adapters.base.relation import InformationSchema
from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability
from dbt.contracts.graph.manifest import Manifest

from typing_extensions import TypeAlias
Expand Down Expand Up @@ -101,6 +102,10 @@ class SparkAdapter(SQLAdapter):
ConstraintType.foreign_key: ConstraintSupport.NOT_ENFORCED,
}

_capabilities = CapabilityDict(
{Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full)}
)

Relation: TypeAlias = SparkRelation
RelationInfo = Tuple[str, str, str]
Column: TypeAlias = SparkColumn
Expand Down Expand Up @@ -387,13 +392,18 @@ def _get_one_catalog(
raise dbt.exceptions.CompilationError(
f"Expected only one schema in spark _get_one_catalog, found " f"{schemas}"
)
relations = self.list_relations(information_schema.database, list(schemas)[0])
return self._get_one_catalog_by_relations(information_schema, relations, manifest)

database = information_schema.database
schema = list(schemas)[0]

def _get_one_catalog_by_relations(
self,
information_schema: InformationSchema,
relations: List[BaseRelation],
manifest: Manifest,
) -> agate.Table:
columns: List[Dict[str, Any]] = []
for relation in self.list_relations(database, schema):
logger.debug("Getting table schema for relation {}", str(relation))
for relation in relations:
logger.debug(f"Getting table schema for relation {relation}")
columns.extend(self._get_columns_for_catalog(relation))
return agate.Table.from_object(columns, column_types=DEFAULT_TYPE_TESTER)

Expand Down

0 comments on commit 28dd554

Please sign in to comment.