Skip to content

Commit

Permalink
🔨 Clustering: bux fixes et amélioration logs Airflow (#1223)
Browse files Browse the repository at this point in the history
* clustering: bux fixes et amélioration logs Airflow

* clustering: TODO pour une prochaine PR

* clustering: amélioration fonction + tests pour l'affichage df

* remplacement des print par logger.info
  • Loading branch information
maxcorbeau authored Jan 16, 2025
1 parent ad883db commit 9a6f805
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 28 deletions.
1 change: 1 addition & 0 deletions dags/cluster/dags/cluster_acteurs_suggestions.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
la base de données seront exécutées.
{UI_PARAMS_SEPARATOR_SELECTION}""",
),
# TODO: permettre de ne sélectionner aucune source = toutes les sources
"include_source_codes": Param(
[],
type="array",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ def cluster_acteurs_config_validate_wrapper(**kwargs) -> None:
for key, value in params.items():
log.preview(f"param: {key}", value)

# TODO: ceci devrait être déplacer dans la fonction de validation
# qui devrait être reprise de fond en comble avec du pydantic
# pour pas réinventer la roue
if (
len(params["include_source_codes"]) == 1
and not params["cluster_intra_source_is_allowed"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from cluster.tasks.business_logic.cluster_acteurs_db_data_read_acteurs import (
cluster_acteurs_db_data_read_acteurs,
)
from cluster.tasks.business_logic.cluster_acteurs_df_sort import cluster_acteurs_df_sort
from utils import logging_utils as log
from utils.django import django_setup_full

Expand Down Expand Up @@ -72,6 +73,7 @@ def cluster_acteurs_db_data_read_acteurs_wrapper(**kwargs) -> None:
raise ValueError("Aucun acteur trouvé avec les critères de sélection")

logging.info(log.banner_string("🏁 Résultat final de cette tâche"))
df = cluster_acteurs_df_sort(df)
log.preview_df_as_markdown("acteurs sélectionnés", df)

kwargs["ti"].xcom_push(key="df", value=df)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pandas as pd
from airflow import DAG
from airflow.operators.python import PythonOperator
from cluster.tasks.business_logic.cluster_acteurs_df_sort import cluster_acteurs_df_sort
from cluster.tasks.business_logic.cluster_acteurs_normalize import (
cluster_acteurs_normalize,
)
Expand Down Expand Up @@ -72,6 +73,7 @@ def cluster_acteurs_normalize_wrapper(**kwargs) -> None:
log.preview("acteurs normalisés", df_norm)

logging.info(log.banner_string("🏁 Résultat final de cette tâche"))
df_norm = cluster_acteurs_df_sort(df_norm)
log.preview_df_as_markdown("acteurs normalisés", df_norm)

# Les XCOM étant spécifiques à une tâche on peut pousser
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from airflow import DAG
from airflow.exceptions import AirflowSkipException
from airflow.operators.python import PythonOperator
from cluster.tasks.business_logic.cluster_acteurs_df_sort import cluster_acteurs_df_sort
from cluster.tasks.business_logic.cluster_acteurs_suggestions import (
cluster_acteurs_suggestions,
)
Expand Down Expand Up @@ -62,6 +63,11 @@ def cluster_acteurs_suggestions_wrapper(**kwargs) -> None:
log.banner_string("Pas de suggestions de clusters générées")
)

df_suggestions = cluster_acteurs_df_sort(
df_suggestions,
cluster_fields_exact=params["cluster_fields_exact"],
cluster_fields_fuzzy=params["cluster_fields_fuzzy"] or [],
)
log.preview_df_as_markdown("suggestions de clusters", df_suggestions)

# On pousse les suggestions dans xcom pour les tâches suivantes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ def cluster_acteurs_db_data_read_acteurs(
=> donc on préfère simplifier l'approche en soulevant des exceptions dès
que possible via cette fonction
# TODO: une fois qu'on est confiant avec la pipeline (+ de tests unitaires
# et métiers):
# - enlever les exceptions
# - dans les tâches Airflow: raise AirflowSkipException si pas de données
# => ceci permettra à la pipeline de tourner en mode autopilot pour générer
# des suggestions en continues (et avoir du skip au lieu des erreurs si rien)
Args:
model_class (type[Model]): Le modèle Django à lire, le mettre en paramètre
nous permet de choisir Acteur, RevisionActeur ou DisplayedActeur
Expand Down
57 changes: 57 additions & 0 deletions dags/cluster/tasks/business_logic/cluster_acteurs_df_sort.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import pandas as pd


def cluster_acteurs_df_sort(
df: pd.DataFrame,
cluster_fields_exact: list[str] = [],
cluster_fields_fuzzy: list[str] = [],
) -> pd.DataFrame:
"""Fonction de tri d'une dataframe acteurs
pour favoriser la visualisation des clusters.
La fonction peut fonctionner sur n'importe quel
état d'une dataframe acteurs (sélection, normalisation,
clusterisation).
De fait elle est utilisée tout au long du DAG
airflow de clustering pour mieux visualiser la
construction des clusters au fur et à mesure des tâches.
Args:
df (pd.DataFrame): DataFrame acteurs
cluster_fields_exact (list[str], optional): Liste des champs exacts
pour le clustering. Defaults to [].
cluster_fields_fuzzy (list[str], optional): Liste des champs flous
pour le clustering. Defaults to [].
Returns:
pd.DataFrame: DataFrame acteurs triée
"""

# On construit une liste de champs de tri
# avec des champs par défauts (ex: cluster_id)
# et des champs spécifiés dans la config du DAG
sort_ideal = ["cluster_id"] # la base du clustering

# pour déceler des erreurs de clustering rapidement (ex: intra-source)
# mais on le met pas pour les étapes de sélection et normalisation
# car cela casse notre ordre (on a pas de cluster_id et donc
# on préfère par sémantique business que des codes)
if cluster_fields_exact or cluster_fields_fuzzy:
sort_ideal += ["source_code", "acteur_type_code"]
sort_ideal += [x for x in cluster_fields_exact if x not in sort_ideal]
sort_ideal += [x for x in cluster_fields_fuzzy if x not in sort_ideal]
# défaut quand on n'a pas de champs de clustering (étape de sélection)
sort_ideal += [
x for x in ["code_postal", "ville", "adresse", "nom"] if x not in sort_ideal
]

# Puis on construit la liste actuelle des champs de tri
# vs. la réalité des champs présents dans la dataframe
# en prenant "au mieux" dans l'ordre idéale et en rajoutant
# ce qui reste de la df
sort_actual = [x for x in sort_ideal if x in df.columns]
sort_actual += [
x for x in df.columns if x not in cluster_fields_exact and x not in sort_actual
]
return df.sort_values(by=sort_actual, ascending=True)[sort_actual]
79 changes: 51 additions & 28 deletions dags/cluster/tasks/business_logic/cluster_acteurs_suggestions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Fonctions de clustering des acteurs
TODO: améliorer le debug: en mode CLI on avant des print()
TODO: améliorer le debug: en mode CLI on avant des logger.info()
qui étaient suffisants, mais en mode Airflow, on va faire
exploser la taille des logs, donc on a besoin de refactorer:
- les fonctions doivent retourner des valeurs de debug
Expand All @@ -10,16 +10,19 @@
qui afficheront une partie du debug
"""

import json
import logging
import re

import numpy as np
import pandas as pd
from rich import print
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from slugify import slugify
from unidecode import unidecode

logger = logging.getLogger(__name__)

COLS_GROUP_EXACT_ALWAYS = [
# "code_departement",
"code_postal",
Expand Down Expand Up @@ -120,28 +123,28 @@ def similarity_matrix_to_tuples(
tuples.sort(key=lambda x: x[2], reverse=True)
if indexes:
tuples = [(indexes[i], indexes[j], score) for i, j, score in tuples]
print(f"{tuples=}")
logger.info(f"{tuples=}")
return tuples


def score_tuples_to_clusters(
data: list[tuple[int, int, float]], threshold
tuples: list[tuple[int, int, float]], threshold
) -> list[list[int]]:
"""Convertit une liste de tuples (index_a, index_b, score) en clusters"""
"""Convertit une liste de tuples d'acteurs (index_a, index_b, score) en clusters"""

# On ne devrait jamais converver des clusters vides, donc si on appelle
# cette fonction avec une liste vide, c'est qu'on a un problème
# en amont
if not data:
if not tuples:
raise ValueError("Liste de tuples d'entrée vide, on ne devrait pas être ici")

# Trier la liste par score décroissant même si elle est déjà triée
# car cela n'est pas garanti par l'appelant ET nous avons
# une optimisation avec "break" dans la boucle
data.sort(key=lambda x: x[2], reverse=True)
tuples.sort(key=lambda x: x[2], reverse=True)

clusters = []
for index_a, index_b, score in data:
for index_a, index_b, score in tuples:
# Ayant trié la liste par score décroissant, on peut sortir
# de la boucle dès qu'on a un score inférieur au seuil
if score < threshold:
Expand Down Expand Up @@ -178,7 +181,7 @@ def cluster_to_subclusters(
) -> list[list[int]]:
"""Scinde un cluster en 1+ sous-clusters en fonction de la similarité des valeurs
d'une colonne donnée. On s'assure qu'il n'y a pas de sous-clusters dupliqués."""
print("\n\nrefine_cluster", f"{column=}, {cluster=}, {threshold=}")
logger.info(f"\n\nrefine_cluster, {column=}, {cluster=}, {threshold=}")
df_cluster = df.loc[cluster]
column_values = df_cluster[column].astype(str).values
similarity_matrix = values_to_similarity_matrix(column_values) # type: ignore
Expand All @@ -189,10 +192,10 @@ def cluster_to_subclusters(
symbol = "🟢" if score >= threshold else "🔴"
v_i = column_values[cluster.index(i)]
v_j = column_values[cluster.index(j)]
print(f"{symbol} {i=}, {j=}, {v_i=}, {v_j=} {score=}")
logger.info(f"{symbol} {i=}, {j=}, {v_i=}, {v_j=} {score=}")

sub_clusters = score_tuples_to_clusters(tuples, threshold)
print(f"{sub_clusters=}")
logger.info(f"{sub_clusters=}")
return sub_clusters


Expand All @@ -205,7 +208,8 @@ def cluster_cols_group_fuzzy(df_src, columns, threshold):
df = df.dropna(subset=[column])
df = df[df[column].astype(str).str.strip() != ""]

if df.empty:
# On ne considère que les clusters de taille 2+
if len(df) < 2:
return []

clusters = [list(df.index)]
Expand All @@ -214,7 +218,7 @@ def cluster_cols_group_fuzzy(df_src, columns, threshold):
clusters_ref = []
for cluster in clusters:
clusters_ref_new = cluster_to_subclusters(df, column, cluster, threshold)
print(f"{column=}, {cluster=}, {clusters_ref_new=}")
logger.info(f"{column=}, {cluster=}, {clusters_ref_new=}")
clusters_ref.extend(clusters_ref_new)

# Only continue with valid clusters for the next refinement step
Expand Down Expand Up @@ -284,7 +288,7 @@ def cluster_acteurs_suggestions(
+ ["nom"]
)
)
print(f"{cols_to_keep=}")
logger.info(f"{cols_to_keep=}")
df = df[cols_to_keep]

# On groupe par les colonnes exactes
Expand All @@ -293,15 +297,12 @@ def cluster_acteurs_suggestions(
for exact_keys, exact_rows in df.groupby(
COLS_GROUP_EXACT_ALWAYS + cluster_fields_exact
):
print("\n\nNouveau cluster potentiel avec exact match:")

# On ne cherche pas à grouper les clusters de taille 1
if len(exact_rows) == 1:
print(f"🔴 Ignoré: cluster de taille 1: {list(exact_keys)}")
# On ne considère que les clusters de taille 2+
if len(exact_rows) < 2:
logger.info(f"🔴 Ignoré: cluster de taille <2: {list(exact_keys)}")
clusters_size1.append(exact_keys)
continue
keys = list(exact_keys)
clusters_to_add = []

# TODO: à déplacer après la logique de clustering
# pour pouvoir sélectionner l'acteur avec le meilleur match
Expand All @@ -319,27 +320,46 @@ def cluster_acteurs_suggestions(
if len(exact_rows) < 2:
continue

# Liste des clusters à considérer, on commence avec rien
clusters_to_add = []

logger.info(f"🟡 Cluster potentiel avant fuzzy: taille {len(exact_rows)}")
fields_debug = (
cluster_fields_exact + cluster_fields_fuzzy + ["identifiant_unique"]
)
logger.info(
json.dumps(exact_rows[fields_debug].to_dict(orient="list"), indent=4)
)

# Si on a des champs fuzzy, on cherche à
# sous-clusteriser sur ces champs
if cluster_fields_fuzzy:
keys += cluster_fields_fuzzy
logger.info(f"fields_fuzzy={cluster_fields_fuzzy}")
logger.info(f"threshold={cluster_fuzzy_threshold}")

keys += cluster_fields_fuzzy
subclusters = cluster_cols_group_fuzzy(
exact_rows, cluster_fields_fuzzy, threshold=cluster_fuzzy_threshold
)
print(f"{subclusters=}")
logger.info(f"Sous-clusters après fuzzy: {len(subclusters)} sous-clusters")
for i, fuzzy_rows in enumerate(subclusters):
fuzzy_keys = keys + [str(i + 1)]
logger.info("🟢 Sous-cluster conservé:")
logger.info(json.dumps(fuzzy_rows.to_dict(orient="list"), indent=4))
clusters_to_add.append((fuzzy_keys, fuzzy_rows))

else:
logger.info(
"🟢 Cluster conservé (sur la base des champs exacts uniquement)"
)
logger.info(exact_rows)
clusters_to_add.append((keys, exact_rows))

for keys, rows in clusters_to_add:
cluster_id = cluster_id_from_strings(keys)
rows["cluster_id"] = cluster_id
values = rows[cluster_fields_fuzzy + ["nom"]]
print(f"\n🟢 CLUSTER: {cluster_id=}, {keys=}, {values=}")
logger.info(f"\n🟢 CLUSTER: {cluster_id=}, {keys=}, {values=}")
clusters.append(rows.copy())

if not clusters:
Expand All @@ -351,17 +371,20 @@ def cluster_acteurs_suggestions(
# On ne garde que les clusters de taille 2+
df_clusters = df_clusters.groupby("cluster_id").filter(lambda x: len(x) >= 2)

"""
# Debug pour les clusters de taille 1
print("🔴 clusters_size1", clusters_size1)
logger.info("🔴 clusters_size1", clusters_size1)
df_clusters_1 = pd.DataFrame(
clusters_size1, columns=COLS_GROUP_EXACT_ALWAYS + cluster_fields_exact
)
# Show entries grouped by code_postal and acteur_type_code which have >1 entries
issues_villes = df_clusters_1.groupby(["code_postal"]).filter(lambda x: len(x) >= 2)
print(f"🔴 {issues_villes=}")
print(f"🔴 {len(clusters_size1)=}")
print(f"🟢 {df_clusters["cluster_id"].nunique()=}")
print(f"🟢 {df_clusters["identifiant_unique"].nunique()=}")
logger.info(f"🔴 {issues_villes=}")
"""
logger.info(f"🟢 {len(clusters_size1)=}")
logger.info(f"🟢 {df_clusters["cluster_id"].nunique()=}")
logger.info(f"🟢 {df_clusters["identifiant_unique"].nunique()=}")

return df_clusters
Loading

0 comments on commit 9a6f805

Please sign in to comment.