From b161238ff0448ccc6d22cbc57d48f4c0ceb2d84f Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Mon, 21 Oct 2024 15:42:16 -0700 Subject: [PATCH] updates to dynamic tables and roles --- load/dlt/loans/datacoves_snowflake.py | 29 ++++++ load/dlt/loans/loans_data.py | 59 ++++++++++++ secure/roles.yml | 8 +- secure/warehouses.yml | 6 ++ .../models/L1_inlets/loans/personal_loans.sql | 90 +++++++++---------- .../models/L1_inlets/loans/personal_loans.yml | 36 +++++--- .../loans_by_state__dynamic.sql | 6 +- visualize/streamlit/loans-example/example.sql | 25 ++++-- 8 files changed, 192 insertions(+), 67 deletions(-) create mode 100644 load/dlt/loans/datacoves_snowflake.py create mode 100755 load/dlt/loans/loans_data.py diff --git a/load/dlt/loans/datacoves_snowflake.py b/load/dlt/loans/datacoves_snowflake.py new file mode 100644 index 00000000..110c45e8 --- /dev/null +++ b/load/dlt/loans/datacoves_snowflake.py @@ -0,0 +1,29 @@ +import os +import dlt + +def set_config_value(key, config_key, env_var_prefix = 'DATACOVES__MAIN_LOAD__'): + + env_var = env_var_prefix + key.upper() + + value = os.getenv(env_var, dlt.config[config_key]) + + if key != 'password': + print(key + ": " +value) + return value + +config_keys = ["account", "database", "warehouse", "role", "user", "password"] + +db_config = {} +for key in config_keys: + config_key = "destination.snowflake.credentials." + key + + try: + dlt.config[config_key] + except dlt.common.configuration.exceptions.ConfigFieldMissingException: + dlt.config[config_key] = '' + + db_config[key] = set_config_value(key, config_key) + +# This is needed because by default dlt calls the snowflake account host +db_config['host'] = db_config['account'] +db_config['username'] = db_config['user'] diff --git a/load/dlt/loans/loans_data.py b/load/dlt/loans/loans_data.py new file mode 100755 index 00000000..944ee44d --- /dev/null +++ b/load/dlt/loans/loans_data.py @@ -0,0 +1,59 @@ +#!/usr/bin/env -S uv run --cache-dir /tmp/.uv_cache +# /// script +# dependencies = [ +# "dlt[snowflake, parquet]==1.1.0", +# "enlighten~=1.12.4", +# "psutil~=6.0.0", +# "pandas==2.2.2", +# ] +# /// +"""Loads a CSV file to Snowflake""" +import dlt +import pandas as pd +from datacoves_snowflake import db_config + +# a resource is the individual endpoints or tables +@dlt.resource(write_disposition="replace") +# method name = table name +def personal_loans(): + personal_loans = "https://datacoves-sample-data-public.s3.us-west-2.amazonaws.com/PERSONAL_LOANS.csv" + df = pd.read_csv(personal_loans) + yield df + +def zip_coordinates(): + zip_coordinates = "https://datacoves-sample-data-public.s3.us-west-2.amazonaws.com/ZIP_COORDINATES.csv" + df = pd.read_csv(zip_coordinates) + yield df + +# Source (corresponds to API or database) +@dlt.source +def personal_loans_source(): + return [personal_loans] + +@dlt.source +def zip_coordinates_source(): + return [zip_coordinates] + +if __name__ == "__main__": + datacoves_snowflake = dlt.destinations.snowflake( + db_config, + destination_name="datacoves_snowflake" + ) + + pipeline = dlt.pipeline( + progress = "enlighten", + pipeline_name = "loans", + destination = datacoves_snowflake, + pipelines_dir = "/tmp/", + + # dataset_name is the target schema name + dataset_name="loans" + ) + + load_info = pipeline.run(personal_loans()) + + print(load_info) + + load_info = pipeline.run(zip_coordinates()) + + print(load_info) diff --git a/secure/roles.yml b/secure/roles.yml index f1614512..7e7cf5df 100644 --- a/secure/roles.yml +++ b/secure/roles.yml @@ -39,6 +39,7 @@ - z_stage_resources_read - z_wh_transforming + - z_wh_transforming_dynamic_tables - analyst: member_of: @@ -83,6 +84,7 @@ - z_tables_views_general - z_wh_integration - z_wh_orchestrating + - z_wh_transforming_dynamic_tables - z_stage_dbt_artifacts_write owns: databases: @@ -118,7 +120,7 @@ - z_tables_views_general - z_policy_row_region_all - + ########################## # Global Roles ########################## @@ -224,6 +226,10 @@ warehouses: - wh_transforming +- z_wh_transforming_dynamic_tables: + warehouses: + - wh_transforming_dynamic_tables + - z_wh_orchestrating: warehouses: - wh_orchestrating diff --git a/secure/warehouses.yml b/secure/warehouses.yml index f9aaac66..1cc5b1c5 100644 --- a/secure/warehouses.yml +++ b/secure/warehouses.yml @@ -36,6 +36,12 @@ auto_resume: true initially_suspended: true +- wh_transforming_dynamic_tables: + size: x-small + auto_suspend: 60 + auto_resume: true + initially_suspended: true + - wh_transforming_sqlmesh: size: x-small auto_suspend: 60 diff --git a/transform/models/L1_inlets/loans/personal_loans.sql b/transform/models/L1_inlets/loans/personal_loans.sql index 9f87c7b1..5e21ba83 100644 --- a/transform/models/L1_inlets/loans/personal_loans.sql +++ b/transform/models/L1_inlets/loans/personal_loans.sql @@ -1,9 +1,8 @@ -{# {{ config( +{{ config( materialized = 'dynamic_table', - snowflake_warehouse = 'wh_transforming', + snowflake_warehouse = 'wh_transforming_dynamic_tables', target_lag = 'downstream', - persist_docs={"relation": false}, -) }} #} +) }} with raw_source as ( @@ -15,62 +14,61 @@ with raw_source as ( final as ( select - "TOTAL_ACC"::float as total_acc, - "ANNUAL_INC"::float as annual_inc, - "EMP_LENGTH"::varchar as emp_length, - "DESC"::varchar as desc, - "TOTAL_PYMNT"::float as total_pymnt, - "LAST_PYMNT_D"::varchar as last_pymnt_d, - "ADDR_STATE"::varchar as addr_state, - "NEXT_PYMNT_D"::varchar as next_pymnt_d, - "EMP_TITLE"::varchar as emp_title, - "COLLECTION_RECOVERY_FEE"::float as collection_recovery_fee, - "MTHS_SINCE_LAST_MAJOR_DEROG"::float as mths_since_last_major_derog, - "INQ_LAST_6MTHS"::float as inq_last_6mths, - "SUB_GRADE"::varchar as sub_grade, - "FUNDED_AMNT_INV"::float as funded_amnt_inv, - "DELINQ_2YRS"::float as delinq_2yrs, "LOAN_ID"::varchar as loan_id, - "FUNDED_AMNT"::float as funded_amnt, - "VERIFICATION_STATUS"::varchar as verification_status, - "DTI"::float as dti, - "TOTAL_REC_PRNCP"::float as total_rec_prncp, + "MEMBER_ID"::number as member_id, + "LOAN_AMNT"::number as loan_amnt, + "FUNDED_AMNT"::number as funded_amnt, + "FUNDED_AMNT_INV"::float as funded_amnt_inv, + "TERM"::varchar as term, + "INT_RATE"::float as int_rate, + "INSTALLMENT"::float as installment, "GRADE"::varchar as grade, + "SUB_GRADE"::varchar as sub_grade, + "EMP_TITLE"::varchar as emp_title, + "EMP_LENGTH"::varchar as emp_length, "HOME_OWNERSHIP"::varchar as home_ownership, + "ANNUAL_INC"::float as annual_inc, + "VERIFICATION_STATUS"::varchar as verification_status, "ISSUE_D"::varchar as issue_d, - "MTHS_SINCE_LAST_DELINQ"::float as mths_since_last_delinq, - "OUT_PRNCP"::float as out_prncp, - "PUB_REC"::float as pub_rec, - "INT_RATE"::float as int_rate, - "ZIP_CODE"::varchar as zip_code, - "OPEN_ACC"::float as open_acc, - "TERM"::varchar as term, + "LOAN_STATUS"::varchar as loan_status, "PYMNT_PLAN"::varchar as pymnt_plan, "URL"::varchar as url, - "REVOL_BAL"::float as revol_bal, - "RECOVERIES"::float as recoveries, - "LAST_PYMNT_AMNT"::float as last_pymnt_amnt, - "LOAN_AMNT"::float as loan_amnt, + "DESC"::varchar as desc, "PURPOSE"::varchar as purpose, - "INITIAL_LIST_STATUS"::varchar as initial_list_status, - "TOTAL_REC_INT"::float as total_rec_int, - "TOTAL_PYMNT_INV"::float as total_pymnt_inv, - "MTHS_SINCE_LAST_RECORD"::float as mths_since_last_record, - "LAST_CREDIT_PULL_D"::varchar as last_credit_pull_d, - "TOTAL_REC_LATE_FEE"::float as total_rec_late_fee, - "MEMBER_ID"::float as member_id, - "POLICY_CODE"::float as policy_code, "TITLE"::varchar as title, - "LOAN_STATUS"::varchar as loan_status, - "INSTALLMENT"::float as installment, + "ZIP_CODE"::varchar as zip_code, + "ADDR_STATE"::varchar as addr_state, + "DTI"::float as dti, + "DELINQ_2_YRS"::float as delinq_2_yrs, "EARLIEST_CR_LINE"::varchar as earliest_cr_line, + "INQ_LAST_6_MTHS"::float as inq_last_6_mths, + "MTHS_SINCE_LAST_DELINQ"::float as mths_since_last_delinq, + "MTHS_SINCE_LAST_RECORD"::float as mths_since_last_record, + "OPEN_ACC"::float as open_acc, + "PUB_REC"::float as pub_rec, + "REVOL_BAL"::number as revol_bal, "REVOL_UTIL"::varchar as revol_util, + "TOTAL_ACC"::float as total_acc, + "INITIAL_LIST_STATUS"::varchar as initial_list_status, + "OUT_PRNCP"::float as out_prncp, "OUT_PRNCP_INV"::float as out_prncp_inv, - "COLLECTIONS_12_MTHS_EX_MED"::float as collections_12_mths_ex_med + "TOTAL_PYMNT"::float as total_pymnt, + "TOTAL_PYMNT_INV"::float as total_pymnt_inv, + "TOTAL_REC_PRNCP"::float as total_rec_prncp, + "TOTAL_REC_INT"::float as total_rec_int, + "TOTAL_REC_LATE_FEE"::float as total_rec_late_fee, + "RECOVERIES"::float as recoveries, + "COLLECTION_RECOVERY_FEE"::float as collection_recovery_fee, + "LAST_PYMNT_D"::varchar as last_pymnt_d, + "LAST_PYMNT_AMNT"::float as last_pymnt_amnt, + "NEXT_PYMNT_D"::varchar as next_pymnt_d, + "LAST_CREDIT_PULL_D"::varchar as last_credit_pull_d, + "COLLECTIONS_12_MTHS_EX_MED"::float as collections_12_mths_ex_med, + "MTHS_SINCE_LAST_MAJOR_DEROG"::float as mths_since_last_major_derog, + "POLICY_CODE"::number as policy_code from raw_source ) select * from final -where addr_state = 'CA' diff --git a/transform/models/L1_inlets/loans/personal_loans.yml b/transform/models/L1_inlets/loans/personal_loans.yml index f447807d..ade7e199 100644 --- a/transform/models/L1_inlets/loans/personal_loans.yml +++ b/transform/models/L1_inlets/loans/personal_loans.yml @@ -17,11 +17,13 @@ models: meta: masking_policy: masking_policy_pii_float - name: collections_12_mths_ex_med - description: Number of collections in the last 12 months excluding medical collections + description: Number of collections in the last 12 months excluding medical + collections - name: collection_recovery_fee description: Post charge off collection fee - name: delinq_2yrs - description: The number of 30+ days past-due incidences of delinquency in the borrower's credit file for the past 2 years + description: The number of 30+ days past-due incidences of delinquency in + the borrower's credit file for the past 2 years - name: desc description: Loan description provided by the borrower meta: @@ -35,11 +37,13 @@ models: meta: masking_policy: masking_policy_pii_string - name: emp_title - description: The job title supplied by the borrower when applying for the loan + description: The job title supplied by the borrower when applying for the + loan - name: funded_amnt description: The total amount committed to that loan at that point in time - name: funded_amnt_inv - description: The total amount committed by investors for that loan at that point in time + description: The total amount committed by investors for that loan at that + point in time - name: grade description: LC assigned loan grade - name: home_ownership @@ -47,7 +51,8 @@ models: - name: initial_list_status description: The initial listing status of the loan - name: inq_last_6mths - description: The number of inquiries in past 6 months (excluding auto and mortgage inquiries) + description: The number of inquiries in past 6 months (excluding auto and + mortgage inquiries) - name: installment description: The monthly payment owed by the borrower if the loan originates - name: int_rate @@ -81,7 +86,8 @@ models: - name: out_prncp description: Remaining outstanding principal for total amount funded - name: out_prncp_inv - description: Remaining outstanding principal for portion of total amount funded by investors + description: Remaining outstanding principal for portion of total amount funded + by investors - name: policy_code description: Publicly available - name: pub_rec @@ -95,21 +101,25 @@ models: - name: revol_bal description: Total credit revolving balance - name: revol_util - description: Revolving line utilization rate, or the amount of credit the borrower is using relative to all available revolving credit + description: Revolving line utilization rate, or the amount of credit the + borrower is using relative to all available revolving credit - name: sub_grade description: LC assigned loan subgrade - name: term - description: The number of payments on the loan. Values are in months and can be either 36 or 60 + description: The number of payments on the loan. Values are in months and + can be either 36 or 60 - name: title description: The loan title provided by the borrower - name: total_acc - description: The total num(#) of credit lines currently in the borrower's credit file + description: The total num(#) of credit lines currently in the borrower's + credit file - name: total_pymnt description: Payments received to date for total amount funded meta: masking_policy: masking_policy_pii_float - name: total_pymnt_inv - description: Payments received to date for portion of total amount funded by investors + description: Payments received to date for portion of total amount funded + by investors - name: total_rec_int description: Interest received to date - name: total_rec_late_fee @@ -119,6 +129,8 @@ models: - name: url description: URL for the LC page with listing data - name: verification_status - description: Indicates if income was verified by LC, not verified, or if the income source was verified + description: Indicates if income was verified by LC, not verified, or if the + income source was verified - name: zip_code - description: The first 3 numbers of the zip code provided by the borrower in the loan application + description: The first 3 numbers of the zip code provided by the borrower + in the loan application diff --git a/transform/models/L3_coves/loan_analytics/loans_by_state__dynamic.sql b/transform/models/L3_coves/loan_analytics/loans_by_state__dynamic.sql index dc54fc8a..4c7babab 100644 --- a/transform/models/L3_coves/loan_analytics/loans_by_state__dynamic.sql +++ b/transform/models/L3_coves/loan_analytics/loans_by_state__dynamic.sql @@ -1,8 +1,8 @@ -{# {{ config( +{{ config( materialized = 'dynamic_table', - snowflake_warehouse = 'wh_transforming', + snowflake_warehouse = 'wh_transforming_dynamic_tables', target_lag = '1 minute', -) }} #} +) }} select personal_loans.addr_state as state, diff --git a/visualize/streamlit/loans-example/example.sql b/visualize/streamlit/loans-example/example.sql index 2b96c37b..2909e00b 100644 --- a/visualize/streamlit/loans-example/example.sql +++ b/visualize/streamlit/loans-example/example.sql @@ -1,3 +1,16 @@ +/* +/config/workspace/visualize/streamlit/start_app.sh + +cd $DATACOVES__REPO_PATH/load/dlt +./loans/loans_data.py + +https://app.snowflake.com/datacoves/main/#/data/databases/BALBOA/schemas/L3_LOAN_ANALYTICS/dynamic-table/LOANS_BY_STATE__DYNAMIC + +dlt pipeline loans_data show + +cd $DATACOVES__REPO_PATH/transform +*/ + -- These are useful queries to run for demo purposes use role loader; @@ -6,19 +19,16 @@ use warehouse wh_loading; -- source table needs to have change tracking enabled alter table RAW.LOANS.PERSONAL_LOANS set CHANGE_TRACKING = true; --- see the rows in a table -select count(*) -from RAW.LOANS.PERSONAL_LOANS; - -- delete records from a table delete from RAW.LOANS.PERSONAL_LOANS where left(addr_state, 1)> 'A'; +-- see the rows in a table select count(*) from RAW.LOANS.PERSONAL_LOANS; - +select distinct addr_state from raw.loans.personal_loans limit 10; -- dropping dymanic table use role analyst; @@ -29,6 +39,11 @@ drop dynamic table balboa_dev.gomezn.loans_by_state__standard; drop schema balboa_dev.fivetran_centre_straighten_staging; ------ +use warehouse wh_orchestrating; +select distinct addr_state from balboa.l1_loans.personal_loans; +drop table balboa.l1_loans.personal_loans; +drop table balboa.l3_loan_analytics.loans_by_state__dynamic; + -- Creating Streamlit App use role transformer_dbt;