Skip to content

Commit

Permalink
Ajout de la source Pharmacies (#1094)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolok authored Dec 5, 2024
1 parent 7ade97e commit 6fcb4f7
Show file tree
Hide file tree
Showing 23 changed files with 524 additions and 144 deletions.
5 changes: 2 additions & 3 deletions dags/annuaire_entreprise_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from shared.tasks.airflow_logic.write_data_task import write_data_task
from shared.tasks.database_logic.db_manager import PostgresConnectionManager

pd.set_option("display.max_columns", None)

Expand Down Expand Up @@ -46,8 +46,7 @@

def fetch_and_parse_data(**context):
limit = context["params"]["limit"]
pg_hook = PostgresHook(postgres_conn_id="qfdmo_django_db")
engine = pg_hook.get_sqlalchemy_engine()
engine = PostgresConnectionManager().engine
active_actors_query = """
SELECT
da.*,
Expand Down
5 changes: 2 additions & 3 deletions dags/create_final_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import shortuuid
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from shared.tasks.database_logic.db_manager import PostgresConnectionManager
from utils.db_tasks import read_data_from_postgres


Expand Down Expand Up @@ -218,8 +218,7 @@ def write_data_to_postgres(**kwargs):
inplace=True,
)

pg_hook = PostgresHook(postgres_conn_id="qfdmo_django_db")
engine = pg_hook.get_sqlalchemy_engine()
engine = PostgresConnectionManager().engine

original_table_name_actor = "qfdmo_displayedacteur"
temp_table_name_actor = "qfdmo_displayedacteurtemp"
Expand Down
11 changes: 5 additions & 6 deletions dags/ingest_validated_dataset_to_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

import pandas as pd
from airflow.models import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.python_operator import BranchPythonOperator, PythonOperator
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.dates import days_ago
from shared.tasks.database_logic.db_manager import PostgresConnectionManager
from utils import dag_ingest_validated_utils, shared_constants

default_args = {
Expand Down Expand Up @@ -51,8 +52,7 @@ def fetch_and_parse_data(**context):
row = _get_first_dagrun_to_insert()
dag_run_id = row[0]

pg_hook = PostgresHook(postgres_conn_id="qfdmo_django_db")
engine = pg_hook.get_sqlalchemy_engine()
engine = PostgresConnectionManager().engine

df_sql = pd.read_sql_query(
f"SELECT * FROM qfdmo_dagrunchange WHERE dag_run_id = '{dag_run_id}'",
Expand Down Expand Up @@ -91,8 +91,7 @@ def write_data_to_postgres(**kwargs):
data_dict = kwargs["ti"].xcom_pull(task_ids="fetch_and_parse_data")
# If data_set is empty, nothing to do
dag_run_id = data_dict["dag_run_id"]
pg_hook = PostgresHook(postgres_conn_id="qfdmo_django_db")
engine = pg_hook.get_sqlalchemy_engine()
engine = PostgresConnectionManager().engine
if "actors" not in data_dict:
with engine.begin() as connection:
dag_ingest_validated_utils.update_dag_run_status(connection, dag_run_id)
Expand Down
27 changes: 27 additions & 0 deletions dags/shared/tasks/database_logic/db_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from airflow.providers.postgres.hooks.postgres import PostgresHook
from sqlalchemy.engine import Engine


class PostgresConnectionManager:
"""
Singleton class to manage the connection to the Postgres database.
use the connecter qfdmo_django_db by default
this connecter is set by using env variable AIRFLOW_CONN_QFDMO_DJANGO_DB
"""

_instance = None

def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(PostgresConnectionManager, cls).__new__(cls)
return cls._instance

def __init__(self, postgres_conn_id="qfdmo_django_db"):
if not hasattr(self, "initialized"): # Pour éviter la réinitialisation
self.postgres_conn_id = postgres_conn_id
self.engine = self._create_engine()
self.initialized = True

def _create_engine(self) -> Engine:
pg_hook = PostgresHook(postgres_conn_id=self.postgres_conn_id)
return pg_hook.get_sqlalchemy_engine()
2 changes: 2 additions & 0 deletions dags/sources/config/airflow_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
clean_siren,
clean_siret,
convert_opening_hours,
strip_string,
)

PATH_NOMENCLARURE_DECHET = (
Expand All @@ -21,6 +22,7 @@
"convert_opening_hours": convert_opening_hours,
"clean_siren": clean_siren,
"clean_siret": clean_siret,
"strip_string": strip_string,
}


Expand Down
1 change: 1 addition & 0 deletions dags/sources/config/db_mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"literie": "Literie - Éléments d'ameublement",
"machines et appareils motorisés thermiques": "Machines et appareils motorises thermiques",
"matériels de bricolage, dont l’outillage à main": "outil de bricolage et jardinage",
"médicaments & dasri": "medicaments",
"mélange d’inertes": "Mélange d’inertes (produits et matériaux de construction du bâtiment)",
"membranes bitumineuses": "Membranes bitumineuses - PMCB (produits et matériaux de construction du bâtiment)",
"meubles": "meuble",
Expand Down
56 changes: 56 additions & 0 deletions dags/sources/dags/source_pharmacies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from airflow import DAG
from sources.config.airflow_params import get_mapping_config
from sources.tasks.airflow_logic.operators import default_args, eo_task_chain

with DAG(
dag_id="eo-pharmacies",
dag_display_name="Source - PHARMACIES",
default_args=default_args,
description=("Téléchargement des pharmacies (Ordre National Des Pharmaciens)"),
params={
"column_transformations": [
{
"origin": "Raison sociale",
"transformation": "strip_string",
"destination": "nom",
},
{
"origin": "Dénomination commerciale",
"transformation": "strip_string",
"destination": "nom_commercial",
},
{
"origin": "Adresse",
"transformation": "strip_string",
"destination": "adresse",
},
{
"origin": "Code postal",
"transformation": "strip_string",
"destination": "code_postal",
},
{
"origin": "Commune",
"transformation": "strip_string",
"destination": "ville",
},
],
"column_mapping": {
"Numéro d'établissement": "identifiant_externe",
"Téléphone": "telephone",
},
"endpoint": "https://www.ordre.pharmacien.fr/download/annuaire_csv.zip",
"columns_to_add_by_default": {
"statut": "ACTIF",
"uniquement_sur_rdv": "non",
"public_accueilli": "Particuliers",
"produitsdechets_acceptes": "Médicaments & DASRI",
"acteur_type_id": "pharmacie",
"point_de_collecte_ou_de_reprise_des_dechets": True,
},
"source_code": "ordredespharmaciens",
"product_mapping": get_mapping_config(),
},
schedule=None,
) as dag:
eo_task_chain(dag)
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from shared.tasks.database_logic.db_manager import PostgresConnectionManager
from sqlalchemy import text


Expand All @@ -14,8 +14,7 @@ def db_read_propositions_max_id_task(dag: DAG) -> PythonOperator:


def db_read_propositions_max_id():
pg_hook = PostgresHook(postgres_conn_id="qfdmo_django_db")
engine = pg_hook.get_sqlalchemy_engine()
engine = PostgresConnectionManager().engine

# TODO : check if we need to manage the max id here
displayedpropositionservice_max_id = engine.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import pandas as pd
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.trigger_rule import TriggerRule
from shared.tasks.database_logic.db_manager import PostgresConnectionManager
from sources.tasks.business_logic.source_config_validate import source_config_validate
from utils import logging_utils as log

Expand All @@ -22,8 +22,9 @@ def source_config_validate_task(dag: DAG) -> PythonOperator:


def source_config_validate_wrapper(**kwargs) -> None:
engine = PostgresConnectionManager().engine
params = kwargs["params"]
engine = PostgresHook(postgres_conn_id="qfdmo_django_db").get_sqlalchemy_engine()

codes_sc_db = set(
pd.read_sql_table("qfdmo_souscategorieobjet", engine, columns=["code"])[
"code"
Expand Down
6 changes: 3 additions & 3 deletions dags/sources/tasks/airflow_logic/source_data_download_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ def source_data_download_task(dag: DAG) -> PythonOperator:

def source_data_download_wrapper(**kwargs) -> pd.DataFrame:
params = kwargs["params"]
api_url = params["endpoint"]
endpoint = params["endpoint"]

log.preview("API end point", api_url)
log.preview("API end point", endpoint)

return source_data_download(api_url=api_url)
return source_data_download(endpoint=endpoint)
5 changes: 2 additions & 3 deletions dags/sources/tasks/business_logic/db_read_acteur.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging

import pandas as pd
from airflow.providers.postgres.hooks.postgres import PostgresHook
from shared.tasks.database_logic.db_manager import PostgresConnectionManager
from utils import logging_utils as log

logger = logging.getLogger(__name__)
Expand All @@ -14,8 +14,7 @@ def db_read_acteur(
raise ValueError(
"La colonne source_id est requise dans la dataframe normalisée"
)
pg_hook = PostgresHook(postgres_conn_id="qfdmo_django_db")
engine = pg_hook.get_sqlalchemy_engine()
engine = PostgresConnectionManager().engine
unique_source_ids = df_normalized["source_id"].unique()

joined_source_ids = ",".join([f"'{source_id}'" for source_id in unique_source_ids])
Expand Down
86 changes: 83 additions & 3 deletions dags/sources/tasks/business_logic/source_data_download.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,104 @@
import logging
import tempfile
import zipfile
from pathlib import Path

import numpy as np
import pandas as pd
from utils import api_utils
import requests
from utils import logging_utils as log

logger = logging.getLogger(__name__)


def source_data_download(api_url: str) -> pd.DataFrame:
def source_data_download(endpoint: str) -> pd.DataFrame:
"""Téléchargement de la données source sans lui apporter de modification"""
logger.info("Téléchargement données de l'API : début...")
# TODO: changer de logique, plutôt que de tout charger en mémoire et se
# trimballer des dataframes en XCOM, on devrait plutôt streamer les données
# directement dans la base de données et déléguer le traitement à la DB
# tant que possible
data = api_utils.fetch_data_from_url(api_url)
data = fetch_data_from_endpoint(endpoint)
logger.info("Téléchargement données de l'API : ✅ succès.")
df = pd.DataFrame(data).replace({pd.NA: None, np.nan: None})
if df.empty:
raise ValueError("Aucune donnée reçue de l'API")
log.preview("df retournée par la tâche", df)
return df


def fetch_data_from_endpoint(endpoint):
if "pointsapport.ademe.fr" in endpoint or "data.ademe.fr" in endpoint:
return fetch_dataset_from_point_apport(endpoint)
elif "artisanat.fr" in endpoint:
return fetch_dataset_from_artisanat(endpoint)
elif "ordre.pharmacien.fr" in endpoint:
return fetch_dataset_from_pharmacies(endpoint)
# Le but de nos intégrations API est de récupérer des données.
# Si on ne récupére pas de données, on sait qu'on à un problème,
# et donc il faut échouer explicitement au plus tôt
raise NotImplementedError(f"Pas de fonction de récupération pour l'url {endpoint}")


def fetch_dataset_from_point_apport(url):
all_data = []
while url:
logger.info(f"Récupération de données pour {url}")
response = requests.get(url, timeout=60)
response.raise_for_status()
data = response.json()
logger.info("Nombre de lignes récupérées: " + str(len(data["results"])))
all_data.extend(data["results"])
url = data.get("next", None)
logger.info("Plus d'URL à parcourir")
logger.info("Nombre total de lignes récupérées: " + str(len(all_data)))
return all_data


def fetch_dataset_from_artisanat(base_url):
all_data = []
offset = 0
total_records = requests.get(base_url, params={"limit": 1, "offset": 0}).json()[
"total_count"
]
records_per_request = 100
params = {"limit": records_per_request, "offset": 0}
while offset < total_records:
params.update({"offset": offset})
response = requests.get(base_url, params=params)
if response.status_code == 200:
data = response.json()
all_data.extend(data["results"])
offset += records_per_request
else:
response.raise_for_status()

return all_data


def fetch_dataset_from_pharmacies(endpoint):
with tempfile.TemporaryDirectory() as temp_dir:
zip_file = _download_file(endpoint, temp_dir)
unzip_files = _extract_zip(zip_file, temp_dir)
etablissements_file = [f for f in unzip_files if "etablissements" in f][0]
df_etablissements = _read_csv(Path(temp_dir) / etablissements_file)
return df_etablissements


def _download_file(url, dest_folder="."):
local_filename = Path(dest_folder) / url.split("/")[-1]
with requests.get(url) as r:
with open(local_filename, "wb") as f:
f.write(r.content)
return local_filename


def _extract_zip(zip_file, dest_folder="."):
with zipfile.ZipFile(zip_file, "r") as zip_ref:
zip_ref.extractall(dest_folder)
return zip_ref.namelist()


def _read_csv(csv_file):
df = pd.read_csv(csv_file, sep=";", encoding="utf-16-le", on_bad_lines="warn")
return df
Loading

0 comments on commit 6fcb4f7

Please sign in to comment.