Skip to content

Commit

Permalink
isolation de data
Browse files Browse the repository at this point in the history
  • Loading branch information
kolok committed Jan 9, 2025
1 parent 8b6c4c2 commit d0ab227
Show file tree
Hide file tree
Showing 41 changed files with 863 additions and 351 deletions.
4 changes: 2 additions & 2 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@
"filename": "core/settings.py",
"hashed_secret": "1ee34e26aeaf89c64ecc2c85efe6a961b75a50e9",
"is_verified": false,
"line_number": 214
"line_number": 215
}
],
"docker-compose.yml": [
Expand Down Expand Up @@ -207,5 +207,5 @@
}
]
},
"generated_at": "2025-01-06T12:01:37Z"
"generated_at": "2025-01-09T17:35:44Z"
}
1 change: 1 addition & 0 deletions core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
"core",
"qfdmd",
"qfdmo",
"data",
"corsheaders",
]

Expand Down
1 change: 1 addition & 0 deletions core/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class PaginatedSitemap(GenericSitemap):
path("dsfr/", include(("dsfr_hacks.urls", "dsfr_hacks"), namespace="dsfr_hacks")),
path("", include(("qfdmo.urls", "qfdmo"), namespace="qfdmo")),
path("", include(("qfdmd.urls", "qfdmd"), namespace="qfdmd")),
path("", include(("data.urls", "data"), namespace="data")),
path("docs/", TemplateView.as_view(template_name="techdocs.html"), name="techdocs"),
]

Expand Down
6 changes: 3 additions & 3 deletions dags/ingest_validated_dataset_to_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def _get_first_suggetsioncohorte_to_insert():
hook = PostgresHook(postgres_conn_id="qfdmo_django_db")
row = hook.get_first(
f"""
SELECT * FROM qfdmo_suggestioncohorte
SELECT * FROM data_suggestioncohorte
WHERE statut = '{constants.SUGGESTION_ATRAITER}'
LIMIT 1
"""
Expand All @@ -54,7 +54,7 @@ def fetch_and_parse_data(**context):

df_sql = pd.read_sql_query(
f"""
SELECT * FROM qfdmo_suggestionunitaire
SELECT * FROM data_suggestionunitaire
WHERE suggestion_cohorte_id = '{suggestion_cohorte_id}'
""",
engine,
Expand Down Expand Up @@ -91,7 +91,7 @@ def fetch_and_parse_data(**context):
normalized_dfs = df_acteur_to_delete["suggestion"].apply(pd.json_normalize)
df_actors_update_actor = pd.concat(normalized_dfs.tolist(), ignore_index=True)
status_repeated = (
df_acteur_to_delete["status"]
df_acteur_to_delete["statut"]
.repeat(df_acteur_to_delete["suggestion"].apply(len))
.reset_index(drop=True)
)
Expand Down
2 changes: 2 additions & 0 deletions dags/sources/config/airflow_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
clean_label_codes,
clean_siret_and_siren,
clean_telephone,
compute_location,
get_latlng_from_geopoint,
merge_and_clean_souscategorie_codes,
merge_sous_categories_columns,
Expand Down Expand Up @@ -63,6 +64,7 @@
"clean_url": clean_url,
"clean_souscategorie_codes_sinoe": clean_souscategorie_codes_sinoe,
"get_latlng_from_geopoint": get_latlng_from_geopoint,
"clean_location": compute_location,
}


Expand Down
5 changes: 5 additions & 0 deletions dags/sources/dags/source_pyreo.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@
# "value": [],
# },
# 4. Transformation du dataframe
{
"origin": ["latitude", "longitude"],
"transformation": "clean_location",
"destination": ["location"],
},
{
"origin": ["labels_etou_bonus", "acteur_type_code"],
"transformation": "clean_label_codes",
Expand Down
3 changes: 3 additions & 0 deletions dags/sources/tasks/airflow_logic/db_data_prepare_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def db_data_prepare_wrapper(**kwargs):
df_pssc = kwargs["ti"].xcom_pull(task_ids="propose_services_sous_categories")
df_labels = kwargs["ti"].xcom_pull(task_ids="propose_labels")
df_acteur_services = kwargs["ti"].xcom_pull(task_ids="propose_acteur_services")
df_acteurs_from_db = kwargs["ti"].xcom_pull(task_ids="db_read_acteur")
source_id_by_code = read_mapping_from_postgres(table_name="qfdmo_source")
acteurtype_id_by_code = read_mapping_from_postgres(table_name="qfdmo_acteurtype")

Expand All @@ -37,6 +38,7 @@ def db_data_prepare_wrapper(**kwargs):
log.preview("df_pssc", df_pssc)
log.preview("df_labels", df_labels)
log.preview("df_acteur_services", df_acteur_services)
log.preview("df_acteurs_from_db", df_acteurs_from_db)
log.preview("source_id_by_code", source_id_by_code)
log.preview("acteurtype_id_by_code", acteurtype_id_by_code)

Expand All @@ -47,6 +49,7 @@ def db_data_prepare_wrapper(**kwargs):
df_pssc=df_pssc,
df_labels=df_labels,
df_acteur_services=df_acteur_services,
df_acteurs_from_db=df_acteurs_from_db,
source_id_by_code=source_id_by_code,
acteurtype_id_by_code=acteurtype_id_by_code,
)
49 changes: 49 additions & 0 deletions dags/sources/tasks/airflow_logic/db_write_suggestion_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from sources.tasks.business_logic.db_write_suggestion import db_write_suggestion
from utils import logging_utils as log

logger = logging.getLogger(__name__)


def db_write_suggestion_task(dag: DAG) -> PythonOperator:
return PythonOperator(
task_id="db_write_suggestion",
python_callable=db_write_suggestion_wrapper,
dag=dag,
)


def db_write_suggestion_wrapper(**kwargs) -> None:
dag_name = kwargs["dag"].dag_display_name or kwargs["dag"].dag_id
run_id = kwargs["run_id"]
dfs_acteur = kwargs["ti"].xcom_pull(task_ids="db_data_prepare")
df_acteur_to_delete = dfs_acteur["df_acteur_to_delete"]
df_acteur_to_create = dfs_acteur["df_acteur_to_create"]
df_acteur_to_update = dfs_acteur["df_acteur_to_update"]

log.preview("dag_name", dag_name)
log.preview("run_id", run_id)
log.preview("df_acteur_to_delete", df_acteur_to_delete)
log.preview("df_acteur_to_create", df_acteur_to_create)
log.preview("df_acteur_to_update", df_acteur_to_update)

if (
df_acteur_to_create.empty
and df_acteur_to_delete.empty
and df_acteur_to_update.empty
):
logger.warning("!!! Aucune suggestion à traiter pour cette source !!!")
# set the task to airflow skip status
kwargs["ti"].xcom_push(key="skip", value=True)
return

return db_write_suggestion(
dag_name=dag_name,
run_id=run_id,
df_acteur_to_create=df_acteur_to_create,
df_acteur_to_delete=df_acteur_to_delete,
df_acteur_to_update=df_acteur_to_update,
)
6 changes: 4 additions & 2 deletions dags/sources/tasks/airflow_logic/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

from airflow import DAG
from airflow.models.baseoperator import chain
from shared.tasks.airflow_logic.write_data_task import write_data_task
from sources.tasks.airflow_logic.db_data_prepare_task import db_data_prepare_task
from sources.tasks.airflow_logic.db_read_acteur_task import db_read_acteur_task
from sources.tasks.airflow_logic.db_read_propositions_max_id_task import (
db_read_propositions_max_id_task,
)
from sources.tasks.airflow_logic.db_write_suggestion_task import (
db_write_suggestion_task,
)
from sources.tasks.airflow_logic.propose_acteur_changes_task import (
propose_acteur_changes_task,
)
Expand Down Expand Up @@ -91,5 +93,5 @@ def eo_task_chain(dag: DAG) -> None:
create_tasks,
propose_services_sous_categories_task(dag),
db_data_prepare_task(dag),
write_data_task(dag),
db_write_suggestion_task(dag),
)
17 changes: 6 additions & 11 deletions dags/sources/tasks/airflow_logic/propose_acteur_changes_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,13 @@ def propose_acteur_changes_task(dag: DAG) -> PythonOperator:


def propose_acteur_changes_wrapper(**kwargs):
df = kwargs["ti"].xcom_pull(task_ids="source_data_normalize")
df_acteurs = kwargs["ti"].xcom_pull(task_ids="db_read_acteur")
df_acteur = kwargs["ti"].xcom_pull(task_ids="source_data_normalize")
df_acteur_from_db = kwargs["ti"].xcom_pull(task_ids="db_read_acteur")

params = kwargs["params"]
column_to_drop = params.get("column_to_drop", [])

log.preview("df (source_data_normalize)", df)
log.preview("df_acteurs", df_acteurs)
log.preview("column_to_drop", column_to_drop)
log.preview("df (source_data_normalize)", df_acteur)
log.preview("df_acteurs", df_acteur_from_db)

return propose_acteur_changes(
df=df,
df_acteurs=df_acteurs,
column_to_drop=column_to_drop,
df_acteur=df_acteur,
df_acteur_from_db=df_acteur_from_db,
)
37 changes: 27 additions & 10 deletions dags/sources/tasks/business_logic/db_data_prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,26 @@ def db_data_prepare(
df_pssc: pd.DataFrame,
df_labels: pd.DataFrame,
df_acteur_services: pd.DataFrame,
df_acteurs_from_db: pd.DataFrame,
source_id_by_code: dict,
acteurtype_id_by_code: dict,
):

update_actors_columns = ["identifiant_unique", "statut", "cree_le"]
df_acteur_to_delete["suggestion"] = df_acteur_to_delete[
update_actors_columns
].apply(lambda row: json.dumps(row.to_dict(), default=str), axis=1)
# Created or updated Acteurs
df_acteur_services = (
df_acteur_services
if df_acteur_services is not None
else pd.DataFrame(columns=["acteur_id", "acteurservice_id"])
)
# df_acteur_services = (
# df_acteur_services
# if df_acteur_services is not None
# else pd.DataFrame(columns=["acteur_id", "acteurservice_id"])
# )

if df_acteur.empty:
raise ValueError("df_actors est vide")
raise ValueError("df_acteur est vide")
if df_acteur_services.empty:
raise ValueError("df_acteur_services est vide")
if df_ps.empty:
raise ValueError("df_ps est vide")
if df_pssc.empty:
Expand All @@ -41,6 +45,8 @@ def db_data_prepare(
acteurtype_id_by_code
)

# FIXME: A bouger dans un tache compute_ps qui remplacera propose_services et
# propose_services_sous_categories
aggregated_pdsc = (
df_pssc.groupby("propositionservice_id")
.apply(lambda x: x.to_dict("records") if not x.empty else [])
Expand All @@ -57,11 +63,9 @@ def db_data_prepare(
df_pds_joined["propositionservice_id"] = df_pds_joined[
"propositionservice_id"
].astype(str)

df_pds_joined["pds_sous_categories"] = df_pds_joined["pds_sous_categories"].apply(
lambda x: x if isinstance(x, list) else []
)

df_pds_joined.drop("id", axis=1, inplace=True)

aggregated_pds = (
Expand Down Expand Up @@ -128,7 +132,20 @@ def db_data_prepare(
lambda row: json.dumps(row.to_dict(), default=str), axis=1
)
df_joined.drop_duplicates("identifiant_unique", keep="first", inplace=True)
log.preview("df_joined", df_joined)

df_acteur_to_create = df_joined[
~df_joined["identifiant_unique"].isin(df_acteurs_from_db["identifiant_unique"])
]
df_acteur_to_update = df_joined[
df_joined["identifiant_unique"].isin(df_acteurs_from_db["identifiant_unique"])
]

log.preview("df_acteur_to_create", df_acteur_to_create)
log.preview("df_acteur_to_update", df_acteur_to_update)
log.preview("df_acteur_to_delete", df_acteur_to_delete)

return {"all": {"df": df_joined}, "to_disable": {"df": df_acteur_to_delete}}
return {
"df_acteur_to_create": df_acteur_to_create,
"df_acteur_to_update": df_acteur_to_update,
"df_acteur_to_delete": df_acteur_to_delete,
}
95 changes: 95 additions & 0 deletions dags/sources/tasks/business_logic/db_write_suggestion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import json
import logging
from datetime import datetime

import pandas as pd
from shared.tasks.database_logic.db_manager import PostgresConnectionManager
from sources.config import shared_constants as constants

logger = logging.getLogger(__name__)


def db_write_suggestion(
dag_name: str,
run_id: str,
df_acteur_to_create: pd.DataFrame,
df_acteur_to_delete: pd.DataFrame,
df_acteur_to_update: pd.DataFrame,
):

metadata = {}

run_name = run_id.replace("__", " - ")

insert_suggestion(
df=df_acteur_to_create,
metadata=metadata,
dag_name=f"{dag_name} - AJOUT",
run_name=run_name,
action_type=constants.SUGGESTION_SOURCE_AJOUT,
)
insert_suggestion(
df=df_acteur_to_delete,
metadata=metadata,
dag_name=f"{dag_name} - SUPRESSION",
run_name=run_name,
action_type=constants.SUGGESTION_SOURCE_SUPRESSION,
)
insert_suggestion(
df=df_acteur_to_update,
metadata=metadata,
dag_name=f"{dag_name} - MISES A JOUR",
run_name=run_name,
action_type=constants.SUGGESTION_SOURCE_MISESAJOUR,
)


def insert_suggestion(
df: pd.DataFrame, metadata: dict, dag_name: str, run_name: str, action_type: str
):
if df.empty:
return
engine = PostgresConnectionManager().engine
current_date = datetime.now()

with engine.connect() as conn:
# Insert a new suggestion
result = conn.execute(
"""
INSERT INTO data_suggestioncohorte
(
identifiant_action,
identifiant_execution,
type_action,
statut,
metadata,
cree_le,
modifie_le
)
VALUES (%s, %s, %s, %s, %s, %s, %s)
RETURNING ID;
""",
(
dag_name,
run_name,
action_type, # FIXME: spécialiser les sources
constants.SUGGESTION_AVALIDER,
json.dumps(metadata),
current_date,
current_date,
),
)
suggestion_cohorte_id = result.fetchone()[0]

# Insert dag_run_change
df["type_action"] = action_type
df["suggestion_cohorte_id"] = suggestion_cohorte_id
df["statut"] = constants.SUGGESTION_AVALIDER
df[["suggestion", "suggestion_cohorte_id", "type_action", "statut"]].to_sql(
"data_suggestionunitaire",
engine,
if_exists="append",
index=False,
method="multi",
chunksize=1000,
)
Loading

0 comments on commit d0ab227

Please sign in to comment.