-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(pipeline): add imilo as a new data source #365
base: main
Are you sure you want to change the base?
Conversation
8350352
to
48297b3
Compare
48297b3
to
279e10d
Compare
279e10d
to
3aca683
Compare
J'ai rebased sur |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bon pour moi, faut peut etre le verifier sur staging avec Antoine et on merge ensuite ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
beau boulot !
je n'ai pas réussi à exécuter l'extraction :
...
[2025-01-13, 08:52:44 UTC] {process_utils.py:194} INFO - requests.exceptions.HTTPError: 401 Client Error: Unauthorized for url: https://api-ods.dsiml.org/get_token
...
Est-ce que l'url est la bonne ?
J'ai utilisé le token stocké sur bw, tu confirmes que c'est le bon ?
|
||
import requests | ||
|
||
from airflow.models import Variable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from airflow.models import Variable |
client = ImiloClient( | ||
base_url=Variable.get("IMILO_API_URL"), | ||
secret=Variable.get("IMILO_API_SECRET"), | ||
) | ||
|
||
|
||
def extract(id: str, url: str, token: str, **kwargs) -> bytes: | ||
data = getattr(client, f"list_{id}")() | ||
return json.dumps(data).encode() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fais l'instanciation du client au runtime (et non au chargement du fichier)
client = ImiloClient( | |
base_url=Variable.get("IMILO_API_URL"), | |
secret=Variable.get("IMILO_API_SECRET"), | |
) | |
def extract(id: str, url: str, token: str, **kwargs) -> bytes: | |
data = getattr(client, f"list_{id}")() | |
return json.dumps(data).encode() | |
def extract(id: str, url: str, token: str, **kwargs) -> bytes: | |
client = ImiloClient(base_url=url, secret=token) | |
data = getattr(client, f"list_{id}")() | |
return json.dumps(data).encode() | |
self.session.headers.update({"Content-Type": "application/json"}) | ||
self.secret = secret | ||
|
||
def _get_token(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
quelle est la durée de vie de ce token ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 heure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
merci ! est-ce que tu peux ajouter un petit commentaire pour l'indiquer ?
class ImiloClient: | ||
def __init__(self, base_url: str, secret: str) -> None: | ||
self.base_url = base_url.rstrip("/") | ||
self.session = requests.Session() | ||
self.session.headers.update({"Content-Type": "application/json"}) | ||
self.secret = secret | ||
|
||
def _get_token(self): | ||
response = self.session.post( | ||
url=f"{self.base_url}/get_token", | ||
data=json.dumps( | ||
{ | ||
"client_secret": self.secret, | ||
} | ||
), | ||
) | ||
response.raise_for_status() | ||
self.session.headers.update( | ||
{"Authorization": f"Bearer {response.json()["access_token"]}"} | ||
) | ||
|
||
def _get_endpoint( | ||
self, | ||
url_path: str, | ||
) -> list: | ||
next_url = f"{self.base_url}{url_path}" | ||
response = self.session.get(next_url) | ||
if response.status_code == 401: | ||
self._get_token() | ||
response = self.session.get(next_url) | ||
response.raise_for_status() | ||
return response.json() | ||
|
||
def list_offres(self) -> list: | ||
return self._get_endpoint("/get_offres") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
si tu veux une version un peu plus compact :
- réutiliser notre
utils.BaseApiClient
qui s'occupe de faireraise_for_status
- pas besoin de mettre le header
Content-Type
. Pour le POST, il suffit d'utiliser le paramjson=
ce qui a pour effet de te mettre le header. - dans tous les cas, pas besoin d'utiliser
json.dumps
- renommer
secret
ensecret_token
ettoken
enaccess_token
qui est le nommage classique - j'encapsule la récupération de
access_token
dans une fonction (+ le decoratorproperty
pour simuler un attribut). Je récupére l'access token à li'nit, en partant du principe que la durée de vie de l'access_token est largement supérieure à nos besoins. - ça simplifie pas mal les choses. Tu peux écrirer tes fonctions
list_offres()
etc. très directement
class ImiloClient: | |
def __init__(self, base_url: str, secret: str) -> None: | |
self.base_url = base_url.rstrip("/") | |
self.session = requests.Session() | |
self.session.headers.update({"Content-Type": "application/json"}) | |
self.secret = secret | |
def _get_token(self): | |
response = self.session.post( | |
url=f"{self.base_url}/get_token", | |
data=json.dumps( | |
{ | |
"client_secret": self.secret, | |
} | |
), | |
) | |
response.raise_for_status() | |
self.session.headers.update( | |
{"Authorization": f"Bearer {response.json()["access_token"]}"} | |
) | |
def _get_endpoint( | |
self, | |
url_path: str, | |
) -> list: | |
next_url = f"{self.base_url}{url_path}" | |
response = self.session.get(next_url) | |
if response.status_code == 401: | |
self._get_token() | |
response = self.session.get(next_url) | |
response.raise_for_status() | |
return response.json() | |
def list_offres(self) -> list: | |
return self._get_endpoint("/get_offres") | |
from . import utils | |
class ImiloClient(utils.BaseApiClient:): | |
def __init__(self, base_url: str, secret: str) -> None: | |
super().__init__(base_url) | |
self.secret_token = secret | |
self.session.headers.update( | |
{"Authorization": f"Bearer {self.access_token]}"} | |
) | |
@property | |
def access_token(self): | |
response = self.session.post( | |
url=f"{self.base_url}/get_token", | |
data={"client_secret": self.secret_token} | |
), | |
) | |
return response.json()["access_token"] | |
def list_offres(self) -> list: | |
return self.session.get(f"{self.base_url}/get_offres").json() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ça j'ai repris ce que tu as fait mais je n'ai pas reussi a le faire tourner :(
@@ -0,0 +1,59 @@ | |||
import json |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import json |
- dbt_utils.accepted_range: | ||
min_value: "now() - interval '2 years'" | ||
config: | ||
severity: warn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
est-ce qu'ils nous ont indiqué que les données n'auraient pas plus de 2 ans ? si oui ok. si non je comprends ta démarche, mais en pratique ça ne fera que du bruit et je pense qu'on peut retirer :/ (same for the others)
structures_offres AS ( | ||
SELECT | ||
CAST((data -> 'structures_offres' ->> 'offre_id') AS INTEGER) AS "id_offre", | ||
CAST((data -> 'structures_offres' ->> 'missionlocale_id') AS INTEGER) AS "id_structure", | ||
CONCAT( | ||
(data -> 'structures_offres' ->> 'offre_id'), | ||
'_', | ||
(data -> 'structures_offres' ->> 'missionlocale_id') | ||
) AS "offre_structure_id" | ||
FROM {{ source('imilo', 'structures_offres') }} | ||
), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mieux vaut un model dédié pour structures_offres
qui matchent au plus près ce qu'ils nous exposent, avec des tests propres
le but de staging c'est de mettre les données au format tabulaire et de faire les nettoyages incontournables, sans changer le model de données à ce stade
le fait de faire le "produit" entre structures et offres, c'est déjà plus ce qu'on considère du mapping
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bien noté, j'ai fait le mapping dans intermediate du coup :)
config: | ||
severity: warn | ||
- dbt_utils.not_constant | ||
- name: thematiques |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pour tous les champs qui sont supposés réutilisés nos référentiels, on peut vérifier que c'est le cas.
e.g.
- name: thematiques | |
- name: thematiques | |
data_tests: | |
- relationships: | |
to: ref('thematiques') | |
field: value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mis à part les métadonnées, est-ce qu'il y a des champs qu'on peut considérer obligatoire chez eux ? dans ce cas on peut ajouter des not_null
NULLIF(TRIM(source.data -> 'offres' ->> 'description'), '') AS "presentation_resume", | ||
NULLIF(TRIM(source.data -> 'offres' ->> 'modes_accueil'), '') AS "modes_accueil", | ||
NULLIF(TRIM(source.data -> 'offres' ->> 'liste_des_profils'), '') AS "profils", | ||
NULLIF(TRIM(source.data -> 'offres' ->> 'modes_orientation_beneficiaire'), '') AS "modes_orientation_beneficiaire", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leur modes_orientation_beneficiaire
ne contient qu'une chaine de caractères (pas de liste) ??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pour l'instant oui
Pardon, j'avais oublié de le mettre à jour. La maintenant c'est bon, il faut mettre la clé secrète (qui est désormais dans la note appelée Secret Key API MILO) car leur token dure 1h. Merci pour tes retours je regarde ça demain aprèm ! |
Je l'ai écrit mais un peu vite, je trouve les remarques de @vmttn pertinentes, donc tu peux les prendre en compte ! Les commentaires que je vois sont plutôt d'ordre technique, je pense que ça vaut le coup de dployer sur staging pour préparer le terrain pour Antoine dès maintenant. Je mets donc le label "deploy-to-staging". |
c8c893d
to
42209d8
Compare
ça explique les tests qui cassent. Donc il faut de nouveau rebase ;) |
Yes, tu m'avais fait le meme coup l'autre jour ^^ Si tu veux je te refais un petit précis git pour refiger les choses ? |
addition of the nullif trim rename imilo services to match the source staging staging fix staging columns staging additions
int addition int additions int fix : add the necessary imilo annotations for quality + little bug in the int_quality_stats because - the iteration is not technically over (there still is imilo_offres_structures) - but the next one is not a qualifying source (no kind) so the iteration fails - switch to loop.first and prepend UNION ALL to avoid any further issue int additions
42209d8
to
d2b2f63
Compare
d2b2f63
to
acf2bee
Compare
46e1129
to
5c0c1ec
Compare
NULLIF(TRIM(source.data -> 'offres' ->> 'liste_des_profils'), '') AS "profils", | ||
NULLIF(TRIM(source.data -> 'offres' ->> 'modes_orientation_beneficiaire'), '') AS "modes_orientation_beneficiaire", | ||
CAST(structures_offres."id_structure" AS TEXT) AS "structure_id" | ||
CASE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vmttn En fait ici je me dis que je pars déjà peut etre trop déjà dans le mapping mais ça avait l'avantage de permettre de tester si, lorsque de nouveaux profils seront ajoutés, ils matcheront bien notre référentiel
END | ||
FROM UNNEST(STRING_TO_ARRAY(source.data -> 'offres' ->> 'liste_des_profils', ';')) AS profils | ||
) | ||
END AS "profils", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @hlecuyer y a-t-il une petite adaptation à faire pour faire rentrer un profils_precisions ici ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sieht für mich gut aus
Carte associée