Skip to content

Commit

Permalink
37 migrate to create streaming table api from create streaming live t…
Browse files Browse the repository at this point in the history
…able (#39)
  • Loading branch information
ravi-databricks authored Mar 19, 2024
1 parent 4139c7f commit 077c71d
Show file tree
Hide file tree
Showing 31 changed files with 251 additions and 41 deletions.
1 change: 1 addition & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ omit =
*/site-packages/*
tests/*
src/install.py
src/uninstall.py
src/config.py
src/cli.py

Expand Down
5 changes: 5 additions & 0 deletions demo/conf/dqe/customers_silver_dqe.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"expect_or_drop": {
"valid_customer_id": "customer_id IS NOT NULL"
}
}
5 changes: 5 additions & 0 deletions demo/conf/dqe/products_silver_dqe.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"expect_or_drop": {
"valid_product_id": "product_id IS NOT NULL"
}
}
5 changes: 5 additions & 0 deletions demo/conf/dqe/stores_silver_dqe.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"expect_or_drop": {
"valid_store_id": "store_id IS NOT NULL"
}
}
6 changes: 6 additions & 0 deletions demo/conf/dqe/transactions_silver_dqe.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"expect_or_drop": {
"valid_transaction_id": "transaction_id IS NOT NULL",
"valid_customer_id": "customer_id IS NOT NULL"
}
}
13 changes: 9 additions & 4 deletions demo/conf/onboarding.template
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
"_rescued_data"
]
},
"silver_transformation_json_prod": "{dbfs_path}/demo/conf/silver_transformations.json"
"silver_transformation_json_prod": "{dbfs_path}/demo/conf/silver_transformations.json",
"silver_data_quality_expectations_json_prod": "{dbfs_path}/demo/conf/dqe/customers_silver_dqe.json"

},
{
"data_flow_id": "101",
Expand Down Expand Up @@ -80,7 +82,8 @@
]
},
"silver_table_path_prod": "{dbfs_path}/demo/resources/data/silver/transactions",
"silver_transformation_json_prod": "{dbfs_path}/demo/conf/silver_transformations.json"
"silver_transformation_json_prod": "{dbfs_path}/demo/conf/silver_transformations.json",
"silver_data_quality_expectations_json_prod": "{dbfs_path}/demo/conf/dqe/transactions_silver_dqe.json"
},
{
"data_flow_id": "103",
Expand Down Expand Up @@ -123,7 +126,8 @@
]
},
"silver_table_path_prod": "{dbfs_path}/demo/resources/data/silver/products",
"silver_transformation_json_prod": "{dbfs_path}/demo/conf/silver_transformations.json"
"silver_transformation_json_prod": "{dbfs_path}/demo/conf/silver_transformations.json",
"silver_data_quality_expectations_json_prod": "{dbfs_path}/demo/conf/dqe/products_silver_dqe.json"
},
{
"data_flow_id": "104",
Expand Down Expand Up @@ -166,6 +170,7 @@
]
},
"silver_table_path_prod": "{dbfs_path}/demo/resources/data/silver/stores",
"silver_transformation_json_prod": "{dbfs_path}/demo/conf/silver_transformations.json"
"silver_transformation_json_prod": "{dbfs_path}/demo/conf/silver_transformations.json",
"silver_data_quality_expectations_json_prod": "{dbfs_path}/demo/conf/dqe/stores_silver_dqe.json"
}
]
Binary file modified demo/dbc/tech_summit_dlt_meta_runners.dbc
Binary file not shown.
2 changes: 1 addition & 1 deletion demo/launch_techsummit_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def create_techsummit_demo_workflow(self, runner_conf: TechsummitRunnerConf):
"""
database, dlt_lib = self.init_db_dltlib(runner_conf)
return self.ws.jobs.create(
name=f"dlt-meta-dais-demo-{runner_conf.run_id}",
name=f"dlt-meta-techsummit-demo-{runner_conf.run_id}",
tasks=[
jobs.Task(
task_key="generate_data",
Expand Down
4 changes: 2 additions & 2 deletions docs/content/getting_started/metadatapreperation.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ draft: false
| silver_table_path_{env} | Silver table storage path. |
| silver_table_properties | DLT table properties map. e.g. `{"pipelines.autoOptimize.managed": "false" , "pipelines.autoOptimize.zOrderCols": "year,month", "pipelines.reset.allowed": "false"}` |
| silver_transformation_json | Silver table sql transformation json path |

| silver_data_quality_expectations_json_{env} | Silver table data quality expectations json file path

### Data Quality Rules File Structure
| Field | Description |
| :-----------: | :----------- |
| expect | Specify multiple data quality sql for each field when records that fail validation should be included in the target dataset|
| expect_or_fail | Specify multiple data quality sql for each field when records that fail validation should halt pipeline execution |
| expect_or_drop | Specify multiple data quality sql for each field when records that fail validation should be dropped from the target dataset |
| expect_or_quarantine | Specify multiple data quality sql for each field when records that fails validation will be dropped from main table and inserted into quarantine table specified in dataflowspec |
| expect_or_quarantine | Specify multiple data quality sql for each field when records that fails validation will be dropped from main table and inserted into quarantine table specified in dataflowspec (only applicable for Bronze layer) |

### Silver transformation File Structure
| Field | Description |
Expand Down
4 changes: 3 additions & 1 deletion integration_tests/conf/cloudfiles-onboarding.template
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
"silver_table_properties": {
"pipelines.reset.allowed": "false",
"pipelines.autoOptimize.zOrderCols": "id, email"
}
},
"silver_data_quality_expectations_json_it": "{dbfs_path}/integration_tests/conf/dqe/customers/silver_data_quality_expectations.json"
},
{
"data_flow_id": "101",
Expand Down Expand Up @@ -98,6 +99,7 @@
},
"silver_table_path_it": "{dbfs_path}/data/silver/transactions",
"silver_transformation_json_it": "{dbfs_path}/integration_tests/conf/silver_transformations.json",
"silver_data_quality_expectations_json_it": "{dbfs_path}/integration_tests/conf/dqe/transactions/silver_data_quality_expectations.json",
"silver_table_properties": {
"pipelines.reset.allowed": "false",
"pipelines.autoOptimize.zOrderCols": "id, customer_id"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"expect_all_or_drop": {
"valid_customer_id": "id IS NOT NULL"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"expect_all_or_drop": {
"valid_device_id": "device_id IS NOT NULL"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"expect_all_or_drop": {
"valid_customer_id": "id IS NOT NULL"
}
}
12 changes: 10 additions & 2 deletions integration_tests/run_integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,15 @@ def init_dltmeta_runner_conf(self, runner_conf: DLTMetaRunnerConf):
self.generate_onboarding_file(runner_conf)
print("int_tests_dir: ", runner_conf.int_tests_dir)
print(f"uploading to {runner_conf.dbfs_tmp_path}/{self.base_dir}/")
self.ws.dbfs.create(path=runner_conf.dbfs_tmp_path + f"/{self.base_dir}/", overwrite=True)
if runner_conf.uc_catalog_name:
self.ws.dbfs.create(path=runner_conf.dbfs_tmp_path + f"/{self.base_dir}/", overwrite=True)
else:
try:
self.ws.dbfs.mkdirs(runner_conf.dbfs_tmp_path + f"/{self.base_dir}/")
except Exception as e:
print(f"Error in creating directory {runner_conf.dbfs_tmp_path + f'/{self.base_dir}/'}")
print(e)
print(runner_conf.dbfs_tmp_path + f"/{self.base_dir}/ must be already present")
self.ws.dbfs.copy(runner_conf.int_tests_dir,
runner_conf.dbfs_tmp_path + f"/{self.base_dir}/",
overwrite=True, recursive=True)
Expand Down Expand Up @@ -726,7 +734,7 @@ def run(self, runner_conf: DLTMetaRunnerConf):
print(e)
finally:
print("Cleaning up...")
self.clean_up(runner_conf)
# self.clean_up(runner_conf)

def download_test_results(self, runner_conf: DLTMetaRunnerConf):
ws_output_file = self.ws.workspace.download(runner_conf.test_output_file_path)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"""
setup(
name="dlt_meta",
version="0.0.5",
version="0.0.6",
python_requires=">=3.8",
setup_requires=["wheel>=0.37.1,<=0.42.0"],
install_requires=INSTALL_REQUIRES,
Expand Down
29 changes: 28 additions & 1 deletion src/dataflow_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,18 @@ def write_bronze_with_dqe(self):
dlt_table_with_expectation = None
expect_or_quarantine_dict = None

if "expect_all" in data_quality_expectations_json:
expect_dict = data_quality_expectations_json["expect_all"]
if "expect" in data_quality_expectations_json:
expect_dict = data_quality_expectations_json["expect"]
if "expect_or_fail" in data_quality_expectations_json:
expect_or_fail_dict = data_quality_expectations_json["expect_or_fail"]
if "expect_all_or_fail" in data_quality_expectations_json:
expect_or_fail_dict = data_quality_expectations_json["expect_all_or_fail"]
if "expect_or_drop" in data_quality_expectations_json:
expect_or_drop_dict = data_quality_expectations_json["expect_or_drop"]
if "expect_all_or_drop" in data_quality_expectations_json:
expect_or_drop_dict = data_quality_expectations_json["expect_all_or_drop"]
if "expect_or_quarantine" in data_quality_expectations_json:
expect_or_quarantine_dict = data_quality_expectations_json["expect_or_quarantine"]
if bronzeDataflowSpec.cdcApplyChanges:
Expand Down Expand Up @@ -308,12 +314,33 @@ def cdc_apply_changes(self):

target_path = None if self.uc_enabled else self.dataflowSpec.targetDetails["path"]

dlt.create_streaming_live_table(
expect_all_dict = None
expect_all_or_drop_dict = None
expect_all_or_fail_dict = None
if self.table_has_expectations():
data_quality_expectations_json = json.loads(self.dataflowSpec.dataQualityExpectations)
if "expect_all" in data_quality_expectations_json:
expect_all_dict = data_quality_expectations_json["expect_all"]
if "expect" in data_quality_expectations_json:
expect_all_dict = data_quality_expectations_json["expect"]
if "expect_all_or_drop" in data_quality_expectations_json:
expect_all_or_drop_dict = data_quality_expectations_json["expect_all_or_drop"]
if "expect_or_drop" in data_quality_expectations_json:
expect_all_or_drop_dict = data_quality_expectations_json["expect_or_drop"]
if "expect_all_or_fail" in data_quality_expectations_json:
expect_all_or_fail_dict = data_quality_expectations_json["expect_all_or_fail"]
if "expect_or_fail" in data_quality_expectations_json:
expect_all_or_fail_dict = data_quality_expectations_json["expect_or_fail"]

dlt.create_streaming_table(
name=f"{self.dataflowSpec.targetDetails['table']}",
table_properties=self.dataflowSpec.tableProperties,
partition_cols=DataflowSpecUtils.get_partition_cols(self.dataflowSpec.partitionColumns),
path=target_path,
schema=struct_schema,
expect_all=expect_all_dict,
expect_all_or_drop=expect_all_or_drop_dict,
expect_all_or_fail=expect_all_or_fail_dict,
)

apply_as_deletes = None
Expand Down
1 change: 1 addition & 0 deletions src/dataflow_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class SilverDataflowSpec:
whereClause: list
partitionColumns: list
cdcApplyChanges: str
dataQualityExpectations: str
version: str
createDate: datetime
createdBy: str
Expand Down
28 changes: 19 additions & 9 deletions src/onboard_dataflowspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,11 @@ def onboard_silver_dataflow_spec(self):

if dict_obj["overwrite"] == "True":
if self.uc_enabled:
silver_dataflow_spec_df.write.format("delta").mode("overwrite").saveAsTable(f"{database}.{table}")
(silver_dataflow_spec_df.write.format("delta").mode("overwrite").option("mergeSchema", "true")
.saveAsTable(f"{database}.{table}")
)
else:
silver_dataflow_spec_df.write.mode("overwrite").format("delta").save(
silver_dataflow_spec_df.write.mode("overwrite").format("delta").option("mergeSchema", "true").save(
dict_obj["silver_dataflowspec_path"]
)
else:
Expand Down Expand Up @@ -301,13 +303,13 @@ def onboard_bronze_dataflow_spec(self):
table = dict_obj["bronze_dataflowspec_table"]
if dict_obj["overwrite"] == "True":
if self.uc_enabled:
bronze_dataflow_spec_df.write.format("delta").mode("overwrite").saveAsTable(
f"{database}.{table}"
)
(bronze_dataflow_spec_df.write.format("delta").mode("overwrite").option("mergeSchema", "true")
.saveAsTable(f"{database}.{table}")
)
else:
bronze_dataflow_spec_df.write.mode("overwrite").format("delta").save(
path=dict_obj["bronze_dataflowspec_path"]
)
(bronze_dataflow_spec_df.write.mode("overwrite").format("delta").option("mergeSchema", "true")
.save(path=dict_obj["bronze_dataflowspec_path"])
)
else:
if self.uc_enabled:
original_dataflow_df = self.spark.read.format("delta").table(f"{database}.{table}")
Expand Down Expand Up @@ -615,6 +617,7 @@ def __get_silver_dataflow_spec_dataframe(self, onboarding_df, env):
"tableProperties",
"partitionColumns",
"cdcApplyChanges",
"dataQualityExpectations"
]
data_flow_spec_schema = StructType(
[
Expand All @@ -632,6 +635,7 @@ def __get_silver_dataflow_spec_dataframe(self, onboarding_df, env):
StructField("tableProperties", MapType(StringType(), StringType(), True), True),
StructField("partitionColumns", ArrayType(StringType(), True), True),
StructField("cdcApplyChanges", StringType(), True),
StructField("dataQualityExpectations", StringType(), True)
]
)
data = []
Expand Down Expand Up @@ -679,7 +683,12 @@ def __get_silver_dataflow_spec_dataframe(self, onboarding_df, env):
silver_cdc_apply_changes_row = onboarding_row["silver_cdc_apply_changes"]
if self.onboard_file_type == "json":
silver_cdc_apply_changes = json.dumps(self.__delete_none(silver_cdc_apply_changes_row.asDict()))

data_quality_expectations = None
if f"silver_data_quality_expectations_json_{env}" in onboarding_row:
silver_data_quality_expectations_json = onboarding_row[f"silver_data_quality_expectations_json_{env}"]
if silver_data_quality_expectations_json:
data_quality_expectations = (
self.__get_data_quality_expecations(silver_data_quality_expectations_json))
silver_row = (
silver_data_flow_spec_id,
silver_data_flow_spec_group,
Expand All @@ -691,6 +700,7 @@ def __get_silver_dataflow_spec_dataframe(self, onboarding_df, env):
silver_table_properties,
silver_parition_columns,
silver_cdc_apply_changes,
data_quality_expectations
)
data.append(silver_row)
logger.info(f"silver_data ==== {data}")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"expect_or_drop":{
"no_rescued_data":"_rescued_data IS NULL",
"valid_id":"id IS NOT NULL",
"valid_operation":"operation IN ('APPEND', 'DELETE', 'UPDATE')"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"expect": {
"no_rescued_data": "_rescued_data IS NULL",
"valid_id": "id IS NOT NULL"
},
"expect_or_quarantine": {
"valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"
}
}
3 changes: 3 additions & 0 deletions tests/resources/dqe/product/products.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
"expect":{
"valid_product_id": "product_id IS NOT NULL"
},
"expect_all":{
"valid_product_id": "product_id IS NOT NULL"
},
"expect_or_fail":{
"no_rescued_data": "_rescued_data IS NULL",
"valid_product_id": "product_id IS NOT NULL"
Expand Down
27 changes: 27 additions & 0 deletions tests/resources/dqe/product/silver_data_quality_expectations.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"expect":{
"valid_product_id": "product_id IS NOT NULL"
},
"expect_all":{
"valid_product_id": "product_id IS NOT NULL"
},
"expect_all_or_fail":{
"no_rescued_data": "_rescued_data IS NULL",
"valid_product_id": "product_id IS NOT NULL"
},
"expect_or_fail":{
"no_rescued_data": "_rescued_data IS NULL",
"valid_product_id": "product_id IS NOT NULL"
},
"expect_all_or_drop": {
"no_rescued_data": "_rescued_data IS NULL",
"valid_product_id": "product_id IS NOT NULL"
},
"expect_or_drop": {
"no_rescued_data": "_rescued_data IS NULL",
"valid_product_id": "product_id IS NOT NULL"
},
"expect_or_quarantine": {
"quarantine_rule": "_rescued_data IS NOT NULL OR product_id IS NULL"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"expect_or_drop": {
"no_rescued_data": "_rescued_data IS NULL",
"valid_store_id": "store_id IS NOT NULL"
},
"expect_or_quarantine": {
"quarantine_rule": "_rescued_data IS NOT NULL OR store_id IS NULL"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"expect": {
"no_rescued_data": "_rescued_data IS NULL",
"valid_id": "id IS NOT NULL"
},
"expect_or_quarantine": {
"valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"
}
}
Loading

0 comments on commit 077c71d

Please sign in to comment.