Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic tables Updates #216

Merged
merged 27 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
40 changes: 40 additions & 0 deletions .github/workflows/integration_airflow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
name: Test and Check on Pull Request

on: # yamllint disable-line rule:truthy
pull_request:
paths:
- orchestrate/*
- orchestrate/**/*

# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:

# This cancels a run if another change is pushed to the same branch
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
airflow:
name: Pull Request Airflow Tests
runs-on: ubuntu-latest

container: datacoves/ci-airflow-dbt-snowflake:3.2

env:
AIRBYTE__EXTRACT_LOCATION: /__w/${{ github.event.repository.name }}/${{ github.event.repository.name }}/load
AIRFLOW__CORE__DAGS_FOLDER: /__w/${{ github.event.repository.name }}/${{ github.event.repository.name }}/automate/airflow/dags
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 300

steps:
- name: Checkout branch
uses: actions/checkout@v3.5.0
with:
fetch-depth: 0
ref: ${{ github.event.pull_request.head.sha }}

- name: Test DAG structure integrity (DagBag Loading)
run: "python /usr/app/load_dagbag.py"

- name: Test DBT Sources against DAGs' YAML files
run: "python /usr/app/test_dags.py --dag-loadtime-threshold 1 --check-variable-usage"
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,20 @@ jobs:

##### Governance Checks
# this first runs dbt but creates enpty tables, this is enough to then run the hooks and fail fast

# There is an issue with --empty and dynamic tables so need to exclude them
- name: Governance run of dynamic tables
run: "dbt build --fail-fast -s config.materialized:dynamic_table --defer --state logs"

# There is an issue with --empty and dynamic tables so need to exclude them
- name: Governance run of dbt with EMPTY models using slim mode
if: ${{ steps.prod_manifest.outputs.manifest_found == 'true' && contains(github.event.pull_request.labels.*.name, 'full-refresh') != true }}
run: "dbt build --fail-fast --defer --state logs --select state:modified+ --empty"
run: "dbt build --fail-fast --defer --state logs --select state:modified+ --empty --exclude config.materialized:dynamic_table"

# There is an issue with --empty and dynamic tables so need to exclude
- name: Governance run of dbt with EMPTY models using full run
if: ${{ steps.prod_manifest.outputs.manifest_found == 'false' || contains(github.event.pull_request.labels.*.name, 'full-refresh') }}
run: "dbt build --fail-fast --empty"
run: "dbt build --fail-fast --empty --exclude config.materialized:dynamic_table"

- name: Generate Docs Combining Prod and branch catalog.json
run: "dbt-coves generate docs --merge-deferred --state logs"
Expand All @@ -110,27 +117,3 @@ jobs:
- name: Drop PR database on Failure to grant security access
if: always() && (env.DATACOVES__DROP_DB_ON_FAIL == 'true') && (steps.grant-access-to-database.outcome == 'failure')
run: "dbt --no-write-json run-operation drop_recreate_db --args '{db_name: ${{env.DATACOVES__MAIN__DATABASE}}, recreate: False}'" # yamllint disable-line rule:line-length

airflow:
name: Pull Request Airflow Tests
runs-on: ubuntu-latest

container: datacoves/ci-airflow-dbt-snowflake:3

env:
AIRBYTE__EXTRACT_LOCATION: /__w/${{ github.event.repository.name }}/${{ github.event.repository.name }}/load
AIRFLOW__CORE__DAGS_FOLDER: /__w/${{ github.event.repository.name }}/${{ github.event.repository.name }}/automate/airflow/dags
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 300

steps:
- name: Checkout branch
uses: actions/checkout@v3.5.0
with:
fetch-depth: 0
ref: ${{ github.event.pull_request.head.sha }}

- name: Test DAG structure integrity (DagBag Loading)
run: "python /usr/app/load_dagbag.py"

- name: Test DBT Sources against DAGs' YAML files
run: "python /usr/app/test_dags.py"
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ repos:
- id: check-source-table-has-description
- id: check-script-semicolon
- id: check-script-has-no-table-name
- id: check-script-ref-and-source
# - id: check-script-ref-and-source
- id: check-model-has-description
- id: check-model-has-properties-file
- id: check-model-has-all-columns
Expand Down
29 changes: 29 additions & 0 deletions load/dlt/loans/datacoves_snowflake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import os
import dlt

def set_config_value(key, config_key, env_var_prefix = 'DATACOVES__MAIN_LOAD__'):

env_var = env_var_prefix + key.upper()

value = os.getenv(env_var, dlt.config[config_key])

if key != 'password':
print(key + ": " +value)
return value

config_keys = ["account", "database", "warehouse", "role", "user", "password"]

db_config = {}
for key in config_keys:
config_key = "destination.snowflake.credentials." + key

try:
dlt.config[config_key]
except dlt.common.configuration.exceptions.ConfigFieldMissingException:
dlt.config[config_key] = ''

db_config[key] = set_config_value(key, config_key)

# This is needed because by default dlt calls the snowflake account host
db_config['host'] = db_config['account']
db_config['username'] = db_config['user']
59 changes: 59 additions & 0 deletions load/dlt/loans/loans_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/usr/bin/env -S uv run --cache-dir /tmp/.uv_cache
# /// script
# dependencies = [
# "dlt[snowflake, parquet]==1.1.0",
# "enlighten~=1.12.4",
# "psutil~=6.0.0",
# "pandas==2.2.2",
# ]
# ///
"""Loads a CSV file to Snowflake"""
import dlt
import pandas as pd
from datacoves_snowflake import db_config

# a resource is the individual endpoints or tables
@dlt.resource(write_disposition="replace")
# method name = table name
def personal_loans():
personal_loans = "https://datacoves-sample-data-public.s3.us-west-2.amazonaws.com/PERSONAL_LOANS.csv"
df = pd.read_csv(personal_loans)
yield df

def zip_coordinates():
zip_coordinates = "https://datacoves-sample-data-public.s3.us-west-2.amazonaws.com/ZIP_COORDINATES.csv"
df = pd.read_csv(zip_coordinates)
yield df

# Source (corresponds to API or database)
@dlt.source
def personal_loans_source():
return [personal_loans]

@dlt.source
def zip_coordinates_source():
return [zip_coordinates]

if __name__ == "__main__":
datacoves_snowflake = dlt.destinations.snowflake(
db_config,
destination_name="datacoves_snowflake"
)

pipeline = dlt.pipeline(
progress = "enlighten",
pipeline_name = "loans",
destination = datacoves_snowflake,
pipelines_dir = "/tmp/",

# dataset_name is the target schema name
dataset_name="loans"
)

load_info = pipeline.run(personal_loans())

print(load_info)

load_info = pipeline.run(zip_coordinates())

print(load_info)
3 changes: 2 additions & 1 deletion orchestrate/dags/daily_loan_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
from operators.datacoves.bash import DatacovesBashOperator
from operators.datacoves.dbt import DatacovesDbtOperator

my_var = Variable.get("ng_test_var")

@dag(
default_args={"start_date": "2021-01"},
description="Loan Run",
schedule_interval="0 0 1 */12 *",
tags=["version_5"],
tags=["version_7"],
catchup=False,
)
def daily_loan_run():
Expand Down
5 changes: 5 additions & 0 deletions orchestrate/dags/yaml_dbt_dag.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime

from airflow.decorators import dag
from airflow.models import Variable
from operators.datacoves.dbt import DatacovesDbtOperator


Expand All @@ -17,6 +18,10 @@
catchup=False,
)
def yaml_dbt_dag():
my_var = Variable.get("ng_test_var")

if my_var:
print(my_var)
run_dbt = DatacovesDbtOperator(
task_id="run_dbt", bash_command="dbt run -s personal_loans"
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
description: "Sample DAG with Slack notification, custom image, and resource requests"
schedule_interval: "0 0 1 */12 *"
tags:
- version_2
- slack_notification
- blue_green
default_args:
start_date: 2023-01-01
owner: Noel Gomez
# Replace with the email of the recipient for failures
email: gomezn@example.com
email_on_failure: true
catchup: false

# Optional callbacks used to send Slack notifications
notifications:
on_success_callback:
notifier: airflow.providers.slack.notifications.slack.send_slack_notification
args:
text: "The DAG {{ dag.dag_id }} succeeded"
channel: "#general"
on_failure_callback:
notifier: airflow.providers.slack.notifications.slack.send_slack_notification
args:
text: "The DAG {{ dag.dag_id }} failed"
channel: "#general"

# DAG Tasks
nodes:
transform:
operator: operators.datacoves.dbt.DatacovesDbtOperator
type: task
config:
image: datacoves/airflow-pandas:latest
resources:
memory: 8Gi
cpu: 1000m

bash_command: "dbt run -s personal_loans"

# Sample failing task to test that notification is sent
# failing_task:
# operator: operators.datacoves.bash.DatacovesBashOperator
# type: task

# bash_command: "some_non_existant_command"
# dependencies: ["transform"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
description: "Sample DAG with MS Teams notification"
schedule_interval: "0 0 1 */12 *"
tags:
- version_2
- ms_teams_notification
- blue_green
default_args:
start_date: 2023-01-01
owner: Noel Gomez
# Replace with the email of the recipient for failures
email: gomezn@example.com
email_on_failure: true
catchup: false

# Optional callbacks used to send MS Teams notifications
notifications:
on_success_callback:
notifier: notifiers.datacoves.ms_teams.MSTeamsNotifier
args:
connection_id: DATACOVES_MS_TEAMS
# message: Custom success message
theme_color: 0000FF
on_failure_callback:
notifier: notifiers.datacoves.ms_teams.MSTeamsNotifier
args:
connection_id: DATACOVES_MS_TEAMS
# message: Custom error message
theme_color: 9900FF

# DAG Tasks
nodes:
transform:
operator: operators.datacoves.dbt.DatacovesDbtOperator
type: task

bash_command: "dbt run -s personal_loans"

# Sample failing task to test that notification is sent
# failing_task:
# operator: operators.datacoves.bash.DatacovesBashOperator
# type: task

# bash_command: "some_non_existant_command"
# dependencies: ["transform"]
8 changes: 7 additions & 1 deletion secure/roles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
- z_stage_resources_read

- z_wh_transforming
- z_wh_transforming_dynamic_tables

- analyst:
member_of:
Expand Down Expand Up @@ -83,6 +84,7 @@
- z_tables_views_general
- z_wh_integration
- z_wh_orchestrating
- z_wh_transforming_dynamic_tables
- z_stage_dbt_artifacts_write
owns:
databases:
Expand Down Expand Up @@ -118,7 +120,7 @@
- z_tables_views_general
- z_policy_row_region_all


##########################
# Global Roles
##########################
Expand Down Expand Up @@ -224,6 +226,10 @@
warehouses:
- wh_transforming

- z_wh_transforming_dynamic_tables:
warehouses:
- wh_transforming_dynamic_tables

- z_wh_orchestrating:
warehouses:
- wh_orchestrating
Expand Down
6 changes: 6 additions & 0 deletions secure/warehouses.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@
auto_resume: true
initially_suspended: true

- wh_transforming_dynamic_tables:
size: x-small
auto_suspend: 60
auto_resume: true
initially_suspended: true

- wh_transforming_sqlmesh:
size: x-small
auto_suspend: 60
Expand Down
Loading
Loading