From 65f2240d8ec0491094f4322784ed0eeba2d98325 Mon Sep 17 00:00:00 2001 From: Nicolas Oudard Date: Tue, 14 Jan 2025 14:30:30 +0100 Subject: [PATCH] isolation du dags d'ingestion des suggestions --- dags/ingest_validated_dataset_to_db.py | 120 ++++++------ .../db_normalize_suggestion_task.py | 15 ++ .../db_read_suggestiontoprocess_task.py | 13 ++ .../db_write_validsuggestions_task.py | 23 +++ .../launch_compute_carte_acteur_task.py | 10 + .../business_logic/db_normalize_suggestion.py | 113 ++++++++++++ .../db_read_suggestiontoprocess.py | 18 ++ .../db_write_validsuggestions.py | 172 ++++++++++++++++++ dags/suggestions/dags/apply_suggestions.py | 42 +++++ data/views.py | 2 +- 10 files changed, 459 insertions(+), 69 deletions(-) create mode 100644 dags/suggestions/airflow_logic/db_normalize_suggestion_task.py create mode 100644 dags/suggestions/airflow_logic/db_read_suggestiontoprocess_task.py create mode 100644 dags/suggestions/airflow_logic/db_write_validsuggestions_task.py create mode 100644 dags/suggestions/airflow_logic/launch_compute_carte_acteur_task.py create mode 100644 dags/suggestions/business_logic/db_normalize_suggestion.py create mode 100644 dags/suggestions/business_logic/db_read_suggestiontoprocess.py create mode 100644 dags/suggestions/business_logic/db_write_validsuggestions.py create mode 100755 dags/suggestions/dags/apply_suggestions.py diff --git a/dags/ingest_validated_dataset_to_db.py b/dags/ingest_validated_dataset_to_db.py index fb56f20ea..1ac93cca2 100755 --- a/dags/ingest_validated_dataset_to_db.py +++ b/dags/ingest_validated_dataset_to_db.py @@ -1,10 +1,12 @@ -# FIXME: intégrer ce dag dans l'architecture cible +""" +DEPRECATED : utiliser le dag apply_suggestions +""" from datetime import timedelta import pandas as pd from airflow.models import DAG -from airflow.operators.python import PythonOperator, ShortCircuitOperator +from airflow.operators.python import BranchPythonOperator, PythonOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.providers.postgres.hooks.postgres import PostgresHook from airflow.utils.dates import days_ago @@ -21,104 +23,75 @@ } dag = DAG( - dag_id="validate_and_process_suggestions", - dag_display_name="Traitement des cohortes de données validées", + dag_id="validate_and_process_dagruns", + dag_display_name="DEPRECATED : Traitement des cohortes de données validées", default_args=default_args, - description="traiter les suggestions à traiter", + description=""" + DEPRECATED : Check for VALIDATE in qfdmo_dagrun and process qfdmo_dagrunchange + util uniquement pour les cohortes de siretisations + """, schedule="*/5 * * * *", catchup=False, max_active_runs=1, ) -def _get_first_suggetsioncohorte_to_insert(): +def _get_first_dagrun_to_insert(): hook = PostgresHook(postgres_conn_id="qfdmo_django_db") + # get first row from table qfdmo_dagrun with status TO_INSERT row = hook.get_first( - f""" - SELECT * FROM data_suggestioncohorte - WHERE statut = '{constants.SUGGESTION_ATRAITER}' - LIMIT 1 - """ + f"SELECT * FROM qfdmo_dagrun WHERE status = '{constants.DAGRUN_TOINSERT}'" + " LIMIT 1" ) return row -def check_suggestion_to_process(**kwargs): - row = _get_first_suggetsioncohorte_to_insert() - return bool(row) +def check_for_validation(**kwargs): + # get first row from table qfdmo_dagrun with status TO_INSERT + row = _get_first_dagrun_to_insert() + + # Skip if row is None + if row is None: + return "skip_processing" + return "fetch_and_parse_data" def fetch_and_parse_data(**context): - row = _get_first_suggetsioncohorte_to_insert() - suggestion_cohorte_id = row[0] + row = _get_first_dagrun_to_insert() + dag_run_id = row[0] engine = PostgresConnectionManager().engine df_sql = pd.read_sql_query( - f""" - SELECT * FROM data_suggestionunitaire - WHERE suggestion_cohorte_id = '{suggestion_cohorte_id}' - """, + f"SELECT * FROM qfdmo_dagrunchange WHERE dag_run_id = '{dag_run_id}'", engine, ) - df_acteur_to_create = df_sql[ - df_sql["type_action"] == constants.SUGGESTION_SOURCE_AJOUT - ] - df_acteur_to_update = df_sql[ - df_sql["type_action"] == constants.SUGGESTION_SOURCE_AJOUT - ] - df_acteur_to_delete = df_sql[ - df_sql["type_action"] == constants.SUGGESTION_SOURCE_SUPRESSION - ] - df_acteur_to_enrich = df_sql[ - df_sql["type_action"] == constants.SUGGESTION_ENRICHISSEMENT - ] - - df_update_actor = df_sql[df_sql["type_action"] == "UPDATE_ACTOR"] - - if not df_acteur_to_create.empty: - normalized_dfs = df_acteur_to_create["suggestion"].apply(pd.json_normalize) - df_acteur = pd.concat(normalized_dfs.tolist(), ignore_index=True) - return dag_ingest_validated_utils.handle_create_event( - df_acteur, suggestion_cohorte_id, engine - ) - if not df_acteur_to_update.empty: - normalized_dfs = df_acteur_to_update["suggestion"].apply(pd.json_normalize) - df_acteur = pd.concat(normalized_dfs.tolist(), ignore_index=True) - return dag_ingest_validated_utils.handle_create_event( - df_acteur, suggestion_cohorte_id, engine - ) - if not df_acteur_to_delete.empty: - normalized_dfs = df_acteur_to_delete["suggestion"].apply(pd.json_normalize) - df_actors_update_actor = pd.concat(normalized_dfs.tolist(), ignore_index=True) - status_repeated = ( - df_acteur_to_delete["statut"] - .repeat(df_acteur_to_delete["suggestion"].apply(len)) - .reset_index(drop=True) - ) - df_actors_update_actor["status"] = status_repeated + df_create = df_sql[df_sql["change_type"] == "CREATE"] + df_update_actor = df_sql[df_sql["change_type"] == "UPDATE_ACTOR"] - return dag_ingest_validated_utils.handle_update_actor_event( - df_actors_update_actor, suggestion_cohorte_id + if not df_create.empty: + normalized_dfs = df_create["row_updates"].apply(pd.json_normalize) + df_actors_create = pd.concat(normalized_dfs.tolist(), ignore_index=True) + return dag_ingest_validated_utils.handle_create_event( + df_actors_create, dag_run_id, engine ) + if not df_update_actor.empty: - if not df_acteur_to_enrich.empty: - - normalized_dfs = df_update_actor["suggestion"].apply(pd.json_normalize) + normalized_dfs = df_update_actor["row_updates"].apply(pd.json_normalize) df_actors_update_actor = pd.concat(normalized_dfs.tolist(), ignore_index=True) status_repeated = ( df_update_actor["status"] - .repeat(df_update_actor["suggestion"].apply(len)) + .repeat(df_update_actor["row_updates"].apply(len)) .reset_index(drop=True) ) df_actors_update_actor["status"] = status_repeated return dag_ingest_validated_utils.handle_update_actor_event( - df_actors_update_actor, suggestion_cohorte_id + df_actors_update_actor, dag_run_id ) return { - "dag_run_id": suggestion_cohorte_id, + "dag_run_id": dag_run_id, } @@ -159,9 +132,19 @@ def write_data_to_postgres(**kwargs): ) -check_suggestion_to_process_task = ShortCircuitOperator( - task_id="check_suggestion_to_process", - python_callable=check_suggestion_to_process, +def skip_processing(**kwargs): + print("No records to validate. DAG run completes successfully.") + + +skip_processing_task = PythonOperator( + task_id="skip_processing", + python_callable=skip_processing, + dag=dag, +) + +branch_task = BranchPythonOperator( + task_id="branch_processing", + python_callable=check_for_validation, dag=dag, ) @@ -177,8 +160,9 @@ def write_data_to_postgres(**kwargs): dag=dag, ) +branch_task >> skip_processing_task ( - check_suggestion_to_process_task + branch_task >> fetch_parse_task >> write_to_postgres_task >> trigger_create_final_actors_dag diff --git a/dags/suggestions/airflow_logic/db_normalize_suggestion_task.py b/dags/suggestions/airflow_logic/db_normalize_suggestion_task.py new file mode 100644 index 000000000..908776b16 --- /dev/null +++ b/dags/suggestions/airflow_logic/db_normalize_suggestion_task.py @@ -0,0 +1,15 @@ +from airflow.models import DAG +from airflow.operators.python import PythonOperator +from suggestions.business_logic.db_normalize_suggestion import db_normalize_suggestion + + +def db_normalize_suggestion_task(dag: DAG): + return PythonOperator( + task_id="db_normalize_suggestion", + python_callable=db_normalize_suggestion_wrapper, + dag=dag, + ) + + +def db_normalize_suggestion_wrapper(**kwargs): + return db_normalize_suggestion() diff --git a/dags/suggestions/airflow_logic/db_read_suggestiontoprocess_task.py b/dags/suggestions/airflow_logic/db_read_suggestiontoprocess_task.py new file mode 100644 index 000000000..3dcd36249 --- /dev/null +++ b/dags/suggestions/airflow_logic/db_read_suggestiontoprocess_task.py @@ -0,0 +1,13 @@ +from airflow.models import DAG +from airflow.operators.python import ShortCircuitOperator +from suggestions.business_logic.db_read_suggestiontoprocess import ( + db_read_suggestiontoprocess, +) + + +def db_read_suggestiontoprocess_task(dag: DAG): + return ShortCircuitOperator( + task_id="check_suggestion_to_process", + python_callable=db_read_suggestiontoprocess, + dag=dag, + ) diff --git a/dags/suggestions/airflow_logic/db_write_validsuggestions_task.py b/dags/suggestions/airflow_logic/db_write_validsuggestions_task.py new file mode 100644 index 000000000..80f097955 --- /dev/null +++ b/dags/suggestions/airflow_logic/db_write_validsuggestions_task.py @@ -0,0 +1,23 @@ +from airflow.models import DAG +from airflow.operators.python import PythonOperator +from suggestions.business_logic.db_write_validsuggestions import ( + db_write_validsuggestions, +) +from utils import logging_utils as log + + +def db_write_validsuggestions_task(dag: DAG) -> PythonOperator: + return PythonOperator( + task_id="db_write_validsuggestions", + python_callable=db_write_validsuggestions_wrapper, + dag=dag, + ) + + +def db_write_validsuggestions_wrapper(**kwargs): + data_from_db = kwargs["ti"].xcom_pull(task_ids="db_normalize_suggestion") + + log.preview("data_from_db acteur", data_from_db["actors"]) + log.preview("data_from_db change_type", data_from_db["change_type"]) + + return db_write_validsuggestions(data_from_db=data_from_db) diff --git a/dags/suggestions/airflow_logic/launch_compute_carte_acteur_task.py b/dags/suggestions/airflow_logic/launch_compute_carte_acteur_task.py new file mode 100644 index 000000000..963593762 --- /dev/null +++ b/dags/suggestions/airflow_logic/launch_compute_carte_acteur_task.py @@ -0,0 +1,10 @@ +from airflow.models import DAG +from airflow.operators.trigger_dagrun import TriggerDagRunOperator + + +def launch_compute_carte_acteur_task(dag: DAG) -> TriggerDagRunOperator: + return TriggerDagRunOperator( + task_id="launch_compute_carte_acteur", + trigger_dag_id="compute_carte_acteur", + dag=dag, + ) diff --git a/dags/suggestions/business_logic/db_normalize_suggestion.py b/dags/suggestions/business_logic/db_normalize_suggestion.py new file mode 100644 index 000000000..7b33e281c --- /dev/null +++ b/dags/suggestions/business_logic/db_normalize_suggestion.py @@ -0,0 +1,113 @@ +import pandas as pd +from shared.tasks.database_logic.db_manager import PostgresConnectionManager +from sources.config import shared_constants as constants +from suggestions.business_logic.db_read_suggestiontoprocess import ( + get_first_suggetsioncohorte_to_insert, +) +from utils import logging_utils as log + + +def db_normalize_suggestion(): + row = get_first_suggetsioncohorte_to_insert() + suggestion_cohorte_id = row[0] + + engine = PostgresConnectionManager().engine + + df_sql = pd.read_sql_query( + f""" + SELECT * FROM data_suggestionunitaire + WHERE suggestion_cohorte_id = '{suggestion_cohorte_id}' + """, + engine, + ) + + df_acteur_to_create = df_sql[ + df_sql["type_action"] == constants.SUGGESTION_SOURCE_AJOUT + ] + df_acteur_to_update = df_sql[ + df_sql["type_action"] == constants.SUGGESTION_SOURCE_AJOUT + ] + df_acteur_to_delete = df_sql[ + df_sql["type_action"] == constants.SUGGESTION_SOURCE_SUPRESSION + ] + if not df_acteur_to_create.empty: + normalized_dfs = df_acteur_to_create["suggestion"].apply(pd.json_normalize) + df_acteur = pd.concat(normalized_dfs.tolist(), ignore_index=True) + return normalize_acteur_update_for_db(df_acteur, suggestion_cohorte_id, engine) + if not df_acteur_to_update.empty: + normalized_dfs = df_acteur_to_update["suggestion"].apply(pd.json_normalize) + df_acteur = pd.concat(normalized_dfs.tolist(), ignore_index=True) + return normalize_acteur_update_for_db(df_acteur, suggestion_cohorte_id, engine) + if not df_acteur_to_delete.empty: + normalized_dfs = df_acteur_to_delete["suggestion"].apply(pd.json_normalize) + df_acteur = pd.concat(normalized_dfs.tolist(), ignore_index=True) + log.preview("df_acteur_to_delete", df_acteur) + return { + "actors": df_acteur, + "dag_run_id": suggestion_cohorte_id, + "change_type": constants.SUGGESTION_SOURCE_SUPRESSION, + } + + raise ValueError("No suggestion found") + + +def normalize_acteur_update_for_db(df_actors, dag_run_id, engine): + df_labels = process_many2many_df(df_actors, "labels") + df_acteur_services = process_many2many_df( + df_actors, "acteur_services", df_columns=["acteur_id", "acteurservice_id"] + ) + + max_id_pds = pd.read_sql_query( + "SELECT max(id) FROM qfdmo_propositionservice", engine + )["max"][0] + normalized_pds_dfs = df_actors["proposition_services"].apply(pd.json_normalize) + df_pds = pd.concat(normalized_pds_dfs.tolist(), ignore_index=True) + ids_range = range(max_id_pds + 1, max_id_pds + 1 + len(df_pds)) + + df_pds["id"] = ids_range + df_pds["pds_sous_categories"] = df_pds.apply( + lambda row: [ + {**d, "propositionservice_id": row["id"]} + for d in row["pds_sous_categories"] + ], + axis=1, + ) + + normalized_pdssc_dfs = df_pds["pds_sous_categories"].apply(pd.json_normalize) + df_pdssc = pd.concat(normalized_pdssc_dfs.tolist(), ignore_index=True) + + return { + "actors": df_actors, + "pds": df_pds[["id", "action_id", "acteur_id"]], + "pds_sous_categories": df_pdssc[ + ["propositionservice_id", "souscategorieobjet_id"] + ], + "dag_run_id": dag_run_id, + "labels": df_labels[["acteur_id", "labelqualite_id"]], + "acteur_services": df_acteur_services[["acteur_id", "acteurservice_id"]], + "change_type": constants.SUGGESTION_SOURCE, + } + + +def process_many2many_df(df, column_name, df_columns=["acteur_id", "labelqualite_id"]): + try: + # Attempt to process the 'labels' column if it exists and is not empty + normalized_df = df[column_name].dropna().apply(pd.json_normalize) + if normalized_df.empty: + return pd.DataFrame( + columns=df_columns + ) # Return empty DataFrame if no data to process + else: + return pd.concat(normalized_df.tolist(), ignore_index=True) + except KeyError: + # Handle the case where the specified column does not exist + return pd.DataFrame(columns=df_columns) + + +def normalize_acteur_delete_for_db(df_actors, dag_run_id): + + return { + "actors": df_actors, + "dag_run_id": dag_run_id, + "change_type": constants.SUGGESTION_SOURCE_SUPRESSION, + } diff --git a/dags/suggestions/business_logic/db_read_suggestiontoprocess.py b/dags/suggestions/business_logic/db_read_suggestiontoprocess.py new file mode 100644 index 000000000..02975f494 --- /dev/null +++ b/dags/suggestions/business_logic/db_read_suggestiontoprocess.py @@ -0,0 +1,18 @@ +from airflow.providers.postgres.hooks.postgres import PostgresHook +from sources.config import shared_constants as constants + + +def get_first_suggetsioncohorte_to_insert(): + hook = PostgresHook(postgres_conn_id="qfdmo_django_db") + row = hook.get_first( + f""" + SELECT * FROM data_suggestioncohorte + WHERE statut = '{constants.SUGGESTION_ATRAITER}' + LIMIT 1 + """ + ) + return row + + +def db_read_suggestiontoprocess(**kwargs): + return bool(get_first_suggetsioncohorte_to_insert()) diff --git a/dags/suggestions/business_logic/db_write_validsuggestions.py b/dags/suggestions/business_logic/db_write_validsuggestions.py new file mode 100644 index 000000000..52ff9e156 --- /dev/null +++ b/dags/suggestions/business_logic/db_write_validsuggestions.py @@ -0,0 +1,172 @@ +import logging + +from shared.tasks.database_logic.db_manager import PostgresConnectionManager +from sources.config import shared_constants as constants +from utils import logging_utils as log + + +def db_write_validsuggestions(data_from_db: dict): + # If data_set is empty, nothing to do + dag_run_id = data_from_db["dag_run_id"] + engine = PostgresConnectionManager().engine + if "actors" not in data_from_db: + with engine.begin() as connection: + update_suggestion_status( + connection, dag_run_id, constants.SUGGESTION_ENCOURS + ) + return + df_actors = data_from_db["actors"] + df_labels = data_from_db.get("labels") + df_acteur_services = data_from_db.get("acteur_services") + df_pds = data_from_db.get("pds") + df_pdssc = data_from_db.get("pds_sous_categories") + dag_run_id = data_from_db["dag_run_id"] + change_type = data_from_db.get("change_type", "CREATE") + + with engine.begin() as connection: + if change_type in [ + constants.SUGGESTION_SOURCE, + constants.SUGGESTION_SOURCE_AJOUT, + constants.SUGGESTION_SOURCE_MISESAJOUR, + ]: + db_write_acteurupdate( + connection, df_actors, df_labels, df_acteur_services, df_pds, df_pdssc + ) + elif change_type == constants.SUGGESTION_SOURCE_SUPRESSION: + db_write_acteurdelete(connection, df_actors) + else: + raise ValueError("Invalid change_type") + + update_suggestion_status(connection, dag_run_id, constants.SUGGESTION_SUCCES) + + +def db_write_acteurupdate( + connection, df_actors, df_labels, df_acteur_services, df_pds, df_pdssc +): + logging.warning("Création ou mise à jour des acteurs") + + df_actors[["identifiant_unique"]].to_sql( + "temp_actors", connection, if_exists="replace" + ) + + delete_queries = [ + """ + DELETE FROM qfdmo_propositionservice_sous_categories + WHERE propositionservice_id IN ( + SELECT id FROM qfdmo_propositionservice + WHERE acteur_id IN ( SELECT identifiant_unique FROM temp_actors ) + ); + """, + """ + DELETE FROM qfdmo_acteur_labels + WHERE acteur_id IN ( SELECT identifiant_unique FROM temp_actors ); + """, + """ + DELETE FROM qfdmo_acteur_acteur_services + WHERE acteur_id IN ( SELECT identifiant_unique FROM temp_actors ); + """, + """ + DELETE FROM qfdmo_propositionservice + WHERE acteur_id IN ( SELECT identifiant_unique FROM temp_actors ); + """, + """ + DELETE FROM qfdmo_acteur WHERE identifiant_unique + IN ( SELECT identifiant_unique FROM temp_actors); + """, + ] + + for query in delete_queries: + connection.execute(query) + + # Liste des colonnes souhaitées + collection = connection.execute( + "SELECT column_name FROM information_schema.columns WHERE table_name =" + " 'qfdmo_acteur';" + ) + colonnes_souhaitees = [col[0] for col in collection] + + # Filtrer les colonnes qui existent dans le DataFrame + colonnes_existantes = [ + col for col in colonnes_souhaitees if col in df_actors.columns + ] + + df_actors[colonnes_existantes].to_sql( + "qfdmo_acteur", + connection, + if_exists="append", + index=False, + method="multi", + chunksize=1000, + ) + + df_labels = df_labels[["acteur_id", "labelqualite_id"]] + df_labels.drop_duplicates(inplace=True) + df_labels[["acteur_id", "labelqualite_id"]].to_sql( + "qfdmo_acteur_labels", + connection, + if_exists="append", + index=False, + method="multi", + chunksize=1000, + ) + + df_acteur_services = df_acteur_services[["acteur_id", "acteurservice_id"]] + df_acteur_services.drop_duplicates(inplace=True) + df_acteur_services.to_sql( + "qfdmo_acteur_acteur_services", + connection, + if_exists="append", + index=False, + method="multi", + chunksize=1000, + ) + + df_pds[["id", "action_id", "acteur_id"]].to_sql( + "qfdmo_propositionservice", + connection, + if_exists="append", + index=False, + method="multi", + chunksize=1000, + ) + + df_pdssc[["propositionservice_id", "souscategorieobjet_id"]].to_sql( + "qfdmo_propositionservice_sous_categories", + connection, + if_exists="append", + index=False, + method="multi", + chunksize=1000, + ) + + +def db_write_acteurdelete(connection, df_acteur_to_delete): + # mettre le statut des acteur à "SUPPRIMER" pour tous les acteurs à supprimer + logging.warning("Suppression des acteurs") + identifiant_uniques = list( + set(df_acteur_to_delete[["identifiant_unique"]].values.flatten()) + ) + quoted_identifiant_uniques = [ + f"'{identifiant_unique}'" for identifiant_unique in identifiant_uniques + ] + query_acteur_to_delete = f""" + UPDATE qfdmo_acteur + SET statut='{constants.ACTEUR_SUPPRIME}' + WHERE identifiant_unique IN ({",".join(quoted_identifiant_uniques)}); + UPDATE qfdmo_revisionacteur + SET statut='{constants.ACTEUR_SUPPRIME}' + WHERE identifiant_unique IN ({",".join(quoted_identifiant_uniques)}); + """ + log.preview("query_acteur_to_delete", query_acteur_to_delete) + connection.execute(query_acteur_to_delete) + + +def update_suggestion_status( + connection, suggestion_id, statut=constants.SUGGESTION_ENCOURS +): + query = f""" + UPDATE data_suggestioncohorte + SET status = '{statut}' + WHERE id = {suggestion_id}; + """ + connection.execute(query) diff --git a/dags/suggestions/dags/apply_suggestions.py b/dags/suggestions/dags/apply_suggestions.py new file mode 100755 index 000000000..39e14e97a --- /dev/null +++ b/dags/suggestions/dags/apply_suggestions.py @@ -0,0 +1,42 @@ +from datetime import timedelta + +from airflow.models import DAG +from airflow.utils.dates import days_ago +from suggestions.airflow_logic.db_normalize_suggestion_task import ( + db_normalize_suggestion_task, +) +from suggestions.airflow_logic.db_read_suggestiontoprocess_task import ( + db_read_suggestiontoprocess_task, +) +from suggestions.airflow_logic.db_write_validsuggestions_task import ( + db_write_validsuggestions_task, +) +from suggestions.airflow_logic.launch_compute_carte_acteur_task import ( + launch_compute_carte_acteur_task, +) + +default_args = { + "owner": "airflow", + "depends_on_past": False, + "start_date": days_ago(1), + "retries": 1, + "retry_delay": timedelta(minutes=5), +} + +dag = DAG( + dag_id="apply_suggestions", + dag_display_name="Application des suggestions validées", + default_args=default_args, + description="traiter les suggestions à traiter", + schedule="*/5 * * * *", + catchup=False, + max_active_runs=1, +) + + +( + db_read_suggestiontoprocess_task(dag) + >> db_normalize_suggestion_task(dag) + >> db_write_validsuggestions_task(dag) + >> launch_compute_carte_acteur_task(dag) +) diff --git a/data/views.py b/data/views.py index 259a073e7..7e9b0a7fe 100644 --- a/data/views.py +++ b/data/views.py @@ -24,7 +24,7 @@ def dispatch(self, request, *args, **kwargs): class SuggestionManagment(IsStaffMixin, FormView): form_class = SuggestionCohorteForm template_name = "data/dags_validations.html" - success_url = "/dags/validations" + success_url = "/data/suggestions" def form_valid(self, form): # MANAGE search and display suggestion_cohorte details