Skip to content

Commit

Permalink
feature: kedro-airflow DAG kwarg configuration
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Brugman <sfbbrugman@gmail.com>
  • Loading branch information
sbrugman committed Jul 13, 2023
1 parent 884c1fe commit 3025dc2
Show file tree
Hide file tree
Showing 7 changed files with 425 additions and 71 deletions.
58 changes: 57 additions & 1 deletion kedro-airflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Please visit the guide to [deploy Kedro as a Python package](https://kedro.readt

#### What if my DAG file is in a different directory to my project folder?

By default the generated DAG file is configured to live in the same directory as your project as per this [template](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-airflow/kedro_airflow/airflow_dag_template.j2#L44). If your DAG file is located in a different directory to your project, you will need to tweak this manually after running the `kedro airflow create` command.
By default, the generated DAG file is configured to live in the same directory as your project as per this [template](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-airflow/kedro_airflow/airflow_dag_template.j2#L44). If your DAG file is located in a different directory to your project, you will need to tweak this manually after running the `kedro airflow create` command.

#### What if I want to use a different Jinja2 template?

Expand All @@ -55,3 +55,59 @@ You can use the additional command line argument `--jinja-file` (alias `-j`) to
```bash
kedro airflow create --jinja-file=./custom/template.j2
```

#### How can I pass arguments to the Airflow DAGs dynamically?

`kedro-airflow` picks up configuration from `airflow.yml` files. The [parameters](https://docs.kedro.org/en/stable/configuration/parameters.html) are read by Kedro.
Arguments can be specified globally, or per pipeline:

```yaml
# Global parameters
default:
start_date: [2023, 1, 1]
max_active_runs: 3
# https://airflow.apache.org/docs/stable/scheduler.html#dag-runs
schedule_interval: "@once"
catchup: false
# Default settings applied to all tasks
owner: "airflow"
depends_on_past: false
email_on_failure: false
email_on_retry: false
retries: 1
retry_delay: 5

# Arguments specific to the pipeline (overrides the parameters above)
data_science:
owner: "airflow-ds"
```
Arguments can also be passed via `--params` in the command line:

```bash
kedro airflow create --params "schedule_interval='@weekly'"
```

These variables are passed to the Jinja2 template.

#### What if I want to pass different arguments?

In order to pass arguments other than those specified in the default template, simply pass a custom template (see: _"What if I want to use a different Jinja2 template?"_)

The syntax for arguments is:
```
{{ argument_name }}
```

In order to make arguments optional, one can use:
```
{{ argument_name | default("default_value") }}
```

For examples, please have a look at the default template (`airflow_dag_template.j2`).

#### How can I use Airflow runtime parameters?

It is possible to pass parameters when triggering an Airflow DAG from the user interface.
In order to use this feature, create a custom template using the [Params syntax](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/params.html).
See _"What if I want to use a different Jinja2 template?"_ for instructions on using custom templates.
5 changes: 5 additions & 0 deletions kedro-airflow/RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# Upcoming release 0.5.2
* Change reference to `kedro.pipeline.Pipeline` object throughout test suite with `kedro.modular_pipeline.pipeline` factory.
* Migrate all project metadata to static `pyproject.toml`.
* Configure DAG kwargs via `airflow.yml`.
* The generated DAG file now contains the pipeline name.
* Included help for CLI arguments (see `kedro airflow create --help`).
* Raise error when pipeline does not exists.
* Added tests for CLI arugments.

# Release 0.5.1
* Added additional CLI argument `--jinja-file` to provide a path to a custom Jinja2 template.
Expand Down
61 changes: 29 additions & 32 deletions kedro-airflow/kedro_airflow/airflow_dag_template.j2
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import datetime, timedelta
from pathlib import Path
from typing import Union

from airflow import DAG
from airflow.models import BaseOperator
Expand All @@ -10,14 +11,13 @@ from kedro.framework.project import configure_project


class KedroOperator(BaseOperator):

@apply_defaults
def __init__(
self,
package_name: str,
pipeline_name: str,
node_name: str,
project_path: str,
project_path: Union[str, Path],
env: str,
*args, **kwargs
) -> None:
Expand All @@ -35,46 +35,43 @@ class KedroOperator(BaseOperator):
env=self.env) as session:
session.run(self.pipeline_name, node_names=[self.node_name])


# Kedro settings required to run your pipeline
env = "{{ env }}"
pipeline_name = "{{ pipeline_name }}"
project_path = Path.cwd()
package_name = "{{ package_name }}"

# Default settings applied to all tasks
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}

# Using a DAG context manager, you don't have to specify the dag property of each task
with DAG(
"{{ dag_name | safe | slugify }}",
start_date=datetime(2019, 1, 1),
max_active_runs=3,
schedule_interval=timedelta(minutes=30), # https://airflow.apache.org/docs/stable/scheduler.html#dag-runs
default_args=default_args,
catchup=False # enable if you don't want historical dag runs to run
) as dag:

tasks = {}
{% for node in pipeline.nodes %}
tasks["{{ node.name | safe | slugify }}"] = KedroOperator(
task_id="{{ node.name | safe | slugify }}",
package_name=package_name,
pipeline_name=pipeline_name,
node_name="{{ node.name | safe }}",
project_path=project_path,
env=env,
dag_id="{{ dag_name | safe | slugify }}",
start_date=datetime({{ start_date | default([2023, 1, 1]) | join(",")}}),
max_active_runs={{ max_active_runs | default(3) }},
# https://airflow.apache.org/docs/stable/scheduler.html#dag-runs
schedule_interval="{{ schedule_interval | default('@once') }}",
catchup={{ catchup | default(False) }},
# Default settings applied to all tasks
default_args=dict(
owner="{{ owner | default('airflow') }}",
depends_on_past={{ depends_on_past | default(False) }},
email_on_failure={{ email_on_failure | default(False) }},
email_on_retry={{ email_on_retry | default(False) }},
retries={{ retries | default(1) }},
retry_delay=timedelta(minutes={{ retry_delay | default(5) }})
)
{% endfor %}
) as dag:
tasks = {
{% for node in pipeline.nodes %} "{{ node.name | safe | slugify }}": KedroOperator(
task_id="{{ node.name | safe | slugify }}",
package_name=package_name,
pipeline_name=pipeline_name,
node_name="{{ node.name | safe }}",
project_path=project_path,
env=env,
),
{% endfor %} }

{% for parent_node, child_nodes in dependencies.items() -%}
{% for child in child_nodes %}
tasks["{{ parent_node.name | safe | slugify }}"] >> tasks["{{ child.name | safe | slugify }}"]
{% for child in child_nodes %} tasks["{{ parent_node.name | safe | slugify }}"] >> tasks["{{ child.name | safe | slugify }}"]
{% endfor %}
{%- endfor %}
72 changes: 66 additions & 6 deletions kedro-airflow/kedro_airflow/plugin.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
""" Kedro plugin for running a project with Airflow """
from __future__ import annotations

from collections import defaultdict
from pathlib import Path
from typing import Any

import click
import jinja2
from click import secho
from kedro.config import MissingConfigException
from kedro.framework.cli.project import PARAMS_ARG_HELP
from kedro.framework.cli.utils import ENV_HELP, KedroCliError, _split_params
from kedro.framework.context import KedroContext
from kedro.framework.project import pipelines
from kedro.framework.startup import ProjectMetadata
from kedro.framework.session import KedroSession
from kedro.framework.startup import ProjectMetadata, bootstrap_project
from slugify import slugify

PIPELINE_ARG_HELP = """Name of the registered pipeline to convert.
If not set, the '__default__' pipeline is used."""


@click.group(name="Kedro-Airflow")
def commands(): # pylint: disable=missing-function-docstring
Expand All @@ -22,15 +32,35 @@ def airflow_commands():
pass


def _load_config(
context: KedroContext, pipeline_name: str, patterns: list[str]
) -> dict[str, Any]:
try:
config_airflow = context.config_loader.get(*patterns)
dag_config = {}
# Load the default config if specified
if "default" in config_airflow:
dag_config.update(config_airflow["default"])
# Update with pipeline-specific config if present
if pipeline_name in config_airflow:
dag_config.update(config_airflow[pipeline_name])
except MissingConfigException:
dag_config = {}
return dag_config


@airflow_commands.command()
@click.option("-p", "--pipeline", "pipeline_name", default="__default__")
@click.option("-e", "--env", default="local")
@click.option(
"-p", "--pipeline", "pipeline_name", default="__default__", help=PIPELINE_ARG_HELP
)
@click.option("-e", "--env", default="local", help=ENV_HELP)
@click.option(
"-t",
"--target-dir",
"target_path",
type=click.Path(writable=True, resolve_path=True, file_okay=False),
default="./airflow_dags/",
help="The directory path to store the generated Airflow dags",
)
@click.option(
"-j",
Expand All @@ -39,6 +69,22 @@ def airflow_commands():
exists=True, readable=True, resolve_path=True, file_okay=True, dir_okay=False
),
default=Path(__file__).parent / "airflow_dag_template.j2",
help="The template file for the generated Airflow dags",
)
@click.option(
"--params",
type=click.UNPROCESSED,
default="",
help=PARAMS_ARG_HELP,
callback=_split_params,
)
@click.option(
"-c",
"--config-patterns",
multiple=True,
type=click.STRING,
default=["airflow*", "airflow/**"],
help="Config pattern for airflow.yml",
)
@click.pass_obj
def create(
Expand All @@ -47,23 +93,36 @@ def create(
env,
target_path,
jinja_file,
params,
config_patterns,
): # pylint: disable=too-many-locals,too-many-arguments
"""Create an Airflow DAG for a project"""
project_path = Path().cwd()
bootstrap_project(project_path)
with KedroSession.create(project_path=project_path, env=env) as session:
context = session.load_context()
dag_config = _load_config(context, pipeline_name, config_patterns)

# Update with params if provided
dag_config.update(params)

jinja_file = Path(jinja_file).resolve()
loader = jinja2.FileSystemLoader(jinja_file.parent)
jinja_env = jinja2.Environment(autoescape=True, loader=loader, lstrip_blocks=True)
jinja_env.filters["slugify"] = slugify
template = jinja_env.get_template(jinja_file.name)

package_name = metadata.package_name
dag_filename = f"{package_name}_dag.py"
dag_filename = f"{package_name}_{pipeline_name}_dag.py"

target_path = Path(target_path)
target_path = target_path / dag_filename

target_path.parent.mkdir(parents=True, exist_ok=True)

pipeline = pipelines.get(pipeline_name)
if pipeline is None:
raise KedroCliError(f"Pipeline {pipeline_name} not found.")

dependencies = defaultdict(list)
for node, parent_nodes in pipeline.node_dependencies.items():
Expand All @@ -77,14 +136,16 @@ def create(
pipeline_name=pipeline_name,
package_name=package_name,
pipeline=pipeline,
**dag_config,
).dump(str(target_path))

secho("")
secho("An Airflow DAG has been generated in:", fg="green")
secho(str(target_path))
secho("This file should be copied to your Airflow DAG folder.", fg="yellow")
secho(
"The Airflow configuration can be customized by editing this file.", fg="green"
"The Airflow configuration can be customized by editing this file.",
fg="green",
)
secho("")
secho(
Expand All @@ -101,4 +162,3 @@ def create(
"And all local paths in both the data catalog and log config must be absolute paths.",
fg="yellow",
)
secho("")
2 changes: 2 additions & 0 deletions kedro-airflow/test_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ apache-airflow<3.0
bandit>=1.6.2, <2.0
behave
black~=22.0
cookiecutter>=2.1.1, <3.0
flake8
pre-commit>=1.17.0, <2.0
pylint>=2.5.2, <3.0
pytest
pytest-cov
pytest-mock
pytest-xdist
pyyaml
trufflehog>=2.1.0, <3.0
wheel
Loading

0 comments on commit 3025dc2

Please sign in to comment.