From ab05c24d2497eb045c1be6c7e6d0001174786b0c Mon Sep 17 00:00:00 2001 From: Victor Perron Date: Wed, 15 Jan 2025 15:25:43 +0100 Subject: [PATCH] chore(pipeline) : use Sentry as main notifier Sentry is a much better tool than our little code executor, at least in terms of not spamming our alerting channel. We can precisely configure how and when we want to be notified, what is important, discuss about issues, etc. It also now directly links every issue to a particular commit, making it easier to identify regressions. The flow changes a little bit as the logs are in the Sentry issue, not directly linked in the Slack message, but I hope we'll be able to triage much better. --- deployment/docker-compose.yml | 2 + deployment/main.tf | 1 + pipeline/dags/dag_utils/notifications.py | 51 ------------------- pipeline/dags/dag_utils/sentry.py | 28 ++++++++++ pipeline/dags/import_brevo.py | 4 +- pipeline/dags/import_data_inclusion_api.py | 4 +- .../dags/import_decoupage_administratif.py | 4 +- pipeline/dags/import_monenfant.py | 4 +- pipeline/dags/import_odspep.py | 6 +-- pipeline/dags/import_sources.py | 4 +- pipeline/dags/main.py | 2 +- pipeline/dags/notify_rgpd_contacts.py | 4 +- 12 files changed, 46 insertions(+), 68 deletions(-) delete mode 100644 pipeline/dags/dag_utils/notifications.py create mode 100644 pipeline/dags/dag_utils/sentry.py diff --git a/deployment/docker-compose.yml b/deployment/docker-compose.yml index f28d0d8f5..8af75375c 100644 --- a/deployment/docker-compose.yml +++ b/deployment/docker-compose.yml @@ -18,6 +18,8 @@ x-airflow-common: AIRFLOW__SCHEDULER__MAX_TIS_PER_QUERY: 0 AIRFLOW__SENTRY__SENTRY_DSN: ${AIRFLOW__SENTRY__SENTRY_DSN} AIRFLOW__SENTRY__SENTRY_ON: 'true' + AIRFLOW__SENTRY__ENABLE_TRACING: 'true' + AIRFLOW__SENTRY__RELEASE: ${AIRFLOW__SENTRY__RELEASE} AIRFLOW__WEBSERVER__BASE_URL: ${AIRFLOW__WEBSERVER__BASE_URL} # Connections diff --git a/deployment/main.tf b/deployment/main.tf index af33be347..cba5a9c43 100644 --- a/deployment/main.tf +++ b/deployment/main.tf @@ -229,6 +229,7 @@ resource "null_resource" "up" { AIRFLOW_WWW_USER_PASSWORD='${var.airflow_admin_password}' AIRFLOW__CORE__FERNET_KEY='${var.airflow__core__fernet_key}' AIRFLOW__SENTRY__SENTRY_DSN='${var.airflow__sentry__sentry_dsn}' + AIRFLOW__SENTRY__RELEASE='${var.stack_version}' AIRFLOW__WEBSERVER__BASE_URL='https://${local.airflow_hostname}' # Airflow connections diff --git a/pipeline/dags/dag_utils/notifications.py b/pipeline/dags/dag_utils/notifications.py deleted file mode 100644 index f0ee9d70f..000000000 --- a/pipeline/dags/dag_utils/notifications.py +++ /dev/null @@ -1,51 +0,0 @@ -import logging -import textwrap - -from airflow import exceptions -from airflow.providers.http.hooks.http import HttpHook -from airflow.utils.context import Context - -from dag_utils import date - -logger = logging.getLogger(__file__) - - -def format_failure(context: Context) -> str: - dag = context["dag"] - task_name = context["task"].task_id - task_instance = context["task_instance"] - logical_date_ds = date.local_day_datetime_str(context["logical_date"]) - start_date_ds = date.local_day_datetime_str(task_instance.start_date) - - # AIRFLOW__WEBSERVER__BASE_URL should be set to the public url to be able - # to access logs - - return textwrap.dedent( - f"""\ - | Status | DAG | Task | Logs | Logical date | Start date | - | --------- | ------------ | ----------- | ----------------------------- | ----------------- | --------------- | - | Failed 🔴 | {dag.dag_id} | {task_name} | [📜]({task_instance.log_url}) | {logical_date_ds} | {start_date_ds} | - """ # noqa: E501 - ) - - -def notify_webhook(context: Context, conn_id: str, format_fn): - """Format context and notify the webhook identify by the given connection id""" - - # if AIRFLOW_CONN_ not set, do nothing - - try: - http_hook = HttpHook(http_conn_id=conn_id) - http_hook.run(json={"text": format_fn(context)}) - except exceptions.AirflowNotFoundException: - logger.warning("Webhook notifier disabled.") - return - - -# FIXME(vmttn) : This could be a DAG factory instead -def notify_failure_args(): - return { - "on_failure_callback": lambda context: notify_webhook( - context, "mattermost", format_failure - ) - } diff --git a/pipeline/dags/dag_utils/sentry.py b/pipeline/dags/dag_utils/sentry.py new file mode 100644 index 000000000..bf6cbeb72 --- /dev/null +++ b/pipeline/dags/dag_utils/sentry.py @@ -0,0 +1,28 @@ +import os + +from sentry_sdk import configure_scope + + +def fill_sentry_scope(context): + github_base_url = "https://github.com/gip-inclusion/data-inclusion" + + with configure_scope() as scope: + dag_id = context.get("dag").dag_id + task_id = context.get("task_instance").task_id + task_instance = context.get("task_instance") + commit_sha = os.getenv("AIRFLOW__SENTRY__RELEASE", None) + scope.set_tag("dag_id", dag_id) + scope.set_tag("task_id", task_id) + scope.set_context( + "custom", + { + "airflow_logs_url": task_instance.log_url, + "github_commit_url": f"{github_base_url}/commit/{commit_sha}", + }, + ) + + +def notify_failure_args(): + return { + "on_failure_callback": fill_sentry_scope, + } diff --git a/pipeline/dags/import_brevo.py b/pipeline/dags/import_brevo.py index bc902c3e1..05ab8afa6 100644 --- a/pipeline/dags/import_brevo.py +++ b/pipeline/dags/import_brevo.py @@ -5,7 +5,7 @@ import airflow from airflow.operators import empty, python -from dag_utils import date, notifications +from dag_utils import date, sentry from dag_utils.dbt import dbt_operator_factory from dag_utils.virtualenvs import PYTHON_BIN_PATH @@ -52,7 +52,7 @@ def _load_rgpd_contacts(run_id: str, stream_id: str, source_id: str, logical_dat with airflow.DAG( dag_id="import_brevo", start_date=pendulum.datetime(2023, 1, 1, tz=date.TIME_ZONE), - default_args=notifications.notify_failure_args(), + default_args=sentry.notify_failure_args(), schedule="@daily", catchup=False, concurrency=1, diff --git a/pipeline/dags/import_data_inclusion_api.py b/pipeline/dags/import_data_inclusion_api.py index 9fcfe33b1..1b5904015 100644 --- a/pipeline/dags/import_data_inclusion_api.py +++ b/pipeline/dags/import_data_inclusion_api.py @@ -5,7 +5,7 @@ from airflow.operators import empty from airflow.utils.trigger_rule import TriggerRule -from dag_utils import date, notifications +from dag_utils import date, sentry from dag_utils.dbt import dbt_operator_factory from dag_utils.virtualenvs import PYTHON_BIN_PATH @@ -85,7 +85,7 @@ def import_data_inclusion_api(): with airflow.DAG( dag_id="import_data_inclusion_api", start_date=pendulum.datetime(2022, 1, 1, tz=date.TIME_ZONE), - default_args=notifications.notify_failure_args(), + default_args=sentry.notify_failure_args(), schedule=HOURLY_AT_FIFTEEN, catchup=False, ) as dag: diff --git a/pipeline/dags/import_decoupage_administratif.py b/pipeline/dags/import_decoupage_administratif.py index 6c1797b8a..5f0e16e6a 100755 --- a/pipeline/dags/import_decoupage_administratif.py +++ b/pipeline/dags/import_decoupage_administratif.py @@ -3,7 +3,7 @@ from airflow.decorators import dag, task from airflow.operators import empty -from dag_utils import date, dbt, notifications +from dag_utils import date, dbt, sentry from dag_utils.virtualenvs import PYTHON_BIN_PATH @@ -89,7 +89,7 @@ def extract_and_load(): @dag( start_date=pendulum.datetime(2022, 1, 1, tz=date.TIME_ZONE), - default_args=notifications.notify_failure_args(), + default_args=sentry.notify_failure_args(), schedule="@monthly", catchup=False, ) diff --git a/pipeline/dags/import_monenfant.py b/pipeline/dags/import_monenfant.py index b3343b666..afeda8fda 100644 --- a/pipeline/dags/import_monenfant.py +++ b/pipeline/dags/import_monenfant.py @@ -2,7 +2,7 @@ from airflow.decorators import dag, task -from dag_utils import date, notifications +from dag_utils import date, sentry from dag_utils.virtualenvs import PYTHON_BIN_PATH @@ -130,7 +130,7 @@ def load( @dag( start_date=pendulum.datetime(2022, 1, 1, tz=date.TIME_ZONE), - default_args=notifications.notify_failure_args(), + default_args=sentry.notify_failure_args(), schedule="@monthly", catchup=False, tags=["source"], diff --git a/pipeline/dags/import_odspep.py b/pipeline/dags/import_odspep.py index 70b185228..983ae4305 100644 --- a/pipeline/dags/import_odspep.py +++ b/pipeline/dags/import_odspep.py @@ -5,13 +5,11 @@ import airflow from airflow.operators import empty, python -from dag_utils import date +from dag_utils import date, sentry from dag_utils.virtualenvs import PYTHON_BIN_PATH logger = logging.getLogger(__name__) -default_args = {} - def _import_dataset( run_id: str, @@ -58,7 +56,7 @@ def _import_dataset( with airflow.DAG( dag_id="import_odspep", start_date=pendulum.datetime(2022, 1, 1, tz=date.TIME_ZONE), - default_args=default_args, + default_args=sentry.notify_failure_args(), schedule="@once", catchup=False, tags=["source"], diff --git a/pipeline/dags/import_sources.py b/pipeline/dags/import_sources.py index 4c81bd850..0522669e3 100644 --- a/pipeline/dags/import_sources.py +++ b/pipeline/dags/import_sources.py @@ -4,7 +4,7 @@ from airflow.operators import empty, python from airflow.utils.task_group import TaskGroup -from dag_utils import date, notifications, sources +from dag_utils import date, sentry, sources from dag_utils.dbt import dbt_operator_factory from dag_utils.virtualenvs import PYTHON_BIN_PATH @@ -120,7 +120,7 @@ def load_from_s3_to_data_warehouse(source_id, stream_id, run_id, logical_date): with airflow.DAG( dag_id=dag_id, start_date=pendulum.datetime(2022, 1, 1, tz=date.TIME_ZONE), - default_args=notifications.notify_failure_args(), + default_args=sentry.notify_failure_args(), schedule=source_config["schedule"], catchup=False, tags=["source"], diff --git a/pipeline/dags/main.py b/pipeline/dags/main.py index f8f388575..c4c148eb9 100644 --- a/pipeline/dags/main.py +++ b/pipeline/dags/main.py @@ -9,7 +9,7 @@ get_intermediate_tasks, get_staging_tasks, ) -from dag_utils.notifications import notify_failure_args +from dag_utils.sentry import notify_failure_args with airflow.DAG( dag_id="main", diff --git a/pipeline/dags/notify_rgpd_contacts.py b/pipeline/dags/notify_rgpd_contacts.py index f37458f5b..c7952a00c 100644 --- a/pipeline/dags/notify_rgpd_contacts.py +++ b/pipeline/dags/notify_rgpd_contacts.py @@ -5,7 +5,7 @@ import airflow from airflow.operators import empty, python -from dag_utils import notifications +from dag_utils import sentry from dag_utils.virtualenvs import PYTHON_BIN_PATH logger = logging.getLogger(__name__) @@ -72,7 +72,7 @@ def _wait_for_brevo(): dag_id="notify_rgpd_contacts", description="Sends RGPD notifications to DI users", start_date=pendulum.datetime(2023, 11, 1), - default_args=notifications.notify_failure_args(), + default_args=sentry.notify_failure_args(), schedule="@monthly", catchup=False, ) as dag: