Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: kedro-airflow DAG kwarg configuration #233

Merged
merged 8 commits into from
Aug 1, 2023
88 changes: 87 additions & 1 deletion kedro-airflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Please visit the guide to [deploy Kedro as a Python package](https://kedro.readt

#### What if my DAG file is in a different directory to my project folder?

By default the generated DAG file is configured to live in the same directory as your project as per this [template](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-airflow/kedro_airflow/airflow_dag_template.j2#L44). If your DAG file is located in a different directory to your project, you will need to tweak this manually after running the `kedro airflow create` command.
By default, the generated DAG file is configured to live in the same directory as your project as per this [template](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-airflow/kedro_airflow/airflow_dag_template.j2#L44). If your DAG file is located in a different directory to your project, you will need to tweak this manually after running the `kedro airflow create` command.

#### What if I want to use a different Jinja2 template?

Expand All @@ -56,6 +56,92 @@ You can use the additional command line argument `--jinja-file` (alias `-j`) to
kedro airflow create --jinja-file=./custom/template.j2
```

#### How can I pass arguments to the Airflow DAGs dynamically?

`kedro-airflow` picks up configuration from `airflow.yml` in `conf/base` or `conf/local` of your Kedro project.
Or it could be in a folder starting with `airflow`.
The [parameters](https://docs.kedro.org/en/stable/configuration/parameters.html) are read by Kedro.
Arguments can be specified globally, or per pipeline:

```yaml
# Global parameters
default:
start_date: [2023, 1, 1]
max_active_runs: 3
# https://airflow.apache.org/docs/stable/scheduler.html#dag-runs
schedule_interval: "@once"
catchup: false
# Default settings applied to all tasks
owner: "airflow"
depends_on_past: false
email_on_failure: false
email_on_retry: false
retries: 1
retry_delay: 5

# Arguments specific to the pipeline (overrides the parameters above)
data_science:
owner: "airflow-ds"
```

Arguments can also be passed via `--params` in the command line:

```bash
kedro airflow create --params "schedule_interval='@weekly'"
```

These variables are passed to the Jinja2 template that creates an Airflow DAG from your pipeline.

### What if I want to use a configuration pattern other than `airflow*` and `airflow**`?

In order to configure the config loader, update the `settings.py` file in your Kedro project.
For instance, if you would like to use the name `scheduler`, then change the file as follows:

```python
CONFIG_LOADER_ARGS = {
"config_patterns": {"airflow": ["scheduler*", "scheduler/**"]}
}

Follow Kedro's official documentation, to see how to add templating, custom resolvers etc. (https://docs.kedro.org/en/stable/configuration/advanced_configuration.html#how-to-do-templating-with-the-omegaconfigloader)[https://docs.kedro.org/en/stable/configuration/advanced_configuration.html#how-to-do-templating-with-the-omegaconfigloader]

#### What if I want to pass different arguments?

In order to pass arguments other than those specified in the default template, simply pass a custom template (see: _"What if I want to use a different Jinja2 template?"_)
sbrugman marked this conversation as resolved.
Show resolved Hide resolved

The syntax for arguments is:
```
{{ argument_name }}
```

In order to make arguments optional, one can use:
```
{{ argument_name | default("default_value") }}
```

For examples, please have a look at the default template (`airflow_dag_template.j2`).

### What if I want to use a configuration file other than `airflow.yml`?

The default configuration pattern is `["airflow*", "airflow/**"]`.
In order to configure the `OmegaConfigLoader`, update the `settings.py` file in your Kedro project as follows:

```python
from kedro.config import OmegaConfigLoader
CONFIG_LOADER_CLASS = OmegaConfigLoader
CONFIG_LOADER_ARGS = {
# other args
"config_patterns": {"airflow": ["airflow*", "airflow/**"]} # configure the pattern for configuration files
}
```

Follow Kedro's official documentation, to see how to add templating, custom resolvers etc. (https://docs.kedro.org/en/stable/configuration/advanced_configuration.html#how-to-do-templating-with-the-omegaconfigloader)[https://docs.kedro.org/en/stable/configuration/advanced_configuration.html#how-to-do-templating-with-the-omegaconfigloader]

#### How can I use Airflow runtime parameters?

It is possible to pass parameters when triggering an Airflow DAG from the user interface.
In order to use this feature, create a custom template using the [Params syntax](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/params.html).
See ["What if I want to use a different Jinja2 template?"](#what-if-i-want-to-use-a-different-jinja2-template) for instructions on using custom templates.

#### What if I want to use a different Airflow Operator?

Which Airflow Operator to use depends on the environment your project is running in.
Expand Down
11 changes: 10 additions & 1 deletion kedro-airflow/RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
# Upcoming release 0.5.2
# Upcoming release 0.6.0
* Change reference to `kedro.pipeline.Pipeline` object throughout test suite with `kedro.modular_pipeline.pipeline` factory.
* Migrate all project metadata to static `pyproject.toml`.
* Configure DAG kwargs via `airflow.yml`.
* The generated DAG file now contains the pipeline name.
* Included help for CLI arguments (see `kedro airflow create --help`).
* Added additional CLI argument `--params` to pass configuration to the Jinja2 template.

## Community contributions
Many thanks to the following Kedroids for contributing PRs to this release:

* [sbrugman](https://github.com/sbrugman)

sbrugman marked this conversation as resolved.
Show resolved Hide resolved
# Release 0.5.1
* Added additional CLI argument `--jinja-file` to provide a path to a custom Jinja2 template.
Expand Down
8 changes: 4 additions & 4 deletions kedro-airflow/features/steps/cli_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def install_kedro(context, version):
if version == "latest":
cmd = [context.pip, "install", "-U", "kedro[pandas]"]
else:
cmd = [context.pip, "install", "kedro[pandas]=={}".format(version)]
cmd = [context.pip, "install", f"kedro[pandas]=={version}"]
res = run(cmd, env=context.env)

if res.returncode != OK_EXIT_CODE:
Expand Down Expand Up @@ -121,7 +121,7 @@ def check_message_printed(context, msg):
stdout = context.result.stdout
assert msg in stdout, (
"Expected the following message segment to be printed on stdout: "
"{exp_msg},\nbut got {actual_msg}".format(exp_msg=msg, actual_msg=stdout)
f"{msg},\nbut got {stdout}"
)


Expand Down Expand Up @@ -187,6 +187,6 @@ def check_status_code(context):
if context.result.returncode != OK_EXIT_CODE:
print(context.result.stdout)
print(context.result.stderr)
assert False, "Expected exit code {}" " but got {}".format(
OK_EXIT_CODE, context.result.returncode
raise AssertionError(
f"Expected exit code {OK_EXIT_CODE} but got {context.result.returncode}"
)
61 changes: 29 additions & 32 deletions kedro-airflow/kedro_airflow/airflow_dag_template.j2
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from __future__ import annotations
from datetime import datetime, timedelta
from pathlib import Path

Expand All @@ -10,14 +11,13 @@ from kedro.framework.project import configure_project


class KedroOperator(BaseOperator):

sbrugman marked this conversation as resolved.
Show resolved Hide resolved
@apply_defaults
def __init__(
self,
package_name: str,
pipeline_name: str,
node_name: str,
project_path: str,
project_path: str | Path,
env: str,
*args, **kwargs
) -> None:
Expand All @@ -35,46 +35,43 @@ class KedroOperator(BaseOperator):
env=self.env) as session:
session.run(self.pipeline_name, node_names=[self.node_name])


# Kedro settings required to run your pipeline
env = "{{ env }}"
pipeline_name = "{{ pipeline_name }}"
project_path = Path.cwd()
package_name = "{{ package_name }}"

# Default settings applied to all tasks
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}

# Using a DAG context manager, you don't have to specify the dag property of each task
with DAG(
"{{ dag_name | safe | slugify }}",
start_date=datetime(2019, 1, 1),
max_active_runs=3,
schedule_interval=timedelta(minutes=30), # https://airflow.apache.org/docs/stable/scheduler.html#dag-runs
default_args=default_args,
catchup=False # enable if you don't want historical dag runs to run
) as dag:

tasks = {}
{% for node in pipeline.nodes %}
tasks["{{ node.name | safe | slugify }}"] = KedroOperator(
task_id="{{ node.name | safe | slugify }}",
package_name=package_name,
pipeline_name=pipeline_name,
node_name="{{ node.name | safe }}",
project_path=project_path,
env=env,
dag_id="{{ dag_name | safe | slugify }}",
start_date=datetime({{ start_date | default([2023, 1, 1]) | join(",")}}),
max_active_runs={{ max_active_runs | default(3) }},
# https://airflow.apache.org/docs/stable/scheduler.html#dag-runs
schedule_interval="{{ schedule_interval | default('@once') }}",
catchup={{ catchup | default(False) }},
# Default settings applied to all tasks
default_args=dict(
owner="{{ owner | default('airflow') }}",
depends_on_past={{ depends_on_past | default(False) }},
email_on_failure={{ email_on_failure | default(False) }},
email_on_retry={{ email_on_retry | default(False) }},
retries={{ retries | default(1) }},
retry_delay=timedelta(minutes={{ retry_delay | default(5) }})
)
{% endfor %}
) as dag:
tasks = {
{% for node in pipeline.nodes %} "{{ node.name | safe | slugify }}": KedroOperator(
task_id="{{ node.name | safe | slugify }}",
package_name=package_name,
pipeline_name=pipeline_name,
node_name="{{ node.name | safe }}",
project_path=project_path,
env=env,
),
{% endfor %} }

{% for parent_node, child_nodes in dependencies.items() -%}
{% for child in child_nodes %}
tasks["{{ parent_node.name | safe | slugify }}"] >> tasks["{{ child.name | safe | slugify }}"]
{% for child in child_nodes %} tasks["{{ parent_node.name | safe | slugify }}"] >> tasks["{{ child.name | safe | slugify }}"]
{% endfor %}
{%- endfor %}
76 changes: 70 additions & 6 deletions kedro-airflow/kedro_airflow/plugin.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
""" Kedro plugin for running a project with Airflow """
from __future__ import annotations

from collections import defaultdict
from pathlib import Path
from typing import Any

import click
import jinja2
from click import secho
from kedro.config import MissingConfigException
from kedro.framework.cli.project import PARAMS_ARG_HELP
from kedro.framework.cli.utils import ENV_HELP, KedroCliError, _split_params
from kedro.framework.context import KedroContext
from kedro.framework.project import pipelines
from kedro.framework.startup import ProjectMetadata
from kedro.framework.session import KedroSession
from kedro.framework.startup import ProjectMetadata, bootstrap_project
from slugify import slugify

PIPELINE_ARG_HELP = """Name of the registered pipeline to convert.
If not set, the '__default__' pipeline is used."""


@click.group(name="Kedro-Airflow")
def commands(): # pylint: disable=missing-function-docstring
Expand All @@ -22,15 +32,44 @@ def airflow_commands():
pass


def _load_config(context: KedroContext, pipeline_name: str) -> dict[str, Any]:
# Set the default pattern for `airflow` if not provided in `settings.py`
if "airflow" not in context.config_loader.config_patterns.keys():
context.config_loader.config_patterns.update( # pragma: no cover
{"airflow": ["airflow*", "airflow/**"]}
)
Comment on lines +37 to +40
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏼 I like this.


assert "airflow" in context.config_loader.config_patterns.keys()

# Load the config
try:
config_airflow = context.config_loader["airflow"]
except MissingConfigException:
# File does not exist
return {}

dag_config = {}
# Load the default config if specified
if "default" in config_airflow:
dag_config.update(config_airflow["default"])
# Update with pipeline-specific config if present
if pipeline_name in config_airflow:
dag_config.update(config_airflow[pipeline_name])
return dag_config


@airflow_commands.command()
@click.option("-p", "--pipeline", "pipeline_name", default="__default__")
@click.option("-e", "--env", default="local")
@click.option(
"-p", "--pipeline", "pipeline_name", default="__default__", help=PIPELINE_ARG_HELP
)
@click.option("-e", "--env", default="local", help=ENV_HELP)
@click.option(
"-t",
"--target-dir",
"target_path",
type=click.Path(writable=True, resolve_path=True, file_okay=False),
default="./airflow_dags/",
help="The directory path to store the generated Airflow dags",
)
@click.option(
"-j",
Expand All @@ -39,6 +78,14 @@ def airflow_commands():
exists=True, readable=True, resolve_path=True, file_okay=True, dir_okay=False
),
default=Path(__file__).parent / "airflow_dag_template.j2",
help="The template file for the generated Airflow dags",
)
@click.option(
"--params",
type=click.UNPROCESSED,
default="",
help=PARAMS_ARG_HELP,
callback=_split_params,
)
@click.pass_obj
def create(
Expand All @@ -47,23 +94,39 @@ def create(
env,
target_path,
jinja_file,
params,
): # pylint: disable=too-many-locals,too-many-arguments
"""Create an Airflow DAG for a project"""
project_path = Path.cwd().resolve()
bootstrap_project(project_path)
with KedroSession.create(project_path=project_path, env=env) as session:
sbrugman marked this conversation as resolved.
Show resolved Hide resolved
context = session.load_context()
dag_config = _load_config(context, pipeline_name)

# Update with params if provided
dag_config.update(params)

jinja_file = Path(jinja_file).resolve()
loader = jinja2.FileSystemLoader(jinja_file.parent)
jinja_env = jinja2.Environment(autoescape=True, loader=loader, lstrip_blocks=True)
jinja_env.filters["slugify"] = slugify
template = jinja_env.get_template(jinja_file.name)

package_name = metadata.package_name
dag_filename = f"{package_name}_dag.py"
dag_filename = (
f"{package_name}_dag.py"
if pipeline_name == "__default__"
else f"{package_name}_{pipeline_name}_dag.py"
)

target_path = Path(target_path)
target_path = target_path / dag_filename

target_path.parent.mkdir(parents=True, exist_ok=True)

pipeline = pipelines.get(pipeline_name)
if pipeline is None:
raise KedroCliError(f"Pipeline {pipeline_name} not found.")

dependencies = defaultdict(list)
for node, parent_nodes in pipeline.node_dependencies.items():
Expand All @@ -77,14 +140,16 @@ def create(
pipeline_name=pipeline_name,
package_name=package_name,
pipeline=pipeline,
**dag_config,
).dump(str(target_path))

secho("")
secho("An Airflow DAG has been generated in:", fg="green")
secho(str(target_path))
secho("This file should be copied to your Airflow DAG folder.", fg="yellow")
secho(
"The Airflow configuration can be customized by editing this file.", fg="green"
"The Airflow configuration can be customized by editing this file.",
fg="green",
)
secho("")
secho(
Expand All @@ -101,4 +166,3 @@ def create(
"And all local paths in both the data catalog and log config must be absolute paths.",
fg="yellow",
)
secho("")
Loading