Skip to content

Commit

Permalink
simplification et proposition de modification
Browse files Browse the repository at this point in the history
  • Loading branch information
kolok committed Jan 2, 2025
1 parent 35df905 commit a908b79
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 22 deletions.
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ run-django:
rm -rf .parcel-cache
honcho start -f Procfile.dev

run-all:
docker compose --profile airflow up -d
rm -rf .parcel-cache
.venv/bin/python manage.py runserver 0.0.0.0:8000
npm run watch

# Local django operations
.PHONY: migrate
migrate:
Expand Down
29 changes: 7 additions & 22 deletions dags/ingest_validated_dataset_to_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pandas as pd
from airflow.models import DAG
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.python import PythonOperator, ShortCircuitOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.dates import days_ago
Expand Down Expand Up @@ -39,14 +39,10 @@ def _get_first_dagrun_to_insert():
return row


def check_for_validation(**kwargs):
def check_suggestion_to_process(**kwargs):
# get first row from table qfdmo_dagrun with status TO_INSERT
row = _get_first_dagrun_to_insert()

# Skip if row is None
if row is None:
return "skip_processing"
return "fetch_and_parse_data"
return bool(row)


def fetch_and_parse_data(**context):
Expand Down Expand Up @@ -125,19 +121,9 @@ def write_data_to_postgres(**kwargs):
)


def skip_processing(**kwargs):
print("No records to validate. DAG run completes successfully.")


skip_processing_task = PythonOperator(
task_id="skip_processing",
python_callable=skip_processing,
dag=dag,
)

branch_task = BranchPythonOperator(
task_id="branch_processing",
python_callable=check_for_validation,
check_suggestion_to_process_task = ShortCircuitOperator(
task_id="check_suggestion_to_process",
python_callable=check_suggestion_to_process,
dag=dag,
)

Expand All @@ -153,9 +139,8 @@ def skip_processing(**kwargs):
dag=dag,
)

branch_task >> skip_processing_task
(
branch_task
check_suggestion_to_process_task
>> fetch_parse_task
>> write_to_postgres_task
>> trigger_create_final_actors_dag
Expand Down
44 changes: 44 additions & 0 deletions docs/reference/303-systeme-de-suggestions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Système de suggestion

**Statut : ❓ À approuver**

Cette proposition de modification de l'architecture pour faire évoluer le système de suggestion est un travail itératif. Il est donc nessaire de garder en tête la cibe et le moyen d'y aller.

## Existant et problématique

il existe les tables `dagrun` et `dagrunchange`:

- `dagrun` représente un ensemble de suggestions produit par l'execution d'un DAG airflow
- `dagrinchange` représente la suggestion de modification pour une ligne donnée

On a quelques problème de lisibilité des ces tables:

- les types des évenements sont imprécis et utilisé pour plusieurs propos, par exemple, `UPDATE_ACTOR` est utilisé pour des propositions de siretisation et de suppression de acteurs lors de l'ingestion de la source
- les types des évenements sont définis au niveau de chaque ligne, pour connaitre le type de
- si une ligne est problématique, aucune ligne n'est mise à jour
- on n'à pas de vu sur les DAG qui on réussi ou se sont terminés en erreur

## Proposition d'amélioration

### Base de données

- Renommage des tables : `dagrun` -> `suggestion_cohorte` , `dagrunchange` -> `suggestion_ligne`
- Écrire les champs en français comme le reste des tables de l'application
- Revu des statuts de `suggestion_cohorte` : à traiter, en cours de traitement, fini avec succès, fini avec succès partiel, fini en erreur
- Ajout d'un type d'évenement à `suggestion_cohorte` : source, enrichissement
- Ajout d'un sous-type d'évenement à `suggestion_cohorte` : source - ajout acteur, source - suppression acteur, source - modification acteur, enrichissement - déménagement…

### Interface

Si possible, utiliser l'interface d'administration de Django pour gérer les suggestions (cela devrait bien fonctionner au mons pour la partie `ingestion des sources`).

- Division des interfaces de validation :
- `ingestion des sources` : nouvelles sources ou nouvelle version d'une source existante
- `enrichissements` : fermetures, démenagements, enrichissement avec annuaire-entrprise, l'API BAN ou d'autres API
- Ajout de filtre sur le statut (à traiter est sélectionné par défaut)
- Ajout de la pagination
- permettre de cocher les suggestions et d'executer une action our l'ensemble

### Pipeline

- Le DAG de validation des cohortes doit intégrer la même architecture que les autres DAGs

0 comments on commit a908b79

Please sign in to comment.