diff --git a/dags/cluster/config/model.py b/dags/cluster/config/model.py new file mode 100644 index 000000000..04a66ae34 --- /dev/null +++ b/dags/cluster/config/model.py @@ -0,0 +1,137 @@ +from pydantic import BaseModel, Field, field_validator, model_validator +from utils.airflow_params import airflow_params_dropdown_selected_to_ids + + +class ClusterConfig(BaseModel): + + # --------------------------------------- + # Champs de base + # --------------------------------------- + # Les champs qu'on s'attend à retrouver + # dans les params airflow: on les consèrve + # dans l'ordre de la UI Airflow, ce qui veut + # dire qu'on ne peut pas mélanger valeurs par défaut + # et valeurs obligatoires, voir section validation + # pour toutes les règles + dry_run: bool + include_source_codes: list[str] + include_acteur_type_codes: list[str] + include_only_if_regex_matches_nom: str | None + include_if_all_fields_filled: list[str] + exclude_if_any_field_filled: list[str] + normalize_fields_basic: list[str] + normalize_fields_no_words_size1: list[str] + normalize_fields_no_words_size2_or_less: list[str] + normalize_fields_no_words_size3_or_less: list[str] + normalize_fields_order_unique_words: list[str] + cluster_intra_source_is_allowed: bool + cluster_fields_exact: list[str] + cluster_fields_fuzzy: list[str] + cluster_fuzzy_threshold: float = Field(0.5, ge=0, le=1) + + # --------------------------------------- + # Listings & Mappings + # --------------------------------------- + fields_used: list[str] + fields_all: list[str] + mapping_source_ids_by_codes: dict[str, int] + mapping_acteur_type_ids_by_codes: dict[str, int] + + # --------------------------------------- + # Champs calculés + # --------------------------------------- + # A partir des champs de base + logique métier + # + valeurs de la base de données + # Conversion des codes en ids + include_source_ids: list[int] + include_acteur_type_ids: list[int] + # Champs sur lesquels on sépare les clusters + cluster_fields_separate: list[str] + + # --------------------------------------- + # Validation + # --------------------------------------- + # Champs isolés + @field_validator("dry_run", mode="before") + def check_dry_run(cls, v): + if v is None: + raise ValueError("dry_run à fournir") + return v + + @field_validator("cluster_intra_source_is_allowed", mode="before") + def check_cluster_intra_source_is_allowed(cls, v): + if v is None: + return False + return v + + @field_validator("exclude_if_any_field_filled", mode="before") + def check_exclude_if_any_field_filled(cls, v): + if v is None: + return [] + return v + + # Logique multi-champs + @model_validator(mode="before") + def check_model(cls, values): + + # SOURCE CODES + # Si aucun code source fourni alors on inclut toutes les sources + if not values.get("include_source_codes"): + values["include_source_codes"] = [] + values["include_source_ids"] = values[ + "mapping_source_ids_by_codes" + ].values() + else: + # Sinon on résout les codes sources en ids à partir de la sélection + values["include_source_ids"] = airflow_params_dropdown_selected_to_ids( + mapping_ids_by_codes=values["mapping_source_ids_by_codes"], + dropdown_selected=values["include_source_codes"], + ) + + # ACTEUR TYPE CODES + if not values.get("include_acteur_type_codes"): + raise ValueError("Au moins un type d'acteur doit être sélectionné") + values["include_acteur_type_ids"] = airflow_params_dropdown_selected_to_ids( + mapping_ids_by_codes=values["mapping_acteur_type_ids_by_codes"], + dropdown_selected=values["include_acteur_type_codes"], + ) + + # ACTEUR TYPE vs. INTRA-SOURCE + if ( + len(values["include_source_ids"]) == 1 + and not values["cluster_intra_source_is_allowed"] + ): + raise ValueError("1 source sélectionnée mais intra-source désactivé") + + # Par défaut on ne clusterise pas les acteurs d'une même source + # sauf si intra-source est activé + values["cluster_fields_separate"] = ["source_id"] + if values["cluster_intra_source_is_allowed"]: + values["cluster_fields_separate"] = [] + + # Fields avec default [] + optionals = [ + "normalize_fields_basic", + "normalize_fields_no_words_size1", + "normalize_fields_no_words_size2_or_less", + "normalize_fields_no_words_size3_or_less", + "normalize_fields_order_unique_words", + ] + for k in optionals: + if not values.get(k): + values[k] = [] + + # Liste UNIQUE des champs utilisés + fields_used = ["source_id", "acteur_type_id", "identifiant_unique"] + for k, v in values.items(): + if "fields" in k and k != "fields_all" and k != "source_id": + fields_used.extend(v) + values["fields_used"] = fields_used + values["fields_used"] = list(set(fields_used)) + + # Si aucun champ pour la normalisation basique = tous les champs + # utilisés seront normalisés + if values["normalize_fields_basic"]: + values["normalize_fields_basic"] = values["fields_used"] + + return values diff --git a/dags/cluster/dags/cluster_acteurs_suggestions.py b/dags/cluster/dags/cluster_acteurs_suggestions.py index cba958064..a4ddd8007 100644 --- a/dags/cluster/dags/cluster_acteurs_suggestions.py +++ b/dags/cluster/dags/cluster_acteurs_suggestions.py @@ -12,7 +12,7 @@ from airflow.models.baseoperator import chain from airflow.models.param import Param from cluster.tasks.airflow_logic import ( - cluster_acteurs_config_validate_task, + cluster_acteurs_config_create_task, cluster_acteurs_db_data_read_acteurs_task, cluster_acteurs_db_data_write_suggestions_task, cluster_acteurs_normalize_task, @@ -113,13 +113,16 @@ # TODO: permettre de ne sélectionner aucune source = toutes les sources "include_source_codes": Param( [], - type="array", + type=["null", "array"], # La terminologie Airflow n'est pas très heureuse # mais "examples" est bien la façon de faire des dropdowns # voir https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/params.html examples=dropdown_sources, description_md="""**➕ INCLUSION ACTEURS**: seuls ceux qui proviennent - de ces sources (opérateur **OU/OR**)""", + de ces sources (opérateur **OU/OR**) + + 💯 Si aucune valeur spécifiée = tous les acteurs sont inclus + """, ), "include_acteur_type_codes": Param( [], @@ -261,7 +264,7 @@ schedule=None, ) as dag: chain( - cluster_acteurs_config_validate_task(dag=dag), + cluster_acteurs_config_create_task(dag=dag), cluster_acteurs_db_data_read_acteurs_task(dag=dag), cluster_acteurs_normalize_task(dag=dag), # TODO: besoin de refactoriser cette tâche: diff --git a/dags/cluster/tasks/airflow_logic/__init__.py b/dags/cluster/tasks/airflow_logic/__init__.py index ff5836a8f..e0956254a 100644 --- a/dags/cluster/tasks/airflow_logic/__init__.py +++ b/dags/cluster/tasks/airflow_logic/__init__.py @@ -1,5 +1,5 @@ -from .cluster_acteurs_config_validate_task import ( # noqa - cluster_acteurs_config_validate_task, +from .cluster_acteurs_config_create_task import ( # noqa + cluster_acteurs_config_create_task, ) from .cluster_acteurs_db_data_read_acteurs_task import ( # noqa cluster_acteurs_db_data_read_acteurs_task, diff --git a/dags/cluster/tasks/airflow_logic/cluster_acteurs_config_create_task.py b/dags/cluster/tasks/airflow_logic/cluster_acteurs_config_create_task.py new file mode 100644 index 000000000..9c18cf482 --- /dev/null +++ b/dags/cluster/tasks/airflow_logic/cluster_acteurs_config_create_task.py @@ -0,0 +1,62 @@ +""" +Tâche Airflow pour valider la configuration de clustering +""" + +import logging + +from airflow import DAG +from airflow.operators.python import PythonOperator +from cluster.config.model import ClusterConfig +from utils import logging_utils as log +from utils.django import django_model_fields_attributes_get, django_setup_full + +django_setup_full() + +from qfdmo.models import Acteur, ActeurType, Source # noqa: E402 + +logger = logging.getLogger(__name__) + + +def task_info_get(): + return """ + + + ============================================================ + Description de la tâche "cluster_acteurs_config_create" + ============================================================ + + 💡 quoi: valide la configuration fournie par la UI (+ défauts si il y en a) + + 🎯 pourquoi: échouer au plus tôt si il y a des problèmes de conf et ne pas + faire du traitement de données inutile + + 🏗️ comment: en comparant la config fournie avec des règles censées + s'aligner avec les besoins métier (ex: prérequis) + et la UI (ex: optionalité) + """ + + +def cluster_acteurs_config_create_wrapper(**kwargs): + """Wrapper de la tâche Airflow pour créer une configuration à + partir des params du DAG + autre logique métier / valeurs DB.""" + logger.info(task_info_get()) + params = kwargs["params"] + extra = { + "fields_all": django_model_fields_attributes_get(Acteur), + "mapping_source_ids_by_codes": {x.code: x.id for x in Source.objects.all()}, + "mapping_acteur_type_ids_by_codes": { + x.code: x.id for x in ActeurType.objects.all() + }, + } + config = ClusterConfig(**(params | extra)) + log.preview("Config", config) + kwargs["ti"].xcom_push(key="config", value=config) + + +def cluster_acteurs_config_create_task(dag: DAG) -> PythonOperator: + """La tâche Airflow qui ne fait que appeler le wrapper""" + return PythonOperator( + task_id="cluster_acteurs_config_create", + python_callable=cluster_acteurs_config_create_wrapper, + dag=dag, + ) diff --git a/dags/cluster/tasks/airflow_logic/cluster_acteurs_config_validate_task.py b/dags/cluster/tasks/airflow_logic/cluster_acteurs_config_validate_task.py deleted file mode 100644 index 005f9c848..000000000 --- a/dags/cluster/tasks/airflow_logic/cluster_acteurs_config_validate_task.py +++ /dev/null @@ -1,84 +0,0 @@ -""" -Tâche Airflow pour valider la configuration de clustering -""" - -import logging - -from airflow import DAG -from airflow.operators.python import PythonOperator -from cluster.tasks.business_logic.cluster_acteurs_config_validate import ( - cluster_acteurs_config_validate, -) -from sources.tasks.business_logic.read_mapping_from_postgres import ( - read_mapping_from_postgres, -) -from utils import logging_utils as log - -logger = logging.getLogger(__name__) - - -mapping_source_id_by_code = read_mapping_from_postgres(table_name="qfdmo_source") -mapping_acteur_type_id_by_code = read_mapping_from_postgres( - table_name="qfdmo_acteurtype" -) - - -def task_info_get(): - return """ - - - ============================================================ - Description de la tâche "cluster_acteurs_config_validate" - ============================================================ - - 💡 quoi: valide la configuration fournie par la UI (+ défauts si il y en a) - - 🎯 pourquoi: échouer au plus tôt si il y a des problèmes de conf et ne pas - faire du traitement de données inutile - - 🏗️ comment: en comparant la config fournie avec des règles censées - s'aligner avec les besoins métier (ex: prérequis) - et la UI (ex: optionalité) - """ - - -def cluster_acteurs_config_validate_wrapper(**kwargs) -> None: - """Wrapper de la tâche Airflow pour échanger les paramètres - et pouvoir tester la tâche sans mock.""" - logger.info(task_info_get()) - - params = kwargs["params"] - - for key, value in params.items(): - log.preview(f"param: {key}", value) - - # TODO: ceci devrait être déplacer dans la fonction de validation - # qui devrait être reprise de fond en comble avec du pydantic - # pour pas réinventer la roue - if ( - len(params["include_source_codes"]) == 1 - and not params["cluster_intra_source_is_allowed"] - ): - raise ValueError("Clustering intra-source désactivé mais une 1 source incluse") - - include_source_ids, include_acteur_type_ids = cluster_acteurs_config_validate( - mapping_source_id_by_code=mapping_source_id_by_code, - mapping_acteur_type_id_by_code=mapping_acteur_type_id_by_code, - include_source_codes=params["include_source_codes"] or [], - include_acteur_type_codes=params["include_acteur_type_codes"] or [], - include_if_all_fields_filled=params["include_if_all_fields_filled"] or [], - exclude_if_any_field_filled=params["exclude_if_any_field_filled"] or [], - ) - params["include_source_ids"] = include_source_ids - params["include_acteur_type_ids"] = include_acteur_type_ids - - kwargs["ti"].xcom_push(key="params", value=params) - - -def cluster_acteurs_config_validate_task(dag: DAG) -> PythonOperator: - """La tâche Airflow qui ne fait que appeler le wrapper""" - return PythonOperator( - task_id="cluster_acteurs_config_validate", - python_callable=cluster_acteurs_config_validate_wrapper, - dag=dag, - ) diff --git a/dags/cluster/tasks/airflow_logic/cluster_acteurs_db_data_read_acteurs_task.py b/dags/cluster/tasks/airflow_logic/cluster_acteurs_db_data_read_acteurs_task.py index 2abd24f48..00b29b6c8 100644 --- a/dags/cluster/tasks/airflow_logic/cluster_acteurs_db_data_read_acteurs_task.py +++ b/dags/cluster/tasks/airflow_logic/cluster_acteurs_db_data_read_acteurs_task.py @@ -2,6 +2,7 @@ from airflow import DAG from airflow.operators.python import PythonOperator +from cluster.config.model import ClusterConfig from cluster.tasks.business_logic.cluster_acteurs_db_data_read_acteurs import ( cluster_acteurs_db_data_read_acteurs, ) @@ -39,30 +40,19 @@ def task_info_get(): def cluster_acteurs_db_data_read_acteurs_wrapper(**kwargs) -> None: logger.info(task_info_get()) - # use xcom to get the params from the previous task - params = kwargs["ti"].xcom_pull( - key="params", task_ids="cluster_acteurs_config_validate" + config: ClusterConfig = kwargs["ti"].xcom_pull( + key="config", task_ids="cluster_acteurs_config_create" ) - - # Boucle pour automatiser l'affichage des paramètres de champs - # et la construction d'un set de tous les champs (pour requête SQL) - fields = ["source_id", "acteur_type_id", "nom"] - for key, value in params.items(): - if key.startswith("include_") or key.startswith("exclude_"): - log.preview(key, value) - if "fields" in key: - fields.extend(value or []) - fields = sorted(list(set(fields))) - log.preview("Tous les champs reseignés", fields) + log.preview("Config reçue", config) df, query = cluster_acteurs_db_data_read_acteurs( model_class=DisplayedActeur, - include_source_ids=params["include_source_ids"], - include_acteur_type_ids=params["include_acteur_type_ids"], - include_only_if_regex_matches_nom=params["include_only_if_regex_matches_nom"], - include_if_all_fields_filled=params["include_if_all_fields_filled"] or [], - exclude_if_any_field_filled=params["exclude_if_any_field_filled"] or [], - extra_dataframe_fields=fields, + include_source_ids=config.include_source_ids, + include_acteur_type_ids=config.include_acteur_type_ids, + include_only_if_regex_matches_nom=config.include_only_if_regex_matches_nom, + include_if_all_fields_filled=config.include_if_all_fields_filled, + exclude_if_any_field_filled=config.exclude_if_any_field_filled, + extra_dataframe_fields=config.fields_used, ) log.preview("requête SQL utilisée", query) log.preview("acteurs sélectionnés", df) diff --git a/dags/cluster/tasks/airflow_logic/cluster_acteurs_db_data_write_suggestions_task.py b/dags/cluster/tasks/airflow_logic/cluster_acteurs_db_data_write_suggestions_task.py index 548db5fe1..f4083a92b 100644 --- a/dags/cluster/tasks/airflow_logic/cluster_acteurs_db_data_write_suggestions_task.py +++ b/dags/cluster/tasks/airflow_logic/cluster_acteurs_db_data_write_suggestions_task.py @@ -4,6 +4,7 @@ from airflow import DAG from airflow.exceptions import AirflowSkipException from airflow.operators.python import PythonOperator +from cluster.config.model import ClusterConfig from utils import logging_utils as log logger = logging.getLogger(__name__) @@ -31,19 +32,24 @@ def cluster_acteurs_db_data_write_suggestions_wrapper(**kwargs) -> None: logger.info(task_info_get()) # use xcom to get the params from the previous task - params = kwargs["ti"].xcom_pull( - key="params", task_ids="cluster_acteurs_config_validate" + config: ClusterConfig = kwargs["ti"].xcom_pull( + key="config", task_ids="cluster_acteurs_config_create" ) df: pd.DataFrame = kwargs["ti"].xcom_pull( key="df", task_ids="cluster_acteurs_suggestions" ) - log.preview("paramètres reçus", params) + log.preview("config reçue", config) log.preview("suggestions de clustering", df) - if params["dry_run"]: + # "is not False" est plus sur que "is True" car on peut avoir None + # par erreur dans la config et on ne veut pas prendre celoa pour + # un signal de modifier la DB + if config.dry_run is not False: raise AirflowSkipException( - log.banner_string("Dry run activé, suggestions pas écrites en base") + log.banner_string( + f"Dry run ={config.dry_run} activé, suggestions pas écrites en base" + ) ) raise NotImplementedError( diff --git a/dags/cluster/tasks/airflow_logic/cluster_acteurs_info_size1_task.py b/dags/cluster/tasks/airflow_logic/cluster_acteurs_info_size1_task.py index b57b47989..bab3f0b24 100644 --- a/dags/cluster/tasks/airflow_logic/cluster_acteurs_info_size1_task.py +++ b/dags/cluster/tasks/airflow_logic/cluster_acteurs_info_size1_task.py @@ -3,6 +3,7 @@ import pandas as pd from airflow import DAG from airflow.operators.python import PythonOperator +from cluster.config.model import ClusterConfig from cluster.tasks.business_logic.cluster_acteurs_info_size1 import ( cluster_info_size1_exact_fields, ) @@ -35,20 +36,19 @@ def task_info_get(): def cluster_info_size1_exact_fields_wrapper(**kwargs) -> None: logger.info(task_info_get()) - # use xcom to get the params from the previous task - params = kwargs["ti"].xcom_pull( - key="params", task_ids="cluster_acteurs_config_validate" + config: ClusterConfig = kwargs["ti"].xcom_pull( + key="config", task_ids="cluster_acteurs_config_create" ) df: pd.DataFrame = kwargs["ti"].xcom_pull( key="df", task_ids="cluster_acteurs_db_data_read_acteurs" ) - log.preview("paramètres reçus", params) + log.preview("config reçue", config) log.preview("acteurs sélectionnés", df) results = cluster_info_size1_exact_fields( df=df, - cluster_fields_exact=params["cluster_fields_exact"], + cluster_fields_exact=config.cluster_fields_exact, ) for group, result in results.items(): msg = log.banner_string(f"📦 Groupage sur: {group}") diff --git a/dags/cluster/tasks/airflow_logic/cluster_acteurs_normalize_task.py b/dags/cluster/tasks/airflow_logic/cluster_acteurs_normalize_task.py index 447e9ddf9..059920594 100644 --- a/dags/cluster/tasks/airflow_logic/cluster_acteurs_normalize_task.py +++ b/dags/cluster/tasks/airflow_logic/cluster_acteurs_normalize_task.py @@ -3,6 +3,7 @@ import pandas as pd from airflow import DAG from airflow.operators.python import PythonOperator +from cluster.config.model import ClusterConfig from cluster.tasks.business_logic.cluster_acteurs_df_sort import cluster_acteurs_df_sort from cluster.tasks.business_logic.cluster_acteurs_normalize import ( cluster_acteurs_normalize, @@ -35,8 +36,8 @@ def cluster_acteurs_normalize_wrapper(**kwargs) -> None: logger.info(task_info_get()) # use xcom to get the params from the previous task - params = kwargs["ti"].xcom_pull( - key="params", task_ids="cluster_acteurs_config_validate" + config: ClusterConfig = kwargs["ti"].xcom_pull( + key="config", task_ids="cluster_acteurs_config_create" ) df: pd.DataFrame = kwargs["ti"].xcom_pull( key="df", task_ids="cluster_acteurs_db_data_read_acteurs" @@ -44,28 +45,16 @@ def cluster_acteurs_normalize_wrapper(**kwargs) -> None: if df.empty: raise ValueError("Pas de données acteurs récupérées") - log.preview("paramètres reçus", params) + log.preview("config reçue", config) log.preview("acteurs sélectionnés", df) df_norm = cluster_acteurs_normalize( df, - # Par défaut si on ne précise pas de champs, - # on applique la normalisation basique à tous les champs - normalize_fields_basic=params["normalize_fields_basic"] or [], - normalize_fields_no_words_size1=params["normalize_fields_no_words_size1"] or [], - normalize_fields_no_words_size2_or_less=params[ - "normalize_fields_no_words_size2_or_less" - ] - or [], - normalize_fields_no_words_size3_or_less=params[ - "normalize_fields_no_words_size3_or_less" - ] - or [], - # Pareil, par défaut on applique à tous les champs - normalize_fields_order_unique_words=params[ - "normalize_fields_order_unique_words" - ] - or [], + normalize_fields_basic=config.normalize_fields_basic, + normalize_fields_no_words_size1=config.normalize_fields_no_words_size1, + normalize_fields_no_words_size2_or_less=config.normalize_fields_no_words_size2_or_less, + normalize_fields_no_words_size3_or_less=config.normalize_fields_no_words_size3_or_less, + normalize_fields_order_unique_words=config.normalize_fields_order_unique_words, ) # TODO: shows # uniques before and after per field diff --git a/dags/cluster/tasks/airflow_logic/cluster_acteurs_suggestions_task.py b/dags/cluster/tasks/airflow_logic/cluster_acteurs_suggestions_task.py index 1091c7aea..f88f0c407 100644 --- a/dags/cluster/tasks/airflow_logic/cluster_acteurs_suggestions_task.py +++ b/dags/cluster/tasks/airflow_logic/cluster_acteurs_suggestions_task.py @@ -4,6 +4,7 @@ from airflow import DAG from airflow.exceptions import AirflowSkipException from airflow.operators.python import PythonOperator +from cluster.config.model import ClusterConfig from cluster.tasks.business_logic.cluster_acteurs_df_sort import cluster_acteurs_df_sort from cluster.tasks.business_logic.cluster_acteurs_suggestions import ( cluster_acteurs_suggestions, @@ -33,9 +34,8 @@ def task_info_get(): def cluster_acteurs_suggestions_wrapper(**kwargs) -> None: logger.info(task_info_get()) - # use xcom to get the params from the previous task - params = kwargs["ti"].xcom_pull( - key="params", task_ids="cluster_acteurs_config_validate" + config: ClusterConfig = kwargs["ti"].xcom_pull( + key="config", task_ids="cluster_acteurs_config_create" ) df: pd.DataFrame = kwargs["ti"].xcom_pull( key="df", task_ids="cluster_acteurs_normalize" @@ -43,20 +43,15 @@ def cluster_acteurs_suggestions_wrapper(**kwargs) -> None: if df.empty: raise ValueError("Pas de données acteurs normalisées récupérées") - log.preview("paramètres reçus", params) + log.preview("config reçue", config) log.preview("acteurs normalisés", df) - # Par défaut on ne clusterise pas les acteurs d'une même source - cluster_fields_separate = ["source_id"] - if params["cluster_intra_source_is_allowed"]: - cluster_fields_separate = [] - df_suggestions = cluster_acteurs_suggestions( df, - cluster_fields_exact=params["cluster_fields_exact"], - cluster_fields_fuzzy=params["cluster_fields_fuzzy"] or [], - cluster_fields_separate=cluster_fields_separate, - cluster_fuzzy_threshold=params["cluster_fuzzy_threshold"], + cluster_fields_exact=config.cluster_fields_exact, + cluster_fields_fuzzy=config.cluster_fields_fuzzy, + cluster_fields_separate=config.cluster_fields_separate, + cluster_fuzzy_threshold=config.cluster_fuzzy_threshold, ) if df_suggestions.empty: raise AirflowSkipException( @@ -65,9 +60,11 @@ def cluster_acteurs_suggestions_wrapper(**kwargs) -> None: df_suggestions = cluster_acteurs_df_sort( df_suggestions, - cluster_fields_exact=params["cluster_fields_exact"], - cluster_fields_fuzzy=params["cluster_fields_fuzzy"] or [], + cluster_fields_exact=config.cluster_fields_exact, + cluster_fields_fuzzy=config.cluster_fields_fuzzy, ) + + logging.info(log.banner_string("🏁 Résultat final de cette tâche")) log.preview_df_as_markdown("suggestions de clusters", df_suggestions) # On pousse les suggestions dans xcom pour les tâches suivantes diff --git a/dags/cluster/tasks/business_logic/cluster_acteurs_config_validate.py b/dags/cluster/tasks/business_logic/cluster_acteurs_config_validate.py deleted file mode 100644 index 72b60ade0..000000000 --- a/dags/cluster/tasks/business_logic/cluster_acteurs_config_validate.py +++ /dev/null @@ -1,41 +0,0 @@ -""" -Fonction pour valider la configuration de clustering -""" - -from utils.airflow_params import airflow_params_dropdown_selected_to_ids - - -def cluster_acteurs_config_validate( - mapping_source_id_by_code: dict[str, int], - mapping_acteur_type_id_by_code: dict[str, int], - include_source_codes: list[str], - include_acteur_type_codes: list[str], - include_if_all_fields_filled: list[str], - exclude_if_any_field_filled: list[str], -) -> tuple: - """Fonction de validation de la config de clustering""" - include_source_ids = airflow_params_dropdown_selected_to_ids( - mapping_ids_by_codes=mapping_source_id_by_code, - dropdown_selected=include_source_codes, - ) - include_acteur_type_ids = airflow_params_dropdown_selected_to_ids( - mapping_ids_by_codes=mapping_acteur_type_id_by_code, - dropdown_selected=include_acteur_type_codes, - ) - fields_incl_excl = set(include_if_all_fields_filled) & set( - exclude_if_any_field_filled - ) - - if not include_source_ids: - raise ValueError("Au moins une source doit être sélectionnée") - - if not include_acteur_type_ids: - raise ValueError("Au moins un type d'acteur doit être sélectionné") - - if not include_if_all_fields_filled: - raise ValueError("Au moins un champ non vide doit être sélectionné") - - if fields_incl_excl: - raise ValueError(f"Champs à la fois à inclure et à exclure: {fields_incl_excl}") - - return include_source_ids, include_acteur_type_ids diff --git a/dags/cluster/tasks/business_logic/cluster_acteurs_db_data_read_acteurs.py b/dags/cluster/tasks/business_logic/cluster_acteurs_db_data_read_acteurs.py index 5d5cbf2ce..d45e68912 100644 --- a/dags/cluster/tasks/business_logic/cluster_acteurs_db_data_read_acteurs.py +++ b/dags/cluster/tasks/business_logic/cluster_acteurs_db_data_read_acteurs.py @@ -19,7 +19,7 @@ def cluster_acteurs_db_data_read_acteurs( model_class: type[Model], include_source_ids: list[int], include_acteur_type_ids: list[int], - include_only_if_regex_matches_nom: str, + include_only_if_regex_matches_nom: str | None, include_if_all_fields_filled: list[str], exclude_if_any_field_filled: list[str], extra_dataframe_fields: list[str], diff --git a/dags/utils/logging_utils.py b/dags/utils/logging_utils.py index b793a6b4a..2d08a62ff 100755 --- a/dags/utils/logging_utils.py +++ b/dags/utils/logging_utils.py @@ -5,6 +5,7 @@ import numpy as np import pandas as pd +from pydantic import BaseModel # TODO: improve by moving instantiation inside the functions # and using inspect to get the parent caller's name @@ -31,7 +32,7 @@ def default(self, obj): def json_dumps(data: Any) -> str: """Fonction pour convertir un objet en JSON""" - return json.dumps(data, cls=CustomJSONEncoder, indent=2, ensure_ascii=False) + return json.dumps(data, cls=CustomJSONEncoder, indent=4, ensure_ascii=False) def size_info_get(value: Any) -> Any: @@ -45,6 +46,8 @@ def size_info_get(value: Any) -> Any: return len(value) # Number of entries elif isinstance(value, dict): return len(value.keys()) # Number of keys + elif isinstance(value, BaseModel): + return f"{len(value.model_fields.keys())} propriétés" elif isinstance(value, (str, bytes)): return len(value) # Length of string or bytes else: @@ -82,6 +85,8 @@ def preview(value_name: str, value: Any) -> None: log.info(json_dumps(value)) elif isinstance(value, set): log.info(json_dumps(list(value))) + elif isinstance(value, BaseModel): + log.info(json_dumps(value.model_dump(mode="json"))) else: log.info(str(value)) log.info("::endgroup::") diff --git a/dags_unit_tests/cluster/config/test_model.py b/dags_unit_tests/cluster/config/test_model.py new file mode 100644 index 000000000..f0081e57b --- /dev/null +++ b/dags_unit_tests/cluster/config/test_model.py @@ -0,0 +1,165 @@ +import pytest + +from dags.cluster.config.model import ClusterConfig + + +class TestClusterConfigModel: + + @pytest.fixture + def params_working(self) -> dict: + # Paramètres pour créer une config qui fonctionne + return { + "dry_run": True, + "include_source_codes": ["source1 (id=1)", "source3 (id=3)"], + "include_acteur_type_codes": ["atype2 (id=2)", "atype3 (id=3)"], + "include_only_if_regex_matches_nom": "mon nom", + "include_if_all_fields_filled": ["f1_incl", "f2_incl"], + "exclude_if_any_field_filled": ["f3_excl", "f4_excl"], + "normalize_fields_basic": ["basic1", "basic2"], + "normalize_fields_no_words_size1": ["size1"], + "normalize_fields_no_words_size2_or_less": ["size2"], + "normalize_fields_no_words_size3_or_less": ["size3"], + "normalize_fields_order_unique_words": ["order1", "order2"], + "cluster_intra_source_is_allowed": False, + "cluster_fields_exact": ["exact1", "exact2"], + "cluster_fields_fuzzy": ["fuzzy1", "fuzzy2"], + "cluster_fuzzy_threshold": 0.5, + "fields_all": [ + "f1_incl", + "f2_incl", + "f3_excl", + "f4_excl", + "basic1", + "basic2", + "size1", + "size2", + "size3", + "order1", + "order2", + "exact1", + "exact2", + "fuzzy1", + "fuzzy2", + "extra_to_ignore", + ], + "mapping_source_ids_by_codes": {"source1": 1, "source2": 2, "source3": 3}, + "mapping_acteur_type_ids_by_codes": {"atype1": 1, "atype2": 2, "atype3": 3}, + } + + @pytest.fixture + def config_working(self, params_working) -> ClusterConfig: + # Config qui fonctionne + return ClusterConfig(**params_working) + + def test_working_source_ids_resolved(self, config_working): + # Avec un gap au milieu et != total que le mapping_ + # pour démontrer qu'on à bien pioché + assert config_working.include_source_ids == [1, 3] + + def test_working_acteur_type_ids_resolved(self, config_working): + # Avec un gap en début et != total que le mapping_ + # pour démontrer qu'on à bien pioché + assert config_working.include_acteur_type_ids == [2, 3] + + def test_working_no_sources_equals_all_sources(self, params_working): + # Si aucun code source fourni alors on inclut toutes les sources + params_working["include_source_codes"] = None + config = ClusterConfig(**params_working) + assert config.include_source_ids == [1, 2, 3] + + def test_working_cluster_fields_separate(self, config_working, params_working): + # Par défaut on ne clusterise pas sur la même source + assert config_working.cluster_intra_source_is_allowed is False + assert config_working.cluster_fields_separate == ["source_id"] + # En revanche si on autorise le clustering intra-source + # on ne sépare pas sur la source + params_working["cluster_intra_source_is_allowed"] = True + config = ClusterConfig(**params_working) + assert config.cluster_fields_separate == [] + + def test_optional_include_only_if_regex_matches_nom(self, params_working): + # On peut ne pas fournir de regex + params_working["include_only_if_regex_matches_nom"] = None + config = ClusterConfig(**params_working) + assert config.include_only_if_regex_matches_nom is None + + def test_optional_exclude_if_any_field_filled(self, params_working): + # On peut ne pas fournir de champs à exclure + params_working["exclude_if_any_field_filled"] = None + config = ClusterConfig(**params_working) + assert config.exclude_if_any_field_filled == [] + + def test_optional_normalize_fields_basic(self, params_working): + # On peut ne pas fournir de champs à normaliser + # et tous les champs présent dans les champs "fields" + # (sauf field_all) seront rajoutés à la liste + params_working["normalize_fields_basic"] = None + config = ClusterConfig(**params_working) + expected = config.fields_used + diff = set(config.normalize_fields_basic) - set(expected) + assert not diff, f"Différence: {diff}" + + def test_fields_used_always_has_internal_fields(self, params_working): + # Les champs utilisés contiennent toujours les champs + # internes (ex: source_id, acteur_type_id) + config = ClusterConfig(**params_working) + assert "source_id" in config.fields_used + assert "acteur_type_id" in config.fields_used + assert "identifiant_unique" in config.fields_used + + def test_fields_used_has_no_duplicates(self, params_working): + # Les champs utilisés ne doivent pas contenir de doublons + params_working["normalize_fields_basic"] = ["basic1", "basic1"] + params_working["normalize_fields_no_words_size1"] = ["basic1", "basic1"] + config = ClusterConfig(**params_working) + assert len(config.fields_used) == len(set(config.fields_used)) + + def test_optinoal_normalize_fields_order_unique_words(self, params_working): + # On peut ne pas fournir de champs à normaliser + params_working["normalize_fields_order_unique_words"] = None + config = ClusterConfig(**params_working) + assert config.normalize_fields_order_unique_words == [] + + def test_default_dry_run_is_true(self, params_working): + # On veut forcer l'init du dry_run pour limiter + # les risques de faux positifs (ex: valeur None + # qui fait échouer if config.dry_run et entraine + # des modifications) + params_working["dry_run"] = None + with pytest.raises(ValueError, match="dry_run à fournir"): + ClusterConfig(**params_working) + + def test_error_one_source_no_intra(self, params_working): + # Si on ne founit qu'une source alors il faut autoriser + # le clustering intra-source + params_working["cluster_intra_source_is_allowed"] = False + params_working["include_source_codes"] = ["source1 (id=1)"] + msg = "1 source sélectionnée mais intra-source désactivé" + with pytest.raises(ValueError, match=msg): + ClusterConfig(**params_working) + + def test_error_must_provide_acteur_type(self, params_working): + # Si aucun type d'acteur fourni alors on lève une erreur + # car on ne veut pas clustering sur tous les types d'acteurs + # à la fois = trop de risques de faux positifs + params_working["include_acteur_type_codes"] = [] + with pytest.raises(ValueError, match="Au moins un type d'acteur"): + ClusterConfig(**params_working) + + def test_error_must_provide_acteur_type_none(self, params_working): + # Variation ci-dessus avec None + params_working["include_acteur_type_codes"] = None + with pytest.raises(ValueError, match="Au moins un type d'acteur"): + ClusterConfig(**params_working) + + def test_error_source_codes_invalid(self, params_working): + # Erreur si un code source n'existe pas dans le mapping + params_working["include_source_codes"] = ["MAUVAISE SOURCE (id=666)"] + with pytest.raises(ValueError, match="Codes non trouvés dans le mapping"): + ClusterConfig(**params_working) + + def test_error_acteur_type_codes_invalid(self, params_working): + # Erreur si un code acteur type n'existe pas dans le mapping + params_working["include_acteur_type_codes"] = ["MAUVAIS TYPE (id=666)"] + with pytest.raises(ValueError, match="Codes non trouvés dans le mapping"): + ClusterConfig(**params_working) diff --git a/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_config_validate.py b/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_config_validate.py deleted file mode 100644 index fac6ef0e1..000000000 --- a/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_config_validate.py +++ /dev/null @@ -1,66 +0,0 @@ -import pytest -from cluster.tasks.business_logic.cluster_acteurs_config_validate import ( - cluster_acteurs_config_validate, -) - - -class TestClusteringActeurConfigValidate: - - @pytest.fixture(scope="session") - def map_source_codes(self): - return { - "source1": 1, - "source2": 2, - "source3": 3, - } - - @pytest.fixture(scope="session") - def map_atype_codes(self): - return { - "type1": 1, - "type2": 2, - "type3": 3, - } - - @pytest.fixture - def params(self, map_source_codes, map_atype_codes): - return { - "mapping_source_id_by_code": map_source_codes, - "mapping_acteur_type_id_by_code": map_atype_codes, - "include_source_codes": ["source1 (id=1)"], - "include_acteur_type_codes": ["type1 (id=1)"], - "include_if_all_fields_filled": ["nom", "code_postal"], - "exclude_if_any_field_filled": ["siret"], - } - - def test_include_source_ids_invalid(self, params): - params["include_source_codes"] = [] - with pytest.raises( - ValueError, match="Au moins une source doit être sélectionnée" - ): - cluster_acteurs_config_validate(**params) - - def test_include_acteur_type_ids_invalid(self, params): - params["include_acteur_type_codes"] = [] - with pytest.raises( - ValueError, match="Au moins un type d'acteur doit être sélectionné" - ): - cluster_acteurs_config_validate(**params) - - def test_include_if_fields_not_empty_invalid(self, params): - params["include_if_all_fields_filled"] = [] - with pytest.raises( - ValueError, match="Au moins un champ non vide doit être sélectionné" - ): - cluster_acteurs_config_validate(**params) - - def test_fields_incl_excl_invalid(self, params): - params["include_if_all_fields_filled"] = ["nom"] - params["exclude_if_any_field_filled"] = ["nom"] - with pytest.raises( - ValueError, match="Champs à la fois à inclure et à exclure: {'nom'}" - ): - cluster_acteurs_config_validate(**params) - - def test_valid(self, params): - cluster_acteurs_config_validate(**params)