Skip to content

Latest commit

 

History

History

Meltano Orchestrating dbt using Airflow

This project is based on the dbt's classic Jaffle shop example project but in a Meltano context and builds off the singer_dbt_jaffle sample project in this repo.

The Airflow DAG generator code files-airflow-dbt that fuels this functionality is ⚠️ Experimental 🚧 and being reviewed in the Merge Request with more in depth discussion taking place in this issue.

We're looking for feedback so feel free to jump into the issue and leave comments!

What is this repo?

A Meltano project to share the benefits of running dbt core within a Meltano project. It can also serve as an example of how to configure your own Meltano project.

The idea is to use dbt's Jaffle shop example project but instead of using dbt seed to load the data into your warehouse (aka local Postgres instance) you will use tap-csv and target-postgres to simulate how to use Singer as a EL tool.

What's contained in this project?

The Meltano project has the following plugins installed and configured:

  • EL
    • tap-csv (Singer)
    • target-postgres (Singer)
  • Transformation
    • dbt
  • Orchestration
    • Airflow

Features we'll specifically explore is the ability to schedule dbt models to run in Airflow at model level precision. We will set DAG definitions, like the one below, using dbt selection syntax that automatically generate Airflow DAGs at the model level, including the Meltano EL sync that feeds the dbt source table.

The DAG definitions configuration file is a custom file thats not part of the Airflow, dbt, or Meltano. A version of the dags key will likely be added to meltano.yml in the near future, at that point this can be migrated there.

dags:
  dag_definitions:
    # Example DAGs to show how you might want to schedule different models in a more precise way
    full_dag:
      # Weekends - Full DAG once a day end to end
      generator: dbt
      interval: '0 0 * * 6,0'
      selection_rule: '*'

It translates the dbt lineage graph from the Jaffle shop example that looks like this:

dbt_docs_lineage

Into Airflow DAGs where each model is an Airflow task and dbt tests are run following the model.

airflow_full_dag

Further we can configure dbt graph operators like +orders to define how the Airflow DAG should render, in this case all upstream of the orders model:

dags:
  dag_definitions:
    orders:
      # Orders consumption model and all upstream, every 2 hrs only on weekdays
      generator: dbt
      interval: '0 */2 * * 1-5'
      selection_rule: '+orders'

The dbt lineage graph given the selection criteria looks like this:

dbt_docs_lineage_orders

Becomes an Airflow DAG that looks like this:

airflow_orders

Prerequisites

Having Meltano installed! See https://docs.meltano.com/guide/installation for more details.

How to run this project?

  1. Clone this repo:

    git clone https://github.com/pnadolny13/meltano_example_implementations.git
    cd meltano_example_implementations/meltano_projects/dbt_orchestration/
  2. Install Meltano:

    meltano install
  3. Start a local Postgres docker instance. It will contain a database called warehouse that we'll send our raw data to.

    docker run --rm --name postgres -e POSTGRES_PASSWORD=meltano -e POSTGRES_USER=meltano -e POSTGRES_DB=warehouse -d -p 5432:5432 postgres
  4. Create a .env file and add database secrets. This is mostly to encourage best practices since were using a local Postgres instance we don't have any sensitive credentials.

    touch .env
    echo PG_PASSWORD="meltano" >> .env
  5. Run the EL pipeline using Meltano

    meltano run tap-csv target-postgres

    You should now see data in your Postgres database.

  6. Compile your dbt project so your manifest.json is available for informing how to generate Airflow DAGs.

    meltano run dbt:compile
  7. Start Airflow as a background process. Also this creates an Airflow users called melty.

    meltano invoke airflow webserver -D
    meltano invoke airflow users create --username melty --firstname melty --lastname meltano --role Admin --password melty --email melty@meltano.com
    meltano invoke airflow scheduler -D

    You'll notice that a generator_cache.yml file will get created which caches your selection criteria results, Meltano schedules, and dbt manifest.json content.

    To kill the background Airflow processes run:

    pkill -f airflow
  8. Generate and serve your the dbt docs.

    meltano invoke dbt docs generate
    # Uses a custom port so it doesnt collide with the Airflow webserver
    meltano invoke dbt docs serve --port 8081
  9. Explore the UIs! Turn on the DAGs that your want to run and watch them create Postgres views.

    Try updating dag_definition.yml to add a DAG or update dbt select criteria. Remember to delete the generator_cache.yml (links to the sample file) file when you make changes to the dbt project or schedules, it will get re-created automatically.

  10. Explore the analysis utilities. This will analyze the selection criteria across all DAGs and let you know if any models are accidentally not selected. You can try commenting out full_dag and orders from dag_definition.yml and re-running to see the output when models are not selected.

    python orchestrate/dags/dbt_analyze.py