Skip to content

Commit

Permalink
Ajout de constantes partagées (#1085)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolok authored Dec 10, 2024
1 parent 50aff0d commit dbdb5e8
Show file tree
Hide file tree
Showing 24 changed files with 92 additions and 56 deletions.
5 changes: 3 additions & 2 deletions dags/ingest_validated_dataset_to_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.dates import days_ago
from shared.tasks.database_logic.db_manager import PostgresConnectionManager
from utils import dag_ingest_validated_utils, shared_constants
from sources.config import shared_constants as constants
from utils import dag_ingest_validated_utils

default_args = {
"owner": "airflow",
Expand All @@ -32,7 +33,7 @@ def _get_first_dagrun_to_insert():
hook = PostgresHook(postgres_conn_id="qfdmo_django_db")
# get first row from table qfdmo_dagrun with status TO_INSERT
row = hook.get_first(
f"SELECT * FROM qfdmo_dagrun WHERE status = '{shared_constants.TO_INSERT}'"
f"SELECT * FROM qfdmo_dagrun WHERE status = '{constants.DAGRUN_TOINSERT}'"
" LIMIT 1"
)
return row
Expand Down
20 changes: 20 additions & 0 deletions dags/sources/config/shared_constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# DagRun statuts
DAGRUN_TOVALIDATE = "TO_VALIDATE"
DAGRUN_TOINSERT = "TO_INSERT"
DAGRUN_REJECTED = "REJECTED"
DAGRUN_FINISHED = "FINISHED"

# Public accueilli
PUBLIC_PAR = "Particuliers"
PUBLIC_PRO = "Professionnels"
PUBLIC_PRO_ET_PAR = "Particuliers et professionnels"
PUBLIC_AUCUN = "Aucun"

# Acteur Statuts
ACTEUR_ACTIF = "ACTIF"
ACTEUR_INACTIF = "INACTIF"
ACTEUR_SUPPRIME = "SUPPRIME"

# Reprise
REPRISE_1POUR0 = "1 pour 0"
REPRISE_1POUR1 = "1 pour 1"
3 changes: 2 additions & 1 deletion dags/sources/dags/source_aliapur.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from airflow import DAG
from sources.config import shared_constants as constants
from sources.config.airflow_params import get_mapping_config
from sources.tasks.airflow_logic.operators import default_args, eo_task_chain

Expand Down Expand Up @@ -31,7 +32,7 @@
"donnees-eo-aliapur/lines?size=10000"
),
"columns_to_add_by_default": {
"statut": "ACTIF",
"statut": constants.ACTEUR_ACTIF,
},
"ignore_duplicates": False,
"validate_address_with_ban": False,
Expand Down
4 changes: 2 additions & 2 deletions dags/sources/dags/source_cma.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from airflow import DAG
from sources.config import shared_constants as constants
from sources.config.airflow_params import get_mapping_config
from sources.tasks.airflow_logic.operators import default_args, eo_task_chain
from utils import shared_constants as constants

with DAG(
dag_id="like-eo-from-api-cma",
Expand Down Expand Up @@ -39,7 +39,7 @@
"reparactor_hours": "horaires_description",
},
"columns_to_add_by_default": {
"statut": "ACTIF",
"statut": constants.ACTEUR_ACTIF,
"labels_etou_bonus": "reparacteur",
"acteur_type_id": "artisan, commerce indépendant",
"point_de_reparation": True,
Expand Down
3 changes: 2 additions & 1 deletion dags/sources/dags/source_corepile.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from airflow import DAG
from sources.config import shared_constants as constants
from sources.config.airflow_params import get_mapping_config
from sources.tasks.airflow_logic.operators import default_args, eo_task_chain

Expand Down Expand Up @@ -32,7 +33,7 @@
"donnees-eo-corepile/lines?size=10000"
),
"columns_to_add_by_default": {
"statut": "ACTIF",
"statut": constants.ACTEUR_ACTIF,
},
"ignore_duplicates": False,
"validate_address_with_ban": False,
Expand Down
3 changes: 2 additions & 1 deletion dags/sources/dags/source_ecodds.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from airflow import DAG
from sources.config import shared_constants as constants
from sources.config.airflow_params import get_mapping_config
from sources.tasks.airflow_logic.operators import default_args, eo_task_chain

Expand All @@ -24,7 +25,7 @@
"perimetre_dintervention",
],
"columns_to_add_by_default": {
"statut": "ACTIF",
"statut": constants.ACTEUR_ACTIF,
},
"endpoint": (
"https://data.pointsapport.ademe.fr/data-fair/api/v1/datasets/"
Expand Down
3 changes: 2 additions & 1 deletion dags/sources/dags/source_ecologic.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from airflow import DAG
from sources.config import shared_constants as constants
from sources.config.airflow_params import get_mapping_config
from sources.tasks.airflow_logic.operators import default_args, eo_task_chain

Expand All @@ -24,7 +25,7 @@
"donnees-eo-ecologic/lines?size=10000"
),
"columns_to_add_by_default": {
"statut": "ACTIF",
"statut": constants.ACTEUR_ACTIF,
},
"ignore_duplicates": False,
"validate_address_with_ban": False,
Expand Down
3 changes: 2 additions & 1 deletion dags/sources/dags/source_ecomaison.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from airflow import DAG
from sources.config import shared_constants as constants
from sources.config.airflow_params import get_mapping_config
from sources.tasks.airflow_logic.operators import default_args, eo_task_chain

Expand Down Expand Up @@ -29,7 +30,7 @@
"latitudewgs84": "latitude",
},
"columns_to_add_by_default": {
"statut": "ACTIF",
"statut": constants.ACTEUR_ACTIF,
},
"endpoint": (
"https://data.pointsapport.ademe.fr/data-fair/api/v1/datasets/"
Expand Down
3 changes: 2 additions & 1 deletion dags/sources/dags/source_ecosystem.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from airflow import DAG
from sources.config import shared_constants as constants
from sources.config.airflow_params import get_mapping_config
from sources.tasks.airflow_logic.operators import default_args, eo_task_chain

Expand Down Expand Up @@ -44,7 +45,7 @@
"siret",
],
"columns_to_add_by_default": {
"statut": "ACTIF",
"statut": constants.ACTEUR_ACTIF,
},
"endpoint": (
"https://data.pointsapport.ademe.fr/data-fair/api/v1/datasets/"
Expand Down
3 changes: 2 additions & 1 deletion dags/sources/dags/source_ocab.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from airflow import DAG
from sources.config import shared_constants as constants
from sources.config.airflow_params import get_mapping_config
from sources.tasks.airflow_logic.operators import default_args, eo_task_chain

Expand Down Expand Up @@ -27,7 +28,7 @@
"latitudewgs84": "latitude",
},
"columns_to_add_by_default": {
"statut": "ACTIF",
"statut": constants.ACTEUR_ACTIF,
},
"endpoint": (
"https://data.pointsapport.ademe.fr/data-fair/api/v1/datasets/"
Expand Down
3 changes: 2 additions & 1 deletion dags/sources/dags/source_ocad3e.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from airflow import DAG
from sources.config import shared_constants as constants
from sources.config.airflow_params import get_mapping_config
from sources.tasks.airflow_logic.operators import default_args, eo_task_chain

Expand Down Expand Up @@ -38,7 +39,7 @@
"adresse_format_ban": "adresse_format_ban",
},
"columns_to_add_by_default": {
"statut": "ACTIF",
"statut": constants.ACTEUR_ACTIF,
},
"endpoint": (
"https://data.pointsapport.ademe.fr/data-fair/api/v1/datasets/"
Expand Down
3 changes: 2 additions & 1 deletion dags/sources/dags/source_pyreo.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from airflow import DAG
from sources.config import shared_constants as constants
from sources.config.airflow_params import get_mapping_config
from sources.tasks.airflow_logic.operators import default_args, eo_task_chain

Expand Down Expand Up @@ -31,7 +32,7 @@
"magasin / franchise, enseigne commerciale / distributeur /"
" point de vente"
),
"statut": "ACTIF",
"statut": constants.ACTEUR_ACTIF,
},
"endpoint": (
"https://data.pointsapport.ademe.fr/data-fair/api/v1/datasets/"
Expand Down
3 changes: 2 additions & 1 deletion dags/sources/dags/source_refashion.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from airflow import DAG
from sources.config import shared_constants as constants
from sources.config.airflow_params import get_mapping_config
from sources.tasks.airflow_logic.operators import default_args, eo_task_chain

Expand Down Expand Up @@ -32,7 +33,7 @@
"consignes_dacces": "commentaires",
},
"columns_to_add_by_default": {
"statut": "ACTIF",
"statut": constants.ACTEUR_ACTIF,
},
"endpoint": (
"https://data.pointsapport.ademe.fr/data-fair/api/v1/datasets/"
Expand Down
3 changes: 2 additions & 1 deletion dags/sources/dags/source_soren.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from airflow import DAG
from sources.config import shared_constants as constants
from sources.config.airflow_params import get_mapping_config
from sources.tasks.airflow_logic.operators import default_args, eo_task_chain

Expand Down Expand Up @@ -27,7 +28,7 @@
"latitudewgs84": "latitude",
},
"columns_to_add_by_default": {
"statut": "ACTIF",
"statut": constants.ACTEUR_ACTIF,
},
"endpoint": (
"https://data.pointsapport.ademe.fr/data-fair/api/v1/datasets/"
Expand Down
3 changes: 2 additions & 1 deletion dags/sources/dags/source_valdelia.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from airflow import DAG
from sources.config import shared_constants as constants
from sources.config.airflow_params import get_mapping_config
from sources.tasks.airflow_logic.operators import default_args, eo_task_chain

Expand Down Expand Up @@ -43,7 +44,7 @@
"latitudewgs84": "latitude",
},
"columns_to_add_by_default": {
"statut": "ACTIF",
"statut": constants.ACTEUR_ACTIF,
},
"endpoint": (
"https://data.pointsapport.ademe.fr/data-fair/api/v1/datasets/"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

import pandas as pd
from sources.config import shared_constants as constants

logger = logging.getLogger(__name__)

Expand All @@ -11,7 +12,7 @@ def propose_acteur_to_delete(
):

df_acteurs_from_db_actifs = df_acteurs_from_db[
df_acteurs_from_db["statut"] == "ACTIF"
df_acteurs_from_db["statut"] == constants.ACTEUR_ACTIF
]

df_acteur_to_delete = df_acteurs_from_db_actifs[
Expand Down
32 changes: 16 additions & 16 deletions dags/sources/tasks/business_logic/source_data_normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pandas as pd
import requests
from shared.tasks.database_logic.db_manager import PostgresConnectionManager
from sources.config import shared_constants as constants
from sources.config.airflow_params import TRANSFORMATION_MAPPING
from sources.tasks.transform.transform_column import (
cast_eo_boolean_or_string_to_boolean,
Expand All @@ -15,7 +16,6 @@
from tenacity import retry, stop_after_attempt, wait_fixed
from utils import logging_utils as log
from utils import mapping_utils
from utils import shared_constants as constants
from utils.base_utils import extract_details, get_address

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -132,26 +132,26 @@ def source_data_normalize(
if "statut" in df.columns:
df["statut"] = df["statut"].map(
{
1: "ACTIF",
0: "SUPPRIME",
"ACTIF": "ACTIF",
"INACTIF": "INACTIF",
"SUPPRIME": "SUPPRIME",
1: constants.ACTEUR_ACTIF,
0: constants.ACTEUR_SUPPRIME,
constants.ACTEUR_ACTIF: constants.ACTEUR_ACTIF,
"INACTIF": constants.ACTEUR_INACTIF,
"SUPPRIME": constants.ACTEUR_SUPPRIME,
}
)
df["statut"] = df["statut"].fillna("ACTIF")
df["statut"] = df["statut"].fillna(constants.ACTEUR_ACTIF)
else:
df["statut"] = "ACTIF"
df["statut"] = constants.ACTEUR_ACTIF

if "public_accueilli" in df.columns:

df["public_accueilli"] = mapping_try_or_fallback_column_value(
df["public_accueilli"],
{
"particuliers et professionnels": ("Particuliers et professionnels"),
"professionnels": "Professionnels",
"particuliers": "Particuliers",
"aucun": "Aucun",
"particuliers et professionnels": constants.PUBLIC_PRO_ET_PAR,
"professionnels": constants.PUBLIC_PRO,
"particuliers": constants.PUBLIC_PAR,
"aucun": constants.PUBLIC_AUCUN,
},
)

Expand All @@ -168,10 +168,10 @@ def source_data_normalize(
df["reprise"] = mapping_try_or_fallback_column_value(
df["reprise"],
{
"1 pour 0": "1 pour 0",
"1 pour 1": "1 pour 1",
"non": "1 pour 0",
"oui": "1 pour 1",
"1 pour 0": constants.REPRISE_1POUR0,
"1 pour 1": constants.REPRISE_1POUR1,
"non": constants.REPRISE_1POUR0,
"oui": constants.REPRISE_1POUR1,
},
)

Expand Down
4 changes: 2 additions & 2 deletions dags/utils/dag_eo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datetime import datetime

from shared.tasks.database_logic.db_manager import PostgresConnectionManager
from utils import shared_constants as constants
from sources.config import shared_constants as constants

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -37,7 +37,7 @@ def insert_dagrun_and_process_df(df_acteur_updates, metadata, dag_name, run_name
# Insert dag_run_change
df_acteur_updates["change_type"] = df_acteur_updates["event"]
df_acteur_updates["dag_run_id"] = dag_run_id
df_acteur_updates["status"] = constants.TO_VALIDATE
df_acteur_updates["status"] = constants.DAGRUN_TOVALIDATE
df_acteur_updates[["row_updates", "dag_run_id", "change_type", "status"]].to_sql(
"qfdmo_dagrunchange",
engine,
Expand Down
9 changes: 6 additions & 3 deletions dags/utils/dag_ingest_validated_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
from datetime import datetime

import pandas as pd
from utils import base_utils, mapping_utils, shared_constants
from sources.config import shared_constants
from utils import base_utils, mapping_utils

logging.basicConfig(level=logging.INFO)

Expand Down Expand Up @@ -75,7 +76,7 @@ def handle_update_actor_event(df_actors, dag_run_id):
]

current_time = datetime.now().astimezone().isoformat(timespec="microseconds")
df_actors = df_actors[df_actors["status"] == shared_constants.TO_INSERT]
df_actors = df_actors[df_actors["status"] == shared_constants.DAGRUN_TOINSERT]
df_actors = df_actors.apply(mapping_utils.replace_with_selected_candidat, axis=1)
df_actors[["adresse", "code_postal", "ville"]] = df_actors.apply(
lambda row: base_utils.extract_details(row, col="adresse_candidat"), axis=1
Expand Down Expand Up @@ -310,6 +311,8 @@ def handle_write_data_update_actor_event(connection, df_actors):
)


def update_dag_run_status(connection, dag_run_id, statut=shared_constants.FINISHED):
def update_dag_run_status(
connection, dag_run_id, statut=shared_constants.DAGRUN_FINISHED
):
query = f"UPDATE qfdmo_dagrun SET status = '{statut}' WHERE id = {dag_run_id}"
connection.execute(query)
3 changes: 2 additions & 1 deletion dags/utils/mapping_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Any

import pandas as pd
from sources.config import shared_constants as constants
from utils.formatter import format_libelle_to_code


Expand Down Expand Up @@ -190,7 +191,7 @@ def replace_with_selected_candidat(row):
best_candidat_index = row.get("best_candidat_index", None)
if best_candidat_index is not None and pd.notna(best_candidat_index):
selected_candidat = ae_result[int(best_candidat_index) - 1]
row["statut"] = "ACTIF"
row["statut"] = constants.ACTEUR_ACTIF
row["adresse_candidat"] = selected_candidat.get("adresse_candidat")
row["siret"] = selected_candidat.get("siret_candidat")
row["nom"] = selected_candidat.get("nom_candidat")
Expand Down
9 changes: 0 additions & 9 deletions dags/utils/shared_constants.py

This file was deleted.

Loading

0 comments on commit dbdb5e8

Please sign in to comment.