Skip to content

Commit

Permalink
amélioration parent=ACTIF, suppresion check 1 source
Browse files Browse the repository at this point in the history
  • Loading branch information
maxcorbeau committed Jan 27, 2025
1 parent 1fa4b9b commit 0375e1d
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 19 deletions.
13 changes: 13 additions & 0 deletions dags/cluster/config/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,25 @@ def check_model(cls, values):
dropdown_selected=values["include_acteur_type_codes"],
)

"""
Logique supprimée le 2025-01-27 mais conservée pour référence:
- depuis l'ajout des parents indépendants des sources
via PR1265 on ne peut plus savoir si on héritera
uniquement d'une seule source au moment de la config, donc on
laisse passer au niveau de la sélection
TODO: on pourrait scinder la config en plusieurs sous-config:
- SelectionConfig
- NormalizationConfig
- ClusteringConfig
- EnrichmentConfig
Et ainsi avoir des validations plus fines à chaque étape
# ACTEUR TYPE vs. INTRA-SOURCE
if (
len(values["include_source_ids"]) == 1
and not values["cluster_intra_source_is_allowed"]
):
raise ValueError("1 source sélectionnée mais intra-source désactivé")
"""

# Par défaut on ne clusterise pas les acteurs d'une même source
# sauf si intra-source est activé
Expand Down
Empty file modified dags/cluster/dags/cluster_acteurs_suggestions.py
100644 → 100755
Empty file.
57 changes: 51 additions & 6 deletions dags/cluster/tasks/airflow_logic/cluster_acteurs_selection_from_db.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import logging

import numpy as np
import pandas as pd
from airflow import DAG
from airflow.operators.python import PythonOperator
from cluster.config.model import ClusterConfig
from cluster.tasks.business_logic.cluster_acteurs_df_sort import cluster_acteurs_df_sort
from cluster.tasks.business_logic.cluster_acteurs_selection_from_db import (
from cluster.tasks.business_logic import (
cluster_acteurs_df_sort,
cluster_acteurs_selection_acteur_type_parents,
cluster_acteurs_selection_from_db,
)
from utils import logging_utils as log
Expand Down Expand Up @@ -45,7 +48,14 @@ def cluster_acteurs_selection_from_db_wrapper(**kwargs) -> None:
)
log.preview("Config reçue", config)

df, query = cluster_acteurs_selection_from_db(
# --------------------------------
# 1) Sélection des acteurs
# --------------------------------
# Quels qu'ils soient (enfants ou parents) sur la
# base de tous les critères d'inclusion/exclusion
# fournis au niveau du DAG
logging.info(log.banner_string("Sélection des acteurs"))
df_acteurs, query = cluster_acteurs_selection_from_db(
model_class=DisplayedActeur,
include_source_ids=config.include_source_ids,
include_acteur_type_ids=config.include_acteur_type_ids,
Expand All @@ -54,10 +64,45 @@ def cluster_acteurs_selection_from_db_wrapper(**kwargs) -> None:
exclude_if_any_field_filled=config.exclude_if_any_field_filled,
extra_dataframe_fields=config.fields_used,
)
df_acteurs = cluster_acteurs_df_sort(df_acteurs)
log.preview("requête SQL utilisée", query)
log.preview("acteurs sélectionnés", df)
log.preview("# acteurs par source_id", df.groupby("source_id").size())
log.preview("# acteurs par acteur_type_id", df.groupby("acteur_type_id").size())
log.preview("# acteurs par source_id", df_acteurs.groupby("source_id").size())
log.preview(
"# acteurs par acteur_type_id", df_acteurs.groupby("acteur_type_id").size()
)
log.preview_df_as_markdown("acteurs sélectionnés", df_acteurs)

# --------------------------------
# 2) Sélection des parents uniquements
# --------------------------------
# Aujourd'hui (2025-01-27): la convention data veut qu'un parent
# soit attribué une source NULL. Donc si le métier choisit de
# clusteriser des sources en particulier à 1), on ne peut donc
# jamais récupérer les parents potentiels pour le même type d'acteur
# La logique ci-dessous vient palier à ce problème en sélectionnant
# TOUS les parents des acteurs types sélectionnés, et en ignorant
# les autres paramètres de sélection
logging.info(log.banner_string("Sélection des parents"))
df_parents = cluster_acteurs_selection_acteur_type_parents(
acteur_type_ids=config.include_acteur_type_ids,
fields=config.fields_used,
)
df_parents = cluster_acteurs_df_sort(df_parents)
log.preview_df_as_markdown("parents sélectionnés", df_parents)

# --------------------------------
# 3) Fusion acteurs + parents
# --------------------------------
logging.info(log.banner_string("Fusion acteurs + parents"))
ids = set()
ids.update(df_acteurs["identifiant_unique"].values)
ids.update(df_parents["identifiant_unique"].values)
log.preview("IDs avant la fusion", ids)
df = pd.concat([df_acteurs, df_parents], ignore_index=True).replace({np.nan: None})
df = df.drop_duplicates(subset="identifiant_unique", keep="first")
df = cluster_acteurs_df_sort(df)
log.preview("IDs après la fusion", df["identifiant_unique"].tolist())
log.preview_df_as_markdown("acteurs + parents sélectionnés", df)

if df.empty:
raise ValueError("Aucun acteur trouvé avec les critères de sélection")
Expand Down
4 changes: 4 additions & 0 deletions dags/cluster/tasks/business_logic/__init__.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
from .cluster_acteurs_parent_calculations import ( # noqa
cluster_acteurs_parent_calculations,
)
from .cluster_acteurs_selection_acteur_type_parents import ( # noqa
cluster_acteurs_selection_acteur_type_parents,
)
from .cluster_acteurs_selection_from_db import cluster_acteurs_selection_from_db # noqa
from .cluster_acteurs_suggestions import cluster_acteurs_suggestions # noqa
from .cluster_acteurs_suggestions_to_db import cluster_acteurs_suggestions_to_db # noqa
from .cluster_acteurs_suggestions_validate import ( # noqa
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

django_setup_full()
from qfdmo.models import ActeurType, DisplayedActeur # noqa: E402
from qfdmo.models.acteur import ActeurStatus # noqa: E402


def cluster_acteurs_selection_acteur_type_parents(
Expand All @@ -25,7 +26,9 @@ def cluster_acteurs_selection_acteur_type_parents(
# On récupère les parents des acteurs types donnés
# qui sont censés être des acteurs sans source
parents = DisplayedActeur.objects.filter(
acteur_type__id__in=acteur_type_ids
).filter(source__id__isnull=True)
acteur_type__id__in=acteur_type_ids,
statut=ActeurStatus.ACTIF,
source__id__isnull=True,
)

return django_model_queryset_to_df(parents, fields)
6 changes: 6 additions & 0 deletions dags_unit_tests/cluster/config/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ def test_default_dry_run_is_true(self, params_working):
with pytest.raises(ValueError, match="dry_run à fournir"):
ClusterConfig(**params_working)

"""
Test supprimé le 2025-01-27 mais conservé pour référence:
- depuis l'ajout des parents indépendant des sources
via PR1265 on ne peut plus savoir si on héritera
uniquement d'une seule source au moment de la config
def test_error_one_source_no_intra(self, params_working):
# Si on ne founit qu'une source alors il faut autoriser
# le clustering intra-source
Expand All @@ -142,6 +147,7 @@ def test_error_one_source_no_intra(self, params_working):
msg = "1 source sélectionnée mais intra-source désactivé"
with pytest.raises(ValueError, match=msg):
ClusterConfig(**params_working)
"""

def test_error_must_provide_acteur_type(self, params_working):
# Si aucun type d'acteur fourni alors on lève une erreur
Expand Down
42 changes: 31 additions & 11 deletions ..._tests/cluster/tasks/business_logic/test_cluster_acteurs_selection_acteur_type_parents.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@

import pandas as pd
import pytest
from cluster.tasks.business_logic.cluster_acteurs_selection_acteur_type_parents import (
cluster_acteurs_selection_acteur_type_parents,
)
from cluster.tasks.business_logic import cluster_acteurs_selection_acteur_type_parents

from qfdmo.models import DisplayedActeur
from unit_tests.qfdmo.acteur_factory import ActeurTypeFactory, SourceFactory
Expand All @@ -17,6 +15,7 @@ class TestClusterActeursSelectionActeurTypeParents:

@pytest.fixture
def db_testdata_write(self) -> dict:
print("db_testdata_write")
"""Création des donnéees de test en DB"""
data = {}
data["at1"] = ActeurTypeFactory(code="at1")
Expand All @@ -28,11 +27,12 @@ def db_testdata_write(self) -> dict:
# On fait exprès d'utiliser des UUIDs partout, y compris
# pour les non-parents, pour démontrer que la requête ne
# se base pas sur l'anatomie des IDs
data["id_at2_pas_parent"] = "10000000-0000-0000-0000-000000000000"
data["id_at2_pas_parent"] = "20000000-0000-0000-0000-000000000000"
data["id_at2_parent_a"] = "20000000-0000-0000-0000-00000000000a"
data["id_at2_parent_b"] = "20000000-0000-0000-0000-00000000000b"
data["id_at3_pas_parent"] = "30000000-0000-0000-0000-000000000000"
data["id_at4_parent"] = "40000000-0000-0000-0000-000000000000"
data["id_at4_parent_inactif"] = "40000000-0000-0000-0000-00000inactif"

# at1
# Parent MAIS d'un acteur type non sélectionné (at1)
Expand Down Expand Up @@ -70,11 +70,17 @@ def db_testdata_write(self) -> dict:
acteur_type=data["at4"],
identifiant_unique=data["id_at4_parent"],
)
# On test le cas où il y a 1 parent mais inactif
DisplayedActeur.objects.create(
acteur_type=data["at4"],
identifiant_unique=data["id_at4_parent_inactif"],
statut="INACTIF",
)

return data

@pytest.fixture
def df(self, db_testdata_write) -> pd.DataFrame:
def df_working(self, db_testdata_write) -> pd.DataFrame:
"""On génère et retourne la df pour les tests"""
data = db_testdata_write
acteur_type_ids = [data["at2"].id, data["at3"].id, data["at4"].id]
Expand All @@ -84,23 +90,37 @@ def df(self, db_testdata_write) -> pd.DataFrame:
fields=fields,
)

def test_df_shape(self, df):
def test_df_shape(self, df_working):
# 3 parents (2 parents pour at2 + 0 pour at3 + 1 pour at4)
# 3 champs
assert df.shape == (3, 3)
assert df_working.shape == (3, 3)

def test_df_columns(self, df):
def test_df_columns(self, df_working):
# Seules les colonnes demandées sont retournées
assert df.columns.tolist() == ["identifiant_unique", "statut", "latitude"]
assert sorted(df_working.columns.tolist()) == sorted(
[
"identifiant_unique",
"statut",
"latitude",
]
)

def test_parents_are_valid(self, df, db_testdata_write):
def test_parents_are_valid(self, df_working, db_testdata_write):
# Seuls les parents des acteurs types demandés
# sont retournés
data = db_testdata_write
assert sorted(df["identifiant_unique"].tolist()) == sorted(
assert sorted(df_working["identifiant_unique"].tolist()) == sorted(
[
data["id_at2_parent_a"],
data["id_at2_parent_b"],
data["id_at4_parent"],
]
)

def test_parents_not_actif_excluded(self, df_working, db_testdata_write):
# Les parents inactifs ne sont pas retournés
data = db_testdata_write
assert (
data["id_at4_parent_inactif"]
not in df_working["identifiant_unique"].tolist()
)

0 comments on commit 0375e1d

Please sign in to comment.