Skip to content

Commit

Permalink
👁️‍🗨️ Vue matérialisée pour tous les acteurs (#1063)
Browse files Browse the repository at this point in the history
Co-authored-by: Nicolas Oudard <[email protected]>
  • Loading branch information
maxcorbeau and kolok authored Nov 26, 2024
1 parent ff8c76a commit 06e065f
Show file tree
Hide file tree
Showing 5 changed files with 372 additions and 24 deletions.
26 changes: 2 additions & 24 deletions dags/sources/dags/source_sinoe.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,10 @@
import os

import pandas as pd
from airflow import DAG
from sources.config.airflow_params import (
get_mapping_config,
source_sinoe_dechet_mapping_get,
)
from sources.tasks.airflow_logic.operators import default_args, eo_task_chain

# ------------------------------------------
# PARAMÈTRES ET DOCUMENTATION DU DAG
# ------------------------------------------
PRODUCT_MAPPING = get_mapping_config("sous_categories_sinoe")
PM_TABLE = pd.DataFrame(list(PRODUCT_MAPPING.items()), columns=["Source", "LVAO"])
PM_TABLE["Associations "] = PM_TABLE["LVAO"].apply(
lambda x: "1 <-> N" if isinstance(x, list) else "1 <-> 1"
)
PM_TABLE = PM_TABLE[["Source", "Associations ", "LVAO"]]
DAG_DOC_MD = f"""
- **fichier**: {os.path.abspath(__file__)}
# Définition des sous-catégories
{PM_TABLE.to_markdown(index=False)}
"""
DAG_DOC_MD = ""
DAG_TAGS = ["source", "ademe", "sinoe", "déchèteries"]

default_args["retries"] = 0
with DAG(
dag_id="eo-sinoe",
Expand All @@ -34,8 +13,7 @@
description=(
"DAG pour télécharger, standardiser, et charger dans notre base la source SINOE"
),
doc_md=DAG_DOC_MD,
tags=DAG_TAGS,
tags=["source", "ademe", "sinoe", "déchèteries"],
params={
"endpoint": (
"https://data.ademe.fr/data-fair/api/v1/datasets/"
Expand Down Expand Up @@ -94,7 +72,7 @@
"dechet_mapping": source_sinoe_dechet_mapping_get(),
"ignore_duplicates": False,
"validate_address_with_ban": False,
"product_mapping": PRODUCT_MAPPING,
"product_mapping": get_mapping_config("sous_categories_sinoe"),
},
schedule=None,
) as dag:
Expand Down
15 changes: 15 additions & 0 deletions dags/views/sql/generators/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Générateur SQL

- **QUOI**: solution pour gérer le SQL des vues crées par les DAGs
- **POURQUOI**: pour des raisons de fiablité et séparation des préoccupations:
- Les DAG ne doivent pas générer le SQL des vues, mais utiliser du SQL prégénéré
- Pour que les DAGs reposent sur du SQL testé et versionné
(ce qu'on ne peut pas avoir si on est en full dynamique Python -> SQL -> CREATE dans le DAG)
- **COMMENT**: en suivant les étapes suivantes:
1. Fichier générateur SQL: `/views/sql/generators/{dag}_sql_generate.py`
2. Fichier SQL: `/views/sql/{dag}.sql`
3. Fichier DAG: `/views/{dag}.py` qui vient lire le SQL ci-dessus
note: techniquement il faut d'abord définir la constante `VIEW_NAME` dans `/views/{dag}.py` avant pour la réutiliser dans l'étape 1.


Cette solution custom en attendant de voir si cela vaut le coup d'introduire des frameworks dédiés à la gestion des vues (ex: `dbt`)
136 changes: 136 additions & 0 deletions dags/views/sql/generators/view_acteur_tous_sql_generate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
from dags.views.view_acteur_tous import VIEW_NAME


def view_acteur_tous_sql_generate() -> str:
"""Génère le code SQL de la vue matérialisé
en faisant python dags/views/view_acteur_tous.py
Pour tester et finaliser la vue:
- placer le code dans le fichier dédié ./sql/view_acteur_tous.sql
- la formatter et pouvoir la relire plus facilement
- tester avec un client SQL
Au final on à le code SQL définitif dans notre repo pour le versionning
"""
# Champs communs à toutes les tables, on prend au mieux avec COALESCE
fields_on_all = [
# IDs
"identifiant_unique",
"identifiant_externe",
"acteur_type_id",
"source_id",
"action_principale_id",
# Noms
"nom",
"nom_commercial",
"nom_officiel",
# GEO
"adresse",
"adresse_complement",
"code_postal",
"location",
"ville",
# Contact
"telephone",
"email",
"url",
"horaires_description",
"horaires_osm",
"uniquement_sur_rdv",
# MISC
"statut",
"naf_principal",
"siret",
"public_accueilli",
"exclusivite_de_reprisereparation",
]
selected = ",\n".join(
[f"COALESCE(da.{x}, ra.{x}, a.{x}) AS {x}" for x in fields_on_all]
)
# dernier champ sélectionné: parent_id qui n'existe que dans revisionacteur
selected += ",\nra.parent_id AS parent_id,\n"
selected += """-- Si l'identifiant est dans parent_ids, alors c'est un parent
CASE
WHEN COALESCE(
da.identifiant_unique,
ra.identifiant_unique,
a.identifiant_unique)
IN (SELECT id FROM parent_ids) THEN TRUE
ELSE FALSE
END AS est_parent,\n"""
selected += """da.identifiant_unique IS NOT NULL AS est_dans_displayedacteur,
ra.identifiant_unique IS NOT NULL AS est_dans_revisionacteur,
a.identifiant_unique IS NOT NULL AS est_dans_acteur"""

query = f"""
-- Il n'existe pas aujourdhui de CREATE OR REPLACE MATERIALIZED VIEW
-- d'où l'autre requête DROP IF EXISTS avant de créer
CREATE MATERIALIZED VIEW {VIEW_NAME} AS (
WITH
parent_ids AS (
SELECT DISTINCT
parent_id AS id
FROM
qfdmo_revisionacteur
),
parent_ids_to_enfants AS (
SELECT
parent_id,
ARRAY_AGG (identifiant_unique) AS enfants_liste,
CARDINALITY(ARRAY_AGG (identifiant_unique)) AS enfants_nombre
FROM
qfdmo_revisionacteur AS ra
WHERE
parent_id IS NOT NULL
GROUP BY
1
ORDER BY
3 DESC
),
acteur_all AS (
SELECT
{selected}
FROM qfdmo_displayedacteur AS da
FULL OUTER JOIN qfdmo_revisionacteur AS ra
ON da.identifiant_unique = ra.identifiant_unique
FULL OUTER JOIN qfdmo_acteur AS a
ON da.identifiant_unique = a.identifiant_unique
)
SELECT
-- ne pas faire un lazy * car ceci sélectionne des champs génériques
-- présents sur plusieurs tables (ex: url) ce qui cause des erreurs
acteur_all.*,
ST_Y(acteur_all.location) AS location_lat,
ST_X(acteur_all.location) AS location_long,
-- Infos enfants pour les parents
CASE
WHEN est_parent THEN (SELECT
enfants_nombre
FROM parent_ids_to_enfants
WHERE parent_id = acteur_all.identifiant_unique)
ELSE NULL
END AS enfants_nombre,
CASE
WHEN est_parent THEN (SELECT
enfants_liste
FROM parent_ids_to_enfants
WHERE parent_id = acteur_all.identifiant_unique)
ELSE NULL
END AS enfants_liste,
-- Les codes pour être plus pratique ques les ids
s.code AS source_code,
atype.code AS acteur_type_code
FROM acteur_all
LEFT JOIN qfdmo_source AS s ON s.id = acteur_all.source_id
LEFT JOIN qfdmo_acteurtype AS atype ON atype.id = acteur_all.acteur_type_id
)"""
return query


# Pour générer le code SQL de la vue
# python dags/views/generators/view_acteur_tous.py
# voir commentaire de la fonction sql_view_generate_code
# pour les étapes suivantes
if __name__ == "__main__":
print("Requête SQL générée:")
print(view_acteur_tous_sql_generate())
152 changes: 152 additions & 0 deletions dags/views/sql/view_acteur_tous.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
CREATE MATERIALIZED VIEW IF NOT EXISTS qfdmo_vue_acteur_tous AS (
WITH
parent_ids AS (
SELECT DISTINCT
parent_id AS id
FROM
qfdmo_revisionacteur
),
parent_ids_to_enfants AS (
SELECT
parent_id,
ARRAY_AGG (identifiant_unique) AS enfants_liste,
CARDINALITY(ARRAY_AGG (identifiant_unique)) AS enfants_nombre
FROM
qfdmo_revisionacteur AS ra
WHERE
parent_id IS NOT NULL
GROUP BY
1
ORDER BY
3 DESC
),
acteur_all AS (
SELECT
COALESCE(
da.identifiant_unique,
ra.identifiant_unique,
a.identifiant_unique
) AS identifiant_unique,
COALESCE(
da.identifiant_externe,
ra.identifiant_externe,
a.identifiant_externe
) AS identifiant_externe,
COALESCE(
da.acteur_type_id,
ra.acteur_type_id,
a.acteur_type_id
) AS acteur_type_id,
COALESCE(da.source_id, ra.source_id, a.source_id) AS source_id,
COALESCE(
da.action_principale_id,
ra.action_principale_id,
a.action_principale_id
) AS action_principale_id,
COALESCE(da.nom, ra.nom, a.nom) AS nom,
COALESCE(
da.nom_commercial,
ra.nom_commercial,
a.nom_commercial
) AS nom_commercial,
COALESCE(da.nom_officiel, ra.nom_officiel, a.nom_officiel) AS nom_officiel,
COALESCE(da.adresse, ra.adresse, a.adresse) AS adresse,
COALESCE(
da.adresse_complement,
ra.adresse_complement,
a.adresse_complement
) AS adresse_complement,
COALESCE(da.code_postal, ra.code_postal, a.code_postal) AS code_postal,
COALESCE(da.location, ra.location, a.location) AS location,
COALESCE(da.ville, ra.ville, a.ville) AS ville,
COALESCE(da.telephone, ra.telephone, a.telephone) AS telephone,
COALESCE(da.email, ra.email, a.email) AS email,
COALESCE(da.url, ra.url, a.url) AS url,
COALESCE(
da.horaires_description,
ra.horaires_description,
a.horaires_description
) AS horaires_description,
COALESCE(da.horaires_osm, ra.horaires_osm, a.horaires_osm) AS horaires_osm,
COALESCE(
da.uniquement_sur_rdv,
ra.uniquement_sur_rdv,
a.uniquement_sur_rdv
) AS uniquement_sur_rdv,
COALESCE(da.statut, ra.statut, a.statut) AS statut,
COALESCE(
da.naf_principal,
ra.naf_principal,
a.naf_principal
) AS naf_principal,
COALESCE(da.siret, ra.siret, a.siret) AS siret,
COALESCE(
da.public_accueilli,
ra.public_accueilli,
a.public_accueilli
) AS public_accueilli,
COALESCE(
da.exclusivite_de_reprisereparation,
ra.exclusivite_de_reprisereparation,
a.exclusivite_de_reprisereparation
) AS exclusivite_de_reprisereparation,
ra.parent_id AS parent_id,
-- Si l'identifiant est dans la liste des parent_ids, alors c'est un parent
CASE
WHEN COALESCE(
da.identifiant_unique,
ra.identifiant_unique,
a.identifiant_unique
) IN (
SELECT
id
FROM
parent_ids
) THEN TRUE
ELSE FALSE
END AS est_parent,
da.identifiant_unique IS NOT NULL AS est_dans_displayedacteur,
ra.identifiant_unique IS NOT NULL AS est_dans_revisionacteur,
a.identifiant_unique IS NOT NULL AS est_dans_acteur
FROM
qfdmo_displayedacteur AS da
FULL OUTER JOIN qfdmo_revisionacteur AS ra ON da.identifiant_unique = ra.identifiant_unique
FULL OUTER JOIN qfdmo_acteur AS a ON da.identifiant_unique = a.identifiant_unique
)
SELECT
-- ne pas faire un lazy * car ceci sélectionne des champs génériques
-- présents sur plusieurs tables (ex: url) ce qui cause des erreurs
acteur_all.*,
ST_Y (acteur_all.location) AS location_lat,
ST_X (acteur_all.location) AS location_long,
-- Info enfants pour les parents
CASE
WHEN est_parent THEN (
SELECT
enfants_nombre
FROM
parent_ids_to_enfants
WHERE
parent_id = acteur_all.identifiant_unique
)
ELSE NULL
END AS enfants_nombre,
CASE
WHEN est_parent THEN (
SELECT
enfants_liste
FROM
parent_ids_to_enfants
WHERE
parent_id = acteur_all.identifiant_unique
)
ELSE NULL
END AS enfants_liste,
-- Les codes pour être plus pratique ques les ids
s.code AS source_code,
atype.code AS acteur_type_code
FROM
acteur_all
LEFT JOIN qfdmo_source AS s ON s.id = acteur_all.source_id
LEFT JOIN qfdmo_acteurtype AS atype ON atype.id = acteur_all.acteur_type_id
);
Loading

0 comments on commit 06e065f

Please sign in to comment.