Skip to content

Commit

Permalink
Fixed issue
Browse files Browse the repository at this point in the history
  • Loading branch information
rahulgoyal2987 committed Dec 9, 2024
1 parent 0ccce2f commit 107769d
Showing 1 changed file with 59 additions and 89 deletions.
148 changes: 59 additions & 89 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,107 +171,77 @@ def _get_relation_information(self, row: "agate.Row") -> RelationInfo:

return _schema, name, information

def _get_relation_information_using_describe(self, row: "agate.Row") -> RelationInfo:
"""Relation info fetched using SHOW TABLES and an auxiliary DESCRIBE statement"""
try:
_schema, name, _ = row
except ValueError:
raise DbtRuntimeError(
f'Invalid value from "show tables ...", got {len(row)} values, expected 3'
)

table_name = f"{_schema}.{name}"
def list_relations_without_caching(self, schema_relation: BaseRelation) -> List[BaseRelation]:
"""Distinct Spark compute engines may not support the same SQL featureset. Thus, we must
try different methods to fetch relation information."""
try:
table_results = self.execute_macro(
DESCRIBE_TABLE_EXTENDED_MACRO_NAME, kwargs={"table_name": table_name}
relations = []
kwargs = {"schema_relation": schema_relation}
# Iceberg behavior: 3-row result of relations obtained
show_table_rows = self.execute_macro(
LIST_RELATIONS_SHOW_TABLES_MACRO_NAME, kwargs=kwargs
)
except DbtRuntimeError as e:
logger.debug(f"Error while retrieving information about {table_name}: {e.msg}")
table_results = AttrDict()
for row in show_table_rows:
_schema, name, _ = row
information = ""

rel_type: RelationType = (
RelationType.View if "Type: VIEW" in information else RelationType.Table
)
is_delta: bool = "Provider: delta" in information
is_hudi: bool = "Provider: hudi" in information
is_iceberg: bool = "Provider: iceberg" in information

relation: BaseRelation = self.Relation.create(
schema=_schema,
identifier=name,
type=rel_type,
information=information,
is_delta=is_delta,
is_iceberg=is_iceberg,
is_hudi=is_hudi,
)
relations.append(relation)
return relations
except DbtRuntimeError as e:
description = "Error while retrieving information about"
logger.debug(f"{description} {schema_relation}: {e.msg}")
return []

def set_relation_information(self, relation: BaseRelation):
if relation.information:
return relation
rows: List[agate.Row] = super().get_columns_in_relation(relation)
information = ""
for info_row in table_results:
info_type, info_value, _ = info_row
for info_row in rows:
info_type, info_value, _ = info_row.values()
if not info_type.startswith("#"):
information += f"{info_type}: {info_value}\n"
rel_type: RelationType = (
RelationType.View if "Type: VIEW" in information else RelationType.Table
)
is_delta: bool = "Provider: delta" in information
is_hudi: bool = "Provider: hudi" in information
is_iceberg: bool = "Provider: iceberg" in information
relation: BaseRelation = self.Relation.create(
schema=relation.schema,
identifier=relation.identifier,
type=rel_type,
information=information,
is_delta=is_delta,
is_iceberg=is_iceberg,
is_hudi=is_hudi,
)
return relation

return _schema, name, information

def _build_spark_relation_list(
self,
row_list: "agate.Table",
relation_info_func: Callable[["agate.Row"], RelationInfo],
) -> List[BaseRelation]:
"""Aggregate relations with format metadata included."""
relations = []
for row in row_list:
_schema, name, information = relation_info_func(row)

rel_type: RelationType = (
RelationType.View if "Type: VIEW" in information else RelationType.Table
)
is_delta: bool = "Provider: delta" in information
is_hudi: bool = "Provider: hudi" in information
is_iceberg: bool = "Provider: iceberg" in information

relation: BaseRelation = self.Relation.create(
schema=_schema,
identifier=name,
type=rel_type,
information=information,
is_delta=is_delta,
is_iceberg=is_iceberg,
is_hudi=is_hudi,
)
relations.append(relation)

return relations

def list_relations_without_caching(self, schema_relation: BaseRelation) -> List[BaseRelation]:
"""Distinct Spark compute engines may not support the same SQL featureset. Thus, we must
try different methods to fetch relation information."""

kwargs = {"schema_relation": schema_relation}

try:
# Default compute engine behavior: show tables extended
show_table_extended_rows = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
return self._build_spark_relation_list(
row_list=show_table_extended_rows,
relation_info_func=self._get_relation_information,
)
except DbtRuntimeError as e:
errmsg = getattr(e, "msg", "")
if f"Database '{schema_relation}' not found" in errmsg:
return []
# Iceberg compute engine behavior: show table
elif "SHOW TABLE EXTENDED is not supported for v2 tables" in errmsg:
# this happens with spark-iceberg with v2 iceberg tables
# https://issues.apache.org/jira/browse/SPARK-33393
try:
# Iceberg behavior: 3-row result of relations obtained
show_table_rows = self.execute_macro(
LIST_RELATIONS_SHOW_TABLES_MACRO_NAME, kwargs=kwargs
)
return self._build_spark_relation_list(
row_list=show_table_rows,
relation_info_func=self._get_relation_information_using_describe,
)
except DbtRuntimeError as e:
description = "Error while retrieving information about"
logger.debug(f"{description} {schema_relation}: {e.msg}")
return []
else:
logger.debug(
f"Error while retrieving information about {schema_relation}: {errmsg}"
)
return []

def get_relation(self, database: str, schema: str, identifier: str) -> Optional[BaseRelation]:
if not self.Relation.get_default_include_policy().database:
database = None # type: ignore

return super().get_relation(database, schema, identifier)
relation = super().get_relation(database, schema, identifier)
self.set_relation_information(relation) if relation else None

def parse_describe_extended(
self, relation: BaseRelation, raw_rows: AttrDict
Expand Down Expand Up @@ -549,4 +519,4 @@ def debug_query(self) -> None:
diff_count.num_missing as num_mismatched
from row_count_diff
cross join diff_count
""".strip()
""".strip()

0 comments on commit 107769d

Please sign in to comment.