From edc17a8f06909093488cebadf7b61b8c31f395d8 Mon Sep 17 00:00:00 2001 From: Rishabh Tatiraju Date: Fri, 19 Jan 2024 12:01:14 -0500 Subject: [PATCH 1/2] Update read_dlt_delta() to read "source_database" instead of "database" onboard_dataflowspec.py sets key "source_database" when creating bronze dataflow spec dataframe inside source details. However read_dlt_delta tries to read "database" instead. --- src/pipeline_readers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pipeline_readers.py b/src/pipeline_readers.py index 1bb7097..4874e68 100644 --- a/src/pipeline_readers.py +++ b/src/pipeline_readers.py @@ -62,14 +62,14 @@ def read_dlt_delta(spark, bronze_dataflow_spec) -> DataFrame: if reader_config_options and len(reader_config_options) > 0: return ( spark.readStream.options(**reader_config_options).table( - f"""{bronze_dataflow_spec.sourceDetails["database"]} + f"""{bronze_dataflow_spec.sourceDetails["source_database"]} .{bronze_dataflow_spec.sourceDetails["table"]}""" ) ) else: return ( spark.readStream.table( - f"""{bronze_dataflow_spec.sourceDetails["database"]} + f"""{bronze_dataflow_spec.sourceDetails["source_database"]} .{bronze_dataflow_spec.sourceDetails["table"]}""" ) ) From efae9df3181772c595dacbb77cc28980174bc789 Mon Sep 17 00:00:00 2001 From: Ravi Gawai <37003292+ravi-databricks@users.noreply.github.com> Date: Mon, 15 Apr 2024 11:17:55 -0700 Subject: [PATCH 2/2] Update test_pipeline_readers.py fixed unit tests for changing keys in delta pipeline reader --- tests/test_pipeline_readers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_pipeline_readers.py b/tests/test_pipeline_readers.py index 63784d4..4ae4690 100644 --- a/tests/test_pipeline_readers.py +++ b/tests/test_pipeline_readers.py @@ -212,7 +212,7 @@ def test_read_delta_positive(self): full_path = os.path.abspath("tests/resources/delta/customers") self.spark.sql(f"CREATE TABLE if not exists source_bronze.customer USING DELTA LOCATION '{full_path}' ") - source_details_map = {"sourceDetails": {"database": "source_bronze", "table": "customer"}} + source_details_map = {"sourceDetails": {"source_database": "source_bronze", "table": "customer"}} bronze_map.update(source_details_map) bronze_dataflow_spec = BronzeDataflowSpec(**bronze_map) @@ -227,7 +227,7 @@ def test_read_delta_with_read_config_positive(self): self.spark.sql("CREATE DATABASE IF NOT EXISTS source_bronze") full_path = os.path.abspath("tests/resources/delta/customers") self.spark.sql(f"CREATE TABLE if not exists source_bronze.customer USING DELTA LOCATION '{full_path}' ") - source_details_map = {"sourceDetails": {"database": "source_bronze", "table": "customer"}} + source_details_map = {"sourceDetails": {"source_database": "source_bronze", "table": "customer"}} bronze_map.update(source_details_map) reader_config = {"readerConfigOptions": {"maxFilesPerTrigger": "1"}} bronze_map.update(reader_config)