Skip to content

Commit

Permalink
Add annuaire entreprise datasets and siretisation (#780)
Browse files Browse the repository at this point in the history
* add columns for wiriting to displayed acteur

* dataset integration

* fix conditions

* fix conditions

* fix job

* fix removed create final actor

* comments

* add db mapping

* make siretisation more generic

* fix comments

* make more generic

* put back content

* add retry and r.raw

* naf none handling

* working code
  • Loading branch information
Hazelmat authored Aug 28, 2024
1 parent 8e0c496 commit 47d6182
Show file tree
Hide file tree
Showing 5 changed files with 336 additions and 40 deletions.
181 changes: 146 additions & 35 deletions dags/annuaire_entreprise_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,16 @@
dag = DAG(
utils.get_dag_name(__file__, "annuaire_entreprise_checks"),
default_args=default_args,
params={"limit": Param(0, type="integer", description="Limit for data processed")},
params={
"limit": Param(0, type="integer", description="Limit for data processed"),
"source_code_naf": {
"Ordre National Des Pharmaciens": "47.73Z",
"Bibliothèques - Ministère de la culture": "84.11Z,91.01Z",
"Lunettes de Zac": "47.78A",
"Association des Ludothèques Françaises": "91.01Z,93.29Z",
"ALIAPUR": "45.31Z",
},
},
description=(
"A pipeline to apply checks using annuaire entreprise "
"API and output data to validation tool"
Expand All @@ -43,25 +52,46 @@ def fetch_and_parse_data(**context):
limit = context["params"]["limit"]
pg_hook = PostgresHook(postgres_conn_id=utils.get_db_conn_id(__file__))
engine = pg_hook.get_sqlalchemy_engine()
df_acteur = pd.read_sql("qfdmo_displayedacteur", engine)
df_acteur["full_adresse"] = (
df_acteur["adresse"]
.fillna("")
.str.cat(df_acteur["code_postal"].fillna(""), sep=" ")
.str.cat(df_acteur["ville"].fillna(""), sep=" ")
)
active_actors_query = """
SELECT
da.*,
s.code AS source_code
FROM
qfdmo_displayedacteur da
JOIN
qfdmo_source s
ON
da.source_id = s.id
WHERE
da.statut = 'ACTIF';
"""
df_acteur = pd.read_sql(active_actors_query, engine)

df_acteur = df_acteur[df_acteur["statut"] == "ACTIF"]
if limit > 1:
df_acteur = df_acteur.head(limit)

good_siret_condition = (
df_acteur["siret"].notna()
& (df_acteur["siret"] != "")
& (df_acteur["siret"] != "None")
& (df_acteur["siret"] != " ")
& (df_acteur["siret"] != "ZZZ")
)
good_siret_closed_query = """
SELECT
a.*,
e.etat_administratif,
s.code AS source_code
FROM
qfdmo_displayedacteur a
LEFT JOIN
etablissements e
ON
a.siret = e.siret
LEFT JOIN
qfdmo_source s
ON
a.source_id = s.id
WHERE
a.statut = 'ACTIF'
AND LENGTH(a.siret) = 14
AND e.etat_administratif = 'F'
"""
df_good_siret_closed = pd.read_sql(good_siret_closed_query, engine)

good_address_condition = (
df_acteur["adresse"].notna()
& (df_acteur["ville"].notnull())
Expand All @@ -71,53 +101,77 @@ def fetch_and_parse_data(**context):
& (df_acteur["code_postal"].str.len() == 5)
)

df1 = df_acteur[good_siret_condition & good_address_condition]

df2 = df_acteur[~good_siret_condition & good_address_condition]
df3 = df_acteur[good_siret_condition & ~good_address_condition]

df_a_siretiser = df_acteur[
(
~df_acteur["identifiant_unique"].isin(
df_good_siret_closed["identifiant_unique"]
)
)
& (good_address_condition)
& (df_acteur["siret"].str.len() != 14)
& (df_acteur["siret"].str.len() != 9)
]
print("df_good_siret_closed size:", df_good_siret_closed.shape)
print("df_a_siretiser size:", df_a_siretiser.shape)
return {
"ok_siret_ok_adresse": df1,
"nok_siret_ok_adresse": df2,
"ok_siret_nok_adresse": df3,
"closed_ok_siret": df_good_siret_closed,
"nok_siret_ok_adresse": df_a_siretiser,
}


def check_actor_with_adresse(**kwargs):
data = kwargs["ti"].xcom_pull(task_ids="load_and_filter_actors_data")
df_ok_siret_ok_adresse = data["ok_siret_ok_adresse"]
source_code_naf = kwargs["params"]["source_code_naf"]
df_ok_siret_ok_adresse = data["closed_ok_siret"]
df_nok_siret_ok_adresse = data["nok_siret_ok_adresse"]
df = pd.concat([df_ok_siret_ok_adresse, df_nok_siret_ok_adresse])
df["full_adresse"] = mapping_utils.create_full_adresse(df)
df["naf_code"] = df["source_code"].map(source_code_naf)

df["ae_result"] = df.apply(
lambda x: utils.check_siret_using_annuaire_entreprise(
x, col="full_adresse", adresse_query_flag=True
x, query_col="full_adresse", naf_col="naf_code", adresse_query_flag=True
),
axis=1,
)

return df[
["identifiant_unique", "siret", "nom", "statut", "ae_result", "full_adresse"]
[
"identifiant_unique",
"siret",
"nom",
"statut",
"ae_result",
"full_adresse",
"source_code",
"naf_code",
]
]


def check_actor_with_siret(**kwargs):
data = kwargs["ti"].xcom_pull(task_ids="load_and_filter_actors_data")
df_ok_siret_ok_adresse = data["ok_siret_ok_adresse"]
df_ok_siret_nok_adresse = data["ok_siret_nok_adresse"]
df_acteur = pd.concat([df_ok_siret_ok_adresse, df_ok_siret_nok_adresse])

df_acteur = data["closed_ok_siret"]
df_acteur["full_adresse"] = mapping_utils.create_full_adresse(df_acteur)
df_acteur["ae_result"] = df_acteur.apply(
utils.check_siret_using_annuaire_entreprise, axis=1
)
return df_acteur[
["identifiant_unique", "siret", "nom", "statut", "ae_result", "full_adresse"]
[
"identifiant_unique",
"siret",
"nom",
"statut",
"ae_result",
"full_adresse",
"source_code",
]
]


def enrich_row(row):
enriched_ae_result = []
for item in row["ae_result"]:
for item in row.get("ae_result", []):
latitude = item.get("latitude_candidat")
longitude = item.get("longitude_candidat")
if latitude is not None and longitude is not None:
Expand All @@ -138,26 +192,82 @@ def combine_actors(**kwargs):
df = pd.merge(
df_acteur_with_siret,
df_acteur_with_adresse,
on=["identifiant_unique", "nom", "statut", "siret", "full_adresse"],
on=[
"identifiant_unique",
"nom",
"statut",
"siret",
"full_adresse",
"source_code",
],
how="outer",
suffixes=("_siret", "_adresse"),
)

cohort_dfs = {}

df_bad_siret = df[
df["ae_result_siret"].isnull()
& df["full_adresse"].notnull()
& (df["siret"].str.len() != 14)
& (df["siret"].str.len() != 9)
]

if not df_bad_siret.empty:
df_bad_siret["ae_result"] = df_bad_siret.apply(
siret_control_utils.combine_ae_result_dicts, axis=1
)
df_non_empty_ae_results = df_bad_siret[
df_bad_siret["ae_result"].apply(
lambda x: len(
[
candidate
for candidate in x
if candidate["etat_admin_candidat"] == "A"
]
)
> 0
)
]
df_non_empty_ae_results["naf_code"] = df_non_empty_ae_results[
"naf_code_adresse"
]
df_empty_ae_results = df_bad_siret[
df_bad_siret["ae_result"].apply(lambda x: len(x) == 0)
]
for (source_code, source_code_naf), group in df_non_empty_ae_results.groupby(
[
df_non_empty_ae_results["source_code"].fillna("None"),
df_non_empty_ae_results["naf_code"].fillna("None"),
]
):
cohort_name = (
f"siretitsation_with_adresse_bad_siret_source_"
f"{source_code}_naf_{source_code_naf}"
)
cohort_dfs[cohort_name] = group

cohort_dfs["siretitsation_with_adresse_bad_siret_empty"] = df_empty_ae_results

df = df[~df["identifiant_unique"].isin(df_bad_siret["identifiant_unique"])]

df["ae_result"] = df.apply(siret_control_utils.combine_ae_result_dicts, axis=1)
df[["statut", "categorie_naf", "ae_adresse"]] = df.apply(
siret_control_utils.update_statut, axis=1
)
df = df[df["statut"] == "SUPPRIME"]
if len(df) > 0:

df["cohort_id"] = df.apply(siret_control_utils.set_cohort_id, axis=1)
else:
return cohort_dfs

for cohort_id in df["cohort_id"].unique():
cohort_dfs[cohort_id] = df[df["cohort_id"] == cohort_id]

for cohort_id, cohort_df in cohort_dfs.items():
print(f"Cohort ID: {cohort_id} - Number of rows: {len(cohort_df)}")

return cohort_dfs


Expand Down Expand Up @@ -235,7 +345,8 @@ def serialize_to_json(**kwargs):

(
load_and_filter_data_task
>> [check_siret_task, check_adresse_task]
>> check_siret_task
>> check_adresse_task
>> combine_candidates
>> get_location_task
>> serialize_to_json_task
Expand Down
Loading

0 comments on commit 47d6182

Please sign in to comment.