Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Passing Data Between Assets Guide #23598

Merged
merged 11 commits into from
Aug 13, 2024
10 changes: 10 additions & 0 deletions .github/workflows/build-docs-revamp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ jobs:
vercel-token: ${{ secrets.VERCEL_TOKEN }}
vercel-org-id: ${{ secrets.VERCEL_ORG_ID }}
vercel-project-id: ${{ secrets.VERCEL_DOCS_NEXT_PROJECT_ID }}
github-token: ${{ secrets.GITHUB_TOKEN }}
scope: ${{ secrets.VERCEL_ORG_ID }}

- name: Publish to Vercel Production
uses: amondnet/vercel-action@v25
if: github.event_name == 'push' && github.ref == 'refs/heads/docs/revamp'
with:
vercel-token: ${{ secrets.VERCEL_TOKEN }}
vercel-org-id: ${{ secrets.VERCEL_ORG_ID }}
vercel-project-id: ${{ secrets.VERCEL_PROJECT_ID }}
vercel-args: "--prod"
github-token: ${{ secrets.GITHUB_TOKEN }}
scope: ${{ secrets.VERCEL_ORG_ID }}
Expand Down
2 changes: 2 additions & 0 deletions docs/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
sphinx/_build/
.env
next/.env
*.duckdb
*.sqlite
8 changes: 4 additions & 4 deletions docs/docs-next/docs/concepts/io-managers.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
title: "I/O managers"
---
## =======

# I/O managers
## title: "I/O managers"

# I/O managers
9 changes: 9 additions & 0 deletions docs/docs-next/docs/concepts/understanding-assets.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
title: Understanding Assets
description: Understanding the concept of assets in Dagster
last_update:
date: 2024-08-11
author: Pedram Navid
---


4 changes: 3 additions & 1 deletion docs/docs-next/docs/guides/data-assets.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@
title: "Data assets"
---

# Data assets
# Data assets

TODO: fill in this section
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
title: "Adding metadata to assets"
sidebar_position: 4
sidebar_position: 40
sidebar_label: "Adding metadata"
---

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
title: "Creating asset factories"
sidebar_position: 5
sidebar_position: 50
sidebar_label: "Creating asset factories"
---

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
title: "Creating data assets"
sidebar_position: 1
sidebar_position: 10
sidebar_label: "Creating data assets"
---

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
title: "Creating dependencies between assets"
sidebar_position: 2
sidebar_position: 20
sidebar_label: "Creating asset dependencies"
---

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from dagster import asset


# Warning! This is not the right way to create assets
@asset
def download_files():
# Download files from S3, the web, etc.
...


@asset
def unzip_files():
# Unzip files to local disk or persistent storage
...


@asset
def load_data():
# Read data previously written and store in a data warehouse
...
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import sqlite3
import tempfile

from dagster import AssetExecutionContext, Definitions, asset

database_file = tempfile.NamedTemporaryFile()


# highlight-start
@asset
def asset1():
with sqlite3.connect("database.sqlite") as conn:
conn.execute("CREATE OR REPLACE TABLE test (i INTEGER)")
conn.execute("INSERT INTO test VALUES (42)")


@asset(deps=[asset1])
def asset2(context: AssetExecutionContext):
with sqlite3.connect("database.sqlite") as conn:
result = conn.execute("SELECT * FROM test").fetchall()
context.log.info(result)
# highlight-end


defs = Definitions(assets=[asset1, asset2])

if __name__ == "__main__":
from dagster import materialize

materialize(assets=[asset1, asset2])
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import pandas as pd
from dagster import Definitions, asset
from dagster_duckdb_pandas import DuckDBPandasIOManager

# highlight-start
duckdb_io_manager = DuckDBPandasIOManager(database="my_database.duckdb", schema="my_schema")


@asset
def people():
return pd.DataFrame({"id": [1, 2, 3], "name": ["Alice", "Bob", "Charlie"]})


@asset
def birds():
return pd.DataFrame({"id": [1, 2, 3], "name": ["Bluebird", "Robin", "Eagle"]})


@asset
def combined_data(people, birds):
return pd.concat([people, birds])
# highlight-end


defs = Definitions(
assets=[people, birds, combined_data],
resources={"io_manager": duckdb_io_manager},
)

if __name__ == "__main__":
from dagster import materialize

materialize(assets=[people, birds, combined_data])
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from typing import List

from dagster import asset


def download_files() -> str:
# Download files from S3, the web, etc.
...


def unzip_files(zipfile: str) -> List[str]:
# Unzip files to local disk or persistent storage
...


def load_data(files: List[str]):
# Read data previously written and store in a data warehouse
...


@asset
def my_dataset():
zipped_files = download_files()
files = unzip_files(zipped_files)
load_data(files)
123 changes: 120 additions & 3 deletions docs/docs-next/docs/guides/data-assets/passing-data-between-assets.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,124 @@
---
title: "Passing data between assets"
sidebar_position: 3
title: How to Pass Data Between Assets
description: Learn how to pass data between assets in Dagster
sidebar_position: 30
sidebar_label: "Passing data between assets"
last_update:
date: 2024-08-11
author: Pedram Navid
---

# Passing data between assets
As you develop your data pipeline, you'll likely need to pass data between assets. By the end of this guide, you'll have a solid understanding of the different approaches to passing data between assets and when to use each one.

---

<details>
<summary>Prerequisites</summary>

To follow the steps in this guide, you'll need:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something I really appreciate when I read other docs sites is when they link to a page that has the information i need to have a basic understanding (so maybe the concept pages for us?). Then I can open it and be like "i've read this im good" or if it's something i know nothing about I am given the resource i need to learn it.

- A basic understanding of Dagster concepts such as assets and resources
- Dagster installed, as well as the `dagster-duckdb-pandas` package
</details>

---

## Overview

In Dagster, assets are the building blocks of your data pipeline and it's common to want to pass data between them. This guide will help you understand how to pass data between assets.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be useful to include a fourth "fake" case which is "you do not pass data between assets because your pipeline is not processing data directly"

This would be something like:

@asset 
def people(): 
 """ call the lambda function that loads people"""
 
@asset 
def birds(): 
 """ call the lambda function that loads birds"""
 
 
 @asset(
   deps = [people, birds]
 ) 
def people_and_birds(): 
 """ call the stored procedure that concats people and birds"""

There are three ways of passing data between assets:

- Explicitly managing data, by using external storage
- Implicitly managing data, using IO Managers
- Avoiding passing data between assets altogether by combining several tasks into a single asset

This guide walks through all three methods.

---

## Move Data Between Assets Explicitly Using External Storage

A common and recommended approach to passing data between assets is explicitly managing data using external storage. This example pipeline uses a SQLite database as external storage:

<CodeExample filePath="guides/data-assets/passing-data-assets/passing-data-explicit.py" language="python" title="Using External Storage" />

In this example, the first asset opens a connection to the SQLite database and writes data to it. The second asset opens a connection to the same database and reads data from it. The dependency between the first asset and the second asset is made explicit through the asset's `deps` argument.

The benefits of this approach are:
- It's explicit and easy to understand how data is stored and retrieved
- You have maximum flexibility in terms of how and where data is stored, for example, based on environment

The downsides of this approach are:
- You need to manage connections and transactions manually
PedramNavid marked this conversation as resolved.
Show resolved Hide resolved
- You need to handle errors and edge cases, for example, if the database is down or if a connection is closed

## Move Data Between Assets Implicitly Using IO Managers

Dagster's IO Managers are a powerful feature that manages data between assets by defining how data is read from and written to external storage. They help separate business logic from I/O operations, reducing boilerplate code and making it easier to change where data is stored.

I/O managers handle:
1. **Input**: Reading data from storage and loading it into memory for use by dependent assets.
2. **Output**: Writing data to the configured storage location.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: IO -> I/O (couple other places where this applies as well)

For a deeper understanding of IO Managers, check out the [Understanding IO Managers](/concepts/io-managers) guide.

<CodeExample filePath="guides/data-assets/passing-data-assets/passing-data-io-manager.py" language="python" title="Using IO Managers" />

In this example, a `DuckDBPandasIOManager` is instantiated to run using a local file. The IO manager handles both reading and writing to the database.

:::warning

This example works for local development, but in a production environment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Consider a cloud hosted environment for production purposes"

nit: is this referring to cloud hosted storage? using "environment" to refer to two different things (where the computation is performed and where the database is hosted) confused me a bit

each step would execute in a separate environment and would not have access to the same file system. Consider a cloud-hosted environment for production purposes.

:::

The `people()` and `birds()` assets both write their dataframes to DuckDB
for persistent storage. The `combined_data()` asset requests data from both assets by adding them as parameters to the function, and the IO Manager handles the reading them from DuckDB and making them available to the `combined_data` function as dataframes. Note that when you use IO Managers you do not need to manually add the asset's dependencies through the `deps` argument.

The benefits of this approach are:
- The reading and writing of data is handled by the IO Manager, reducing boilerplate code
- It's easy to swap out different IO Managers based on environments without changing the underlying asset computation

The downsides of this approach are:
- The IO Manager approach is less flexible should you need to customize how data is read or written to storage
- Some decisions may be made by the IO Manager for you, such as naming conventions that can be hard to override.

## Avoid Passing Data Between Assets by Combining Assets

In some cases, you may find that you can avoid passing data between assets by
carefully considering how you have modeled your pipeline:

Consider this example:

<CodeExample filePath="guides/data-assets/passing-data-assets/passing-data-avoid.py" language="python" title="Avoid Passing Data Between Assets" />

This example downloads a zip file from Google Drive, unzips it, and loads the data into a pandas DataFrame. It relies on each asset running on the same file system to perform these operations.

The assets are modeled as tasks, rather than as data assets. For more information on the difference between tasks and data assets, check out the [Thinking in Assets](/concepts/assets/thinking-in-assets) guide.

In this refactor, the `download_files`, `unzip_files`, and `load_data` assets are combined into a single asset, `my_dataset`. This asset downloads the files, unzips them, and loads the data into a data warehouse.

<CodeExample filePath="guides/data-assets/passing-data-assets/passing-data-rewrite-assets.py" language="python" title="Avoid Passing Data Between Assets" />

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a word missing in this sentence

This approach still handles passing data explicitly, but no longer does it across assets,
instead within a single asset. This pipeline still assumes enough disk and
memory available to handle the data, but for smaller datasets, it can work well.

The benefits of this approach are:
- All the computation that defines how an asset is created is contained within a single asset, making it easier to understand and maintain
- It can be faster than relying on external storage, and doesn't require the overhead of setting up additional compute instances.


The downsides of this approach are:
- It makes certain assumptions about how much data is being processed
- It can be difficult to reuse functions across assets, since they're tightly coupled to the data they produce
- It may not always be possible to swap functionality based on the environment you are running in. For example, if you are running in a cloud environment, you may not have access to the local file system.


---

## Related Resources

TODO: add links to relevant API documentation here.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
title: "Selecting subsets of assets"
sidebar_position: 6
sidebar_position: 60
sidebar_label: "Selecting assets"
---

Expand Down
44 changes: 44 additions & 0 deletions docs/docs-next/src/components/CodeExample.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import React from 'react';
import CodeBlock from '@theme/CodeBlock';

interface CodeExampleProps {
filePath: string;
language?: string;
title?: string;
}

const CodeExample: React.FC<CodeExampleProps> = ({ filePath, language, title }) => {
const [content, setContent] = React.useState<string>('');
const [error, setError] = React.useState<string | null>(null);

React.useEffect(() => {
// Adjust the import path to start from the docs directory
import(`!!raw-loader!/docs/${filePath}`)
.then((module) => {
const lines = module.default.split('\n');
const mainIndex = lines.findIndex(line => line.trim().startsWith('if __name__ == '));
const strippedContent = mainIndex !== -1 ? lines.slice(0, mainIndex).join('\n') : module.default;
setContent(strippedContent);
setError(null);
})
.catch((error) => {
console.error(`Error loading file: ${filePath}`, error);
setError(`Failed to load file: ${filePath}. Please check if the file exists and the path is correct.`);
});
}, [filePath]);

if (error) {
return <div style={{ color: 'red', padding: '1rem', border: '1px solid red' }}>{error}</div>;
}

return (
<CodeBlock
language={language}
title={title || filePath}
>
{content || 'Loading...'}
</CodeBlock>
);
};

export default CodeExample;
2 changes: 1 addition & 1 deletion docs/docs-next/src/styles/custom.scss
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
--ifm-color-primary-lightest: var(--dagster-color-gray-50);

// modified base vars
--ifm-code-font-size: 90%;
--ifm-code-font-size: 80%;
--ifm-navbar-height: 70px;

// brand-specific colors
Expand Down
Loading
Loading