Skip to content

Commit

Permalink
🧑‍🤝‍🧑 Dédup v2 amélioration data parent (#1160)
Browse files Browse the repository at this point in the history
* initial commit

* clarification de l'inclusion

* migration typing minuscule

* exclusion des champs cree_le et modifie_le

* ajout ECODDS à l'exclusion du nom_commercial
  • Loading branch information
maxcorbeau authored Dec 19, 2024
1 parent 744b73d commit 5283c1e
Show file tree
Hide file tree
Showing 4 changed files with 343 additions and 82 deletions.
10 changes: 7 additions & 3 deletions scripts/deduplication/deduplicate.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
from tasks.db_manage_cluster import db_manage_cluster
from tasks.source_data_get import source_data_get
from utils.cli import banner
from utils.db import db_source_ids_by_code_get

from qfdmo.models import RevisionActeur, Source
from qfdmo.models import RevisionActeur

install() # you can install globally to your env, see docs

Expand All @@ -36,7 +37,10 @@
# A adapter
RUN_SOURCES_PREFERRED_CODES = ["ALIAPUR", "COREPILE"]
RUN_ID = "dechetteries_202412"
RUN_CLUSTER_IDS_TO_SKIP = [] # Si besoin pour passer des erreurs
RUN_CLUSTER_IDS_TO_SKIP = [
"85250_3_1",
"93320_1_1",
] # Si besoin pour passer des erreurs

# Automatique par le script
RUN_CLUSTER_IDS_TO_CHANGES = getattr(
Expand Down Expand Up @@ -146,7 +150,7 @@ def main() -> None:
# ------------------------------------------
# Pour privilégier certaines sources au moment de la création des parents
banner("RECUPERATION IDS DES SOURCES PREFEREES")
source_ids_by_codes = dict(Source.objects.values_list("code", "id"))
source_ids_by_codes = db_source_ids_by_code_get()
sources_preferred_ids = [
source_ids_by_codes[x] for x in RUN_SOURCES_PREFERRED_CODES
]
Expand Down
136 changes: 110 additions & 26 deletions scripts/deduplication/tasks/db_manage_parent.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from qfdmo.models.acteur import RevisionActeur
from scripts.deduplication.models.acteur_map import ActeurMap
from scripts.deduplication.models.change import Change
from scripts.deduplication.utils.db import db_source_ids_by_code_get


def parent_id_generate(ids: list[str]) -> str:
Expand Down Expand Up @@ -102,8 +103,11 @@ def acteurs_dict_to_list_of_dicts(acteurs_maps: list[ActeurMap]) -> list[dict]:


def parent_get_data_from_acteurs(
acteurs: list[dict], sources_preferred_ids, validation_model: Model
) -> dict:
acteurs: list[dict],
source_ids_by_code: dict,
sources_preferred_ids,
validation_model: Model,
) -> tuple[dict, dict]:
"""
Récupération des données d'acteurs pour l'utilisation dans un parent. Pour l'instant
la logique est primitive de choisir selon une liste de sources préférées. A l'avenir
Expand All @@ -116,8 +120,88 @@ def parent_get_data_from_acteurs(
Returns:
parent_dict: donnée parent sous forme de dict
sources_codes_picked: mapping des champs et sources choisies
"""
# TODO: revoir les règles d'inclusion/exclusion pour la prochaine
# dédup pour le rendre plus flexible avec l'acteur_type_id
# - entrée de mapping = acteur_type_id x champ
# - sortie de mapping = liste de sources à inclure/exclure
if any(x["acteur_type_id"] not in (None, 7) for x in acteurs):
raise NotImplementedError(
"""Règles inclusion/exclusion pour la dédup déchetteries uniquement,
revoir avec Christian avec la prochaine dédup"""
)

source_codes_by_id = {v: k for k, v in source_ids_by_code.items()}
parent_dict = {}
sources_codes_picked = {}
# Mapping d'inclusion de source spécifique à certains champs
# TODO: les 2 mapping doivent être exclusif
field_source_codes_priority = {
"nom": ["COREPILE", "ALIAPUR"],
"location": ["REFASHION"],
}
# Mapping d'exclusion de source spécifique à certains champs
field_source_codes_exclusion = {
"nom": ["REFASHION"],
"nom_commercial": ["REFASHION", "ECODDS"],
"siret": ["REFASHION"],
"siren": ["REFASHION"],
"telephone": ["REFASHION"],
}
# Ignore les champs d'identifications acteurs
# car il doivent tous restés vides (générés ultérieurement)
keys_to_ignore = [
"identifiant_unique",
"identifiant_externe",
"parent_id",
"source_id",
# Déjà géré par Django
"modifie_le",
# Soit on garde la valeur historique, ou soit pour un nouveau
# parent c'est géré par Django
"cree_le",
]

def results_update(field, value, acteur):
"""utilitaire MAJ de parent_dict et sources_codes_picked
+ some debug prints pour éviter de se répéter sur les
2 boucles"""
source_id = acteur["source_id"]
source_code = source_codes_by_id.get(source_id)
acteur_id = acteur["identifiant_unique"]
parent_dict[field] = value
sources_codes_picked[field] = source_code
print(f"\t{field=} {value=}")
print(f"\t\t via {source_code=} {acteur_id=}")

# ------------------------------------
# Boucle 1: données via inclusion/exclusion champs/sources spécéfiques
# ------------------------------------
print("parent_get_data_from_acteurs: boucle 1")
for field, source_codes_inclusion in field_source_codes_priority.items():
source_codes_exlusion = field_source_codes_exclusion.get(field, [])
# acteurs candidats pour le champ
cands = sorted(
[
x
for x in acteurs
if x["source_id"] is not None
and source_codes_by_id[x["source_id"]] in source_codes_inclusion
and source_codes_by_id[x["source_id"]] not in source_codes_exlusion
and x[field] is not None
],
key=lambda x: source_codes_inclusion.index(
source_codes_by_id[x["source_id"]]
),
)
acteur = next(iter(cands), None)
if acteur is not None:
results_update(field, acteur[field], acteur)

# ------------------------------------
# Boucle 2: données restantes via les sources préférées
# ------------------------------------
# Ordonner acteurs sur source_id dans l'ordre de sources_preferred_ids
# et les autres acteurs par défaut après
acteurs = sorted(
Expand All @@ -132,47 +216,44 @@ def parent_get_data_from_acteurs(
else len(sources_preferred_ids)
),
)
# Ignore les champs d'identifications acteurs
# car il doivent tous restés vides (sauf identifiant_unique
# qui est généré avec un UUID séparément)
keys_to_ignore = [
"identifiant_unique",
"identifiant_externe",
"parent_id",
"source_id",
]
print("parent_get_data_from_acteurs:")
print("parent_get_data_from_acteurs: boucle 2")
email_field = forms.EmailField()
for acteur in acteurs:
for key, val in acteur.items():
if key in keys_to_ignore:
continue
if val is not None and parent_dict.get(key) is None:
source_code = source_codes_by_id.get(acteur["source_id"])
# Précédent parent non selectionné comme parent pour le nouveau
# cluster, on veut pas prendre le risque de réutiliser ces données
if source_code is None:
continue
for field, value in acteur.items():
source_codes_exlusion = field_source_codes_exclusion.get(field, [])
if (
field not in keys_to_ignore
and value is not None
and parent_dict.get(field) is None
and source_code not in source_codes_exlusion
):
# TODO: voir si il y aurait une façon plus élégante
# d'automatiquement supprimer toutes les données invalides
# plutôt que de le faire manuellement champ par champ.
# Ceci est en lien avec le problème de présence de mauvais
# emails dans la DB, qui empêche la réutilisation du
# modèle django car .save() appelle .full_clean() qui crash
if key == "email":
if field == "email":
try:
email_field.clean(val)
email_field.clean(value)
except forms.ValidationError:
continue
print(
f"\t{key=}, {val=}",
f"via {acteur['source_id']=} {acteur['identifiant_unique']=}",
)
parent_dict[key] = val
results_update(field, value, acteur)
# Et on met la source_id à None car c'est une création de notre
# part, donc il ne correspond pas à une source existante
parent_dict["source_id"] = None

# On s'assure que la donnée est compatible avec le modèle de révision
validation_model(**parent_dict) # type: ignore
# On retourne la données sous forme de dict pour qu'elle puisse
# être utilisée soit sur un parent existant (update) soit pour créer un
# nouveau parent (insert)
return parent_dict
return parent_dict, sources_codes_picked


def db_manage_parent(
Expand Down Expand Up @@ -202,8 +283,11 @@ def db_manage_parent(
identifiants_uniques = [x.identifiant_unique for x in acteurs_maps]
acteurs_dicts = acteurs_dict_to_list_of_dicts(acteurs_maps)
# print(f"{acteurs_list=}")
parent_data = parent_get_data_from_acteurs(
acteurs_dicts, sources_preferred_ids, validation_model=RevisionActeur # type: ignore
parent_data, _ = parent_get_data_from_acteurs(
acteurs=acteurs_dicts,
source_ids_by_code=db_source_ids_by_code_get(),
sources_preferred_ids=sources_preferred_ids,
validation_model=RevisionActeur, # type: ignore
)
print(f"{parent_data=}")

Expand Down
26 changes: 26 additions & 0 deletions scripts/deduplication/utils/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Utilitaires pour les interactions DB"""

from functools import cache

from qfdmo.models.acteur import Source


# A propos de @cache:
# Données de très petite taille donc unbounded cache est OK
# Raison pour le cache:
# - pouvoir utiliser le mapping dans des sous-tâches sans
# avoir à se trimballer des arguments partout
# (manage cluster -> manage parent)
# - gain de performance: au 2024-12-18 il nous faut 2h23min pour
# traiter n=4700 clusters, preuve que l'accès DB est assez lent
# et donc économiser n=requêtes (mapping utilisé à la fois dans
# db_manage_cluster et db_manage_parent) ne parait pas scandaleux
@cache
def db_source_ids_by_code_get() -> dict:
"""Récupération de la liste des sources par code pour les utiliser
dans la logique de priorisation des sources pour la fusion des données
Returns:
result: dict de sources code:id
"""
return dict(Source.objects.values_list("code", "id"))
Loading

0 comments on commit 5283c1e

Please sign in to comment.