From 647e91a3bf430b2432da32b97f12379d764b4feb Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Thu, 26 Sep 2024 14:25:26 -0700 Subject: [PATCH 01/25] update dlt script --- load/dlt/.dlt/.sources | 24 ---- load/dlt/csv_to_snowflake/load_csv_data.py | 10 ++ .../dlt/mysql_to_snowflake/load_mysql_data.py | 26 ---- .../mysql_to_snowflake/sql_database/README.md | 56 -------- .../sql_database/__init__.py | 97 ------------- .../sql_database/helpers.py | 132 ------------------ .../sql_database/settings.py | 3 - 7 files changed, 10 insertions(+), 338 deletions(-) delete mode 100644 load/dlt/.dlt/.sources mode change 100644 => 100755 load/dlt/csv_to_snowflake/load_csv_data.py delete mode 100644 load/dlt/mysql_to_snowflake/load_mysql_data.py delete mode 100644 load/dlt/mysql_to_snowflake/sql_database/README.md delete mode 100644 load/dlt/mysql_to_snowflake/sql_database/__init__.py delete mode 100644 load/dlt/mysql_to_snowflake/sql_database/helpers.py delete mode 100644 load/dlt/mysql_to_snowflake/sql_database/settings.py diff --git a/load/dlt/.dlt/.sources b/load/dlt/.dlt/.sources deleted file mode 100644 index 00b88cdc..00000000 --- a/load/dlt/.dlt/.sources +++ /dev/null @@ -1,24 +0,0 @@ -engine_version: 1 -sources: - sql_database: - is_dirty: false - last_commit_sha: c412cf871b440d21d6548b499df1c96301e56f39 - last_commit_timestamp: '2023-10-10T15:17:19+02:00' - files: - sql_database/__init__.py: - commit_sha: c412cf871b440d21d6548b499df1c96301e56f39 - git_sha: 49a498a8f0d636473a1f8bd7c093109ea09c0bd2 - sha3_256: a950fa88a86fed3e8ce109e5c8bd71e1ae50a92decd7bf7ab4d13334f40b82c4 - sql_database/helpers.py: - commit_sha: c412cf871b440d21d6548b499df1c96301e56f39 - git_sha: b71fd379bf7f478e3b4e2f7c3021bf624c29631a - sha3_256: c0458939b7a3aaaeab1f6ee0d141b32f83ce5a1a19c6f628fd4d0dba1e1eac0c - sql_database/settings.py: - commit_sha: c412cf871b440d21d6548b499df1c96301e56f39 - git_sha: f1aa2d4b264019aff29a4303dca3ca87a683bbe4 - sha3_256: c78809df0ad03ffe5a2aed03348b3a58d80e8f69a2df2ec892de4480265956a1 - sql_database/README.md: - commit_sha: c412cf871b440d21d6548b499df1c96301e56f39 - git_sha: a1990321a937e717f53590d1f3d59ac775621c50 - sha3_256: 955e751e0c482bc314075a2bb274fb329a1cbdd9cfe1d34a02ae60e5a17253f5 - dlt_version_constraint: <0.4,>=0.3.5 diff --git a/load/dlt/csv_to_snowflake/load_csv_data.py b/load/dlt/csv_to_snowflake/load_csv_data.py old mode 100644 new mode 100755 index 3130ffe8..fd8d2569 --- a/load/dlt/csv_to_snowflake/load_csv_data.py +++ b/load/dlt/csv_to_snowflake/load_csv_data.py @@ -1,3 +1,13 @@ +#!/usr/bin/env -S uv run +# /// script +# dependencies = [ +# "dlt[snowflake, parquet]==1.1.0", +# "enlighten~=1.12.4", +# "psutil~=6.0.0", +# "pandas==2.2.2", +# ] +# /// +"""Loads a CSV file to Snowflake""" import dlt import pandas as pd from datacoves_snowflake import db_config diff --git a/load/dlt/mysql_to_snowflake/load_mysql_data.py b/load/dlt/mysql_to_snowflake/load_mysql_data.py deleted file mode 100644 index 7cb46b48..00000000 --- a/load/dlt/mysql_to_snowflake/load_mysql_data.py +++ /dev/null @@ -1,26 +0,0 @@ -# run mysql -> snowflake pipeline using: -# python sql_database_pipeline.py - -# Use this to delete target table -# dlt pipeline drop datacoves_tc2 --drop-all - - -import dlt -from sql_database import sql_table - -if __name__ == "__main__": - - # dataset_name is the target schema name - pipeline = dlt.pipeline( - pipeline_name="datacoves_tc2", - destination="snowflake", - dataset_name="tc2", - progress="enlighten") - - table = sql_table(table="tc2_lineitem") - - table.apply_hints( - write_disposition="merge", - primary_key=["L_ORDERKEY", "L_LINENUMBER"]) - - print(pipeline.run(table)) diff --git a/load/dlt/mysql_to_snowflake/sql_database/README.md b/load/dlt/mysql_to_snowflake/sql_database/README.md deleted file mode 100644 index a1990321..00000000 --- a/load/dlt/mysql_to_snowflake/sql_database/README.md +++ /dev/null @@ -1,56 +0,0 @@ -# SQL Database -SQL database, or Structured Query Language database, are a type of database management system (DBMS) that stores and manages data in a structured format. The SQL Database `dlt` is a verified source and pipeline example that makes it easy to load data from your SQL database to a destination of your choice. It offers flexibility in terms of loading either the entire database or specific tables to the target. - -## Initialize the pipeline with SQL Database verified source -```bash -dlt init sql_database bigquery -``` -Here, we chose BigQuery as the destination. Alternatively, you can also choose redshift, duckdb, or any of the otherĀ [destinations.](https://dlthub.com/docs/dlt-ecosystem/destinations/) - -## Setup verified source - -To setup the SQL Database Verified Source read the [full documentation here.](https://dlthub.com/docs/dlt-ecosystem/verified-sources/sql_database) - -## Add credentials -1. Open `.dlt/secrets.toml`. -2. In order to continue, we will use the supplied connection URL to establish credentials. The connection URL is associated with a public database and looks like this: - ```bash - connection_url = "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" - ``` - Here's what the `secrets.toml` looks like: - ```toml - # Put your secret values and credentials here. do not share this file and do not upload it to github. - # We will set up creds with the following connection URL, which is a public database - - # The credentials are as follows - drivername = "mysql+pymysql" # Driver name for the database - database = "Rfam # Database name - username = "rfamro" # username associated with the database - host = "mysql-rfam-public.ebi.ac.uk" # host address - port = "4497 # port required for connection - ``` -3. Enter credentials for your chosen destination as per the [docs.](https://dlthub.com/docs/dlt-ecosystem/destinations/) - -## Running the pipeline example - -1. Install the required dependencies by running the following command: - ```bash - pip install -r requirements.txt - ``` - -2. Now you can build the verified source by using the command: - ```bash - python3 sql_database_pipeline.py - ``` - -3. To ensure that everything loads as expected, use the command: - ```bash - dlt pipeline show - ``` - - For example, the pipeline_name for the above pipeline example is `rfam`, you can use any custom name instead. - - - -šŸ’” To explore additional customizations for this pipeline, we recommend referring to the official DLT SQL Database verified documentation. It provides comprehensive information and guidance on how to further customize and tailor the pipeline to suit your specific needs. You can find the DLT SQL Database documentation in [Setup Guide: SQL Database.](https://dlthub.com/docs/dlt-ecosystem/verified-sources/sql_database) - diff --git a/load/dlt/mysql_to_snowflake/sql_database/__init__.py b/load/dlt/mysql_to_snowflake/sql_database/__init__.py deleted file mode 100644 index 49a498a8..00000000 --- a/load/dlt/mysql_to_snowflake/sql_database/__init__.py +++ /dev/null @@ -1,97 +0,0 @@ -"""Source that loads tables form any SQLAlchemy supported database, supports batching requests and incremental loads.""" - -from typing import List, Optional, Union, Iterable, Any -from sqlalchemy import MetaData, Table -from sqlalchemy.engine import Engine - -import dlt -from dlt.extract.source import DltResource - - -from dlt.sources.credentials import ConnectionStringCredentials - -from .helpers import ( - table_rows, - engine_from_credentials, - get_primary_key, - SqlDatabaseTableConfiguration, - SqlTableResourceConfiguration, -) - - -@dlt.source -def sql_database( - credentials: Union[ConnectionStringCredentials, Engine, str] = dlt.secrets.value, - schema: Optional[str] = dlt.config.value, - metadata: Optional[MetaData] = None, - table_names: Optional[List[str]] = dlt.config.value, -) -> Iterable[DltResource]: - """ - A DLT source which loads data from an SQL database using SQLAlchemy. - Resources are automatically created for each table in the schema or from the given list of tables. - - Args: - credentials (Union[ConnectionStringCredentials, Engine, str]): Database credentials or an `sqlalchemy.Engine` instance. - schema (Optional[str]): Name of the database schema to load (if different from default). - metadata (Optional[MetaData]): Optional `sqlalchemy.MetaData` instance. `schema` argument is ignored when this is used. - table_names (Optional[List[str]]): A list of table names to load. By default, all tables in the schema are loaded. - - Returns: - Iterable[DltResource]: A list of DLT resources for each table to be loaded. - """ - - # set up alchemy engine - engine = engine_from_credentials(credentials) - engine.execution_options(stream_results=True) - metadata = metadata or MetaData(schema=schema) - - # use provided tables or all tables - if table_names: - tables = [Table(name, metadata, autoload_with=engine) for name in table_names] - else: - metadata.reflect(bind=engine) - tables = list(metadata.tables.values()) - - for table in tables: - yield dlt.resource( - table_rows, - name=table.name, - primary_key=get_primary_key(table), - spec=SqlDatabaseTableConfiguration, - )(engine, table) - - -@dlt.common.configuration.with_config( - sections=("sources", "sql_database"), spec=SqlTableResourceConfiguration -) -def sql_table( - credentials: Union[ConnectionStringCredentials, Engine, str] = dlt.secrets.value, - table: str = dlt.config.value, - schema: Optional[str] = dlt.config.value, - metadata: Optional[MetaData] = None, - incremental: Optional[dlt.sources.incremental[Any]] = None, -) -> DltResource: - """ - A dlt resource which loads data from an SQL database table using SQLAlchemy. - - Args: - credentials (Union[ConnectionStringCredentials, Engine, str]): Database credentials or an `Engine` instance representing the database connection. - table (str): Name of the table to load. - schema (Optional[str]): Optional name of the schema the table belongs to. - metadata (Optional[MetaData]): Optional `sqlalchemy.MetaData` instance. If provided, the `schema` argument is ignored. - incremental (Optional[dlt.sources.incremental[Any]]): Option to enable incremental loading for the table. - E.g., `incremental=dlt.sources.incremental('updated_at', pendulum.parse('2022-01-01T00:00:00Z'))` - write_disposition (str): Write disposition of the resource. - - Returns: - DltResource: The dlt resource for loading data from the SQL database table. - """ - engine = engine_from_credentials(credentials) - engine.execution_options(stream_results=True) - metadata = metadata or MetaData(schema=schema) - - table_obj = Table(table, metadata, autoload_with=engine) - - return dlt.resource( - table_rows, name=table_obj.name, primary_key=get_primary_key(table_obj) - )(engine, table_obj, incremental=incremental) diff --git a/load/dlt/mysql_to_snowflake/sql_database/helpers.py b/load/dlt/mysql_to_snowflake/sql_database/helpers.py deleted file mode 100644 index b71fd379..00000000 --- a/load/dlt/mysql_to_snowflake/sql_database/helpers.py +++ /dev/null @@ -1,132 +0,0 @@ -"""SQL database source helpers""" - -from typing import ( - cast, - Any, - List, - Optional, - Iterator, - Dict, - Union, -) -import operator - -import dlt -from dlt.sources.credentials import ConnectionStringCredentials -from dlt.extract.source import DltResource -from dlt.common.configuration.specs import BaseConfiguration, configspec -from dlt.common.typing import TDataItem -from .settings import DEFAULT_CHUNK_SIZE - -from sqlalchemy import Table, create_engine -from sqlalchemy.engine import Engine, Row -from sqlalchemy.sql import Select -from sqlalchemy import MetaData, Table - - -class TableLoader: - def __init__( - self, - engine: Engine, - table: Table, - chunk_size: int = 1000, - incremental: Optional[dlt.sources.incremental[Any]] = None, - ) -> None: - self.engine = engine - self.table = table - self.chunk_size = chunk_size - self.incremental = incremental - if incremental: - try: - self.cursor_column = table.c[incremental.cursor_path] - except KeyError as e: - raise KeyError( - f"Cursor column '{incremental.cursor_path}' does not exist in table '{table.name}'" - ) from e - self.last_value = incremental.last_value - else: - self.cursor_column = None - self.last_value = None - - def make_query(self) -> Select[Any]: - table = self.table - query = table.select() - if not self.incremental: - return query - last_value_func = self.incremental.last_value_func - if ( - last_value_func is max - ): # Query ordered and filtered according to last_value function - order_by = self.cursor_column.asc() - filter_op = operator.ge - elif last_value_func is min: - order_by = self.cursor_column.desc() - filter_op = operator.le - else: # Custom last_value, load everything and let incremental handle filtering - return query - query = query.order_by(order_by) - if self.last_value is None: - return cast(Select[Any], query) # TODO: typing in sqlalchemy 2 - return cast( - Select[Any], query.where(filter_op(self.cursor_column, self.last_value)) - ) - - def load_rows(self) -> Iterator[List[TDataItem]]: - query = self.make_query() - with self.engine.connect() as conn: - result = conn.execution_options(yield_per=self.chunk_size).execute(query) - for partition in result.partitions(): - yield [dict(row._mapping) for row in partition] - - -def table_rows( - engine: Engine, - table: Table, - chunk_size: int = DEFAULT_CHUNK_SIZE, - incremental: Optional[dlt.sources.incremental[Any]] = None, -) -> Iterator[TDataItem]: - """ - A DLT source which loads data from an SQL database using SQLAlchemy. - Resources are automatically created for each table in the schema or from the given list of tables. - - Args: - credentials (Union[ConnectionStringCredentials, Engine, str]): Database credentials or an `sqlalchemy.Engine` instance. - schema (Optional[str]): Name of the database schema to load (if different from default). - metadata (Optional[MetaData]): Optional `sqlalchemy.MetaData` instance. `schema` argument is ignored when this is used. - table_names (Optional[List[str]]): A list of table names to load. By default, all tables in the schema are loaded. - - Returns: - Iterable[DltResource]: A list of DLT resources for each table to be loaded. - """ - loader = TableLoader(engine, table, incremental=incremental, chunk_size=chunk_size) - yield from loader.load_rows() - - -def engine_from_credentials( - credentials: Union[ConnectionStringCredentials, Engine, str] -) -> Engine: - if isinstance(credentials, Engine): - return credentials - if isinstance(credentials, ConnectionStringCredentials): - credentials = credentials.to_native_representation() - return create_engine(credentials) - - -def get_primary_key(table: Table) -> List[str]: - return [c.name for c in table.primary_key] - - -@configspec -class SqlDatabaseTableConfiguration(BaseConfiguration): - incremental: Optional[dlt.sources.incremental] = None # type: ignore[type-arg] - - -@configspec -class SqlTableResourceConfiguration(BaseConfiguration): - credentials: ConnectionStringCredentials - table: str - incremental: Optional[dlt.sources.incremental] = None # type: ignore[type-arg] - schema: Optional[str] - - -__source_name__ = "sql_database" diff --git a/load/dlt/mysql_to_snowflake/sql_database/settings.py b/load/dlt/mysql_to_snowflake/sql_database/settings.py deleted file mode 100644 index f1aa2d4b..00000000 --- a/load/dlt/mysql_to_snowflake/sql_database/settings.py +++ /dev/null @@ -1,3 +0,0 @@ -"""Sql Database source settings and constants""" - -DEFAULT_CHUNK_SIZE = 1000 From 63ec60b223b249f0434cef923ff2d0f9f646d3f8 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Thu, 26 Sep 2024 14:25:45 -0700 Subject: [PATCH 02/25] update dlt step in dag --- orchestrate/dags_yml_definitions/daily_loan_run.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestrate/dags_yml_definitions/daily_loan_run.yml b/orchestrate/dags_yml_definitions/daily_loan_run.yml index 5bde8d96..bd507b4a 100644 --- a/orchestrate/dags_yml_definitions/daily_loan_run.yml +++ b/orchestrate/dags_yml_definitions/daily_loan_run.yml @@ -33,7 +33,7 @@ nodes: # activate_venv: true # Virtual Environment is automatically activated - bash_command: "python load/dlt/csv_to_snowflake/load_csv_data.py" + bash_command: "./load/dlt/csv_to_snowflake/load_csv_data.py" transform: From c4dcfe0a292d5fe335b37dce689b8258c943a473 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Thu, 26 Sep 2024 14:34:20 -0700 Subject: [PATCH 03/25] update python dag --- orchestrate/dags/daily_loan_run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestrate/dags/daily_loan_run.py b/orchestrate/dags/daily_loan_run.py index d5069c26..fc624f6d 100644 --- a/orchestrate/dags/daily_loan_run.py +++ b/orchestrate/dags/daily_loan_run.py @@ -59,7 +59,7 @@ def extract_and_load_fivetran(): def extract_and_load_dlt(): load_us_population = DatacovesBashOperator( task_id="load_us_population", - bash_command="python load/dlt/csv_to_snowflake/load_csv_data.py", + bash_command="./load/dlt/csv_to_snowflake/load_csv_data.py", ) tg_extract_and_load_dlt = extract_and_load_dlt() From 8b9a4bbaf38d73019a762c4771ee69899ac01312 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Thu, 26 Sep 2024 14:45:14 -0700 Subject: [PATCH 04/25] add cache location --- load/dlt/csv_to_snowflake/load_csv_data.py | 1 + 1 file changed, 1 insertion(+) diff --git a/load/dlt/csv_to_snowflake/load_csv_data.py b/load/dlt/csv_to_snowflake/load_csv_data.py index fd8d2569..6e174a52 100755 --- a/load/dlt/csv_to_snowflake/load_csv_data.py +++ b/load/dlt/csv_to_snowflake/load_csv_data.py @@ -1,5 +1,6 @@ #!/usr/bin/env -S uv run # /// script +# cache-dir = "/tmp/.uv_cache" # dependencies = [ # "dlt[snowflake, parquet]==1.1.0", # "enlighten~=1.12.4", From 51ac58e79bd541dcd24a53bce6cfec3b3892fa1d Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Thu, 26 Sep 2024 14:58:15 -0700 Subject: [PATCH 05/25] update cache dir --- load/dlt/csv_to_snowflake/load_csv_data.py | 3 +-- orchestrate/dags/daily_loan_run.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/load/dlt/csv_to_snowflake/load_csv_data.py b/load/dlt/csv_to_snowflake/load_csv_data.py index 6e174a52..3a92a25e 100755 --- a/load/dlt/csv_to_snowflake/load_csv_data.py +++ b/load/dlt/csv_to_snowflake/load_csv_data.py @@ -1,6 +1,5 @@ -#!/usr/bin/env -S uv run +#!/usr/bin/env -S uv run --cache-dir /tmp/.uv_cache # /// script -# cache-dir = "/tmp/.uv_cache" # dependencies = [ # "dlt[snowflake, parquet]==1.1.0", # "enlighten~=1.12.4", diff --git a/orchestrate/dags/daily_loan_run.py b/orchestrate/dags/daily_loan_run.py index fc624f6d..ad5cc07c 100644 --- a/orchestrate/dags/daily_loan_run.py +++ b/orchestrate/dags/daily_loan_run.py @@ -13,7 +13,7 @@ default_args={"start_date": "2021-01"}, description="Loan Run", schedule_interval="0 0 1 */12 *", - tags=["version_5"], + tags=["version_7"], catchup=False, ) def daily_loan_run(): From 735b8abac530f11f4533a86307abd2edbeb8695c Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Thu, 26 Sep 2024 15:08:27 -0700 Subject: [PATCH 06/25] update cache location for uv --- load/dlt/csv_to_snowflake/load_csv_data.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/load/dlt/csv_to_snowflake/load_csv_data.py b/load/dlt/csv_to_snowflake/load_csv_data.py index 6e174a52..3a92a25e 100755 --- a/load/dlt/csv_to_snowflake/load_csv_data.py +++ b/load/dlt/csv_to_snowflake/load_csv_data.py @@ -1,6 +1,5 @@ -#!/usr/bin/env -S uv run +#!/usr/bin/env -S uv run --cache-dir /tmp/.uv_cache # /// script -# cache-dir = "/tmp/.uv_cache" # dependencies = [ # "dlt[snowflake, parquet]==1.1.0", # "enlighten~=1.12.4", From 1c433187f7363bae39893b77fbc1764d5f91f1c4 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Wed, 2 Oct 2024 17:04:41 -0700 Subject: [PATCH 07/25] test reading aws var --- orchestrate/dags/yaml_dbt_dag.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/orchestrate/dags/yaml_dbt_dag.py b/orchestrate/dags/yaml_dbt_dag.py index ebcd0819..f7e228ee 100644 --- a/orchestrate/dags/yaml_dbt_dag.py +++ b/orchestrate/dags/yaml_dbt_dag.py @@ -17,6 +17,10 @@ catchup=False, ) def yaml_dbt_dag(): + my_var = Variable.get("ng_test_var") + + if my_var: + print(my_var) run_dbt = DatacovesDbtOperator( task_id="run_dbt", bash_command="dbt run -s personal_loans" ) From e9e8f9ae954794d6a3f6216f000faf6315fd4fd5 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Wed, 2 Oct 2024 17:07:28 -0700 Subject: [PATCH 08/25] add missing import --- orchestrate/dags/yaml_dbt_dag.py | 1 + 1 file changed, 1 insertion(+) diff --git a/orchestrate/dags/yaml_dbt_dag.py b/orchestrate/dags/yaml_dbt_dag.py index f7e228ee..ac120f55 100644 --- a/orchestrate/dags/yaml_dbt_dag.py +++ b/orchestrate/dags/yaml_dbt_dag.py @@ -1,6 +1,7 @@ import datetime from airflow.decorators import dag +from airflow.models import Variable from operators.datacoves.dbt import DatacovesDbtOperator From 5e9104c6ec81599c82699cef877a702f05d49c36 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Thu, 17 Oct 2024 17:09:37 -0700 Subject: [PATCH 09/25] Add airflow CI checks --- .github/workflows/pull_request_build.yml | 4 ++-- orchestrate/dags/daily_loan_run.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pull_request_build.yml b/.github/workflows/pull_request_build.yml index 57140590..cb0148c2 100644 --- a/.github/workflows/pull_request_build.yml +++ b/.github/workflows/pull_request_build.yml @@ -115,7 +115,7 @@ jobs: name: Pull Request Airflow Tests runs-on: ubuntu-latest - container: datacoves/ci-airflow-dbt-snowflake:3 + container: datacoves/ci-airflow-dbt-snowflake:3.2 env: AIRBYTE__EXTRACT_LOCATION: /__w/${{ github.event.repository.name }}/${{ github.event.repository.name }}/load @@ -133,4 +133,4 @@ jobs: run: "python /usr/app/load_dagbag.py" - name: Test DBT Sources against DAGs' YAML files - run: "python /usr/app/test_dags.py" + run: "python /usr/app/test_dags.py --dag-loadtime-threshold 1 --check-variable-usage" diff --git a/orchestrate/dags/daily_loan_run.py b/orchestrate/dags/daily_loan_run.py index fc624f6d..b1325b4b 100644 --- a/orchestrate/dags/daily_loan_run.py +++ b/orchestrate/dags/daily_loan_run.py @@ -8,6 +8,7 @@ from operators.datacoves.bash import DatacovesBashOperator from operators.datacoves.dbt import DatacovesDbtOperator +my_var = Variable.get("ng_test_var") @dag( default_args={"start_date": "2021-01"}, From bf0320fe8a4a36b834c2b6b536fe5da39401d626 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Thu, 17 Oct 2024 17:15:25 -0700 Subject: [PATCH 10/25] update job run conditions for GH Action --- .github/workflows/pull_request_build.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/pull_request_build.yml b/.github/workflows/pull_request_build.yml index cb0148c2..acde2aa7 100644 --- a/.github/workflows/pull_request_build.yml +++ b/.github/workflows/pull_request_build.yml @@ -18,6 +18,7 @@ jobs: dbt: name: Pull Request dbt Tests runs-on: ubuntu-latest + if: ${{ contains(github.event.head_commit.modified, 'transform/') }} # Set environment variables in # https://github.com////settings/variables/actions @@ -114,6 +115,7 @@ jobs: airflow: name: Pull Request Airflow Tests runs-on: ubuntu-latest + if: ${{ contains(github.event.head_commit.modified, 'orchestrate/') }} container: datacoves/ci-airflow-dbt-snowflake:3.2 From 1f636474d5a0a064a388d62cfe4e09cc8f012ea0 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Thu, 17 Oct 2024 17:22:03 -0700 Subject: [PATCH 11/25] update job run condition --- .github/workflows/pull_request_build.yml | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/.github/workflows/pull_request_build.yml b/.github/workflows/pull_request_build.yml index acde2aa7..f1451396 100644 --- a/.github/workflows/pull_request_build.yml +++ b/.github/workflows/pull_request_build.yml @@ -16,9 +16,10 @@ concurrency: jobs: dbt: + if: ${{ github.event_name == 'pull_request' && steps.changed_files.outputs.files_contains_transform == 'true' }} + name: Pull Request dbt Tests runs-on: ubuntu-latest - if: ${{ contains(github.event.head_commit.modified, 'transform/') }} # Set environment variables in # https://github.com////settings/variables/actions @@ -64,6 +65,10 @@ jobs: - name: Set Secure Directory run: git config --global --add safe.directory /__w/${{ github.event.repository.name }}/${{ github.event.repository.name }} + - name: Check for transform changes + id: changed_files + run: echo ::set-output name=files_contains_transform::$(if git diff --name-only ${{ github.event.before }} ${{ github.sha }} | grep -q '^transform/'; then echo 'true'; else echo 'false'; fi) + - name: List of files changed run: "git diff origin/${{ github.event.pull_request.base.ref }} HEAD --name-status" @@ -113,6 +118,8 @@ jobs: run: "dbt --no-write-json run-operation drop_recreate_db --args '{db_name: ${{env.DATACOVES__MAIN__DATABASE}}, recreate: False}'" # yamllint disable-line rule:line-length airflow: + if: ${{ github.event_name == 'pull_request' && steps.changed_files.outputs.files_contains_orchestrate == 'true' }} + name: Pull Request Airflow Tests runs-on: ubuntu-latest if: ${{ contains(github.event.head_commit.modified, 'orchestrate/') }} @@ -131,6 +138,10 @@ jobs: fetch-depth: 0 ref: ${{ github.event.pull_request.head.sha }} + - name: Check for orchestrate changes + id: changed_files + run: echo ::set-output name=files_contains_orchestrate::$(if git diff --name-only ${{ github.event.before }} ${{ github.sha }} | grep -q '^orchestrate/'; then echo 'true'; else echo 'false'; fi) + - name: Test DAG structure integrity (DagBag Loading) run: "python /usr/app/load_dagbag.py" From 758e47eed6afa0a1ff1dfb70d6a5f103f050f024 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Thu, 17 Oct 2024 17:25:16 -0700 Subject: [PATCH 12/25] fix workflow file --- .github/workflows/pull_request_build.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/pull_request_build.yml b/.github/workflows/pull_request_build.yml index f1451396..71cd7019 100644 --- a/.github/workflows/pull_request_build.yml +++ b/.github/workflows/pull_request_build.yml @@ -122,7 +122,6 @@ jobs: name: Pull Request Airflow Tests runs-on: ubuntu-latest - if: ${{ contains(github.event.head_commit.modified, 'orchestrate/') }} container: datacoves/ci-airflow-dbt-snowflake:3.2 From d007604e0c3161ed5c5168678a40ad34782b17b8 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Thu, 17 Oct 2024 17:29:03 -0700 Subject: [PATCH 13/25] update step id --- .github/workflows/pull_request_build.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pull_request_build.yml b/.github/workflows/pull_request_build.yml index 71cd7019..dcb08ad2 100644 --- a/.github/workflows/pull_request_build.yml +++ b/.github/workflows/pull_request_build.yml @@ -66,7 +66,7 @@ jobs: run: git config --global --add safe.directory /__w/${{ github.event.repository.name }}/${{ github.event.repository.name }} - name: Check for transform changes - id: changed_files + id: changed_files_transform run: echo ::set-output name=files_contains_transform::$(if git diff --name-only ${{ github.event.before }} ${{ github.sha }} | grep -q '^transform/'; then echo 'true'; else echo 'false'; fi) - name: List of files changed @@ -138,7 +138,7 @@ jobs: ref: ${{ github.event.pull_request.head.sha }} - name: Check for orchestrate changes - id: changed_files + id: changed_files_orchestrate run: echo ::set-output name=files_contains_orchestrate::$(if git diff --name-only ${{ github.event.before }} ${{ github.sha }} | grep -q '^orchestrate/'; then echo 'true'; else echo 'false'; fi) - name: Test DAG structure integrity (DagBag Loading) From f35e51e081996270d75dcbcb7de86148adee59c1 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Thu, 17 Oct 2024 17:31:38 -0700 Subject: [PATCH 14/25] add orchestrate to paths --- .github/workflows/pull_request_build.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/pull_request_build.yml b/.github/workflows/pull_request_build.yml index dcb08ad2..73a65b4d 100644 --- a/.github/workflows/pull_request_build.yml +++ b/.github/workflows/pull_request_build.yml @@ -5,6 +5,8 @@ on: # yamllint disable-line rule:truthy paths: - transform/* - transform/**/* + - orchestrate/* + - orchestrate/**/* # Allows you to run this workflow manually from the Actions tab workflow_dispatch: From 05aaf2558bef736079cc45681baaf5c9279411d4 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Thu, 17 Oct 2024 17:44:52 -0700 Subject: [PATCH 15/25] update workflow --- .github/workflows/pull_request_build.yml | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/.github/workflows/pull_request_build.yml b/.github/workflows/pull_request_build.yml index 73a65b4d..64d330fb 100644 --- a/.github/workflows/pull_request_build.yml +++ b/.github/workflows/pull_request_build.yml @@ -18,7 +18,7 @@ concurrency: jobs: dbt: - if: ${{ github.event_name == 'pull_request' && steps.changed_files.outputs.files_contains_transform == 'true' }} + if: contains(github.event.pull_request.head.ref, 'transform/') name: Pull Request dbt Tests runs-on: ubuntu-latest @@ -67,10 +67,6 @@ jobs: - name: Set Secure Directory run: git config --global --add safe.directory /__w/${{ github.event.repository.name }}/${{ github.event.repository.name }} - - name: Check for transform changes - id: changed_files_transform - run: echo ::set-output name=files_contains_transform::$(if git diff --name-only ${{ github.event.before }} ${{ github.sha }} | grep -q '^transform/'; then echo 'true'; else echo 'false'; fi) - - name: List of files changed run: "git diff origin/${{ github.event.pull_request.base.ref }} HEAD --name-status" @@ -120,7 +116,7 @@ jobs: run: "dbt --no-write-json run-operation drop_recreate_db --args '{db_name: ${{env.DATACOVES__MAIN__DATABASE}}, recreate: False}'" # yamllint disable-line rule:line-length airflow: - if: ${{ github.event_name == 'pull_request' && steps.changed_files.outputs.files_contains_orchestrate == 'true' }} + if: contains(github.event.pull_request.head.ref, 'orchestrate/') name: Pull Request Airflow Tests runs-on: ubuntu-latest @@ -139,10 +135,6 @@ jobs: fetch-depth: 0 ref: ${{ github.event.pull_request.head.sha }} - - name: Check for orchestrate changes - id: changed_files_orchestrate - run: echo ::set-output name=files_contains_orchestrate::$(if git diff --name-only ${{ github.event.before }} ${{ github.sha }} | grep -q '^orchestrate/'; then echo 'true'; else echo 'false'; fi) - - name: Test DAG structure integrity (DagBag Loading) run: "python /usr/app/load_dagbag.py" From 6464899c2346825110c5e6dc0bf2e24fff5652c8 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Thu, 17 Oct 2024 17:49:10 -0700 Subject: [PATCH 16/25] update PR if condition --- .github/workflows/pull_request_build.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pull_request_build.yml b/.github/workflows/pull_request_build.yml index 64d330fb..69b04089 100644 --- a/.github/workflows/pull_request_build.yml +++ b/.github/workflows/pull_request_build.yml @@ -18,7 +18,7 @@ concurrency: jobs: dbt: - if: contains(github.event.pull_request.head.ref, 'transform/') + if: ${{ contains(github.event.pull_request.changed_files, 'transform/') }} name: Pull Request dbt Tests runs-on: ubuntu-latest @@ -116,7 +116,7 @@ jobs: run: "dbt --no-write-json run-operation drop_recreate_db --args '{db_name: ${{env.DATACOVES__MAIN__DATABASE}}, recreate: False}'" # yamllint disable-line rule:line-length airflow: - if: contains(github.event.pull_request.head.ref, 'orchestrate/') + if: ${{ contains(github.event.pull_request.changed_files, 'orchestrate/') }} name: Pull Request Airflow Tests runs-on: ubuntu-latest From c86690bddeeb4432eb6d4e908fec765e5c94daa5 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Thu, 17 Oct 2024 17:59:14 -0700 Subject: [PATCH 17/25] separate dbt and airflow integration jobs --- .../{merge_to_main.yml => deployment.yml} | 0 .github/workflows/integration_airflow.yml | 40 +++++++++++++++++++ ..._request_build.yml => integration_dbt.yml} | 30 -------------- 3 files changed, 40 insertions(+), 30 deletions(-) rename .github/workflows/{merge_to_main.yml => deployment.yml} (100%) create mode 100644 .github/workflows/integration_airflow.yml rename .github/workflows/{pull_request_build.yml => integration_dbt.yml} (82%) diff --git a/.github/workflows/merge_to_main.yml b/.github/workflows/deployment.yml similarity index 100% rename from .github/workflows/merge_to_main.yml rename to .github/workflows/deployment.yml diff --git a/.github/workflows/integration_airflow.yml b/.github/workflows/integration_airflow.yml new file mode 100644 index 00000000..e219746e --- /dev/null +++ b/.github/workflows/integration_airflow.yml @@ -0,0 +1,40 @@ +name: Test and Check on Pull Request + +on: # yamllint disable-line rule:truthy + pull_request: + paths: + - orchestrate/* + - orchestrate/**/* + + # Allows you to run this workflow manually from the Actions tab + workflow_dispatch: + +# This cancels a run if another change is pushed to the same branch +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + airflow: + name: Pull Request Airflow Tests + runs-on: ubuntu-latest + + container: datacoves/ci-airflow-dbt-snowflake:3.2 + + env: + AIRBYTE__EXTRACT_LOCATION: /__w/${{ github.event.repository.name }}/${{ github.event.repository.name }}/load + AIRFLOW__CORE__DAGS_FOLDER: /__w/${{ github.event.repository.name }}/${{ github.event.repository.name }}/automate/airflow/dags + AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 300 + + steps: + - name: Checkout branch + uses: actions/checkout@v3.5.0 + with: + fetch-depth: 0 + ref: ${{ github.event.pull_request.head.sha }} + + - name: Test DAG structure integrity (DagBag Loading) + run: "python /usr/app/load_dagbag.py" + + - name: Test DBT Sources against DAGs' YAML files + run: "python /usr/app/test_dags.py --dag-loadtime-threshold 1 --check-variable-usage" diff --git a/.github/workflows/pull_request_build.yml b/.github/workflows/integration_dbt.yml similarity index 82% rename from .github/workflows/pull_request_build.yml rename to .github/workflows/integration_dbt.yml index 69b04089..f2dd77af 100644 --- a/.github/workflows/pull_request_build.yml +++ b/.github/workflows/integration_dbt.yml @@ -5,8 +5,6 @@ on: # yamllint disable-line rule:truthy paths: - transform/* - transform/**/* - - orchestrate/* - - orchestrate/**/* # Allows you to run this workflow manually from the Actions tab workflow_dispatch: @@ -18,8 +16,6 @@ concurrency: jobs: dbt: - if: ${{ contains(github.event.pull_request.changed_files, 'transform/') }} - name: Pull Request dbt Tests runs-on: ubuntu-latest @@ -114,29 +110,3 @@ jobs: - name: Drop PR database on Failure to grant security access if: always() && (env.DATACOVES__DROP_DB_ON_FAIL == 'true') && (steps.grant-access-to-database.outcome == 'failure') run: "dbt --no-write-json run-operation drop_recreate_db --args '{db_name: ${{env.DATACOVES__MAIN__DATABASE}}, recreate: False}'" # yamllint disable-line rule:line-length - - airflow: - if: ${{ contains(github.event.pull_request.changed_files, 'orchestrate/') }} - - name: Pull Request Airflow Tests - runs-on: ubuntu-latest - - container: datacoves/ci-airflow-dbt-snowflake:3.2 - - env: - AIRBYTE__EXTRACT_LOCATION: /__w/${{ github.event.repository.name }}/${{ github.event.repository.name }}/load - AIRFLOW__CORE__DAGS_FOLDER: /__w/${{ github.event.repository.name }}/${{ github.event.repository.name }}/automate/airflow/dags - AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 300 - - steps: - - name: Checkout branch - uses: actions/checkout@v3.5.0 - with: - fetch-depth: 0 - ref: ${{ github.event.pull_request.head.sha }} - - - name: Test DAG structure integrity (DagBag Loading) - run: "python /usr/app/load_dagbag.py" - - - name: Test DBT Sources against DAGs' YAML files - run: "python /usr/app/test_dags.py --dag-loadtime-threshold 1 --check-variable-usage" From 5804bab1d812cad1f7f491c4ac38e44289c74120 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Sat, 19 Oct 2024 19:13:32 -0700 Subject: [PATCH 18/25] organize dags folder --- .../yaml_slack_dag.py | 0 .../yaml_teams_dag.py | 0 .../notifications_examples/yaml_slack_dag.yml | 47 +++++++++++++++++++ .../notifications_examples/yaml_teams_dag.yml | 44 +++++++++++++++++ 4 files changed, 91 insertions(+) rename orchestrate/dags/{ => notifications_examples}/yaml_slack_dag.py (100%) rename orchestrate/dags/{ => notifications_examples}/yaml_teams_dag.py (100%) create mode 100644 orchestrate/dags_yml_definitions/notifications_examples/yaml_slack_dag.yml create mode 100644 orchestrate/dags_yml_definitions/notifications_examples/yaml_teams_dag.yml diff --git a/orchestrate/dags/yaml_slack_dag.py b/orchestrate/dags/notifications_examples/yaml_slack_dag.py similarity index 100% rename from orchestrate/dags/yaml_slack_dag.py rename to orchestrate/dags/notifications_examples/yaml_slack_dag.py diff --git a/orchestrate/dags/yaml_teams_dag.py b/orchestrate/dags/notifications_examples/yaml_teams_dag.py similarity index 100% rename from orchestrate/dags/yaml_teams_dag.py rename to orchestrate/dags/notifications_examples/yaml_teams_dag.py diff --git a/orchestrate/dags_yml_definitions/notifications_examples/yaml_slack_dag.yml b/orchestrate/dags_yml_definitions/notifications_examples/yaml_slack_dag.yml new file mode 100644 index 00000000..9dbe91c1 --- /dev/null +++ b/orchestrate/dags_yml_definitions/notifications_examples/yaml_slack_dag.yml @@ -0,0 +1,47 @@ +description: "Sample DAG with Slack notification, custom image, and resource requests" +schedule_interval: "0 0 1 */12 *" +tags: + - version_2 + - slack_notification + - blue_green +default_args: + start_date: 2023-01-01 + owner: Noel Gomez + # Replace with the email of the recipient for failures + email: gomezn@example.com + email_on_failure: true +catchup: false + +# Optional callbacks used to send Slack notifications +notifications: + on_success_callback: + notifier: airflow.providers.slack.notifications.slack.send_slack_notification + args: + text: "The DAG {{ dag.dag_id }} succeeded" + channel: "#general" + on_failure_callback: + notifier: airflow.providers.slack.notifications.slack.send_slack_notification + args: + text: "The DAG {{ dag.dag_id }} failed" + channel: "#general" + +# DAG Tasks +nodes: + transform: + operator: operators.datacoves.dbt.DatacovesDbtOperator + type: task + config: + image: datacoves/airflow-pandas:latest + resources: + memory: 8Gi + cpu: 1000m + + bash_command: "dbt run -s personal_loans" + + # Sample failing task to test that notification is sent + # failing_task: + # operator: operators.datacoves.bash.DatacovesBashOperator + # type: task + + # bash_command: "some_non_existant_command" + # dependencies: ["transform"] diff --git a/orchestrate/dags_yml_definitions/notifications_examples/yaml_teams_dag.yml b/orchestrate/dags_yml_definitions/notifications_examples/yaml_teams_dag.yml new file mode 100644 index 00000000..900981e9 --- /dev/null +++ b/orchestrate/dags_yml_definitions/notifications_examples/yaml_teams_dag.yml @@ -0,0 +1,44 @@ +description: "Sample DAG with MS Teams notification" +schedule_interval: "0 0 1 */12 *" +tags: + - version_2 + - ms_teams_notification + - blue_green +default_args: + start_date: 2023-01-01 + owner: Noel Gomez + # Replace with the email of the recipient for failures + email: gomezn@example.com + email_on_failure: true +catchup: false + +# Optional callbacks used to send MS Teams notifications +notifications: + on_success_callback: + notifier: notifiers.datacoves.ms_teams.MSTeamsNotifier + args: + connection_id: DATACOVES_MS_TEAMS + # message: Custom success message + theme_color: 0000FF + on_failure_callback: + notifier: notifiers.datacoves.ms_teams.MSTeamsNotifier + args: + connection_id: DATACOVES_MS_TEAMS + # message: Custom error message + theme_color: 9900FF + +# DAG Tasks +nodes: + transform: + operator: operators.datacoves.dbt.DatacovesDbtOperator + type: task + + bash_command: "dbt run -s personal_loans" + + # Sample failing task to test that notification is sent + # failing_task: + # operator: operators.datacoves.bash.DatacovesBashOperator + # type: task + + # bash_command: "some_non_existant_command" + # dependencies: ["transform"] From b161238ff0448ccc6d22cbc57d48f4c0ceb2d84f Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Mon, 21 Oct 2024 15:42:16 -0700 Subject: [PATCH 19/25] updates to dynamic tables and roles --- load/dlt/loans/datacoves_snowflake.py | 29 ++++++ load/dlt/loans/loans_data.py | 59 ++++++++++++ secure/roles.yml | 8 +- secure/warehouses.yml | 6 ++ .../models/L1_inlets/loans/personal_loans.sql | 90 +++++++++---------- .../models/L1_inlets/loans/personal_loans.yml | 36 +++++--- .../loans_by_state__dynamic.sql | 6 +- visualize/streamlit/loans-example/example.sql | 25 ++++-- 8 files changed, 192 insertions(+), 67 deletions(-) create mode 100644 load/dlt/loans/datacoves_snowflake.py create mode 100755 load/dlt/loans/loans_data.py diff --git a/load/dlt/loans/datacoves_snowflake.py b/load/dlt/loans/datacoves_snowflake.py new file mode 100644 index 00000000..110c45e8 --- /dev/null +++ b/load/dlt/loans/datacoves_snowflake.py @@ -0,0 +1,29 @@ +import os +import dlt + +def set_config_value(key, config_key, env_var_prefix = 'DATACOVES__MAIN_LOAD__'): + + env_var = env_var_prefix + key.upper() + + value = os.getenv(env_var, dlt.config[config_key]) + + if key != 'password': + print(key + ": " +value) + return value + +config_keys = ["account", "database", "warehouse", "role", "user", "password"] + +db_config = {} +for key in config_keys: + config_key = "destination.snowflake.credentials." + key + + try: + dlt.config[config_key] + except dlt.common.configuration.exceptions.ConfigFieldMissingException: + dlt.config[config_key] = '' + + db_config[key] = set_config_value(key, config_key) + +# This is needed because by default dlt calls the snowflake account host +db_config['host'] = db_config['account'] +db_config['username'] = db_config['user'] diff --git a/load/dlt/loans/loans_data.py b/load/dlt/loans/loans_data.py new file mode 100755 index 00000000..944ee44d --- /dev/null +++ b/load/dlt/loans/loans_data.py @@ -0,0 +1,59 @@ +#!/usr/bin/env -S uv run --cache-dir /tmp/.uv_cache +# /// script +# dependencies = [ +# "dlt[snowflake, parquet]==1.1.0", +# "enlighten~=1.12.4", +# "psutil~=6.0.0", +# "pandas==2.2.2", +# ] +# /// +"""Loads a CSV file to Snowflake""" +import dlt +import pandas as pd +from datacoves_snowflake import db_config + +# a resource is the individual endpoints or tables +@dlt.resource(write_disposition="replace") +# method name = table name +def personal_loans(): + personal_loans = "https://datacoves-sample-data-public.s3.us-west-2.amazonaws.com/PERSONAL_LOANS.csv" + df = pd.read_csv(personal_loans) + yield df + +def zip_coordinates(): + zip_coordinates = "https://datacoves-sample-data-public.s3.us-west-2.amazonaws.com/ZIP_COORDINATES.csv" + df = pd.read_csv(zip_coordinates) + yield df + +# Source (corresponds to API or database) +@dlt.source +def personal_loans_source(): + return [personal_loans] + +@dlt.source +def zip_coordinates_source(): + return [zip_coordinates] + +if __name__ == "__main__": + datacoves_snowflake = dlt.destinations.snowflake( + db_config, + destination_name="datacoves_snowflake" + ) + + pipeline = dlt.pipeline( + progress = "enlighten", + pipeline_name = "loans", + destination = datacoves_snowflake, + pipelines_dir = "/tmp/", + + # dataset_name is the target schema name + dataset_name="loans" + ) + + load_info = pipeline.run(personal_loans()) + + print(load_info) + + load_info = pipeline.run(zip_coordinates()) + + print(load_info) diff --git a/secure/roles.yml b/secure/roles.yml index f1614512..7e7cf5df 100644 --- a/secure/roles.yml +++ b/secure/roles.yml @@ -39,6 +39,7 @@ - z_stage_resources_read - z_wh_transforming + - z_wh_transforming_dynamic_tables - analyst: member_of: @@ -83,6 +84,7 @@ - z_tables_views_general - z_wh_integration - z_wh_orchestrating + - z_wh_transforming_dynamic_tables - z_stage_dbt_artifacts_write owns: databases: @@ -118,7 +120,7 @@ - z_tables_views_general - z_policy_row_region_all - + ########################## # Global Roles ########################## @@ -224,6 +226,10 @@ warehouses: - wh_transforming +- z_wh_transforming_dynamic_tables: + warehouses: + - wh_transforming_dynamic_tables + - z_wh_orchestrating: warehouses: - wh_orchestrating diff --git a/secure/warehouses.yml b/secure/warehouses.yml index f9aaac66..1cc5b1c5 100644 --- a/secure/warehouses.yml +++ b/secure/warehouses.yml @@ -36,6 +36,12 @@ auto_resume: true initially_suspended: true +- wh_transforming_dynamic_tables: + size: x-small + auto_suspend: 60 + auto_resume: true + initially_suspended: true + - wh_transforming_sqlmesh: size: x-small auto_suspend: 60 diff --git a/transform/models/L1_inlets/loans/personal_loans.sql b/transform/models/L1_inlets/loans/personal_loans.sql index 9f87c7b1..5e21ba83 100644 --- a/transform/models/L1_inlets/loans/personal_loans.sql +++ b/transform/models/L1_inlets/loans/personal_loans.sql @@ -1,9 +1,8 @@ -{# {{ config( +{{ config( materialized = 'dynamic_table', - snowflake_warehouse = 'wh_transforming', + snowflake_warehouse = 'wh_transforming_dynamic_tables', target_lag = 'downstream', - persist_docs={"relation": false}, -) }} #} +) }} with raw_source as ( @@ -15,62 +14,61 @@ with raw_source as ( final as ( select - "TOTAL_ACC"::float as total_acc, - "ANNUAL_INC"::float as annual_inc, - "EMP_LENGTH"::varchar as emp_length, - "DESC"::varchar as desc, - "TOTAL_PYMNT"::float as total_pymnt, - "LAST_PYMNT_D"::varchar as last_pymnt_d, - "ADDR_STATE"::varchar as addr_state, - "NEXT_PYMNT_D"::varchar as next_pymnt_d, - "EMP_TITLE"::varchar as emp_title, - "COLLECTION_RECOVERY_FEE"::float as collection_recovery_fee, - "MTHS_SINCE_LAST_MAJOR_DEROG"::float as mths_since_last_major_derog, - "INQ_LAST_6MTHS"::float as inq_last_6mths, - "SUB_GRADE"::varchar as sub_grade, - "FUNDED_AMNT_INV"::float as funded_amnt_inv, - "DELINQ_2YRS"::float as delinq_2yrs, "LOAN_ID"::varchar as loan_id, - "FUNDED_AMNT"::float as funded_amnt, - "VERIFICATION_STATUS"::varchar as verification_status, - "DTI"::float as dti, - "TOTAL_REC_PRNCP"::float as total_rec_prncp, + "MEMBER_ID"::number as member_id, + "LOAN_AMNT"::number as loan_amnt, + "FUNDED_AMNT"::number as funded_amnt, + "FUNDED_AMNT_INV"::float as funded_amnt_inv, + "TERM"::varchar as term, + "INT_RATE"::float as int_rate, + "INSTALLMENT"::float as installment, "GRADE"::varchar as grade, + "SUB_GRADE"::varchar as sub_grade, + "EMP_TITLE"::varchar as emp_title, + "EMP_LENGTH"::varchar as emp_length, "HOME_OWNERSHIP"::varchar as home_ownership, + "ANNUAL_INC"::float as annual_inc, + "VERIFICATION_STATUS"::varchar as verification_status, "ISSUE_D"::varchar as issue_d, - "MTHS_SINCE_LAST_DELINQ"::float as mths_since_last_delinq, - "OUT_PRNCP"::float as out_prncp, - "PUB_REC"::float as pub_rec, - "INT_RATE"::float as int_rate, - "ZIP_CODE"::varchar as zip_code, - "OPEN_ACC"::float as open_acc, - "TERM"::varchar as term, + "LOAN_STATUS"::varchar as loan_status, "PYMNT_PLAN"::varchar as pymnt_plan, "URL"::varchar as url, - "REVOL_BAL"::float as revol_bal, - "RECOVERIES"::float as recoveries, - "LAST_PYMNT_AMNT"::float as last_pymnt_amnt, - "LOAN_AMNT"::float as loan_amnt, + "DESC"::varchar as desc, "PURPOSE"::varchar as purpose, - "INITIAL_LIST_STATUS"::varchar as initial_list_status, - "TOTAL_REC_INT"::float as total_rec_int, - "TOTAL_PYMNT_INV"::float as total_pymnt_inv, - "MTHS_SINCE_LAST_RECORD"::float as mths_since_last_record, - "LAST_CREDIT_PULL_D"::varchar as last_credit_pull_d, - "TOTAL_REC_LATE_FEE"::float as total_rec_late_fee, - "MEMBER_ID"::float as member_id, - "POLICY_CODE"::float as policy_code, "TITLE"::varchar as title, - "LOAN_STATUS"::varchar as loan_status, - "INSTALLMENT"::float as installment, + "ZIP_CODE"::varchar as zip_code, + "ADDR_STATE"::varchar as addr_state, + "DTI"::float as dti, + "DELINQ_2_YRS"::float as delinq_2_yrs, "EARLIEST_CR_LINE"::varchar as earliest_cr_line, + "INQ_LAST_6_MTHS"::float as inq_last_6_mths, + "MTHS_SINCE_LAST_DELINQ"::float as mths_since_last_delinq, + "MTHS_SINCE_LAST_RECORD"::float as mths_since_last_record, + "OPEN_ACC"::float as open_acc, + "PUB_REC"::float as pub_rec, + "REVOL_BAL"::number as revol_bal, "REVOL_UTIL"::varchar as revol_util, + "TOTAL_ACC"::float as total_acc, + "INITIAL_LIST_STATUS"::varchar as initial_list_status, + "OUT_PRNCP"::float as out_prncp, "OUT_PRNCP_INV"::float as out_prncp_inv, - "COLLECTIONS_12_MTHS_EX_MED"::float as collections_12_mths_ex_med + "TOTAL_PYMNT"::float as total_pymnt, + "TOTAL_PYMNT_INV"::float as total_pymnt_inv, + "TOTAL_REC_PRNCP"::float as total_rec_prncp, + "TOTAL_REC_INT"::float as total_rec_int, + "TOTAL_REC_LATE_FEE"::float as total_rec_late_fee, + "RECOVERIES"::float as recoveries, + "COLLECTION_RECOVERY_FEE"::float as collection_recovery_fee, + "LAST_PYMNT_D"::varchar as last_pymnt_d, + "LAST_PYMNT_AMNT"::float as last_pymnt_amnt, + "NEXT_PYMNT_D"::varchar as next_pymnt_d, + "LAST_CREDIT_PULL_D"::varchar as last_credit_pull_d, + "COLLECTIONS_12_MTHS_EX_MED"::float as collections_12_mths_ex_med, + "MTHS_SINCE_LAST_MAJOR_DEROG"::float as mths_since_last_major_derog, + "POLICY_CODE"::number as policy_code from raw_source ) select * from final -where addr_state = 'CA' diff --git a/transform/models/L1_inlets/loans/personal_loans.yml b/transform/models/L1_inlets/loans/personal_loans.yml index f447807d..ade7e199 100644 --- a/transform/models/L1_inlets/loans/personal_loans.yml +++ b/transform/models/L1_inlets/loans/personal_loans.yml @@ -17,11 +17,13 @@ models: meta: masking_policy: masking_policy_pii_float - name: collections_12_mths_ex_med - description: Number of collections in the last 12 months excluding medical collections + description: Number of collections in the last 12 months excluding medical + collections - name: collection_recovery_fee description: Post charge off collection fee - name: delinq_2yrs - description: The number of 30+ days past-due incidences of delinquency in the borrower's credit file for the past 2 years + description: The number of 30+ days past-due incidences of delinquency in + the borrower's credit file for the past 2 years - name: desc description: Loan description provided by the borrower meta: @@ -35,11 +37,13 @@ models: meta: masking_policy: masking_policy_pii_string - name: emp_title - description: The job title supplied by the borrower when applying for the loan + description: The job title supplied by the borrower when applying for the + loan - name: funded_amnt description: The total amount committed to that loan at that point in time - name: funded_amnt_inv - description: The total amount committed by investors for that loan at that point in time + description: The total amount committed by investors for that loan at that + point in time - name: grade description: LC assigned loan grade - name: home_ownership @@ -47,7 +51,8 @@ models: - name: initial_list_status description: The initial listing status of the loan - name: inq_last_6mths - description: The number of inquiries in past 6 months (excluding auto and mortgage inquiries) + description: The number of inquiries in past 6 months (excluding auto and + mortgage inquiries) - name: installment description: The monthly payment owed by the borrower if the loan originates - name: int_rate @@ -81,7 +86,8 @@ models: - name: out_prncp description: Remaining outstanding principal for total amount funded - name: out_prncp_inv - description: Remaining outstanding principal for portion of total amount funded by investors + description: Remaining outstanding principal for portion of total amount funded + by investors - name: policy_code description: Publicly available - name: pub_rec @@ -95,21 +101,25 @@ models: - name: revol_bal description: Total credit revolving balance - name: revol_util - description: Revolving line utilization rate, or the amount of credit the borrower is using relative to all available revolving credit + description: Revolving line utilization rate, or the amount of credit the + borrower is using relative to all available revolving credit - name: sub_grade description: LC assigned loan subgrade - name: term - description: The number of payments on the loan. Values are in months and can be either 36 or 60 + description: The number of payments on the loan. Values are in months and + can be either 36 or 60 - name: title description: The loan title provided by the borrower - name: total_acc - description: The total num(#) of credit lines currently in the borrower's credit file + description: The total num(#) of credit lines currently in the borrower's + credit file - name: total_pymnt description: Payments received to date for total amount funded meta: masking_policy: masking_policy_pii_float - name: total_pymnt_inv - description: Payments received to date for portion of total amount funded by investors + description: Payments received to date for portion of total amount funded + by investors - name: total_rec_int description: Interest received to date - name: total_rec_late_fee @@ -119,6 +129,8 @@ models: - name: url description: URL for the LC page with listing data - name: verification_status - description: Indicates if income was verified by LC, not verified, or if the income source was verified + description: Indicates if income was verified by LC, not verified, or if the + income source was verified - name: zip_code - description: The first 3 numbers of the zip code provided by the borrower in the loan application + description: The first 3 numbers of the zip code provided by the borrower + in the loan application diff --git a/transform/models/L3_coves/loan_analytics/loans_by_state__dynamic.sql b/transform/models/L3_coves/loan_analytics/loans_by_state__dynamic.sql index dc54fc8a..4c7babab 100644 --- a/transform/models/L3_coves/loan_analytics/loans_by_state__dynamic.sql +++ b/transform/models/L3_coves/loan_analytics/loans_by_state__dynamic.sql @@ -1,8 +1,8 @@ -{# {{ config( +{{ config( materialized = 'dynamic_table', - snowflake_warehouse = 'wh_transforming', + snowflake_warehouse = 'wh_transforming_dynamic_tables', target_lag = '1 minute', -) }} #} +) }} select personal_loans.addr_state as state, diff --git a/visualize/streamlit/loans-example/example.sql b/visualize/streamlit/loans-example/example.sql index 2b96c37b..2909e00b 100644 --- a/visualize/streamlit/loans-example/example.sql +++ b/visualize/streamlit/loans-example/example.sql @@ -1,3 +1,16 @@ +/* +/config/workspace/visualize/streamlit/start_app.sh + +cd $DATACOVES__REPO_PATH/load/dlt +./loans/loans_data.py + +https://app.snowflake.com/datacoves/main/#/data/databases/BALBOA/schemas/L3_LOAN_ANALYTICS/dynamic-table/LOANS_BY_STATE__DYNAMIC + +dlt pipeline loans_data show + +cd $DATACOVES__REPO_PATH/transform +*/ + -- These are useful queries to run for demo purposes use role loader; @@ -6,19 +19,16 @@ use warehouse wh_loading; -- source table needs to have change tracking enabled alter table RAW.LOANS.PERSONAL_LOANS set CHANGE_TRACKING = true; --- see the rows in a table -select count(*) -from RAW.LOANS.PERSONAL_LOANS; - -- delete records from a table delete from RAW.LOANS.PERSONAL_LOANS where left(addr_state, 1)> 'A'; +-- see the rows in a table select count(*) from RAW.LOANS.PERSONAL_LOANS; - +select distinct addr_state from raw.loans.personal_loans limit 10; -- dropping dymanic table use role analyst; @@ -29,6 +39,11 @@ drop dynamic table balboa_dev.gomezn.loans_by_state__standard; drop schema balboa_dev.fivetran_centre_straighten_staging; ------ +use warehouse wh_orchestrating; +select distinct addr_state from balboa.l1_loans.personal_loans; +drop table balboa.l1_loans.personal_loans; +drop table balboa.l3_loan_analytics.loans_by_state__dynamic; + -- Creating Streamlit App use role transformer_dbt; From 25611302f3f7852ec7c5b79598f28ed845c03f3d Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Mon, 21 Oct 2024 15:54:05 -0700 Subject: [PATCH 20/25] exclude dynamic tables when running --empty --- .github/workflows/integration_dbt.yml | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/.github/workflows/integration_dbt.yml b/.github/workflows/integration_dbt.yml index f2dd77af..ddaecadf 100644 --- a/.github/workflows/integration_dbt.yml +++ b/.github/workflows/integration_dbt.yml @@ -78,13 +78,16 @@ jobs: ##### Governance Checks # this first runs dbt but creates enpty tables, this is enough to then run the hooks and fail fast + + # There is an issue with --empty and dynamic tables so need to exclude them - name: Governance run of dbt with EMPTY models using slim mode if: ${{ steps.prod_manifest.outputs.manifest_found == 'true' && contains(github.event.pull_request.labels.*.name, 'full-refresh') != true }} - run: "dbt build --fail-fast --defer --state logs --select state:modified+ --empty" + run: "dbt build --fail-fast --defer --state logs --select state:modified+ --empty --exclude config.materialized:dynamic_table" + # There is an issue with --empty and dynamic tables so need to exclude - name: Governance run of dbt with EMPTY models using full run if: ${{ steps.prod_manifest.outputs.manifest_found == 'false' || contains(github.event.pull_request.labels.*.name, 'full-refresh') }} - run: "dbt build --fail-fast --empty" + run: "dbt build --fail-fast --empty --exclude config.materialized:dynamic_table" - name: Generate Docs Combining Prod and branch catalog.json run: "dbt-coves generate docs --merge-deferred --state logs" From 35e3fda111cea09d5eedbd82f987f35452e1a124 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Mon, 21 Oct 2024 16:03:13 -0700 Subject: [PATCH 21/25] run dynamic tables before --empty --- .github/workflows/integration_dbt.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/integration_dbt.yml b/.github/workflows/integration_dbt.yml index ddaecadf..e31954ac 100644 --- a/.github/workflows/integration_dbt.yml +++ b/.github/workflows/integration_dbt.yml @@ -79,6 +79,10 @@ jobs: ##### Governance Checks # this first runs dbt but creates enpty tables, this is enough to then run the hooks and fail fast + # There is an issue with --empty and dynamic tables so need to exclude them + - name: Governance run of dynamic tables + run: "dbt build --fail-fast -s config.materialized:dynamic_table" + # There is an issue with --empty and dynamic tables so need to exclude them - name: Governance run of dbt with EMPTY models using slim mode if: ${{ steps.prod_manifest.outputs.manifest_found == 'true' && contains(github.event.pull_request.labels.*.name, 'full-refresh') != true }} From 6c589d2c071b009d5d4d976486c8b50783878a26 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Mon, 21 Oct 2024 16:07:30 -0700 Subject: [PATCH 22/25] remove data test --- transform/models/L1_inlets/loans/personal_loans.yml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/transform/models/L1_inlets/loans/personal_loans.yml b/transform/models/L1_inlets/loans/personal_loans.yml index ade7e199..5cd69580 100644 --- a/transform/models/L1_inlets/loans/personal_loans.yml +++ b/transform/models/L1_inlets/loans/personal_loans.yml @@ -6,10 +6,6 @@ models: columns: - name: addr_state description: The state in which the borrower resides - data_tests: - - accepted_values: - values: - - "CA" meta: masking_policy: masking_policy_pii_string - name: annual_inc From 71dfb7668895604375e57e22ea8d83e0215450f4 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Mon, 21 Oct 2024 16:09:54 -0700 Subject: [PATCH 23/25] add deferral to dynamic tables --- .github/workflows/integration_dbt.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/integration_dbt.yml b/.github/workflows/integration_dbt.yml index e31954ac..58def4e8 100644 --- a/.github/workflows/integration_dbt.yml +++ b/.github/workflows/integration_dbt.yml @@ -81,7 +81,7 @@ jobs: # There is an issue with --empty and dynamic tables so need to exclude them - name: Governance run of dynamic tables - run: "dbt build --fail-fast -s config.materialized:dynamic_table" + run: "dbt build --fail-fast -s config.materialized:dynamic_table --defer --state logs" # There is an issue with --empty and dynamic tables so need to exclude them - name: Governance run of dbt with EMPTY models using slim mode From 785671637f6835d80e2d40305c1a9922baf45278 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Mon, 21 Oct 2024 16:31:41 -0700 Subject: [PATCH 24/25] fix col names in yml --- transform/models/L1_inlets/loans/personal_loans.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/transform/models/L1_inlets/loans/personal_loans.yml b/transform/models/L1_inlets/loans/personal_loans.yml index 5cd69580..cab9bd55 100644 --- a/transform/models/L1_inlets/loans/personal_loans.yml +++ b/transform/models/L1_inlets/loans/personal_loans.yml @@ -17,7 +17,7 @@ models: collections - name: collection_recovery_fee description: Post charge off collection fee - - name: delinq_2yrs + - name: delinq_2_yrs description: The number of 30+ days past-due incidences of delinquency in the borrower's credit file for the past 2 years - name: desc @@ -46,7 +46,7 @@ models: description: The home ownership status provided by the borrower during registration - name: initial_list_status description: The initial listing status of the loan - - name: inq_last_6mths + - name: inq_last_6_mths description: The number of inquiries in past 6 months (excluding auto and mortgage inquiries) - name: installment From 5e7c24f3cc29bd6c6ad06e9fbad15eb586551c09 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Mon, 21 Oct 2024 16:37:43 -0700 Subject: [PATCH 25/25] disable checkpoint check --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d2b8a488..2ecfc607 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -8,7 +8,7 @@ repos: - id: check-source-table-has-description - id: check-script-semicolon - id: check-script-has-no-table-name - - id: check-script-ref-and-source + # - id: check-script-ref-and-source - id: check-model-has-description - id: check-model-has-properties-file - id: check-model-has-all-columns