diff --git a/dags/sources/tasks/airflow_logic/db_write_suggestion_task.py b/dags/sources/tasks/airflow_logic/db_write_type_action_suggestions_task.py similarity index 79% rename from dags/sources/tasks/airflow_logic/db_write_suggestion_task.py rename to dags/sources/tasks/airflow_logic/db_write_type_action_suggestions_task.py index f372d0e0b..cdc1e5e25 100644 --- a/dags/sources/tasks/airflow_logic/db_write_suggestion_task.py +++ b/dags/sources/tasks/airflow_logic/db_write_type_action_suggestions_task.py @@ -2,21 +2,23 @@ from airflow import DAG from airflow.operators.python import PythonOperator -from sources.tasks.business_logic.db_write_suggestion import db_write_suggestion +from sources.tasks.business_logic.db_write_type_action_suggestions import ( + db_write_type_action_suggestions, +) from utils import logging_utils as log logger = logging.getLogger(__name__) -def db_write_suggestion_task(dag: DAG) -> PythonOperator: +def db_write_type_action_suggestions_task(dag: DAG) -> PythonOperator: return PythonOperator( task_id="db_write_suggestion", - python_callable=db_write_suggestion_wrapper, + python_callable=db_write_type_action_suggestions_wrapper, dag=dag, ) -def db_write_suggestion_wrapper(**kwargs) -> None: +def db_write_type_action_suggestions_wrapper(**kwargs) -> None: dag_name = kwargs["dag"].dag_display_name or kwargs["dag"].dag_id run_id = kwargs["run_id"] dfs_acteur = kwargs["ti"].xcom_pull(task_ids="db_data_prepare") @@ -40,7 +42,7 @@ def db_write_suggestion_wrapper(**kwargs) -> None: kwargs["ti"].xcom_push(key="skip", value=True) return - return db_write_suggestion( + return db_write_type_action_suggestions( dag_name=dag_name, run_id=run_id, df_acteur_to_create=df_acteur_to_create, diff --git a/dags/sources/tasks/airflow_logic/operators.py b/dags/sources/tasks/airflow_logic/operators.py index a17e0052e..ad4cddfb6 100755 --- a/dags/sources/tasks/airflow_logic/operators.py +++ b/dags/sources/tasks/airflow_logic/operators.py @@ -8,7 +8,7 @@ db_read_propositions_max_id_task, ) from sources.tasks.airflow_logic.db_write_suggestion_task import ( - db_write_suggestion_task, + db_write_type_action_suggestions_task, ) from sources.tasks.airflow_logic.propose_acteur_changes_task import ( propose_acteur_changes_task, @@ -93,5 +93,5 @@ def eo_task_chain(dag: DAG) -> None: create_tasks, propose_services_sous_categories_task(dag), db_data_prepare_task(dag), - db_write_suggestion_task(dag), + db_write_type_action_suggestions_task(dag), ) diff --git a/dags/sources/tasks/business_logic/db_data_prepare.py b/dags/sources/tasks/business_logic/db_data_prepare.py index 98e804a49..c2e5e216e 100644 --- a/dags/sources/tasks/business_logic/db_data_prepare.py +++ b/dags/sources/tasks/business_logic/db_data_prepare.py @@ -23,12 +23,6 @@ def db_data_prepare( df_acteur_to_delete["suggestion"] = df_acteur_to_delete[ update_actors_columns ].apply(lambda row: json.dumps(row.to_dict(), default=str), axis=1) - # Created or updated Acteurs - # df_acteur_services = ( - # df_acteur_services - # if df_acteur_services is not None - # else pd.DataFrame(columns=["acteur_id", "acteurservice_id"]) - # ) if df_acteur.empty: raise ValueError("df_acteur est vide") diff --git a/dags/sources/tasks/business_logic/db_write_suggestion.py b/dags/sources/tasks/business_logic/db_write_type_action_suggestions.py similarity index 98% rename from dags/sources/tasks/business_logic/db_write_suggestion.py rename to dags/sources/tasks/business_logic/db_write_type_action_suggestions.py index 2e921599d..24d48931b 100644 --- a/dags/sources/tasks/business_logic/db_write_suggestion.py +++ b/dags/sources/tasks/business_logic/db_write_type_action_suggestions.py @@ -9,7 +9,7 @@ logger = logging.getLogger(__name__) -def db_write_suggestion( +def db_write_type_action_suggestions( dag_name: str, run_id: str, df_acteur_to_create: pd.DataFrame,