Skip to content

Commit

Permalink
🚀 CLUSTERING: DAG airflow - Sélection des acteurs (#1145)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxcorbeau authored Jan 7, 2025
1 parent a6d51b9 commit db3ff84
Show file tree
Hide file tree
Showing 9 changed files with 576 additions and 0 deletions.
107 changes: 107 additions & 0 deletions dags/clustering/dags/clustering_acteur.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.models.param import Param
from clustering.tasks.airflow_logic.clustering_config_validate_task import (
clustering_acteur_config_validate_task,
)
from clustering.tasks.airflow_logic.clustering_db_data_read_acteurs_task import (
clustering_db_data_read_acteurs_task,
)
from shared.tasks.database_logic.db_manager import PostgresConnectionManager
from sources.tasks.airflow_logic.operators import default_args
from sources.tasks.business_logic.read_mapping_from_postgres import (
read_mapping_from_postgres,
)
from sqlalchemy import inspect
from utils.airflow_params import airflow_params_dropdown_from_mapping

default_args["retries"] = 0

# -------------------------------------------
# Gestion des dropdowns des paramètres
# -------------------------------------------
# A noter que ce design pattern est a éviter au maximum
# car le code est executé au parsing des fichiers DAG
# selon min_file_process_interval
# https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#config-scheduler-min-file-process-interval
# https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#top-level-python-code
# En revanche sans cette approche, il va falloir:
# - soit laisser le métier rentrer des paramètres complexes à la main
# - soit maintenir des mapping statiques ici
# Ces deux options semblent pires que l'inconvénient du top-level code
# sachant que le code executé demeure assez légé

# Récupération données DB
mapping_source_id_by_code = read_mapping_from_postgres(table_name="qfdmo_source")
mapping_acteur_type_id_by_code = read_mapping_from_postgres(
table_name="qfdmo_acteurtype"
)
# Création des dropdowns
dropdown_sources = airflow_params_dropdown_from_mapping(mapping_source_id_by_code)
dropdown_acteur_types = airflow_params_dropdown_from_mapping(
mapping_acteur_type_id_by_code
)


def table_columns_get(table_name: str) -> list[str]:
"""
Récupère la liste des colonnes d'une table dans une base de données.
"""
engine = PostgresConnectionManager().engine
inspector = inspect(engine)
columns = inspector.get_columns(table_name) # type: ignore
return [column["name"] for column in columns]


dropdown_acteur_columns = table_columns_get("qfdmo_revisionacteur")


with DAG(
dag_id="clustering_acteur",
dag_display_name="Clustering - Acteur",
default_args=default_args,
description=("Un DAG pour générer des suggestions de clustering pour les acteurs"),
params={
"include_source_codes": Param(
[],
type="array",
# La terminologie Airflow n'est pas très heureuse
# mais "examples" est bien la façon de faire des dropdowns
# voir https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/params.html
examples=dropdown_sources,
description_md="""**➕ INCLUSION ACTEURS**: seuls ceux qui proviennent
de ces sources
(opérateur **OU/OR**)""",
),
"include_acteur_type_codes": Param(
[],
type="array",
examples=dropdown_acteur_types,
description_md="""**➕ INCLUSION ACTEURS**: ceux qui sont de ces types
(opérateur **OU/OR**)""",
),
"include_if_all_fields_filled": Param(
["code_postal"],
type="array",
examples=dropdown_acteur_columns,
description_md="""**➕ INCLUSION ACTEURS**: ceux dont tous ces champs
sont **remplis**
exemple: travailler uniquement sur les acteurs avec SIRET
(opérateur **ET/AND**)""",
),
"exclude_if_any_field_filled": Param(
[],
type=["null", "array"],
examples=dropdown_acteur_columns,
description_md="""**🛑 EXCLUSION ACTEURS**: ceux dont n'importe quel
de ces champs est **rempli**
exemple: travailler uniquement sur les acteurs SANS SIRET
(opérateur **OU/OR**)""",
),
},
schedule=None,
) as dag:
chain(
clustering_acteur_config_validate_task(dag=dag),
clustering_db_data_read_acteurs_task(dag=dag),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""
Tâche Airflow pour valider la configuration de clustering
"""

from airflow import DAG
from airflow.operators.python import PythonOperator
from clustering.tasks.business_logic.clustering_config_validate import (
clustering_acteur_config_validate,
)
from sources.tasks.business_logic.read_mapping_from_postgres import (
read_mapping_from_postgres,
)
from utils import logging_utils as log

mapping_source_id_by_code = read_mapping_from_postgres(table_name="qfdmo_source")
mapping_acteur_type_id_by_code = read_mapping_from_postgres(
table_name="qfdmo_acteurtype"
)


def clustering_acteur_config_validate_wrapper(**kwargs) -> None:
"""Wrapper de la tâche Airflow pour échanger les paramètres
et pouvoir tester la tâche sans mock."""
params = kwargs["params"]

log.preview("sources sélectionnées", params["include_source_codes"])
log.preview("acteur_types sélectionnés", params["include_acteur_type_codes"])
log.preview("inclure si champs non-vide", params["include_if_all_fields_filled"])
log.preview("exclude si champ vide", params["exclude_if_any_field_filled"])
log.preview(
"champs à la fois inclure/exclure", params["include_if_all_fields_filled"]
)

include_source_ids, include_acteur_type_ids = clustering_acteur_config_validate(
mapping_source_id_by_code=mapping_source_id_by_code,
mapping_acteur_type_id_by_code=mapping_acteur_type_id_by_code,
include_source_codes=params["include_source_codes"],
include_acteur_type_codes=params["include_acteur_type_codes"],
include_if_all_fields_filled=params["include_if_all_fields_filled"],
exclude_if_any_field_filled=params["exclude_if_any_field_filled"],
)
params["include_source_ids"] = include_source_ids
params["include_acteur_type_ids"] = include_acteur_type_ids

# Use xcom to pass the params to the next task
kwargs["ti"].xcom_push(key="params", value=params)


def clustering_acteur_config_validate_task(dag: DAG) -> PythonOperator:
"""La tâche Airflow qui ne fait que appeler le wrapper"""
return PythonOperator(
task_id="clustering_acteur_config_validate",
python_callable=clustering_acteur_config_validate_wrapper,
dag=dag,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import pandas as pd
from airflow import DAG
from airflow.operators.python import PythonOperator
from clustering.tasks.business_logic.clustering_db_data_read_acteurs import (
clustering_db_data_read_acteurs,
)
from shared.tasks.database_logic.db_manager import PostgresConnectionManager
from utils import logging_utils as log


def clustering_db_data_read_acteurs_wrapper(**kwargs) -> pd.DataFrame:

# use xcom to get the params from the previous task
params = kwargs["ti"].xcom_pull(
key="params", task_ids="clustering_acteur_config_validate"
)

log.preview("paramètres reçus", params)
log.preview("sources sélectionnées", params["include_source_ids"])
log.preview("acteur_types sélectionnés", params["include_acteur_type_ids"])
log.preview("inclure si champs remplis", params["include_if_all_fields_filled"])
log.preview("exclude si champs remplis", params["exclude_if_any_field_filled"])

df, query = clustering_db_data_read_acteurs(
include_source_ids=params["include_source_ids"],
include_acteur_type_ids=params["include_acteur_type_ids"],
include_if_all_fields_filled=params["include_if_all_fields_filled"],
exclude_if_any_field_filled=params["exclude_if_any_field_filled"],
engine=PostgresConnectionManager().engine,
)
log.preview("requête SQL utilisée", query)
log.preview("acteurs sélectionnés", df)
log.preview("# acteurs par source_id", df.groupby("source_id").size())
log.preview("# acteurs par acteur_type_id", df.groupby("acteur_type_id").size())

return df


def clustering_db_data_read_acteurs_task(dag: DAG) -> PythonOperator:
"""La tâche Airflow qui ne fait que appeler le wrapper"""
return PythonOperator(
task_id="clustering_db_data_read_acteurs",
python_callable=clustering_db_data_read_acteurs_wrapper,
dag=dag,
)
41 changes: 41 additions & 0 deletions dags/clustering/tasks/business_logic/clustering_config_validate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""
Fonction pour valider la configuration de clustering
"""

from utils.airflow_params import airflow_params_dropdown_selected_to_ids


def clustering_acteur_config_validate(
mapping_source_id_by_code: dict[str, int],
mapping_acteur_type_id_by_code: dict[str, int],
include_source_codes: list[str],
include_acteur_type_codes: list[str],
include_if_all_fields_filled: list[str],
exclude_if_any_field_filled: list[str],
) -> tuple:
"""Fonction de validation de la config de clustering"""
include_source_ids = airflow_params_dropdown_selected_to_ids(
mapping_ids_by_codes=mapping_source_id_by_code,
dropdown_selected=include_source_codes,
)
include_acteur_type_ids = airflow_params_dropdown_selected_to_ids(
mapping_ids_by_codes=mapping_acteur_type_id_by_code,
dropdown_selected=include_acteur_type_codes,
)
fields_incl_excl = set(include_if_all_fields_filled) & set(
exclude_if_any_field_filled
)

if not include_source_ids:
raise ValueError("Au moins une source doit être sélectionnée")

if not include_acteur_type_ids:
raise ValueError("Au moins un type d'acteur doit être sélectionné")

if not include_if_all_fields_filled:
raise ValueError("Au moins un champ non vide doit être sélectionné")

if fields_incl_excl:
raise ValueError(f"Champs à la fois à inclure et à exclure: {fields_incl_excl}")

return include_source_ids, include_acteur_type_ids
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import numpy as np
import pandas as pd
from sqlalchemy import Column, Integer, MetaData, String, Table, and_, or_
from sqlalchemy.dialects import postgresql

"""
TODO:
- initialiser django au sein du contexte airflow
puis dans le airflow_logic/*_task.py:
- charger le modèle DisplayedActeur
- appeler la fonction clustering_params_to_django_queryset
- convertir le queryset SQL puis en dataframe
- supprimer clustering_params_to_sql_query qui n'est plus utilisée
from django.db.models import Q
def clustering_params_to_django_queryset(
model,
include_source_ids,
include_acteur_type_ids,
include_if_fields_filled,
exclude_if_any_field_filled,
):
queryset = model.objects.all()
# Filter by source_id
if include_source_ids:
queryset = queryset.filter(source_id__in=include_source_ids)
# Filter by acteur_type_id
if include_acteur_type_ids:
queryset = queryset.filter(acteur_type_id__in=include_acteur_type_ids)
# Ensure fields in include_if_fields_filled are not empty
if include_if_fields_filled:
for field in include_if_fields_filled:
queryset = queryset.filter(
~Q(**{f"{field}__isnull": True}), ~Q(**{f"{field}": ""})
)
# Exclude entries where any field in exclude_if_any_field_filled is filled
if exclude_if_any_field_filled:
for field in exclude_if_any_field_filled:
queryset = queryset.exclude(
~Q(**{f"{field}__isnull": True}) & ~Q(**{f"{field}": ""})
)
return queryset
"""


def clustering_params_to_sql_query(
table_name,
include_source_ids,
include_acteur_type_ids,
include_if_all_fields_filled,
exclude_if_any_field_filled,
):
# Dynamically create metadata and table definition
metadata = MetaData()
table = Table(
table_name,
metadata,
Column("source_id", Integer),
Column("acteur_type_id", Integer),
*[
Column(field, String)
for field in include_if_all_fields_filled + exclude_if_any_field_filled
],
)

query = table.select()

# Add source_id filter
if include_source_ids:
query = query.where(table.c.source_id.in_(include_source_ids))

# Add acteur_type_id filter
if include_acteur_type_ids:
query = query.where(table.c.acteur_type_id.in_(include_acteur_type_ids))

# Add include_if_all_fields_filled filters
if include_if_all_fields_filled:
non_empty_conditions = [
and_(table.c[field] != "", table.c[field] is not None)
for field in include_if_all_fields_filled
]
query = query.where(and_(*non_empty_conditions))

# Add exclude_if_any_field_filled filters
if exclude_if_any_field_filled:
empty_conditions = [
or_(table.c[field] == "", table.c[field] is None)
for field in exclude_if_any_field_filled
]
query = query.where(~or_(*empty_conditions))

# Compile the query into a SQL string
compiled_query = str(
query.compile(
dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True}
)
).replace(f"{table_name}.", "")
return compiled_query


def clustering_db_data_read_acteurs(
include_source_ids,
include_acteur_type_ids,
include_if_all_fields_filled,
exclude_if_any_field_filled,
engine,
) -> tuple[pd.DataFrame, str]:
query = clustering_params_to_sql_query(
"qfdmo_displayedacteur",
include_source_ids,
include_acteur_type_ids,
include_if_all_fields_filled,
exclude_if_any_field_filled,
)
df = pd.read_sql_query(query, engine).replace({np.nan: None})
return df, query
Loading

0 comments on commit db3ff84

Please sign in to comment.