Skip to content

Commit

Permalink
Merge pull request #33 from rtdtwo/patch-1
Browse files Browse the repository at this point in the history
Mismatched Keys: Update read_dlt_delta() with key "source_database" instead of "database"
  • Loading branch information
ravi-databricks authored Apr 15, 2024
2 parents 423a776 + efae9df commit 3555aaa
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 4 deletions.
4 changes: 2 additions & 2 deletions src/pipeline_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ def read_dlt_delta(spark, bronze_dataflow_spec) -> DataFrame:
if reader_config_options and len(reader_config_options) > 0:
return (
spark.readStream.options(**reader_config_options).table(
f"""{bronze_dataflow_spec.sourceDetails["database"]}
f"""{bronze_dataflow_spec.sourceDetails["source_database"]}
.{bronze_dataflow_spec.sourceDetails["table"]}"""
)
)
else:
return (
spark.readStream.table(
f"""{bronze_dataflow_spec.sourceDetails["database"]}
f"""{bronze_dataflow_spec.sourceDetails["source_database"]}
.{bronze_dataflow_spec.sourceDetails["table"]}"""
)
)
Expand Down
4 changes: 2 additions & 2 deletions tests/test_pipeline_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def test_read_delta_positive(self):
full_path = os.path.abspath("tests/resources/delta/customers")
self.spark.sql(f"CREATE TABLE if not exists source_bronze.customer USING DELTA LOCATION '{full_path}' ")

source_details_map = {"sourceDetails": {"database": "source_bronze", "table": "customer"}}
source_details_map = {"sourceDetails": {"source_database": "source_bronze", "table": "customer"}}

bronze_map.update(source_details_map)
bronze_dataflow_spec = BronzeDataflowSpec(**bronze_map)
Expand All @@ -227,7 +227,7 @@ def test_read_delta_with_read_config_positive(self):
self.spark.sql("CREATE DATABASE IF NOT EXISTS source_bronze")
full_path = os.path.abspath("tests/resources/delta/customers")
self.spark.sql(f"CREATE TABLE if not exists source_bronze.customer USING DELTA LOCATION '{full_path}' ")
source_details_map = {"sourceDetails": {"database": "source_bronze", "table": "customer"}}
source_details_map = {"sourceDetails": {"source_database": "source_bronze", "table": "customer"}}
bronze_map.update(source_details_map)
reader_config = {"readerConfigOptions": {"maxFilesPerTrigger": "1"}}
bronze_map.update(reader_config)
Expand Down

0 comments on commit 3555aaa

Please sign in to comment.