Skip to content

Commit

Permalink
isolation du dags d'ingestion des suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
kolok committed Jan 16, 2025
1 parent b657043 commit 65f2240
Show file tree
Hide file tree
Showing 10 changed files with 459 additions and 69 deletions.
120 changes: 52 additions & 68 deletions dags/ingest_validated_dataset_to_db.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# FIXME: intégrer ce dag dans l'architecture cible
"""
DEPRECATED : utiliser le dag apply_suggestions
"""

from datetime import timedelta

import pandas as pd
from airflow.models import DAG
from airflow.operators.python import PythonOperator, ShortCircuitOperator
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.dates import days_ago
Expand All @@ -21,104 +23,75 @@
}

dag = DAG(
dag_id="validate_and_process_suggestions",
dag_display_name="Traitement des cohortes de données validées",
dag_id="validate_and_process_dagruns",
dag_display_name="DEPRECATED : Traitement des cohortes de données validées",
default_args=default_args,
description="traiter les suggestions à traiter",
description="""
DEPRECATED : Check for VALIDATE in qfdmo_dagrun and process qfdmo_dagrunchange
util uniquement pour les cohortes de siretisations
""",
schedule="*/5 * * * *",
catchup=False,
max_active_runs=1,
)


def _get_first_suggetsioncohorte_to_insert():
def _get_first_dagrun_to_insert():
hook = PostgresHook(postgres_conn_id="qfdmo_django_db")
# get first row from table qfdmo_dagrun with status TO_INSERT
row = hook.get_first(
f"""
SELECT * FROM data_suggestioncohorte
WHERE statut = '{constants.SUGGESTION_ATRAITER}'
LIMIT 1
"""
f"SELECT * FROM qfdmo_dagrun WHERE status = '{constants.DAGRUN_TOINSERT}'"
" LIMIT 1"
)
return row


def check_suggestion_to_process(**kwargs):
row = _get_first_suggetsioncohorte_to_insert()
return bool(row)
def check_for_validation(**kwargs):
# get first row from table qfdmo_dagrun with status TO_INSERT
row = _get_first_dagrun_to_insert()

# Skip if row is None
if row is None:
return "skip_processing"
return "fetch_and_parse_data"


def fetch_and_parse_data(**context):
row = _get_first_suggetsioncohorte_to_insert()
suggestion_cohorte_id = row[0]
row = _get_first_dagrun_to_insert()
dag_run_id = row[0]

engine = PostgresConnectionManager().engine

df_sql = pd.read_sql_query(
f"""
SELECT * FROM data_suggestionunitaire
WHERE suggestion_cohorte_id = '{suggestion_cohorte_id}'
""",
f"SELECT * FROM qfdmo_dagrunchange WHERE dag_run_id = '{dag_run_id}'",
engine,
)

df_acteur_to_create = df_sql[
df_sql["type_action"] == constants.SUGGESTION_SOURCE_AJOUT
]
df_acteur_to_update = df_sql[
df_sql["type_action"] == constants.SUGGESTION_SOURCE_AJOUT
]
df_acteur_to_delete = df_sql[
df_sql["type_action"] == constants.SUGGESTION_SOURCE_SUPRESSION
]
df_acteur_to_enrich = df_sql[
df_sql["type_action"] == constants.SUGGESTION_ENRICHISSEMENT
]

df_update_actor = df_sql[df_sql["type_action"] == "UPDATE_ACTOR"]

if not df_acteur_to_create.empty:
normalized_dfs = df_acteur_to_create["suggestion"].apply(pd.json_normalize)
df_acteur = pd.concat(normalized_dfs.tolist(), ignore_index=True)
return dag_ingest_validated_utils.handle_create_event(
df_acteur, suggestion_cohorte_id, engine
)
if not df_acteur_to_update.empty:
normalized_dfs = df_acteur_to_update["suggestion"].apply(pd.json_normalize)
df_acteur = pd.concat(normalized_dfs.tolist(), ignore_index=True)
return dag_ingest_validated_utils.handle_create_event(
df_acteur, suggestion_cohorte_id, engine
)
if not df_acteur_to_delete.empty:
normalized_dfs = df_acteur_to_delete["suggestion"].apply(pd.json_normalize)
df_actors_update_actor = pd.concat(normalized_dfs.tolist(), ignore_index=True)
status_repeated = (
df_acteur_to_delete["statut"]
.repeat(df_acteur_to_delete["suggestion"].apply(len))
.reset_index(drop=True)
)
df_actors_update_actor["status"] = status_repeated
df_create = df_sql[df_sql["change_type"] == "CREATE"]
df_update_actor = df_sql[df_sql["change_type"] == "UPDATE_ACTOR"]

return dag_ingest_validated_utils.handle_update_actor_event(
df_actors_update_actor, suggestion_cohorte_id
if not df_create.empty:
normalized_dfs = df_create["row_updates"].apply(pd.json_normalize)
df_actors_create = pd.concat(normalized_dfs.tolist(), ignore_index=True)
return dag_ingest_validated_utils.handle_create_event(
df_actors_create, dag_run_id, engine
)
if not df_update_actor.empty:

if not df_acteur_to_enrich.empty:

normalized_dfs = df_update_actor["suggestion"].apply(pd.json_normalize)
normalized_dfs = df_update_actor["row_updates"].apply(pd.json_normalize)
df_actors_update_actor = pd.concat(normalized_dfs.tolist(), ignore_index=True)
status_repeated = (
df_update_actor["status"]
.repeat(df_update_actor["suggestion"].apply(len))
.repeat(df_update_actor["row_updates"].apply(len))
.reset_index(drop=True)
)
df_actors_update_actor["status"] = status_repeated

return dag_ingest_validated_utils.handle_update_actor_event(
df_actors_update_actor, suggestion_cohorte_id
df_actors_update_actor, dag_run_id
)
return {
"dag_run_id": suggestion_cohorte_id,
"dag_run_id": dag_run_id,
}


Expand Down Expand Up @@ -159,9 +132,19 @@ def write_data_to_postgres(**kwargs):
)


check_suggestion_to_process_task = ShortCircuitOperator(
task_id="check_suggestion_to_process",
python_callable=check_suggestion_to_process,
def skip_processing(**kwargs):
print("No records to validate. DAG run completes successfully.")


skip_processing_task = PythonOperator(
task_id="skip_processing",
python_callable=skip_processing,
dag=dag,
)

branch_task = BranchPythonOperator(
task_id="branch_processing",
python_callable=check_for_validation,
dag=dag,
)

Expand All @@ -177,8 +160,9 @@ def write_data_to_postgres(**kwargs):
dag=dag,
)

branch_task >> skip_processing_task
(
check_suggestion_to_process_task
branch_task
>> fetch_parse_task
>> write_to_postgres_task
>> trigger_create_final_actors_dag
Expand Down
15 changes: 15 additions & 0 deletions dags/suggestions/airflow_logic/db_normalize_suggestion_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from suggestions.business_logic.db_normalize_suggestion import db_normalize_suggestion


def db_normalize_suggestion_task(dag: DAG):
return PythonOperator(
task_id="db_normalize_suggestion",
python_callable=db_normalize_suggestion_wrapper,
dag=dag,
)


def db_normalize_suggestion_wrapper(**kwargs):
return db_normalize_suggestion()
13 changes: 13 additions & 0 deletions dags/suggestions/airflow_logic/db_read_suggestiontoprocess_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from airflow.models import DAG
from airflow.operators.python import ShortCircuitOperator
from suggestions.business_logic.db_read_suggestiontoprocess import (
db_read_suggestiontoprocess,
)


def db_read_suggestiontoprocess_task(dag: DAG):
return ShortCircuitOperator(
task_id="check_suggestion_to_process",
python_callable=db_read_suggestiontoprocess,
dag=dag,
)
23 changes: 23 additions & 0 deletions dags/suggestions/airflow_logic/db_write_validsuggestions_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from suggestions.business_logic.db_write_validsuggestions import (
db_write_validsuggestions,
)
from utils import logging_utils as log


def db_write_validsuggestions_task(dag: DAG) -> PythonOperator:
return PythonOperator(
task_id="db_write_validsuggestions",
python_callable=db_write_validsuggestions_wrapper,
dag=dag,
)


def db_write_validsuggestions_wrapper(**kwargs):
data_from_db = kwargs["ti"].xcom_pull(task_ids="db_normalize_suggestion")

log.preview("data_from_db acteur", data_from_db["actors"])
log.preview("data_from_db change_type", data_from_db["change_type"])

return db_write_validsuggestions(data_from_db=data_from_db)
10 changes: 10 additions & 0 deletions dags/suggestions/airflow_logic/launch_compute_carte_acteur_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from airflow.models import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator


def launch_compute_carte_acteur_task(dag: DAG) -> TriggerDagRunOperator:
return TriggerDagRunOperator(
task_id="launch_compute_carte_acteur",
trigger_dag_id="compute_carte_acteur",
dag=dag,
)
113 changes: 113 additions & 0 deletions dags/suggestions/business_logic/db_normalize_suggestion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import pandas as pd
from shared.tasks.database_logic.db_manager import PostgresConnectionManager
from sources.config import shared_constants as constants
from suggestions.business_logic.db_read_suggestiontoprocess import (
get_first_suggetsioncohorte_to_insert,
)
from utils import logging_utils as log


def db_normalize_suggestion():
row = get_first_suggetsioncohorte_to_insert()
suggestion_cohorte_id = row[0]

engine = PostgresConnectionManager().engine

df_sql = pd.read_sql_query(
f"""
SELECT * FROM data_suggestionunitaire
WHERE suggestion_cohorte_id = '{suggestion_cohorte_id}'
""",
engine,
)

df_acteur_to_create = df_sql[
df_sql["type_action"] == constants.SUGGESTION_SOURCE_AJOUT
]
df_acteur_to_update = df_sql[
df_sql["type_action"] == constants.SUGGESTION_SOURCE_AJOUT
]
df_acteur_to_delete = df_sql[
df_sql["type_action"] == constants.SUGGESTION_SOURCE_SUPRESSION
]
if not df_acteur_to_create.empty:
normalized_dfs = df_acteur_to_create["suggestion"].apply(pd.json_normalize)
df_acteur = pd.concat(normalized_dfs.tolist(), ignore_index=True)
return normalize_acteur_update_for_db(df_acteur, suggestion_cohorte_id, engine)
if not df_acteur_to_update.empty:
normalized_dfs = df_acteur_to_update["suggestion"].apply(pd.json_normalize)
df_acteur = pd.concat(normalized_dfs.tolist(), ignore_index=True)
return normalize_acteur_update_for_db(df_acteur, suggestion_cohorte_id, engine)
if not df_acteur_to_delete.empty:
normalized_dfs = df_acteur_to_delete["suggestion"].apply(pd.json_normalize)
df_acteur = pd.concat(normalized_dfs.tolist(), ignore_index=True)
log.preview("df_acteur_to_delete", df_acteur)
return {
"actors": df_acteur,
"dag_run_id": suggestion_cohorte_id,
"change_type": constants.SUGGESTION_SOURCE_SUPRESSION,
}

raise ValueError("No suggestion found")


def normalize_acteur_update_for_db(df_actors, dag_run_id, engine):
df_labels = process_many2many_df(df_actors, "labels")
df_acteur_services = process_many2many_df(
df_actors, "acteur_services", df_columns=["acteur_id", "acteurservice_id"]
)

max_id_pds = pd.read_sql_query(
"SELECT max(id) FROM qfdmo_propositionservice", engine
)["max"][0]
normalized_pds_dfs = df_actors["proposition_services"].apply(pd.json_normalize)
df_pds = pd.concat(normalized_pds_dfs.tolist(), ignore_index=True)
ids_range = range(max_id_pds + 1, max_id_pds + 1 + len(df_pds))

df_pds["id"] = ids_range
df_pds["pds_sous_categories"] = df_pds.apply(
lambda row: [
{**d, "propositionservice_id": row["id"]}
for d in row["pds_sous_categories"]
],
axis=1,
)

normalized_pdssc_dfs = df_pds["pds_sous_categories"].apply(pd.json_normalize)
df_pdssc = pd.concat(normalized_pdssc_dfs.tolist(), ignore_index=True)

return {
"actors": df_actors,
"pds": df_pds[["id", "action_id", "acteur_id"]],
"pds_sous_categories": df_pdssc[
["propositionservice_id", "souscategorieobjet_id"]
],
"dag_run_id": dag_run_id,
"labels": df_labels[["acteur_id", "labelqualite_id"]],
"acteur_services": df_acteur_services[["acteur_id", "acteurservice_id"]],
"change_type": constants.SUGGESTION_SOURCE,
}


def process_many2many_df(df, column_name, df_columns=["acteur_id", "labelqualite_id"]):
try:
# Attempt to process the 'labels' column if it exists and is not empty
normalized_df = df[column_name].dropna().apply(pd.json_normalize)
if normalized_df.empty:
return pd.DataFrame(
columns=df_columns
) # Return empty DataFrame if no data to process
else:
return pd.concat(normalized_df.tolist(), ignore_index=True)
except KeyError:
# Handle the case where the specified column does not exist
return pd.DataFrame(columns=df_columns)


def normalize_acteur_delete_for_db(df_actors, dag_run_id):

return {
"actors": df_actors,
"dag_run_id": dag_run_id,
"change_type": constants.SUGGESTION_SOURCE_SUPRESSION,
}
18 changes: 18 additions & 0 deletions dags/suggestions/business_logic/db_read_suggestiontoprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from airflow.providers.postgres.hooks.postgres import PostgresHook
from sources.config import shared_constants as constants


def get_first_suggetsioncohorte_to_insert():
hook = PostgresHook(postgres_conn_id="qfdmo_django_db")
row = hook.get_first(
f"""
SELECT * FROM data_suggestioncohorte
WHERE statut = '{constants.SUGGESTION_ATRAITER}'
LIMIT 1
"""
)
return row


def db_read_suggestiontoprocess(**kwargs):
return bool(get_first_suggetsioncohorte_to_insert())
Loading

0 comments on commit 65f2240

Please sign in to comment.