diff --git a/dags/annuaire_entreprise_checks.py b/dags/annuaire_entreprise_checks.py index 8152b8757..2a878c927 100755 --- a/dags/annuaire_entreprise_checks.py +++ b/dags/annuaire_entreprise_checks.py @@ -8,8 +8,8 @@ from airflow import DAG from airflow.models.param import Param from airflow.operators.python import PythonOperator -from airflow.providers.postgres.hooks.postgres import PostgresHook from shared.tasks.airflow_logic.write_data_task import write_data_task +from shared.tasks.database_logic.db_manager import PostgresConnectionManager pd.set_option("display.max_columns", None) @@ -46,8 +46,7 @@ def fetch_and_parse_data(**context): limit = context["params"]["limit"] - pg_hook = PostgresHook(postgres_conn_id="qfdmo_django_db") - engine = pg_hook.get_sqlalchemy_engine() + engine = PostgresConnectionManager().engine active_actors_query = """ SELECT da.*, diff --git a/dags/create_final_actors.py b/dags/create_final_actors.py index 105ab3bf0..ebe1f5d4e 100755 --- a/dags/create_final_actors.py +++ b/dags/create_final_actors.py @@ -4,7 +4,7 @@ import shortuuid from airflow import DAG from airflow.operators.python import PythonOperator -from airflow.providers.postgres.hooks.postgres import PostgresHook +from shared.tasks.database_logic.db_manager import PostgresConnectionManager from utils.db_tasks import read_data_from_postgres @@ -218,8 +218,7 @@ def write_data_to_postgres(**kwargs): inplace=True, ) - pg_hook = PostgresHook(postgres_conn_id="qfdmo_django_db") - engine = pg_hook.get_sqlalchemy_engine() + engine = PostgresConnectionManager().engine original_table_name_actor = "qfdmo_displayedacteur" temp_table_name_actor = "qfdmo_displayedacteurtemp" diff --git a/dags/ingest_validated_dataset_to_db.py b/dags/ingest_validated_dataset_to_db.py index cd59edad0..b7887d95b 100755 --- a/dags/ingest_validated_dataset_to_db.py +++ b/dags/ingest_validated_dataset_to_db.py @@ -2,10 +2,11 @@ import pandas as pd from airflow.models import DAG -from airflow.operators.dagrun_operator import TriggerDagRunOperator -from airflow.operators.python_operator import BranchPythonOperator, PythonOperator +from airflow.operators.python import BranchPythonOperator, PythonOperator +from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.providers.postgres.hooks.postgres import PostgresHook from airflow.utils.dates import days_ago +from shared.tasks.database_logic.db_manager import PostgresConnectionManager from utils import dag_ingest_validated_utils, shared_constants default_args = { @@ -51,8 +52,7 @@ def fetch_and_parse_data(**context): row = _get_first_dagrun_to_insert() dag_run_id = row[0] - pg_hook = PostgresHook(postgres_conn_id="qfdmo_django_db") - engine = pg_hook.get_sqlalchemy_engine() + engine = PostgresConnectionManager().engine df_sql = pd.read_sql_query( f"SELECT * FROM qfdmo_dagrunchange WHERE dag_run_id = '{dag_run_id}'", @@ -91,8 +91,7 @@ def write_data_to_postgres(**kwargs): data_dict = kwargs["ti"].xcom_pull(task_ids="fetch_and_parse_data") # If data_set is empty, nothing to do dag_run_id = data_dict["dag_run_id"] - pg_hook = PostgresHook(postgres_conn_id="qfdmo_django_db") - engine = pg_hook.get_sqlalchemy_engine() + engine = PostgresConnectionManager().engine if "actors" not in data_dict: with engine.begin() as connection: dag_ingest_validated_utils.update_dag_run_status(connection, dag_run_id) diff --git a/dags/shared/tasks/database_logic/db_manager.py b/dags/shared/tasks/database_logic/db_manager.py new file mode 100644 index 000000000..6a21874b0 --- /dev/null +++ b/dags/shared/tasks/database_logic/db_manager.py @@ -0,0 +1,27 @@ +from airflow.providers.postgres.hooks.postgres import PostgresHook +from sqlalchemy.engine import Engine + + +class PostgresConnectionManager: + """ + Singleton class to manage the connection to the Postgres database. + use the connecter qfdmo_django_db by default + this connecter is set by using env variable AIRFLOW_CONN_QFDMO_DJANGO_DB + """ + + _instance = None + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super(PostgresConnectionManager, cls).__new__(cls) + return cls._instance + + def __init__(self, postgres_conn_id="qfdmo_django_db"): + if not hasattr(self, "initialized"): # Pour éviter la réinitialisation + self.postgres_conn_id = postgres_conn_id + self.engine = self._create_engine() + self.initialized = True + + def _create_engine(self) -> Engine: + pg_hook = PostgresHook(postgres_conn_id=self.postgres_conn_id) + return pg_hook.get_sqlalchemy_engine() diff --git a/dags/sources/config/airflow_params.py b/dags/sources/config/airflow_params.py index 263f43cae..e2a241f8e 100644 --- a/dags/sources/config/airflow_params.py +++ b/dags/sources/config/airflow_params.py @@ -6,6 +6,7 @@ clean_siren, clean_siret, convert_opening_hours, + strip_string, ) PATH_NOMENCLARURE_DECHET = ( @@ -21,6 +22,7 @@ "convert_opening_hours": convert_opening_hours, "clean_siren": clean_siren, "clean_siret": clean_siret, + "strip_string": strip_string, } diff --git a/dags/sources/config/db_mapping.json b/dags/sources/config/db_mapping.json index cf131c73d..0f834acad 100755 --- a/dags/sources/config/db_mapping.json +++ b/dags/sources/config/db_mapping.json @@ -29,6 +29,7 @@ "literie": "Literie - Éléments d'ameublement", "machines et appareils motorisés thermiques": "Machines et appareils motorises thermiques", "matériels de bricolage, dont l’outillage à main": "outil de bricolage et jardinage", + "médicaments & dasri": "medicaments", "mélange d’inertes": "Mélange d’inertes (produits et matériaux de construction du bâtiment)", "membranes bitumineuses": "Membranes bitumineuses - PMCB (produits et matériaux de construction du bâtiment)", "meubles": "meuble", diff --git a/dags/sources/dags/source_pharmacies.py b/dags/sources/dags/source_pharmacies.py new file mode 100755 index 000000000..2857d8fdc --- /dev/null +++ b/dags/sources/dags/source_pharmacies.py @@ -0,0 +1,56 @@ +from airflow import DAG +from sources.config.airflow_params import get_mapping_config +from sources.tasks.airflow_logic.operators import default_args, eo_task_chain + +with DAG( + dag_id="eo-pharmacies", + dag_display_name="Source - PHARMACIES", + default_args=default_args, + description=("Téléchargement des pharmacies (Ordre National Des Pharmaciens)"), + params={ + "column_transformations": [ + { + "origin": "Raison sociale", + "transformation": "strip_string", + "destination": "nom", + }, + { + "origin": "Dénomination commerciale", + "transformation": "strip_string", + "destination": "nom_commercial", + }, + { + "origin": "Adresse", + "transformation": "strip_string", + "destination": "adresse", + }, + { + "origin": "Code postal", + "transformation": "strip_string", + "destination": "code_postal", + }, + { + "origin": "Commune", + "transformation": "strip_string", + "destination": "ville", + }, + ], + "column_mapping": { + "Numéro d'établissement": "identifiant_externe", + "Téléphone": "telephone", + }, + "endpoint": "https://www.ordre.pharmacien.fr/download/annuaire_csv.zip", + "columns_to_add_by_default": { + "statut": "ACTIF", + "uniquement_sur_rdv": "non", + "public_accueilli": "Particuliers", + "produitsdechets_acceptes": "Médicaments & DASRI", + "acteur_type_id": "pharmacie", + "point_de_collecte_ou_de_reprise_des_dechets": True, + }, + "source_code": "ordredespharmaciens", + "product_mapping": get_mapping_config(), + }, + schedule=None, +) as dag: + eo_task_chain(dag) diff --git a/dags/sources/tasks/airflow_logic/db_read_propositions_max_id_task.py b/dags/sources/tasks/airflow_logic/db_read_propositions_max_id_task.py index a83c5b0f5..77c9d7885 100644 --- a/dags/sources/tasks/airflow_logic/db_read_propositions_max_id_task.py +++ b/dags/sources/tasks/airflow_logic/db_read_propositions_max_id_task.py @@ -1,6 +1,6 @@ from airflow import DAG from airflow.operators.python import PythonOperator -from airflow.providers.postgres.hooks.postgres import PostgresHook +from shared.tasks.database_logic.db_manager import PostgresConnectionManager from sqlalchemy import text @@ -14,8 +14,7 @@ def db_read_propositions_max_id_task(dag: DAG) -> PythonOperator: def db_read_propositions_max_id(): - pg_hook = PostgresHook(postgres_conn_id="qfdmo_django_db") - engine = pg_hook.get_sqlalchemy_engine() + engine = PostgresConnectionManager().engine # TODO : check if we need to manage the max id here displayedpropositionservice_max_id = engine.execute( diff --git a/dags/sources/tasks/airflow_logic/source_config_validate_task.py b/dags/sources/tasks/airflow_logic/source_config_validate_task.py index c0909b300..4cc9c39a7 100644 --- a/dags/sources/tasks/airflow_logic/source_config_validate_task.py +++ b/dags/sources/tasks/airflow_logic/source_config_validate_task.py @@ -3,8 +3,8 @@ import pandas as pd from airflow import DAG from airflow.operators.python import PythonOperator -from airflow.providers.postgres.hooks.postgres import PostgresHook from airflow.utils.trigger_rule import TriggerRule +from shared.tasks.database_logic.db_manager import PostgresConnectionManager from sources.tasks.business_logic.source_config_validate import source_config_validate from utils import logging_utils as log @@ -22,8 +22,9 @@ def source_config_validate_task(dag: DAG) -> PythonOperator: def source_config_validate_wrapper(**kwargs) -> None: + engine = PostgresConnectionManager().engine params = kwargs["params"] - engine = PostgresHook(postgres_conn_id="qfdmo_django_db").get_sqlalchemy_engine() + codes_sc_db = set( pd.read_sql_table("qfdmo_souscategorieobjet", engine, columns=["code"])[ "code" diff --git a/dags/sources/tasks/airflow_logic/source_data_download_task.py b/dags/sources/tasks/airflow_logic/source_data_download_task.py index 98d580df1..26ac34725 100644 --- a/dags/sources/tasks/airflow_logic/source_data_download_task.py +++ b/dags/sources/tasks/airflow_logic/source_data_download_task.py @@ -22,8 +22,8 @@ def source_data_download_task(dag: DAG) -> PythonOperator: def source_data_download_wrapper(**kwargs) -> pd.DataFrame: params = kwargs["params"] - api_url = params["endpoint"] + endpoint = params["endpoint"] - log.preview("API end point", api_url) + log.preview("API end point", endpoint) - return source_data_download(api_url=api_url) + return source_data_download(endpoint=endpoint) diff --git a/dags/sources/tasks/business_logic/db_read_acteur.py b/dags/sources/tasks/business_logic/db_read_acteur.py index 9ec92a331..be9c74cb2 100644 --- a/dags/sources/tasks/business_logic/db_read_acteur.py +++ b/dags/sources/tasks/business_logic/db_read_acteur.py @@ -1,7 +1,7 @@ import logging import pandas as pd -from airflow.providers.postgres.hooks.postgres import PostgresHook +from shared.tasks.database_logic.db_manager import PostgresConnectionManager from utils import logging_utils as log logger = logging.getLogger(__name__) @@ -14,8 +14,7 @@ def db_read_acteur( raise ValueError( "La colonne source_id est requise dans la dataframe normalisée" ) - pg_hook = PostgresHook(postgres_conn_id="qfdmo_django_db") - engine = pg_hook.get_sqlalchemy_engine() + engine = PostgresConnectionManager().engine unique_source_ids = df_normalized["source_id"].unique() joined_source_ids = ",".join([f"'{source_id}'" for source_id in unique_source_ids]) diff --git a/dags/sources/tasks/business_logic/source_data_download.py b/dags/sources/tasks/business_logic/source_data_download.py index e727dfb5b..70d3c6c24 100755 --- a/dags/sources/tasks/business_logic/source_data_download.py +++ b/dags/sources/tasks/business_logic/source_data_download.py @@ -1,24 +1,104 @@ import logging +import tempfile +import zipfile +from pathlib import Path import numpy as np import pandas as pd -from utils import api_utils +import requests from utils import logging_utils as log logger = logging.getLogger(__name__) -def source_data_download(api_url: str) -> pd.DataFrame: +def source_data_download(endpoint: str) -> pd.DataFrame: """Téléchargement de la données source sans lui apporter de modification""" logger.info("Téléchargement données de l'API : début...") # TODO: changer de logique, plutôt que de tout charger en mémoire et se # trimballer des dataframes en XCOM, on devrait plutôt streamer les données # directement dans la base de données et déléguer le traitement à la DB # tant que possible - data = api_utils.fetch_data_from_url(api_url) + data = fetch_data_from_endpoint(endpoint) logger.info("Téléchargement données de l'API : ✅ succès.") df = pd.DataFrame(data).replace({pd.NA: None, np.nan: None}) if df.empty: raise ValueError("Aucune donnée reçue de l'API") log.preview("df retournée par la tâche", df) return df + + +def fetch_data_from_endpoint(endpoint): + if "pointsapport.ademe.fr" in endpoint or "data.ademe.fr" in endpoint: + return fetch_dataset_from_point_apport(endpoint) + elif "artisanat.fr" in endpoint: + return fetch_dataset_from_artisanat(endpoint) + elif "ordre.pharmacien.fr" in endpoint: + return fetch_dataset_from_pharmacies(endpoint) + # Le but de nos intégrations API est de récupérer des données. + # Si on ne récupére pas de données, on sait qu'on à un problème, + # et donc il faut échouer explicitement au plus tôt + raise NotImplementedError(f"Pas de fonction de récupération pour l'url {endpoint}") + + +def fetch_dataset_from_point_apport(url): + all_data = [] + while url: + logger.info(f"Récupération de données pour {url}") + response = requests.get(url, timeout=60) + response.raise_for_status() + data = response.json() + logger.info("Nombre de lignes récupérées: " + str(len(data["results"]))) + all_data.extend(data["results"]) + url = data.get("next", None) + logger.info("Plus d'URL à parcourir") + logger.info("Nombre total de lignes récupérées: " + str(len(all_data))) + return all_data + + +def fetch_dataset_from_artisanat(base_url): + all_data = [] + offset = 0 + total_records = requests.get(base_url, params={"limit": 1, "offset": 0}).json()[ + "total_count" + ] + records_per_request = 100 + params = {"limit": records_per_request, "offset": 0} + while offset < total_records: + params.update({"offset": offset}) + response = requests.get(base_url, params=params) + if response.status_code == 200: + data = response.json() + all_data.extend(data["results"]) + offset += records_per_request + else: + response.raise_for_status() + + return all_data + + +def fetch_dataset_from_pharmacies(endpoint): + with tempfile.TemporaryDirectory() as temp_dir: + zip_file = _download_file(endpoint, temp_dir) + unzip_files = _extract_zip(zip_file, temp_dir) + etablissements_file = [f for f in unzip_files if "etablissements" in f][0] + df_etablissements = _read_csv(Path(temp_dir) / etablissements_file) + return df_etablissements + + +def _download_file(url, dest_folder="."): + local_filename = Path(dest_folder) / url.split("/")[-1] + with requests.get(url) as r: + with open(local_filename, "wb") as f: + f.write(r.content) + return local_filename + + +def _extract_zip(zip_file, dest_folder="."): + with zipfile.ZipFile(zip_file, "r") as zip_ref: + zip_ref.extractall(dest_folder) + return zip_ref.namelist() + + +def _read_csv(csv_file): + df = pd.read_csv(csv_file, sep=";", encoding="utf-16-le", on_bad_lines="warn") + return df diff --git a/dags/sources/tasks/business_logic/source_data_normalize.py b/dags/sources/tasks/business_logic/source_data_normalize.py index 499a9d596..2d6b27e97 100755 --- a/dags/sources/tasks/business_logic/source_data_normalize.py +++ b/dags/sources/tasks/business_logic/source_data_normalize.py @@ -1,13 +1,18 @@ +import json import logging from typing import List import pandas as pd +import requests +from shared.tasks.database_logic.db_manager import PostgresConnectionManager from sources.config.airflow_params import TRANSFORMATION_MAPPING from sources.tasks.transform.transform_column import ( cast_eo_boolean_or_string_to_boolean, mapping_try_or_fallback_column_value, ) from sources.tasks.transform.transform_df import merge_duplicates +from sqlalchemy import text +from tenacity import retry, stop_after_attempt, wait_fixed from utils import logging_utils as log from utils import mapping_utils from utils import shared_constants as constants @@ -178,6 +183,9 @@ def source_data_normalize( df["url"] = df["url"].map(mapping_utils.prefix_url) # Etapes de normalisation spécifiques aux sources + if source_code == "ordredespharmaciens": + df = df_normalize_pharmacie(df) + if source_code == "ADEME_SINOE_Decheteries": df = df_normalize_sinoe( df, @@ -222,6 +230,20 @@ def source_data_normalize( return df +def df_normalize_pharmacie(df: pd.DataFrame) -> pd.DataFrame: + # controle des adresses et localisation des pharmacies + df = df.apply(enrich_from_ban_api, axis=1) + # On supprime les pharmacies sans localisation + nb_pharmacies_sans_loc = len(df[(df["latitude"] == 0) | (df["longitude"] == 0)]) + nb_pharmacies = len(df) + logger.warning( + f"Nombre de pharmacies sans localisation: {nb_pharmacies_sans_loc}" + f" / {nb_pharmacies}" + ) + df = df[(df["latitude"] != 0) & (df["longitude"] != 0)] + return df + + def df_normalize_sinoe( df: pd.DataFrame, product_mapping: dict, @@ -290,3 +312,75 @@ def df_normalize_sinoe( raise ValueError(f"Sous-catégories invalides: {souscats_invalid}") return df + + +@retry(wait=wait_fixed(5), stop=stop_after_attempt(5)) +def enrich_from_ban_api(row: pd.Series) -> pd.Series: + engine = PostgresConnectionManager().engine + + adresse = row["adresse"] if row["adresse"] else "" + code_postal = row["code_postal"] if row["code_postal"] else "" + ville = row["ville"] if row["ville"] else "" + + ban_cache_row = engine.execute( + text( + "SELECT * FROM qfdmo_bancache WHERE adresse = :adresse and code_postal = " + ":code_postal and ville = :ville and modifie_le > now() - interval '30 day'" + " order by modifie_le desc limit 1" + ), + adresse=adresse, + code_postal=str(code_postal), + ville=ville, + ).fetchone() + + if ban_cache_row: + result = ban_cache_row["ban_returned"] + else: + ban_adresse = _compute_ban_adresse(row) + url = "https://api-adresse.data.gouv.fr/search/" + r = requests.get(url, params={"q": ban_adresse}) + if r.status_code != 200: + raise Exception(f"Failed to get data from API: {r.status_code}") + result = r.json() + engine.execute( + text( + "INSERT INTO qfdmo_bancache" + " (adresse, code_postal, ville, ban_returned, modifie_le)" + " VALUES (:adresse, :code_postal, :ville, :result, NOW())" + ), + adresse=adresse, + code_postal=code_postal, + ville=ville, + result=json.dumps(result), + ) + + better_result = None + better_geo = None + if "features" in result and result["features"]: + better_geo = result["features"][0]["geometry"]["coordinates"] + better_result = result["features"][0]["properties"] + if better_geo and better_result and better_result["score"] > 0.5: + better_postcode = ( + better_result["postcode"] + if "postcode" in better_result + else row["code_postal"] + ) + better_city = better_result["city"] if "city" in better_result else row["ville"] + better_adresse = ( + better_result["name"] if "name" in better_result else row["adresse"] + ) + row["longitude"] = better_geo[0] + row["latitude"] = better_geo[1] + row["adresse"] = better_adresse + row["code_postal"] = better_postcode + row["ville"] = better_city + else: + row["longitude"] = 0 + row["latitude"] = 0 + return row + + +def _compute_ban_adresse(row): + ban_adresse = [row["adresse"], row["code_postal"], row["ville"]] + ban_adresse = [str(x) for x in ban_adresse if x] + return " ".join(ban_adresse) diff --git a/dags/sources/tasks/transform/transform_column.py b/dags/sources/tasks/transform/transform_column.py index 40b81ed66..bdcdcd1ad 100644 --- a/dags/sources/tasks/transform/transform_column.py +++ b/dags/sources/tasks/transform/transform_column.py @@ -54,7 +54,7 @@ def process_entry(entry): hours_translated = process_schedule(hours) return f"du {day_range} {hours_translated}" - if not opening_hours or pd.isna(opening_hours): + if pd.isna(opening_hours) or not opening_hours: return "" return process_entry(opening_hours) @@ -92,3 +92,7 @@ def clean_number(number: Any) -> str: # suppression de tous les caractères autre que digital number = re.sub(r"[^\d+]", "", number) return number + + +def strip_string(value: str | None) -> str: + return str(value).strip() if not pd.isna(value) and value else "" diff --git a/dags/utils/api_utils.py b/dags/utils/api_utils.py index a3d986673..a8a66b19f 100755 --- a/dags/utils/api_utils.py +++ b/dags/utils/api_utils.py @@ -7,53 +7,6 @@ logger = logging.getLogger(__name__) -def fetch_dataset_from_point_apport(url): - all_data = [] - while url: - logger.info(f"Récupération de données pour {url}") - response = requests.get(url, timeout=60) - response.raise_for_status() - data = response.json() - logger.info("Nombre de lignes récupérées: " + str(len(data["results"]))) - all_data.extend(data["results"]) - url = data.get("next", None) - logger.info("Plus d'URL à parcourir") - logger.info("Nombre total de lignes récupérées: " + str(len(all_data))) - return all_data - - -def fetch_data_from_url(base_url): - if "pointsapport.ademe.fr" or "data.ademe.fr" in base_url: - return fetch_dataset_from_point_apport(base_url) - elif "artisanat.fr" in base_url: - return fetch_dataset_from_artisanat(base_url) - # Le but de nos intégrations API est de récupérer des données. - # Si on ne récupére pas de données, on sait qu'on à un problème, - # et donc il faut échouer explicitement au plus tôt - raise NotImplementedError(f"Pas de fonction de récupération pour l'url {base_url}") - - -def fetch_dataset_from_artisanat(base_url): - all_data = [] - offset = 0 - total_records = requests.get(base_url, params={"limit": 1, "offset": 0}).json()[ - "total_count" - ] - records_per_request = 100 - params = {"limit": records_per_request, "offset": 0} - while offset < total_records: - params.update({"offset": offset}) - response = requests.get(base_url, params=params) - if response.status_code == 200: - data = response.json() - all_data.extend(data["results"]) - offset += records_per_request - else: - response.raise_for_status() - - return all_data - - @sleep_and_retry @limits(calls=7, period=1) def call_annuaire_entreprises(query, adresse_query_flag=False, naf=None): @@ -167,9 +120,3 @@ def get_lat_lon_from_address(address): return coords[1], coords[0] return None, None - - -if __name__ == "__main__": - fetch_dataset_from_point_apport( - "https://data.ademe.fr/data-fair/api/v1/datasets/sinoe-(r)-annuaire-des-decheteries-dma/lines?size=10000&q_mode=simple&ANNEE_eq=2024" - ) diff --git a/dags/utils/dag_eo_utils.py b/dags/utils/dag_eo_utils.py index b573d2efb..73ce3b111 100755 --- a/dags/utils/dag_eo_utils.py +++ b/dags/utils/dag_eo_utils.py @@ -2,7 +2,7 @@ import logging from datetime import datetime -from airflow.providers.postgres.hooks.postgres import PostgresHook +from shared.tasks.database_logic.db_manager import PostgresConnectionManager from utils import shared_constants as constants logger = logging.getLogger(__name__) @@ -11,8 +11,7 @@ def insert_dagrun_and_process_df(df_acteur_updates, metadata, dag_name, run_name): if df_acteur_updates.empty: return - pg_hook = PostgresHook(postgres_conn_id="qfdmo_django_db") - engine = pg_hook.get_sqlalchemy_engine() + engine = PostgresConnectionManager().engine current_date = datetime.now() with engine.connect() as conn: diff --git a/dags/utils/db_tasks.py b/dags/utils/db_tasks.py index 1aa7b2303..6d6b6fabf 100755 --- a/dags/utils/db_tasks.py +++ b/dags/utils/db_tasks.py @@ -1,12 +1,11 @@ import pandas as pd -from airflow.providers.postgres.hooks.postgres import PostgresHook +from shared.tasks.database_logic.db_manager import PostgresConnectionManager from utils import logging_utils as log def read_data_from_postgres(**kwargs): table_name = kwargs["table_name"] - pg_hook = PostgresHook(postgres_conn_id="qfdmo_django_db") - engine = pg_hook.get_sqlalchemy_engine() + engine = PostgresConnectionManager().engine df = pd.read_sql_table(table_name, engine).replace({pd.NA: None}) if df.empty: raise ValueError(f"DB: pas de données pour table {table_name}") diff --git a/dags/utils/mapping_utils.py b/dags/utils/mapping_utils.py index 19e81275c..c610d8c5a 100755 --- a/dags/utils/mapping_utils.py +++ b/dags/utils/mapping_utils.py @@ -68,6 +68,7 @@ def transform_acteur_type_id(value, acteurtype_id_by_code): "association, entreprise de l'economie sociale et solidaire (ess)": "ess", "etablissement de sante": "ets_sante", "decheterie": "decheterie", + "pharmacie": "commerce", "point d'apport volontaire prive": "pav_prive", "plateforme inertes": "plateforme_inertes", "magasin / franchise, enseigne commerciale / distributeur / point de vente " diff --git a/dags_unit_tests/sources/tasks/business_logic/test_source_data_normalize.py b/dags_unit_tests/sources/tasks/business_logic/test_source_data_normalize.py index 66458fb99..4c0e73d54 100755 --- a/dags_unit_tests/sources/tasks/business_logic/test_source_data_normalize.py +++ b/dags_unit_tests/sources/tasks/business_logic/test_source_data_normalize.py @@ -1,11 +1,14 @@ +from unittest.mock import patch + import numpy as np import pandas as pd import pytest +from sources.config.airflow_params import TRANSFORMATION_MAPPING from sources.tasks.business_logic.source_data_normalize import ( + df_normalize_pharmacie, df_normalize_sinoe, source_data_normalize, ) -from sources.tasks.transform.transform_column import convert_opening_hours """ TODO: @@ -260,6 +263,7 @@ class TestSourceDataNormalize: - [ ] test Suppresion des colonnes non voulues - [ ] test ignore_duplicates - [ ] test produitsdechets_acceptes vide ou None + - [ ] test transformation from column_transformations is called """ @pytest.fixture @@ -512,27 +516,74 @@ def test_exclusivite_de_reprisereparation( == expected_exclusivite_de_reprisereparation ) + def test_column_transformations_is_called(self, source_data_normalize_kwargs): -@pytest.mark.parametrize( - "input_value, expected_output", - [ - # chaine vide ou Nulle - ("", ""), - (None, ""), - (np.nan, ""), - # chaines valides - ("Mo-Fr 09:00-16:00", "du lundi au vendredi de 09h00 à 16h00"), - ( - "Mo-Fr 09:00-12:00,14:00-17:00", - "du lundi au vendredi de 09h00 à 12h00 et de 14h00 à 17h00", - ), - # TODO : à implémenter - # ( - # "Mo,Fr 09:00-12:00,15:00-17:00", - # "le lundi et le vendredi de 09h00 à 12h00 et de 15h00 à 17h00" - # ), - # ("Mo,Tu,We 09:00-12:00", "le lundi, mardi et le mercredi de 09h00 à 12h00"), - ], -) -def test_convert_opening_hours(input_value, expected_output): - assert convert_opening_hours(input_value) == expected_output + source_data_normalize_kwargs["column_transformations"] = [ + { + "origin": "nom origin", + "transformation": "test_fct", + "destination": "nom destination", + } + ] + source_data_normalize_kwargs["df_acteur_from_source"] = pd.DataFrame( + { + "identifiant_externe": ["1"], + "ecoorganisme": ["source1"], + "source_id": ["source_id1"], + "acteur_type_id": ["decheterie"], + "produitsdechets_acceptes": ["Plastic Box"], + "nom origin": ["nom"], + } + ) + + TRANSFORMATION_MAPPING["test_fct"] = lambda x: "success" + df = source_data_normalize(**source_data_normalize_kwargs) + assert "nom destination" in df.columns + assert df["nom destination"].iloc[0] == "success" + + +class TestDfNormalizePharmacie: + """ + Test de la fonction df_normalize_pharmacie + """ + + @patch( + "sources.tasks.business_logic.source_data_normalize.enrich_from_ban_api", + autospec=True, + ) + def test_df_normalize_pharmacie(self, mock_enrich_from_ban_api): + def _enrich_from_ban_api(row): + if row["ville"] == "Paris": + row["latitude"] = 48.8566 + row["longitude"] = 2.3522 + else: + row["latitude"] = 0 + row["longitude"] = 0 + return row + + mock_enrich_from_ban_api.side_effect = _enrich_from_ban_api + + df = pd.DataFrame( + { + "adresse": ["123 Rue de Paris", "456 Avenue de Lyon"], + "code_postal": ["75001", "69000"], + "ville": ["Paris", "Lyon"], + } + ) + + # Appeler la fonction df_normalize_pharmacie + result_df = df_normalize_pharmacie(df) + + assert mock_enrich_from_ban_api.call_count == len(df) + pd.testing.assert_frame_equal( + result_df, + pd.DataFrame( + { + "adresse": ["123 Rue de Paris"], + "code_postal": ["75001"], + "ville": ["Paris"], + "latitude": [48.8566], + "longitude": [2.3522], + } + ), + ) diff --git a/dags_unit_tests/sources/tasks/transform/test_transform_column.py b/dags_unit_tests/sources/tasks/transform/test_transform_column.py index 962a0affd..59af55728 100644 --- a/dags_unit_tests/sources/tasks/transform/test_transform_column.py +++ b/dags_unit_tests/sources/tasks/transform/test_transform_column.py @@ -1,7 +1,12 @@ import numpy as np import pandas as pd import pytest -from sources.tasks.transform.transform_column import clean_siren, clean_siret +from sources.tasks.transform.transform_column import ( + clean_siren, + clean_siret, + convert_opening_hours, + strip_string, +) class TestCleanSiret: @@ -37,3 +42,52 @@ class TestCleanSiren: ) def test_clean_siren(self, siren, expected_siren): assert clean_siren(siren) == expected_siren + + +class TestStripString: + + @pytest.mark.parametrize( + "input, output", + [ + (None, ""), + (pd.NA, ""), + (np.nan, ""), + ("", ""), + (" ", ""), + (75001, "75001"), + (" adresse postale ", "adresse postale"), + ], + ) + def test_strip_string(self, input, output): + assert strip_string(input) == output + + +class TestConvertOpeningHours: + + @pytest.mark.parametrize( + "input_value, expected_output", + [ + # chaine vide ou Nulle + ("", ""), + (None, ""), + (pd.NA, ""), + (np.nan, ""), + # chaines valides + ("Mo-Fr 09:00-16:00", "du lundi au vendredi de 09h00 à 16h00"), + ( + "Mo-Fr 09:00-12:00,14:00-17:00", + "du lundi au vendredi de 09h00 à 12h00 et de 14h00 à 17h00", + ), + # TODO : à implémenter + # ( + # "Mo,Fr 09:00-12:00,15:00-17:00", + # "le lundi et le vendredi de 09h00 à 12h00 et de 15h00 à 17h00" + # ), + # ( + # "Mo,Tu,We 09:00-12:00", + # "le lundi, mardi et le mercredi de 09h00 à 12h00" + # ), + ], + ) + def test_convert_opening_hours(self, input_value, expected_output): + assert convert_opening_hours(input_value) == expected_output diff --git a/qfdmo/migrations/0104_recode_ordredespharmaciens.py b/qfdmo/migrations/0104_recode_ordredespharmaciens.py new file mode 100644 index 000000000..f46809299 --- /dev/null +++ b/qfdmo/migrations/0104_recode_ordredespharmaciens.py @@ -0,0 +1,35 @@ +# Generated by Django 5.1.1 on 2024-12-03 07:25 + +from django.db import migrations + + +def rename_ordredespharmaciens(apps, schema_editor): + Source = apps.get_model("qfdmo", "Source") + ordredespharmaciens = Source.objects.filter( + code="Ordre National Des Pharmaciens" + ).first() + if ordredespharmaciens: + ordredespharmaciens.code = "ordredespharmaciens" + ordredespharmaciens.save() + + +def rollback_rename_ordredespharmaciens(apps, schema_editor): + Source = apps.get_model("qfdmo", "Source") + ordredespharmaciens = Source.objects.filter(code="ordredespharmaciens").first() + if ordredespharmaciens: + ordredespharmaciens.code = "Ordre National Des Pharmaciens" + ordredespharmaciens.save() + + +class Migration(migrations.Migration): + + dependencies = [ + ("qfdmo", "0103_acteur_siren_displayedacteur_siren_and_more"), + ] + + operations = [ + migrations.RunPython( + rename_ordredespharmaciens, + rollback_rename_ordredespharmaciens, + ), + ] diff --git a/qfdmo/migrations/0105_bancache.py b/qfdmo/migrations/0105_bancache.py new file mode 100644 index 000000000..81bf0cd83 --- /dev/null +++ b/qfdmo/migrations/0105_bancache.py @@ -0,0 +1,53 @@ +# Generated by Django 5.1.1 on 2024-12-02 12:47 + +import django.contrib.gis.db.models.fields +import django.db.models.functions.datetime +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("qfdmo", "0104_recode_ordredespharmaciens"), + ] + + operations = [ + migrations.CreateModel( + name="BANCache", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("adresse", models.CharField(blank=True, max_length=255, null=True)), + ( + "code_postal", + models.CharField(blank=True, max_length=255, null=True), + ), + ("ville", models.CharField(blank=True, max_length=255, null=True)), + ( + "location", + django.contrib.gis.db.models.fields.PointField( + blank=True, null=True, srid=4326 + ), + ), + ("ban_returned", models.JSONField(blank=True, null=True)), + ( + "modifie_le", + models.DateTimeField( + auto_now=True, + db_default=django.db.models.functions.datetime.Now(), + ), + ), + ], + options={ + "verbose_name": "Cache BAN", + "verbose_name_plural": "Cache BAN", + }, + ), + ] diff --git a/qfdmo/models/data.py b/qfdmo/models/data.py index a300034ef..aaa3d3037 100644 --- a/qfdmo/models/data.py +++ b/qfdmo/models/data.py @@ -1,4 +1,5 @@ from django.contrib.gis.db import models +from django.db.models.functions import Now from dags.utils.shared_constants import FINISHED, REJECTED, TO_INSERT, TO_VALIDATE from qfdmo.models.acteur import ActeurType, Source @@ -63,39 +64,6 @@ class DagRunChange(models.Model): default=DagRunStatus.TO_VALIDATE, ) - # row_updates : JSON of acteur to update or create to store on row_updates - # { - # "nom": "NOM", - # "description": null, - # "identifiant_unique": "IDENTIFIANT_UNIQUE", - # "adresse": "ADRESSE", - # "adresse_complement": "…", - # "code_postal": "CODE_POSTAL", - # "ville": "VILLE", - # "url": "…", - # "email": "…", - # "telephone": "…", - # "nom_commercial": "…", - # "nom_officiel": "…", - # "labels": ['reparacteur], - # "siret": "49819433100019", - # "identifiant_externe": "144103", - # "statut": "ACTIF", - # "naf_principal": "62.02A", - # "commentaires": null, - # "horaires_osm": null, - # "horaires_description": null, - # "acteur_type_id": 3, - # "source_id": 4, - # "location": [2.9043527, 42.6949013], - # "proposition_services": [ - # { - # "action_id": 1, - # "acteur_service_id": 15, - # "sous_categories": [90] - # } - # ] - # } def display_acteur_details(self) -> dict: displayed_details = {} for field, field_value in { @@ -170,3 +138,16 @@ def update_row_update_candidate(self, status, index): def get_candidat(self, index): return self.row_updates["ae_result"][int(index) - 1] + + +class BANCache(models.Model): + class Meta: + verbose_name = "Cache BAN" + verbose_name_plural = "Cache BAN" + + adresse = models.CharField(max_length=255, blank=True, null=True) + code_postal = models.CharField(max_length=255, blank=True, null=True) + ville = models.CharField(max_length=255, blank=True, null=True) + location = models.PointField(blank=True, null=True) + ban_returned = models.JSONField(blank=True, null=True) + modifie_le = models.DateTimeField(auto_now=True, db_default=Now())