🚧 This repo is meant for demoing purposes - don't use in production.
Inspired by the dbt-Airflow provider, this is an exploratory effort for adding pipelines to bacalhau. Find the related design doc here.
Tested on Bacalhau version >= v0.3.6.
Install Airflow:
conda create --name bacalhau python=3.9
conda activate bacalhau
AIRFLOW_VERSION=2.4.1
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
export AIRFLOW_HOME=~/airflow
airflow db init
To install this provider we need to place it within the AIRFLOW_HOME/dags
folder:
git clone https://github.com/enricorotundo/bacalhau-airflow-provider.git
ln -s <PATH_TO_THIS_REPO>/bacalhau-airflow-provider/ ${AIRFLOW_HOME}/dags
In a separate terminal:
export AIRFLOW_HOME=~/airflow
airflow standalone
airflow dags test bacalhau-image-processing
airflow dags test bacalhau-integer-sum
Head over to http://0.0.0.0:8080 to inspect the running dags.
For data lineage, the Bacalhau operator integrates the methods get_openlineage_facets_on_start
and
get_openlineage_facets_on_complete
. It allows it to be compliant with openlineage-airflow
standards to collect metadata
Along an Airflow instance, a client needs an OpenLineage compatible backend. In our case we will be focusing on Marquez, an open source solution for he collection, aggregation, and visualization of a data ecosystem's metadata.
To setup a Marquez backend, it is advised to follow the Quickstart from their README.
Once the backend runs, an environment variable needs to be set to our Airflow components, OPENLINEAGE_URL
. This variable
will tell the openlineage-airflow
library where to write our executions metadata. For a complete list of the available
variables, see OpenLineage doc.
The openlineage-airflow
library integrated in will run the methods as so:
- On TaskInstance start, collect metadata for each task
- Collect task input / output metadata (source, schema, etc) with the
get_openlineage_facets_on_start
method - Collect task run-level metadata (execution time, state, parameters, etc) with
get_openlineage_facets_on_complete
method - On TaskInstance complete, also mark the task as complete in Marquez
The information that we are extracted through this Operator are:
- ID: Unique global ID of this job in the bacalhau network.
- ClientID: ID of the client that created this job.
- Inputs: Data volumes read in the job
- Outputs: Data volumes we will write in the job
In the future it would be nice to also support:
- APIVersion: APIVersion of the Job
- CreatedAt: Time the job was submitted to the bacalhau network
- Spec fields in the Job model
Before contributing, refer to OpenLineage doc to understand the data schemas that can be used in OpenLineage.
If you have any questions or feedback, please reach out to enricorotundo
on the #bacalhau
channel in Filecoin Slack.