From ac74dd8ac43a112be2fdd8641dd44dec9c621099 Mon Sep 17 00:00:00 2001 From: Nicolas Oudard Date: Thu, 2 Jan 2025 11:37:03 +0100 Subject: [PATCH] simplification et proposition de modification --- Makefile | 6 +++ dags/ingest_validated_dataset_to_db.py | 29 ++++--------- docs/reference/303-systeme-de-suggestions.md | 44 ++++++++++++++++++++ 3 files changed, 57 insertions(+), 22 deletions(-) create mode 100644 docs/reference/303-systeme-de-suggestions.md diff --git a/Makefile b/Makefile index 30c094a2d..7cd398dfc 100644 --- a/Makefile +++ b/Makefile @@ -62,6 +62,12 @@ run-django: rm -rf .parcel-cache honcho start -f Procfile.dev +run-all: + docker compose --profile airflow up -d + rm -rf .parcel-cache + .venv/bin/python manage.py runserver 0.0.0.0:8000 + npm run watch + # Local django operations .PHONY: migrate migrate: diff --git a/dags/ingest_validated_dataset_to_db.py b/dags/ingest_validated_dataset_to_db.py index d4a61e029..e8cf9a80b 100755 --- a/dags/ingest_validated_dataset_to_db.py +++ b/dags/ingest_validated_dataset_to_db.py @@ -2,7 +2,7 @@ import pandas as pd from airflow.models import DAG -from airflow.operators.python import BranchPythonOperator, PythonOperator +from airflow.operators.python import PythonOperator, ShortCircuitOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.providers.postgres.hooks.postgres import PostgresHook from airflow.utils.dates import days_ago @@ -39,14 +39,10 @@ def _get_first_dagrun_to_insert(): return row -def check_for_validation(**kwargs): +def check_suggestion_to_process(**kwargs): # get first row from table qfdmo_dagrun with status TO_INSERT row = _get_first_dagrun_to_insert() - - # Skip if row is None - if row is None: - return "skip_processing" - return "fetch_and_parse_data" + return bool(row) def fetch_and_parse_data(**context): @@ -125,19 +121,9 @@ def write_data_to_postgres(**kwargs): ) -def skip_processing(**kwargs): - print("No records to validate. DAG run completes successfully.") - - -skip_processing_task = PythonOperator( - task_id="skip_processing", - python_callable=skip_processing, - dag=dag, -) - -branch_task = BranchPythonOperator( - task_id="branch_processing", - python_callable=check_for_validation, +check_suggestion_to_process_task = ShortCircuitOperator( + task_id="check_suggestion_to_process", + python_callable=check_suggestion_to_process, dag=dag, ) @@ -153,9 +139,8 @@ def skip_processing(**kwargs): dag=dag, ) -branch_task >> skip_processing_task ( - branch_task + check_suggestion_to_process_task >> fetch_parse_task >> write_to_postgres_task >> trigger_create_final_actors_dag diff --git a/docs/reference/303-systeme-de-suggestions.md b/docs/reference/303-systeme-de-suggestions.md new file mode 100644 index 000000000..e6aa76f91 --- /dev/null +++ b/docs/reference/303-systeme-de-suggestions.md @@ -0,0 +1,44 @@ +# Système de suggestion + +**Statut : ❓ À approuver** + +Cette proposition de modification de l'architecture pour faire évoluer le système de suggestion est un travail itératif. Il est donc nessaire de garder en tête la cibe et le moyen d'y aller. + +## Existant et problématique + +il existe les tables `dagrun` et `dagrunchange`: + +- `dagrun` représente un ensemble de suggestions produit par l'execution d'un DAG airflow +- `dagrinchange` représente la suggestion de modification pour une ligne donnée + +On a quelques problème de lisibilité des ces tables: + +- les types des évenements sont imprécis et utilisé pour plusieurs propos, par exemple, `UPDATE_ACTOR` est utilisé pour des propositions de siretisation et de suppression de acteurs lors de l'ingestion de la source +- les types des évenements sont définis au niveau de chaque ligne, pour connaitre le type de +- si une ligne est problématique, aucune ligne n'est mise à jour +- on n'à pas de vu sur les DAG qui on réussi ou se sont terminés en erreur + +## Proposition d'amélioration + +### Base de données + +- Renommage des tables : `dagrun` -> `suggestion_cohorte` , `dagrunchange` -> `suggestion_ligne` +- Écrire les champs en français comme le reste des tables de l'application +- Revu des statuts de `suggestion_cohorte` : à traiter, en cours de traitement, fini avec succès, fini avec succès partiel, fini en erreur +- Ajout d'un type d'évenement à `suggestion_cohorte` : source, enrichissement +- Ajout d'un sous-type d'évenement à `suggestion_cohorte` : source - ajout acteur, source - suppression acteur, source - modification acteur, enrichissement - déménagement… + +### Interface + +Si possible, utiliser l'interface d'administration de Django pour gérer les suggestions (cela devrait bien fonctionner au mons pour la partie `ingestion des sources`). + +- Division des interfaces de validation : + - `ingestion des sources` : nouvelles sources ou nouvelle version d'une source existante + - `enrichissements` : fermetures, démenagements, enrichissement avec annuaire-entrprise, l'API BAN ou d'autres API +- Ajout de filtre sur le statut (à traiter est sélectionné par défaut) +- Ajout de la pagination +- permettre de cocher les suggestions et d'executer une action our l'ensemble + +### Pipeline + +- Le DAG de validation des cohortes doit intégrer la même architecture que les autres DAGs