Skip to content

Commit

Permalink
chore(pipeline) : use Sentry as main notifier
Browse files Browse the repository at this point in the history
Sentry is a much better tool than our little code executor, at least in
terms of not spamming our alerting channel. We can precisely configure
how and when we want to be notified, what is important, discuss about
issues, etc.

It also now directly links every issue to a particular commit, making it
easier to identify regressions.

The flow changes a little bit as the logs are in the Sentry issue, not
directly linked in the Slack message, but I hope we'll be able to triage
much better.
  • Loading branch information
vperron committed Jan 15, 2025
1 parent 8693d93 commit 728b6ae
Show file tree
Hide file tree
Showing 12 changed files with 54 additions and 68 deletions.
2 changes: 2 additions & 0 deletions deployment/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ x-airflow-common:
AIRFLOW__SCHEDULER__MAX_TIS_PER_QUERY: 0
AIRFLOW__SENTRY__SENTRY_DSN: ${AIRFLOW__SENTRY__SENTRY_DSN}
AIRFLOW__SENTRY__SENTRY_ON: 'true'
AIRFLOW__SENTRY__ENABLE_TRACING: 'true'
AIRFLOW__SENTRY__RELEASE: ${AIRFLOW__SENTRY__RELEASE}
AIRFLOW__WEBSERVER__BASE_URL: ${AIRFLOW__WEBSERVER__BASE_URL}

# Connections
Expand Down
1 change: 1 addition & 0 deletions deployment/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ resource "null_resource" "up" {
AIRFLOW_WWW_USER_PASSWORD='${var.airflow_admin_password}'
AIRFLOW__CORE__FERNET_KEY='${var.airflow__core__fernet_key}'
AIRFLOW__SENTRY__SENTRY_DSN='${var.airflow__sentry__sentry_dsn}'
AIRFLOW__SENTRY__RELEASE='${var.stack_version}'
AIRFLOW__WEBSERVER__BASE_URL='https://${local.airflow_hostname}'
# Airflow connections
Expand Down
51 changes: 0 additions & 51 deletions pipeline/dags/dag_utils/notifications.py

This file was deleted.

36 changes: 36 additions & 0 deletions pipeline/dags/dag_utils/sentry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import os
from datetime import datetime

from sentry_sdk import configure_scope


def task_failure_callback(context):
github_base_url = "https://github.com/gip-inclusion/data-inclusion"

with configure_scope() as scope:
dag_id = context.get("dag").dag_id
task_id = context.get("task_instance").task_id
execution_date = context.get("execution_date")
task_instance = context.get("task_instance")
commit_sha = os.getenv("AIRFLOW__SENTRY__RELEASE", None)

if isinstance(execution_date, datetime):
execution_date = execution_date.isoformat()

scope.set_tag("dag_id", dag_id)
scope.set_tag("task_id", task_id)

scope.set_context(
"custom",
{
"airflow_logs_url": task_instance.log_url,
"execution_date": execution_date,
"github_commit_url": f"{github_base_url}/commit/{commit_sha}",
},
)


def notify_failure_args():
return {
"on_failure_callback": task_failure_callback,
}
4 changes: 2 additions & 2 deletions pipeline/dags/import_brevo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import airflow
from airflow.operators import empty, python

from dag_utils import date, notifications
from dag_utils import date, sentry
from dag_utils.dbt import dbt_operator_factory
from dag_utils.virtualenvs import PYTHON_BIN_PATH

Expand Down Expand Up @@ -52,7 +52,7 @@ def _load_rgpd_contacts(run_id: str, stream_id: str, source_id: str, logical_dat
with airflow.DAG(
dag_id="import_brevo",
start_date=pendulum.datetime(2023, 1, 1, tz=date.TIME_ZONE),
default_args=notifications.notify_failure_args(),
default_args=sentry.notify_failure_args(),
schedule="@daily",
catchup=False,
concurrency=1,
Expand Down
4 changes: 2 additions & 2 deletions pipeline/dags/import_data_inclusion_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from airflow.operators import empty
from airflow.utils.trigger_rule import TriggerRule

from dag_utils import date, notifications
from dag_utils import date, sentry
from dag_utils.dbt import dbt_operator_factory
from dag_utils.virtualenvs import PYTHON_BIN_PATH

Expand Down Expand Up @@ -85,7 +85,7 @@ def import_data_inclusion_api():
with airflow.DAG(
dag_id="import_data_inclusion_api",
start_date=pendulum.datetime(2022, 1, 1, tz=date.TIME_ZONE),
default_args=notifications.notify_failure_args(),
default_args=sentry.notify_failure_args(),
schedule=HOURLY_AT_FIFTEEN,
catchup=False,
) as dag:
Expand Down
4 changes: 2 additions & 2 deletions pipeline/dags/import_decoupage_administratif.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from airflow.decorators import dag, task
from airflow.operators import empty

from dag_utils import date, dbt, notifications
from dag_utils import date, dbt, sentry
from dag_utils.virtualenvs import PYTHON_BIN_PATH


Expand Down Expand Up @@ -89,7 +89,7 @@ def extract_and_load():

@dag(
start_date=pendulum.datetime(2022, 1, 1, tz=date.TIME_ZONE),
default_args=notifications.notify_failure_args(),
default_args=sentry.notify_failure_args(),
schedule="@monthly",
catchup=False,
)
Expand Down
4 changes: 2 additions & 2 deletions pipeline/dags/import_monenfant.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from airflow.decorators import dag, task

from dag_utils import date, notifications
from dag_utils import date, sentry
from dag_utils.virtualenvs import PYTHON_BIN_PATH


Expand Down Expand Up @@ -130,7 +130,7 @@ def load(

@dag(
start_date=pendulum.datetime(2022, 1, 1, tz=date.TIME_ZONE),
default_args=notifications.notify_failure_args(),
default_args=sentry.notify_failure_args(),
schedule="@monthly",
catchup=False,
tags=["source"],
Expand Down
6 changes: 2 additions & 4 deletions pipeline/dags/import_odspep.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@
import airflow
from airflow.operators import empty, python

from dag_utils import date
from dag_utils import date, sentry
from dag_utils.virtualenvs import PYTHON_BIN_PATH

logger = logging.getLogger(__name__)

default_args = {}


def _import_dataset(
run_id: str,
Expand Down Expand Up @@ -58,7 +56,7 @@ def _import_dataset(
with airflow.DAG(
dag_id="import_odspep",
start_date=pendulum.datetime(2022, 1, 1, tz=date.TIME_ZONE),
default_args=default_args,
default_args=sentry.notify_failure_args(),
schedule="@once",
catchup=False,
tags=["source"],
Expand Down
4 changes: 2 additions & 2 deletions pipeline/dags/import_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from airflow.operators import empty, python
from airflow.utils.task_group import TaskGroup

from dag_utils import date, notifications, sources
from dag_utils import date, sentry, sources
from dag_utils.dbt import dbt_operator_factory
from dag_utils.virtualenvs import PYTHON_BIN_PATH

Expand Down Expand Up @@ -120,7 +120,7 @@ def load_from_s3_to_data_warehouse(source_id, stream_id, run_id, logical_date):
with airflow.DAG(
dag_id=dag_id,
start_date=pendulum.datetime(2022, 1, 1, tz=date.TIME_ZONE),
default_args=notifications.notify_failure_args(),
default_args=sentry.notify_failure_args(),
schedule=source_config["schedule"],
catchup=False,
tags=["source"],
Expand Down
2 changes: 1 addition & 1 deletion pipeline/dags/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
get_intermediate_tasks,
get_staging_tasks,
)
from dag_utils.notifications import notify_failure_args
from dag_utils.sentry import notify_failure_args

with airflow.DAG(
dag_id="main",
Expand Down
4 changes: 2 additions & 2 deletions pipeline/dags/notify_rgpd_contacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import airflow
from airflow.operators import empty, python

from dag_utils import notifications
from dag_utils import sentry
from dag_utils.virtualenvs import PYTHON_BIN_PATH

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -72,7 +72,7 @@ def _wait_for_brevo():
dag_id="notify_rgpd_contacts",
description="Sends RGPD notifications to DI users",
start_date=pendulum.datetime(2023, 11, 1),
default_args=notifications.notify_failure_args(),
default_args=sentry.notify_failure_args(),
schedule="@monthly",
catchup=False,
) as dag:
Expand Down

0 comments on commit 728b6ae

Please sign in to comment.