Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

👪 CLUSTERING: toujours ajouter parents existants des acteurs types #1268

Merged
merged 9 commits into from
Jan 30, 2025
24 changes: 24 additions & 0 deletions dags/cluster/config/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,27 @@ class ClusterConfig(BaseModel):
# et valeurs obligatoires, voir section validation
# pour toutes les règles
dry_run: bool

# SELECTION ACTEURS NON-PARENTS
include_source_codes: list[str]
include_acteur_type_codes: list[str]
include_only_if_regex_matches_nom: str | None
include_if_all_fields_filled: list[str]
exclude_if_any_field_filled: list[str]

# SELECTION PARENTS EXISTANTS
# Pas de champ source car par définition parents = 0 source
# Pas de champ acteur type = on prend tous les acteur type ci-dessus
include_parents_only_if_regex_matches_nom: str | None

# NORMALISATION
normalize_fields_basic: list[str]
normalize_fields_no_words_size1: list[str]
normalize_fields_no_words_size2_or_less: list[str]
normalize_fields_no_words_size3_or_less: list[str]
normalize_fields_order_unique_words: list[str]

# CLUSTERING
cluster_intra_source_is_allowed: bool
cluster_fields_exact: list[str]
cluster_fields_fuzzy: list[str]
Expand Down Expand Up @@ -96,12 +107,25 @@ def check_model(cls, values):
dropdown_selected=values["include_acteur_type_codes"],
)

"""
Logique supprimée le 2025-01-27 mais conservée pour référence:
- depuis l'ajout des parents indépendants des sources
via PR1265 on ne peut plus savoir si on héritera
uniquement d'une seule source au moment de la config, donc on
laisse passer au niveau de la sélection
TODO: on pourrait scinder la config en plusieurs sous-config:
- SelectionConfig
- NormalizationConfig
- ClusteringConfig
- EnrichmentConfig
Et ainsi avoir des validations plus fines à chaque étape
# ACTEUR TYPE vs. INTRA-SOURCE
if (
len(values["include_source_ids"]) == 1
and not values["cluster_intra_source_is_allowed"]
):
raise ValueError("1 source sélectionnée mais intra-source désactivé")
"""

# Par défaut on ne clusterise pas les acteurs d'une même source
# sauf si intra-source est activé
Expand Down
69 changes: 32 additions & 37 deletions dags/cluster/dags/cluster_acteurs_suggestions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.models.param import Param
from cluster.dags.ui import (
UI_PARAMS_SEPARATOR_CLUSTERING,
UI_PARAMS_SEPARATOR_NORMALIZATION,
UI_PARAMS_SEPARATOR_SELECTION_ACTEURS,
UI_PARAMS_SEPARATOR_SELECTION_PARENTS,
)
from cluster.tasks.airflow_logic import (
cluster_acteurs_config_create_task,
cluster_acteurs_normalize_task,
Expand Down Expand Up @@ -51,37 +57,6 @@

dropdown_acteur_columns = django_model_fields_attributes_get(Acteur)

UI_PARAMS_SEPARATOR_SELECTION = r"""

---

# Paramètres de sélection
Les paramètres suivants décident des acteurs à inclure
ou exclure comme candidats au clustering. Ce n'est pas
parce qu'un acteur est selectionné qu'il sera forcément clusterisé.
(ex: si il se retrouve tout seul sachant qu'on supprime
les clusters de taille 1)
"""

UI_PARAMS_SEPARATOR_NORMALIZATION = r"""

---

# Paramètres de normalisation
Les paramètres suivants définissent comment les valeurs
des champs vont être transformées avant le clustering.
"""

UI_PARAMS_SEPARATOR_CLUSTERING = r"""

---

# Paramètres de clustering
Les paramètres suivants définissent comment les acteurs
vont être regroupés en clusters.
"""


with DAG(
dag_id="cluster_acteurs_suggestions",
dag_display_name="Cluster - Acteurs - Suggestions",
Expand All @@ -107,9 +82,13 @@
"dry_run": Param(
True,
type="boolean",
description_md=f"""🚱 Si coché, seules les tâches qui ne modifient pas
la base de données seront exécutées.
{UI_PARAMS_SEPARATOR_SELECTION}""",
description_md=f"""
🚱 Si coché, aucune tâche d'écriture ne sera effectuée.
Ceci permet de tester le DAG rapidement sans peur de
casser quoi que ce soit (itérer plus vite)
(ex: pas d'écriture des suggestions en DB,
donc pas visible dans Django Admin).
{UI_PARAMS_SEPARATOR_SELECTION_ACTEURS}""",
),
# TODO: permettre de ne sélectionner aucune source = toutes les sources
"include_source_codes": Param(
Expand Down Expand Up @@ -142,7 +121,7 @@
champ avant l'application de la regex pour simplifier les expressions

0️⃣ Si aucune valeur spécifiée = cette option n'a PAS d'effet
""",
{UI_PARAMS_SEPARATOR_SELECTION_PARENTS}""",
maxcorbeau marked this conversation as resolved.
Show resolved Hide resolved
),
"include_if_all_fields_filled": Param(
["code_postal"],
Expand All @@ -164,9 +143,22 @@
exemple: travailler uniquement sur les acteurs SANS SIRET

0️⃣ Si aucune valeur spécifiée = cette option n'a PAS d'effet
{UI_PARAMS_SEPARATOR_NORMALIZATION}
{UI_PARAMS_SEPARATOR_SELECTION_PARENTS}
maxcorbeau marked this conversation as resolved.
Show resolved Hide resolved
""",
),
"include_parents_only_if_regex_matches_nom": Param(
"",
type=["null", "string"],
description_md=f"""**➕ INCLUSION PARENTS**: ceux dont le champ 'nom'
correspond à cette expression régulière ([voir recettes](https://www.notion.so/accelerateur-transition-ecologique-ademe/Expressions-r-guli-res-regex-1766523d57d780939a37edd60f367b75))

🧹 Note: la normalisation basique est appliquée à la volée sur ce
champ avant l'application de la regex pour simplifier les expressions

0️⃣ Si aucune valeur spécifiée = cette option n'a PAS d'effet

{UI_PARAMS_SEPARATOR_NORMALIZATION}""",
),
"normalize_fields_basic": Param(
[],
type=["null", "array"],
Expand Down Expand Up @@ -210,7 +202,10 @@
""",
),
"normalize_fields_no_words_size3_or_less": Param(
["nom"],
# Feedback métier: supprimer des mots de taille 3
# commence à devenir radical, donc on laisse ce champ
# vide par défaut
[],
type=["null", "array"],
examples=dropdown_acteur_columns,
description_md=r"""Les champs à normaliser en supprimant les mots
Expand Down
48 changes: 48 additions & 0 deletions dags/cluster/dags/ui.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""
Code nécessaire pour le formattage de la UI Airflow,
qu'on déplace dans ce fichier pour ne pas encombrer
le DAG
"""

UI_PARAMS_SEPARATOR_SELECTION_ACTEURS = r"""

---

# 🔎 Paramètres **sélection acteurs non-parent**
Les paramètres suivants décident des acteurs non-parent à inclure
ou exclure comme candidats au clustering. Ce n'est pas
parce qu'un acteur est selectionné qu'il sera forcément clusterisé
(ex: si il se retrouve tout seul sachant qu'on supprime
les clusters de taille 1)
"""

UI_PARAMS_SEPARATOR_SELECTION_PARENTS = r"""

---

# 🔎 Paramètres **sélection parents existants**
Les paramètres suivants décident des parents à inclure
ou exclure comme candidats au clustering.

- 💯 Par défault on prend **TOUS les parents** des **mêmes acteur-types
utilisés pour les acteurs**
- Et on filtre ces parents avec les paramètres suivants:
"""

UI_PARAMS_SEPARATOR_NORMALIZATION = r"""

---

# 🧹 Paramètres de **normalisation acteurs + parents**
Les paramètres suivants définissent comment les valeurs
des champs vont être transformées avant le clustering.
"""

UI_PARAMS_SEPARATOR_CLUSTERING = r"""

---

# 📦 Paramètres de **clustering acteurs + parents**
Les paramètres suivants définissent comment les acteurs
vont être regroupés en clusters.
"""
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import logging

import numpy as np
import pandas as pd
from airflow import DAG
from airflow.operators.python import PythonOperator
from cluster.config.model import ClusterConfig
from cluster.tasks.business_logic.cluster_acteurs_df_sort import cluster_acteurs_df_sort
from cluster.tasks.business_logic.cluster_acteurs_selection_from_db import (
from cluster.tasks.business_logic import (
cluster_acteurs_df_sort,
cluster_acteurs_selection_acteur_type_parents,
cluster_acteurs_selection_from_db,
)
from utils import logging_utils as log
Expand Down Expand Up @@ -45,7 +48,14 @@ def cluster_acteurs_selection_from_db_wrapper(**kwargs) -> None:
)
log.preview("Config reçue", config)

df, query = cluster_acteurs_selection_from_db(
# --------------------------------
# 1) Sélection des acteurs
# --------------------------------
# Quels qu'ils soient (enfants ou parents) sur la
# base de tous les critères d'inclusion/exclusion
# fournis au niveau du DAG
logging.info(log.banner_string("Sélection des acteurs"))
df_acteurs, query = cluster_acteurs_selection_from_db(
model_class=DisplayedActeur,
include_source_ids=config.include_source_ids,
include_acteur_type_ids=config.include_acteur_type_ids,
Expand All @@ -54,10 +64,46 @@ def cluster_acteurs_selection_from_db_wrapper(**kwargs) -> None:
exclude_if_any_field_filled=config.exclude_if_any_field_filled,
extra_dataframe_fields=config.fields_used,
)
df_acteurs = cluster_acteurs_df_sort(df_acteurs)
log.preview("requête SQL utilisée", query)
log.preview("acteurs sélectionnés", df)
log.preview("# acteurs par source_id", df.groupby("source_id").size())
log.preview("# acteurs par acteur_type_id", df.groupby("acteur_type_id").size())
log.preview("# acteurs par source_id", df_acteurs.groupby("source_id").size())
log.preview(
"# acteurs par acteur_type_id", df_acteurs.groupby("acteur_type_id").size()
)
log.preview_df_as_markdown("acteurs sélectionnés", df_acteurs)

# --------------------------------
# 2) Sélection des parents uniquements
# --------------------------------
# Aujourd'hui (2025-01-27): la convention data veut qu'un parent
# soit attribué une source NULL. Donc si le métier choisit de
# clusteriser des sources en particulier à 1), on ne peut donc
# jamais récupérer les parents potentiels pour le même type d'acteur
# La logique ci-dessous vient palier à ce problème en sélectionnant
# TOUS les parents des acteurs types sélectionnés, et en ignorant
# les autres paramètres de sélection
logging.info(log.banner_string("Sélection des parents"))
df_parents = cluster_acteurs_selection_acteur_type_parents(
acteur_type_ids=config.include_acteur_type_ids,
fields=config.fields_used,
include_only_if_regex_matches_nom=config.include_parents_only_if_regex_matches_nom,
)
df_parents = cluster_acteurs_df_sort(df_parents)
log.preview_df_as_markdown("parents sélectionnés", df_parents)

# --------------------------------
# 3) Fusion acteurs + parents
# --------------------------------
logging.info(log.banner_string("Fusion acteurs + parents"))
ids = set()
ids.update(df_acteurs["identifiant_unique"].values)
ids.update(df_parents["identifiant_unique"].values)
log.preview("IDs avant la fusion", ids)
df = pd.concat([df_acteurs, df_parents], ignore_index=True).replace({np.nan: None})
df = df.drop_duplicates(subset="identifiant_unique", keep="first")
df = cluster_acteurs_df_sort(df)
log.preview("IDs après la fusion", df["identifiant_unique"].tolist())
log.preview_df_as_markdown("acteurs + parents sélectionnés", df)

if df.empty:
raise ValueError("Aucun acteur trouvé avec les critères de sélection")
Expand Down
4 changes: 4 additions & 0 deletions dags/cluster/tasks/business_logic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
from .cluster_acteurs_parent_calculations import ( # noqa
cluster_acteurs_parent_calculations,
)
from .cluster_acteurs_selection_acteur_type_parents import ( # noqa
cluster_acteurs_selection_acteur_type_parents,
)
from .cluster_acteurs_selection_from_db import cluster_acteurs_selection_from_db # noqa
from .cluster_acteurs_suggestions import cluster_acteurs_suggestions # noqa
from .cluster_acteurs_suggestions_to_db import cluster_acteurs_suggestions_to_db # noqa
from .cluster_acteurs_suggestions_validate import ( # noqa
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import pandas as pd
from shared.tasks.business_logic import normalize
from utils.django import django_model_queryset_to_df, django_setup_full

django_setup_full()
from qfdmo.models import ActeurType, DisplayedActeur # noqa: E402
from qfdmo.models.acteur import ActeurStatus # noqa: E402


def cluster_acteurs_selection_acteur_type_parents(
acteur_type_ids: list[int],
fields: list[str],
include_only_if_regex_matches_nom: str | None = None,
) -> pd.DataFrame:
"""Sélectionne tous les parents des acteurs types donnés,
pour pouvoir notamment permettre de clusteriser avec
ces parents existant indépendemment des critères de sélection
des autres acteurs qu'on cherche à clusteriser (ex: si on cherche
à clusteriser les acteurs commerce de source A MAIS en essayant
de rattacher au maximum avec tous les parents commerce existants)"""

# Ajout des champs nécessaires au fonctionnement de la fonction
# si manquant
if "nom" not in fields:
fields.append("nom")

# Petite validation (on ne fait pas confiance à l'appelant)
ids_in_db = list(ActeurType.objects.values_list("id", flat=True))
ids_invalid = set(acteur_type_ids) - set(ids_in_db)
if ids_invalid:
raise ValueError(f"acteur_type_ids {ids_invalid} pas trouvés en DB")

# On récupère les parents des acteurs types donnés
# qui sont censés être des acteurs sans source
parents = DisplayedActeur.objects.filter(
acteur_type__id__in=acteur_type_ids,
statut=ActeurStatus.ACTIF,
source__id__isnull=True,
)

df = django_model_queryset_to_df(parents, fields)

# Si une regexp de nom est fournie, on l'applique
# pour filtrer la df, sinon on garde toute la df
if include_only_if_regex_matches_nom:
print(f"{include_only_if_regex_matches_nom=}")
print(df["nom"].map(normalize.string_basic).tolist())
df = df[
df["nom"]
# On applique la normalisation de base à la volée
# pour simplifier les regex
.map(normalize.string_basic).str.contains(
include_only_if_regex_matches_nom, na=False, regex=True
)
].copy()

return df
Loading
Loading