Skip to content

Commit

Permalink
📜 CLUSTERING: amélioration config (#1231)
Browse files Browse the repository at this point in the history
* config DAG cluster en pydantic + 0 sources = toutes sources

* supression commentaires inutiles
  • Loading branch information
maxcorbeau authored Jan 22, 2025
1 parent 7235e74 commit 1e9d79f
Show file tree
Hide file tree
Showing 15 changed files with 427 additions and 264 deletions.
137 changes: 137 additions & 0 deletions dags/cluster/config/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
from pydantic import BaseModel, Field, field_validator, model_validator
from utils.airflow_params import airflow_params_dropdown_selected_to_ids


class ClusterConfig(BaseModel):

# ---------------------------------------
# Champs de base
# ---------------------------------------
# Les champs qu'on s'attend à retrouver
# dans les params airflow: on les consèrve
# dans l'ordre de la UI Airflow, ce qui veut
# dire qu'on ne peut pas mélanger valeurs par défaut
# et valeurs obligatoires, voir section validation
# pour toutes les règles
dry_run: bool
include_source_codes: list[str]
include_acteur_type_codes: list[str]
include_only_if_regex_matches_nom: str | None
include_if_all_fields_filled: list[str]
exclude_if_any_field_filled: list[str]
normalize_fields_basic: list[str]
normalize_fields_no_words_size1: list[str]
normalize_fields_no_words_size2_or_less: list[str]
normalize_fields_no_words_size3_or_less: list[str]
normalize_fields_order_unique_words: list[str]
cluster_intra_source_is_allowed: bool
cluster_fields_exact: list[str]
cluster_fields_fuzzy: list[str]
cluster_fuzzy_threshold: float = Field(0.5, ge=0, le=1)

# ---------------------------------------
# Listings & Mappings
# ---------------------------------------
fields_used: list[str]
fields_all: list[str]
mapping_source_ids_by_codes: dict[str, int]
mapping_acteur_type_ids_by_codes: dict[str, int]

# ---------------------------------------
# Champs calculés
# ---------------------------------------
# A partir des champs de base + logique métier
# + valeurs de la base de données
# Conversion des codes en ids
include_source_ids: list[int]
include_acteur_type_ids: list[int]
# Champs sur lesquels on sépare les clusters
cluster_fields_separate: list[str]

# ---------------------------------------
# Validation
# ---------------------------------------
# Champs isolés
@field_validator("dry_run", mode="before")
def check_dry_run(cls, v):
if v is None:
raise ValueError("dry_run à fournir")
return v

@field_validator("cluster_intra_source_is_allowed", mode="before")
def check_cluster_intra_source_is_allowed(cls, v):
if v is None:
return False
return v

@field_validator("exclude_if_any_field_filled", mode="before")
def check_exclude_if_any_field_filled(cls, v):
if v is None:
return []
return v

# Logique multi-champs
@model_validator(mode="before")
def check_model(cls, values):

# SOURCE CODES
# Si aucun code source fourni alors on inclut toutes les sources
if not values.get("include_source_codes"):
values["include_source_codes"] = []
values["include_source_ids"] = values[
"mapping_source_ids_by_codes"
].values()
else:
# Sinon on résout les codes sources en ids à partir de la sélection
values["include_source_ids"] = airflow_params_dropdown_selected_to_ids(
mapping_ids_by_codes=values["mapping_source_ids_by_codes"],
dropdown_selected=values["include_source_codes"],
)

# ACTEUR TYPE CODES
if not values.get("include_acteur_type_codes"):
raise ValueError("Au moins un type d'acteur doit être sélectionné")
values["include_acteur_type_ids"] = airflow_params_dropdown_selected_to_ids(
mapping_ids_by_codes=values["mapping_acteur_type_ids_by_codes"],
dropdown_selected=values["include_acteur_type_codes"],
)

# ACTEUR TYPE vs. INTRA-SOURCE
if (
len(values["include_source_ids"]) == 1
and not values["cluster_intra_source_is_allowed"]
):
raise ValueError("1 source sélectionnée mais intra-source désactivé")

# Par défaut on ne clusterise pas les acteurs d'une même source
# sauf si intra-source est activé
values["cluster_fields_separate"] = ["source_id"]
if values["cluster_intra_source_is_allowed"]:
values["cluster_fields_separate"] = []

# Fields avec default []
optionals = [
"normalize_fields_basic",
"normalize_fields_no_words_size1",
"normalize_fields_no_words_size2_or_less",
"normalize_fields_no_words_size3_or_less",
"normalize_fields_order_unique_words",
]
for k in optionals:
if not values.get(k):
values[k] = []

# Liste UNIQUE des champs utilisés
fields_used = ["source_id", "acteur_type_id", "identifiant_unique"]
for k, v in values.items():
if "fields" in k and k != "fields_all" and k != "source_id":
fields_used.extend(v)
values["fields_used"] = fields_used
values["fields_used"] = list(set(fields_used))

# Si aucun champ pour la normalisation basique = tous les champs
# utilisés seront normalisés
if values["normalize_fields_basic"]:
values["normalize_fields_basic"] = values["fields_used"]

return values
11 changes: 7 additions & 4 deletions dags/cluster/dags/cluster_acteurs_suggestions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from airflow.models.baseoperator import chain
from airflow.models.param import Param
from cluster.tasks.airflow_logic import (
cluster_acteurs_config_validate_task,
cluster_acteurs_config_create_task,
cluster_acteurs_db_data_read_acteurs_task,
cluster_acteurs_db_data_write_suggestions_task,
cluster_acteurs_normalize_task,
Expand Down Expand Up @@ -113,13 +113,16 @@
# TODO: permettre de ne sélectionner aucune source = toutes les sources
"include_source_codes": Param(
[],
type="array",
type=["null", "array"],
# La terminologie Airflow n'est pas très heureuse
# mais "examples" est bien la façon de faire des dropdowns
# voir https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/params.html
examples=dropdown_sources,
description_md="""**➕ INCLUSION ACTEURS**: seuls ceux qui proviennent
de ces sources (opérateur **OU/OR**)""",
de ces sources (opérateur **OU/OR**)
💯 Si aucune valeur spécifiée = tous les acteurs sont inclus
""",
),
"include_acteur_type_codes": Param(
[],
Expand Down Expand Up @@ -261,7 +264,7 @@
schedule=None,
) as dag:
chain(
cluster_acteurs_config_validate_task(dag=dag),
cluster_acteurs_config_create_task(dag=dag),
cluster_acteurs_db_data_read_acteurs_task(dag=dag),
cluster_acteurs_normalize_task(dag=dag),
# TODO: besoin de refactoriser cette tâche:
Expand Down
4 changes: 2 additions & 2 deletions dags/cluster/tasks/airflow_logic/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .cluster_acteurs_config_validate_task import ( # noqa
cluster_acteurs_config_validate_task,
from .cluster_acteurs_config_create_task import ( # noqa
cluster_acteurs_config_create_task,
)
from .cluster_acteurs_db_data_read_acteurs_task import ( # noqa
cluster_acteurs_db_data_read_acteurs_task,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""
Tâche Airflow pour valider la configuration de clustering
"""

import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from cluster.config.model import ClusterConfig
from utils import logging_utils as log
from utils.django import django_model_fields_attributes_get, django_setup_full

django_setup_full()

from qfdmo.models import Acteur, ActeurType, Source # noqa: E402

logger = logging.getLogger(__name__)


def task_info_get():
return """
============================================================
Description de la tâche "cluster_acteurs_config_create"
============================================================
💡 quoi: valide la configuration fournie par la UI (+ défauts si il y en a)
🎯 pourquoi: échouer au plus tôt si il y a des problèmes de conf et ne pas
faire du traitement de données inutile
🏗️ comment: en comparant la config fournie avec des règles censées
s'aligner avec les besoins métier (ex: prérequis)
et la UI (ex: optionalité)
"""


def cluster_acteurs_config_create_wrapper(**kwargs):
"""Wrapper de la tâche Airflow pour créer une configuration à
partir des params du DAG + autre logique métier / valeurs DB."""
logger.info(task_info_get())
params = kwargs["params"]
extra = {
"fields_all": django_model_fields_attributes_get(Acteur),
"mapping_source_ids_by_codes": {x.code: x.id for x in Source.objects.all()},
"mapping_acteur_type_ids_by_codes": {
x.code: x.id for x in ActeurType.objects.all()
},
}
config = ClusterConfig(**(params | extra))
log.preview("Config", config)
kwargs["ti"].xcom_push(key="config", value=config)


def cluster_acteurs_config_create_task(dag: DAG) -> PythonOperator:
"""La tâche Airflow qui ne fait que appeler le wrapper"""
return PythonOperator(
task_id="cluster_acteurs_config_create",
python_callable=cluster_acteurs_config_create_wrapper,
dag=dag,
)

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from airflow import DAG
from airflow.operators.python import PythonOperator
from cluster.config.model import ClusterConfig
from cluster.tasks.business_logic.cluster_acteurs_db_data_read_acteurs import (
cluster_acteurs_db_data_read_acteurs,
)
Expand Down Expand Up @@ -39,30 +40,19 @@ def task_info_get():
def cluster_acteurs_db_data_read_acteurs_wrapper(**kwargs) -> None:
logger.info(task_info_get())

# use xcom to get the params from the previous task
params = kwargs["ti"].xcom_pull(
key="params", task_ids="cluster_acteurs_config_validate"
config: ClusterConfig = kwargs["ti"].xcom_pull(
key="config", task_ids="cluster_acteurs_config_create"
)

# Boucle pour automatiser l'affichage des paramètres de champs
# et la construction d'un set de tous les champs (pour requête SQL)
fields = ["source_id", "acteur_type_id", "nom"]
for key, value in params.items():
if key.startswith("include_") or key.startswith("exclude_"):
log.preview(key, value)
if "fields" in key:
fields.extend(value or [])
fields = sorted(list(set(fields)))
log.preview("Tous les champs reseignés", fields)
log.preview("Config reçue", config)

df, query = cluster_acteurs_db_data_read_acteurs(
model_class=DisplayedActeur,
include_source_ids=params["include_source_ids"],
include_acteur_type_ids=params["include_acteur_type_ids"],
include_only_if_regex_matches_nom=params["include_only_if_regex_matches_nom"],
include_if_all_fields_filled=params["include_if_all_fields_filled"] or [],
exclude_if_any_field_filled=params["exclude_if_any_field_filled"] or [],
extra_dataframe_fields=fields,
include_source_ids=config.include_source_ids,
include_acteur_type_ids=config.include_acteur_type_ids,
include_only_if_regex_matches_nom=config.include_only_if_regex_matches_nom,
include_if_all_fields_filled=config.include_if_all_fields_filled,
exclude_if_any_field_filled=config.exclude_if_any_field_filled,
extra_dataframe_fields=config.fields_used,
)
log.preview("requête SQL utilisée", query)
log.preview("acteurs sélectionnés", df)
Expand Down
Loading

0 comments on commit 1e9d79f

Please sign in to comment.