Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

📜 CLUSTERING: amélioration config #1231

Merged
merged 4 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions dags/cluster/config/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
from pydantic import BaseModel, Field, field_validator, model_validator
from utils.airflow_params import airflow_params_dropdown_selected_to_ids


class ClusterConfig(BaseModel):

# ---------------------------------------
# Champs de base
# ---------------------------------------
# Les champs qu'on s'attend à retrouver
# dans les params airflow: on les consèrve
# dans l'ordre de la UI Airflow, ce qui veut
# dire qu'on ne peut pas mélanger valeurs par défaut
# et valeurs obligatoires, voir section validation
# pour toutes les règles
dry_run: bool
include_source_codes: list[str]
include_acteur_type_codes: list[str]
include_only_if_regex_matches_nom: str | None
include_if_all_fields_filled: list[str]
exclude_if_any_field_filled: list[str]
normalize_fields_basic: list[str]
normalize_fields_no_words_size1: list[str]
normalize_fields_no_words_size2_or_less: list[str]
normalize_fields_no_words_size3_or_less: list[str]
normalize_fields_order_unique_words: list[str]
cluster_intra_source_is_allowed: bool
cluster_fields_exact: list[str]
cluster_fields_fuzzy: list[str]
cluster_fuzzy_threshold: float = Field(0.5, ge=0, le=1)

# ---------------------------------------
# Listings & Mappings
# ---------------------------------------
fields_used: list[str]
fields_all: list[str]
mapping_source_ids_by_codes: dict[str, int]
mapping_acteur_type_ids_by_codes: dict[str, int]

# ---------------------------------------
# Champs calculés
# ---------------------------------------
# A partir des champs de base + logique métier
# + valeurs de la base de données
# Conversion des codes en ids
include_source_ids: list[int]
include_acteur_type_ids: list[int]
# Champs sur lesquels on sépare les clusters
cluster_fields_separate: list[str]

# ---------------------------------------
# Validation
# ---------------------------------------
# Champs isolés
@field_validator("dry_run", mode="before")
def check_dry_run(cls, v):
if v is None:
raise ValueError("dry_run à fournir")
return v

@field_validator("cluster_intra_source_is_allowed", mode="before")
def check_cluster_intra_source_is_allowed(cls, v):
if v is None:
return False
return v

@field_validator("exclude_if_any_field_filled", mode="before")
def check_exclude_if_any_field_filled(cls, v):
if v is None:
return []
return v

# Logique multi-champs
@model_validator(mode="before")
def check_model(cls, values):

# SOURCE CODES
# Si aucun code source fourni alors on inclut toutes les sources
if not values.get("include_source_codes"):
values["include_source_codes"] = []
values["include_source_ids"] = values[
"mapping_source_ids_by_codes"
].values()
else:
# Sinon on résout les codes sources en ids à partir de la sélection
values["include_source_ids"] = airflow_params_dropdown_selected_to_ids(
mapping_ids_by_codes=values["mapping_source_ids_by_codes"],
dropdown_selected=values["include_source_codes"],
)

# ACTEUR TYPE CODES
if not values.get("include_acteur_type_codes"):
raise ValueError("Au moins un type d'acteur doit être sélectionné")
values["include_acteur_type_ids"] = airflow_params_dropdown_selected_to_ids(
mapping_ids_by_codes=values["mapping_acteur_type_ids_by_codes"],
dropdown_selected=values["include_acteur_type_codes"],
)

# ACTEUR TYPE vs. INTRA-SOURCE
if (
len(values["include_source_ids"]) == 1
and not values["cluster_intra_source_is_allowed"]
):
raise ValueError("1 source sélectionnée mais intra-source désactivé")

# Par défaut on ne clusterise pas les acteurs d'une même source
# sauf si intra-source est activé
values["cluster_fields_separate"] = ["source_id"]
if values["cluster_intra_source_is_allowed"]:
values["cluster_fields_separate"] = []

# Fields avec default []
optionals = [
"normalize_fields_basic",
"normalize_fields_no_words_size1",
"normalize_fields_no_words_size2_or_less",
"normalize_fields_no_words_size3_or_less",
"normalize_fields_order_unique_words",
]
for k in optionals:
if not values.get(k):
values[k] = []

# Liste UNIQUE des champs utilisés
fields_used = ["source_id", "acteur_type_id", "identifiant_unique"]
for k, v in values.items():
if "fields" in k and k != "fields_all" and k != "source_id":
fields_used.extend(v)
values["fields_used"] = fields_used
values["fields_used"] = list(set(fields_used))

# Si aucun champ pour la normalisation basique = tous les champs
# utilisés seront normalisés
if values["normalize_fields_basic"]:
values["normalize_fields_basic"] = values["fields_used"]

return values
11 changes: 7 additions & 4 deletions dags/cluster/dags/cluster_acteurs_suggestions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from airflow.models.baseoperator import chain
from airflow.models.param import Param
from cluster.tasks.airflow_logic import (
cluster_acteurs_config_validate_task,
cluster_acteurs_config_create_task,
cluster_acteurs_db_data_read_acteurs_task,
cluster_acteurs_db_data_write_suggestions_task,
cluster_acteurs_normalize_task,
Expand Down Expand Up @@ -113,13 +113,16 @@
# TODO: permettre de ne sélectionner aucune source = toutes les sources
"include_source_codes": Param(
[],
type="array",
type=["null", "array"],
# La terminologie Airflow n'est pas très heureuse
# mais "examples" est bien la façon de faire des dropdowns
# voir https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/params.html
examples=dropdown_sources,
description_md="""**➕ INCLUSION ACTEURS**: seuls ceux qui proviennent
de ces sources (opérateur **OU/OR**)""",
de ces sources (opérateur **OU/OR**)

💯 Si aucune valeur spécifiée = tous les acteurs sont inclus
""",
),
"include_acteur_type_codes": Param(
[],
Expand Down Expand Up @@ -261,7 +264,7 @@
schedule=None,
) as dag:
chain(
cluster_acteurs_config_validate_task(dag=dag),
cluster_acteurs_config_create_task(dag=dag),
cluster_acteurs_db_data_read_acteurs_task(dag=dag),
cluster_acteurs_normalize_task(dag=dag),
# TODO: besoin de refactoriser cette tâche:
Expand Down
4 changes: 2 additions & 2 deletions dags/cluster/tasks/airflow_logic/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .cluster_acteurs_config_validate_task import ( # noqa
cluster_acteurs_config_validate_task,
from .cluster_acteurs_config_create_task import ( # noqa
cluster_acteurs_config_create_task,
)
from .cluster_acteurs_db_data_read_acteurs_task import ( # noqa
cluster_acteurs_db_data_read_acteurs_task,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""
Tâche Airflow pour valider la configuration de clustering
"""

import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from cluster.config.model import ClusterConfig
from utils import logging_utils as log
from utils.django import django_model_fields_attributes_get, django_setup_full

django_setup_full()

from qfdmo.models import Acteur, ActeurType, Source # noqa: E402

logger = logging.getLogger(__name__)


def task_info_get():
return """


============================================================
Description de la tâche "cluster_acteurs_config_create"
============================================================

💡 quoi: valide la configuration fournie par la UI (+ défauts si il y en a)

🎯 pourquoi: échouer au plus tôt si il y a des problèmes de conf et ne pas
faire du traitement de données inutile

🏗️ comment: en comparant la config fournie avec des règles censées
s'aligner avec les besoins métier (ex: prérequis)
et la UI (ex: optionalité)
"""


def cluster_acteurs_config_create_wrapper(**kwargs):
"""Wrapper de la tâche Airflow pour créer une configuration à
partir des params du DAG + autre logique métier / valeurs DB."""
logger.info(task_info_get())
params = kwargs["params"]
extra = {
"fields_all": django_model_fields_attributes_get(Acteur),
"mapping_source_ids_by_codes": {x.code: x.id for x in Source.objects.all()},
"mapping_acteur_type_ids_by_codes": {
x.code: x.id for x in ActeurType.objects.all()
},
}
config = ClusterConfig(**(params | extra))
log.preview("Config", config)
kwargs["ti"].xcom_push(key="config", value=config)


def cluster_acteurs_config_create_task(dag: DAG) -> PythonOperator:
"""La tâche Airflow qui ne fait que appeler le wrapper"""
return PythonOperator(
task_id="cluster_acteurs_config_create",
python_callable=cluster_acteurs_config_create_wrapper,
dag=dag,
)

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from airflow import DAG
from airflow.operators.python import PythonOperator
from cluster.config.model import ClusterConfig
from cluster.tasks.business_logic.cluster_acteurs_db_data_read_acteurs import (
cluster_acteurs_db_data_read_acteurs,
)
Expand Down Expand Up @@ -39,30 +40,19 @@ def task_info_get():
def cluster_acteurs_db_data_read_acteurs_wrapper(**kwargs) -> None:
logger.info(task_info_get())

# use xcom to get the params from the previous task
params = kwargs["ti"].xcom_pull(
key="params", task_ids="cluster_acteurs_config_validate"
config: ClusterConfig = kwargs["ti"].xcom_pull(
key="config", task_ids="cluster_acteurs_config_create"
)

# Boucle pour automatiser l'affichage des paramètres de champs
# et la construction d'un set de tous les champs (pour requête SQL)
fields = ["source_id", "acteur_type_id", "nom"]
for key, value in params.items():
if key.startswith("include_") or key.startswith("exclude_"):
log.preview(key, value)
if "fields" in key:
fields.extend(value or [])
fields = sorted(list(set(fields)))
log.preview("Tous les champs reseignés", fields)
log.preview("Config reçue", config)

df, query = cluster_acteurs_db_data_read_acteurs(
model_class=DisplayedActeur,
include_source_ids=params["include_source_ids"],
include_acteur_type_ids=params["include_acteur_type_ids"],
include_only_if_regex_matches_nom=params["include_only_if_regex_matches_nom"],
include_if_all_fields_filled=params["include_if_all_fields_filled"] or [],
exclude_if_any_field_filled=params["exclude_if_any_field_filled"] or [],
extra_dataframe_fields=fields,
include_source_ids=config.include_source_ids,
include_acteur_type_ids=config.include_acteur_type_ids,
include_only_if_regex_matches_nom=config.include_only_if_regex_matches_nom,
include_if_all_fields_filled=config.include_if_all_fields_filled,
exclude_if_any_field_filled=config.exclude_if_any_field_filled,
extra_dataframe_fields=config.fields_used,
)
log.preview("requête SQL utilisée", query)
log.preview("acteurs sélectionnés", df)
Expand Down
Loading
Loading