Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pipeline): add imilo as a new data source #365

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deployment/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ x-airflow-common:
AIRFLOW_VAR_EMPLOIS_API_TOKEN: ${AIRFLOW_VAR_EMPLOIS_API_TOKEN}
AIRFLOW_VAR_ENVIRONMENT: ${AIRFLOW_VAR_ENVIRONMENT}
AIRFLOW_VAR_FREDO_API_TOKEN: ${AIRFLOW_VAR_FREDO_API_TOKEN}
AIRFLOW_VAR_IMILO_API_SECRET: ${AIRFLOW_VAR_IMILO_API_SECRET}
AIRFLOW_VAR_FT_API_TOKEN: ${AIRFLOW_VAR_FT_API_TOKEN}
AIRFLOW_VAR_MES_AIDES_AIRTABLE_KEY: ${AIRFLOW_VAR_MES_AIDES_AIRTABLE_KEY}
AIRFLOW_VAR_SOLIGUIDE_API_TOKEN: ${AIRFLOW_VAR_SOLIGUIDE_API_TOKEN}
Expand Down
1 change: 1 addition & 0 deletions deployment/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ resource "null_resource" "up" {
AIRFLOW_VAR_EMPLOIS_API_TOKEN='${var.emplois_api_token}'
AIRFLOW_VAR_ENVIRONMENT='${var.environment}'
AIRFLOW_VAR_FREDO_API_TOKEN='${var.fredo_api_token}'
AIRFLOW_VAR_IMILO_API_SECRET='${var.imilo_api_secret}'
AIRFLOW_VAR_FT_API_TOKEN='${var.ft_api_token}'
AIRFLOW_VAR_MES_AIDES_AIRTABLE_KEY='${var.mes_aides_airtable_key}'
AIRFLOW_VAR_SOLIGUIDE_API_TOKEN='${var.soliguide_api_token}'
Expand Down
7 changes: 7 additions & 0 deletions deployment/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,10 @@ variable "fredo_api_token" {
sensitive = true
default = ""
}

variable "imilo_api_secret" {
description = "Used in extraction tasks orchestrated by airflow"
type = string
sensitive = true
default = ""
}
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ x-airflow-common:
AIRFLOW_VAR_DORA_API_TOKEN: ${AIRFLOW_VAR_DORA_API_TOKEN}
AIRFLOW_VAR_FREDO_API_TOKEN: ${AIRFLOW_VAR_FREDO_API_TOKEN}
AIRFLOW_VAR_FT_API_TOKEN: ${AIRFLOW_VAR_FT_API_TOKEN}
AIRFLOW_VAR_IMILO_API_SECRET: ${AIRFLOW_VAR_IMILO_API_SECRET}
AIRFLOW_VAR_DORA_PREPROD_API_TOKEN: ${AIRFLOW_VAR_DORA_PREPROD_API_TOKEN}
AIRFLOW_VAR_EMPLOIS_API_TOKEN: ${AIRFLOW_VAR_EMPLOIS_API_TOKEN}
AIRFLOW_VAR_MES_AIDES_AIRTABLE_KEY: ${AIRFLOW_VAR_MES_AIDES_AIRTABLE_KEY}
Expand Down
23 changes: 23 additions & 0 deletions pipeline/dags/dag_utils/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
emplois_de_linclusion,
france_travail,
fredo,
imilo,
mediation_numerique,
mes_aides,
reseau_alpha,
Expand Down Expand Up @@ -221,6 +222,28 @@
},
"odspep": {},
"monenfant": {},
"imilo": {
"schedule": "@daily",
"snapshot": True,
"extractor": imilo.extract,
"streams": {
"offres": {
"filename": "offres.json",
"url": Variable.get("IMILO_API_URL", None),
"token": Variable.get("IMILO_API_SECRET", None),
},
"structures": {
"filename": "structures.json",
"url": Variable.get("IMILO_API_URL", None),
"token": Variable.get("IMILO_API_SECRET", None),
},
"structures_offres": {
"filename": "structures_offres.json",
"url": Variable.get("IMILO_API_URL", None),
"token": Variable.get("IMILO_API_SECRET", None),
},
},
},
}


Expand Down
53 changes: 53 additions & 0 deletions pipeline/dags/dag_utils/sources/imilo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import json
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import json


import requests


class ImiloClient:
def __init__(self, base_url: str, secret: str) -> None:
self.base_url = base_url.rstrip("/")
self.session = requests.Session()
self.session.headers.update({"Content-Type": "application/json"})
self.secret = secret

# The token lasts 1h
def _get_token(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quelle est la durée de vie de ce token ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 heure

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merci ! est-ce que tu peux ajouter un petit commentaire pour l'indiquer ?

response = self.session.post(
url=f"{self.base_url}/get_token",
data=json.dumps(
{
"client_secret": self.secret,
}
),
)
response.raise_for_status()
self.session.headers.update(
{"Authorization": f"Bearer {response.json()['access_token']}"}
)

def _get_endpoint(
self,
url_path: str,
) -> list:
next_url = f"{self.base_url}{url_path}"
response = self.session.get(next_url)
if response.status_code == 401:
self._get_token()
response = self.session.get(next_url)
response.raise_for_status()
return response.json()

def list_offres(self) -> list:
return self._get_endpoint("/get_offres")

def list_structures(self) -> list:
return self._get_endpoint("/get_structures")

def list_structures_offres(self) -> list:
return self._get_endpoint("/get_structures_offres")


def extract(id: str, url: str, token: str, **kwargs) -> bytes:
client = ImiloClient(base_url=url, secret=token)
data = getattr(client, f"list_{id}")()
return json.dumps(data).encode()
13 changes: 13 additions & 0 deletions pipeline/dbt/models/_sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,16 @@ sources:
- name: services
meta:
kind: service

- name: imilo
schema: imilo
meta:
is_provider: true
tables:
- name: structures
meta:
kind: structure
- name: offres
meta:
kind: service
- name: structures_offres
1 change: 1 addition & 0 deletions pipeline/dbt/models/intermediate/int__union_adresses.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ WITH adresses AS (
ref('int_finess__adresses'),
ref('int_france_travail__adresses'),
ref('int_fredo__adresses'),
ref('int_imilo__adresses'),
ref('int_mediation_numerique__adresses'),
ref('int_mes_aides__adresses'),
ref('int_monenfant__adresses'),
Expand Down
1 change: 1 addition & 0 deletions pipeline/dbt/models/intermediate/int__union_services.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ WITH services AS (
ref('int_dora__services'),
ref('int_france_travail__services'),
ref('int_fredo__services'),
ref('int_imilo__services'),
ref('int_mediation_numerique__services'),
ref('int_mes_aides__services'),
ref('int_monenfant__services'),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ WITH structures AS (
ref('int_finess__structures'),
ref('int_france_travail__structures'),
ref('int_fredo__structures'),
ref('int_imilo__structures'),
ref('int_mediation_numerique__structures'),
ref('int_mes_aides__structures'),
ref('int_monenfant__structures'),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ models:
- emplois_de_linclusion
- france_travail
- fredo
- imilo
- mediation_numerique
- mes_aides
- monenfant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
-- depends_on: {{ ref('int_france_travail__structures') }}
-- depends_on: {{ ref('stg_fredo__structures') }}
-- depends_on: {{ ref('int_fredo__structures') }}
-- depends_on: {{ ref('stg_imilo__offres') }}
-- depends_on: {{ ref('stg_imilo__structures') }}
-- depends_on: {{ ref('int_imilo__services') }}
-- depends_on: {{ ref('int_imilo__structures') }}
-- depends_on: {{ ref('stg_mediation_numerique__services') }}
-- depends_on: {{ ref('stg_mediation_numerique__structures') }}
-- depends_on: {{ ref('int_mediation_numerique__services') }}
Expand Down Expand Up @@ -100,15 +104,15 @@ final AS (
{% for source_node in graph.sources.values() if source_node.source_meta.is_provider %}

{% if source_node.meta.kind %}
{% if not loop.first %}
UNION ALL
{% endif %}


{% set source_name = source_node.source_name %}
{% set stream_name = source_node.name %}

SELECT * FROM {{ source_name }}__{{ stream_name }}__stats
{% if not loop.last %}
UNION ALL
{% endif %}

{% endif %}

{% endfor %}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
version: 2

models:
- name: int_imilo__adresses
data_tests:
- check_adresse:
config:
severity: warn
columns:
- name: id
data_tests:
- unique
- not_null

- name: int_imilo__services
data_tests:
- check_service:
config:
severity: warn
columns:
- name: id
data_tests:
- unique
- not_null
- dbt_utils.not_empty_string
- name: structure_id
data_tests:
- not_null
- relationships:
to: ref('int_imilo__structures')
field: id

- name: int_imilo__structures
data_tests:
- check_structure:
config:
severity: warn
columns:
- name: id
data_tests:
- unique
- not_null
- dbt_utils.not_empty_string
- name: adresse_id
data_tests:
- not_null
- relationships:
to: ref('int_imilo__adresses')
field: id
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
WITH structures AS (
SELECT * FROM {{ ref('stg_imilo__structures') }}
),

final AS (
SELECT
id AS "id",
commune AS "commune",
code_postal AS "code_postal",
code_insee AS "code_insee",
adresse AS "adresse",
complement_adresse AS "complement_adresse",
CAST(NULL AS FLOAT) AS "longitude",
CAST(NULL AS FLOAT) AS "latitude",
_di_source_id AS "source"
FROM structures
)

SELECT * FROM final
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
WITH services AS (
SELECT * FROM {{ ref('stg_imilo__offres') }}
),

final AS (
SELECT
services._di_source_id AS "source",
CONCAT(
structures_offres.id_offre,
'_',
structures_offres.id_structure
) AS "id",
CAST(structures_offres."id_structure" AS TEXT) AS "structure_id",
NULL AS "courriel",
CAST(NULL AS BOOLEAN) AS "cumulable",
CAST(NULL AS BOOLEAN) AS "contact_public",
NULL AS "contact_nom_prenom",
CAST(services.date_maj AS DATE) AS "date_maj",
CAST(services.date_creation AS DATE) AS "date_creation",
NULL AS "formulaire_en_ligne",
NULL AS "frais_autres",
CAST(NULL AS TEXT []) AS "justificatifs",
NULL AS "lien_source",
CAST(NULL AS TEXT []) AS "modes_accueil",
CAST(NULL AS TEXT []) AS "modes_orientation_accompagnateur",
NULL AS "modes_orientation_accompagnateur_autres",
ARRAY[services.modes_orientation_beneficiaire] AS "modes_orientation_beneficiaire",
NULL AS "modes_orientation_beneficiaire_autres",
services.nom AS "nom",
NULL AS "page_web",
NULL AS "presentation_detail",
services.presentation_resume AS "presentation_resume",
NULL AS "prise_rdv",
ARRAY[services.profils] AS "profils",
NULL AS "profils_precisions",
CAST(NULL AS TEXT []) AS "pre_requis",
NULL AS "recurrence",
ARRAY[services.thematiques] AS "thematiques",
CAST(NULL AS TEXT []) AS "types",
NULL AS "telephone",
CAST(NULL AS TEXT []) AS "frais",
NULL AS "zone_diffusion_type",
NULL AS "zone_diffusion_code",
NULL AS "zone_diffusion_nom",
CAST(NULL AS DATE) AS "date_suspension"
FROM services
LEFT JOIN {{ ref('stg_imilo__structures_offres') }} AS structures_offres
ON services.id = structures_offres.id_offre
)

SELECT * FROM final
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
WITH structures AS (
SELECT * FROM {{ ref('stg_imilo__structures') }}
),

final AS (
SELECT
_di_source_id AS "source",
id AS "id",
siret AS "siret",
NULL AS "rna",
courriel AS "courriel",
CAST(NULL AS BOOLEAN) AS "antenne",
horaires_ouverture AS "horaires_ouverture",
site_web AS "site_web",
NULL AS "lien_source",
NULL AS "accessibilite",
telephone AS "telephone",
typologie AS "typologie",
nom AS "nom",
ARRAY[labels_nationaux] AS "labels_nationaux",
CAST(NULL AS TEXT []) AS "labels_autres",
presentation_resume AS "presentation_resume",
presentation_detail AS "presentation_detail",
id AS "adresse_id",
CAST(NULL AS TEXT []) AS "thematiques",
CAST(date_maj AS DATE) AS "date_maj"
FROM structures

)

SELECT * FROM final
Loading
Loading