Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(pipeline) : use Sentry as main notifier #369

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions deployment/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ x-airflow-common:
AIRFLOW__SCHEDULER__MAX_TIS_PER_QUERY: 0
AIRFLOW__SENTRY__SENTRY_DSN: ${AIRFLOW__SENTRY__SENTRY_DSN}
AIRFLOW__SENTRY__SENTRY_ON: 'true'
AIRFLOW__SENTRY__ENABLE_TRACING: 'true'
AIRFLOW__SENTRY__RELEASE: ${AIRFLOW__SENTRY__RELEASE}
AIRFLOW__WEBSERVER__BASE_URL: ${AIRFLOW__WEBSERVER__BASE_URL}

# Connections
Expand Down
1 change: 1 addition & 0 deletions deployment/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ resource "null_resource" "up" {
AIRFLOW_WWW_USER_PASSWORD='${var.airflow_admin_password}'
AIRFLOW__CORE__FERNET_KEY='${var.airflow__core__fernet_key}'
AIRFLOW__SENTRY__SENTRY_DSN='${var.airflow__sentry__sentry_dsn}'
AIRFLOW__SENTRY__RELEASE='${var.stack_version}'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pas fan du nommage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c'est parce que la variable release de la conf de sentry permet de lier une version du soft en question:
https://docs.sentry.io/product/sentry-basics/integrate-backend/configuration-options/

C'est peut etre un peu overkill, on peut juste mettre un tag avec le SHA et une URL vers le commit, mais je me disais que utiliser des maintenant la notion de "release" est un plus (les issues sont automatiquement liées au commit suspect, etc)

image

AIRFLOW__WEBSERVER__BASE_URL='https://${local.airflow_hostname}'

# Airflow connections
Expand Down
51 changes: 0 additions & 51 deletions pipeline/dags/dag_utils/notifications.py

This file was deleted.

28 changes: 28 additions & 0 deletions pipeline/dags/dag_utils/sentry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import os

from sentry_sdk import configure_scope


def fill_sentry_scope(context):
github_base_url = "https://github.com/gip-inclusion/data-inclusion"

with configure_scope() as scope:
dag_id = context.get("dag").dag_id
task_id = context.get("task_instance").task_id
task_instance = context.get("task_instance")
commit_sha = os.getenv("AIRFLOW__SENTRY__RELEASE", None)
scope.set_tag("dag_id", dag_id)
scope.set_tag("task_id", task_id)
scope.set_context(
"custom",
{
"airflow_logs_url": task_instance.log_url,
"github_commit_url": f"{github_base_url}/commit/{commit_sha}",
},
)


def notify_failure_args():
return {
"on_failure_callback": fill_sentry_scope,
}
4 changes: 2 additions & 2 deletions pipeline/dags/import_brevo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import airflow
from airflow.operators import empty, python

from dag_utils import date, notifications
from dag_utils import date, sentry
from dag_utils.dbt import dbt_operator_factory
from dag_utils.virtualenvs import PYTHON_BIN_PATH

Expand Down Expand Up @@ -52,7 +52,7 @@ def _load_rgpd_contacts(run_id: str, stream_id: str, source_id: str, logical_dat
with airflow.DAG(
dag_id="import_brevo",
start_date=pendulum.datetime(2023, 1, 1, tz=date.TIME_ZONE),
default_args=notifications.notify_failure_args(),
default_args=sentry.notify_failure_args(),
schedule="@daily",
catchup=False,
concurrency=1,
Expand Down
4 changes: 2 additions & 2 deletions pipeline/dags/import_data_inclusion_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from airflow.operators import empty
from airflow.utils.trigger_rule import TriggerRule

from dag_utils import date, notifications
from dag_utils import date, sentry
from dag_utils.dbt import dbt_operator_factory
from dag_utils.virtualenvs import PYTHON_BIN_PATH

Expand Down Expand Up @@ -85,7 +85,7 @@ def import_data_inclusion_api():
with airflow.DAG(
dag_id="import_data_inclusion_api",
start_date=pendulum.datetime(2022, 1, 1, tz=date.TIME_ZONE),
default_args=notifications.notify_failure_args(),
default_args=sentry.notify_failure_args(),
schedule=HOURLY_AT_FIFTEEN,
catchup=False,
) as dag:
Expand Down
4 changes: 2 additions & 2 deletions pipeline/dags/import_decoupage_administratif.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from airflow.decorators import dag, task
from airflow.operators import empty

from dag_utils import date, dbt, notifications
from dag_utils import date, dbt, sentry
from dag_utils.virtualenvs import PYTHON_BIN_PATH


Expand Down Expand Up @@ -89,7 +89,7 @@ def extract_and_load():

@dag(
start_date=pendulum.datetime(2022, 1, 1, tz=date.TIME_ZONE),
default_args=notifications.notify_failure_args(),
default_args=sentry.notify_failure_args(),
schedule="@monthly",
catchup=False,
)
Expand Down
4 changes: 2 additions & 2 deletions pipeline/dags/import_monenfant.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from airflow.decorators import dag, task

from dag_utils import date, notifications
from dag_utils import date, sentry
from dag_utils.virtualenvs import PYTHON_BIN_PATH


Expand Down Expand Up @@ -130,7 +130,7 @@ def load(

@dag(
start_date=pendulum.datetime(2022, 1, 1, tz=date.TIME_ZONE),
default_args=notifications.notify_failure_args(),
default_args=sentry.notify_failure_args(),
schedule="@monthly",
catchup=False,
tags=["source"],
Expand Down
6 changes: 2 additions & 4 deletions pipeline/dags/import_odspep.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@
import airflow
from airflow.operators import empty, python

from dag_utils import date
from dag_utils import date, sentry
from dag_utils.virtualenvs import PYTHON_BIN_PATH

logger = logging.getLogger(__name__)

default_args = {}


def _import_dataset(
run_id: str,
Expand Down Expand Up @@ -58,7 +56,7 @@ def _import_dataset(
with airflow.DAG(
dag_id="import_odspep",
start_date=pendulum.datetime(2022, 1, 1, tz=date.TIME_ZONE),
default_args=default_args,
default_args=sentry.notify_failure_args(),
schedule="@once",
catchup=False,
tags=["source"],
Expand Down
4 changes: 2 additions & 2 deletions pipeline/dags/import_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from airflow.operators import empty, python
from airflow.utils.task_group import TaskGroup

from dag_utils import date, notifications, sources
from dag_utils import date, sentry, sources
from dag_utils.dbt import dbt_operator_factory
from dag_utils.virtualenvs import PYTHON_BIN_PATH

Expand Down Expand Up @@ -120,7 +120,7 @@ def load_from_s3_to_data_warehouse(source_id, stream_id, run_id, logical_date):
with airflow.DAG(
dag_id=dag_id,
start_date=pendulum.datetime(2022, 1, 1, tz=date.TIME_ZONE),
default_args=notifications.notify_failure_args(),
default_args=sentry.notify_failure_args(),
schedule=source_config["schedule"],
catchup=False,
tags=["source"],
Expand Down
2 changes: 1 addition & 1 deletion pipeline/dags/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
get_intermediate_tasks,
get_staging_tasks,
)
from dag_utils.notifications import notify_failure_args
from dag_utils.sentry import notify_failure_args

with airflow.DAG(
dag_id="main",
Expand Down
4 changes: 2 additions & 2 deletions pipeline/dags/notify_rgpd_contacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import airflow
from airflow.operators import empty, python

from dag_utils import notifications
from dag_utils import sentry
from dag_utils.virtualenvs import PYTHON_BIN_PATH

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -72,7 +72,7 @@ def _wait_for_brevo():
dag_id="notify_rgpd_contacts",
description="Sends RGPD notifications to DI users",
start_date=pendulum.datetime(2023, 11, 1),
default_args=notifications.notify_failure_args(),
default_args=sentry.notify_failure_args(),
schedule="@monthly",
catchup=False,
) as dag:
Expand Down
Loading