Skip to content

Commit

Permalink
Merge pull request #77 from fishtown-analytics/fix/catalog-gen
Browse files Browse the repository at this point in the history
fix catalog generation
  • Loading branch information
beckjake authored Apr 22, 2020
2 parents c1f7212 + 3c0f99a commit 4ec7854
Showing 1 changed file with 44 additions and 7 deletions.
51 changes: 44 additions & 7 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import agate
import dbt.exceptions
import dbt
from dbt.adapters.base.relation import SchemaSearchMap
from dbt.adapters.sql import SQLAdapter
from dbt.contracts.graph.manifest import Manifest
from dbt.node_types import NodeType

from dbt.adapters.spark import SparkConnectionManager
from dbt.adapters.spark import SparkRelation
Expand Down Expand Up @@ -267,19 +268,55 @@ def _massage_column_for_catalog(
dct['table_database'] = dct['table_schema']
return dct

def get_catalog(self, manifest: Manifest) -> agate.Table:
schemas = manifest.get_used_schemas()
columns = []
for database, schema in schemas:
relations = self.list_relations(database, schema)
for relation in relations:
def _get_catalog_for_relations(self, database: str, schema: str):
with self.connection_named(f'{database}.{schema}'):
columns = []
for relation in self.list_relations(database, schema):
logger.debug("Getting table schema for relation {}", relation)
columns.extend(
self._massage_column_for_catalog(col)
for col in self.get_columns_in_relation(relation)
)
return agate.Table.from_object(columns)

def _get_cache_schemas(self, manifest, exec_only=False):
info_schema_name_map = SchemaSearchMap()
for node in manifest.nodes.values():
if exec_only and node.resource_type not in NodeType.executable():
continue
relation = self.Relation.create(
database=node.database,
schema=node.schema,
identifier='information_schema',
quote_policy=self.config.quoting,
)
key = relation.information_schema_only()
info_schema_name_map[key] = {node.schema}
return info_schema_name_map

def _get_one_catalog(
self, information_schema, schemas, manifest,
) -> agate.Table:
name = f'{information_schema.database}.information_schema'

if len(schemas) != 1:
dbt.exceptions.raise_compiler_error(
'Expected only one schema in spark _get_one_catalog'
)

database = information_schema.database
schema = list(schemas)[0]

with self.connection_named(name):
columns = []
for relation in self.list_relations(database, schema):
logger.debug("Getting table schema for relation {}", relation)
columns.extend(
self._massage_column_for_catalog(col)
for col in self.get_columns_in_relation(relation)
)
return agate.Table.from_object(columns)

def check_schema_exists(self, database, schema):
results = self.execute_macro(
LIST_SCHEMAS_MACRO_NAME,
Expand Down

0 comments on commit 4ec7854

Please sign in to comment.