Skip to content

Commit

Permalink
renomme fonction db_write_type_action_suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
kolok committed Jan 19, 2025
1 parent 5017db0 commit f839697
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,23 @@

from airflow import DAG
from airflow.operators.python import PythonOperator
from sources.tasks.business_logic.db_write_suggestion import db_write_suggestion
from sources.tasks.business_logic.db_write_type_action_suggestions import (
db_write_type_action_suggestions,
)
from utils import logging_utils as log

logger = logging.getLogger(__name__)


def db_write_suggestion_task(dag: DAG) -> PythonOperator:
def db_write_type_action_suggestions_task(dag: DAG) -> PythonOperator:
return PythonOperator(
task_id="db_write_suggestion",
python_callable=db_write_suggestion_wrapper,
python_callable=db_write_type_action_suggestions_wrapper,
dag=dag,
)


def db_write_suggestion_wrapper(**kwargs) -> None:
def db_write_type_action_suggestions_wrapper(**kwargs) -> None:
dag_name = kwargs["dag"].dag_display_name or kwargs["dag"].dag_id
run_id = kwargs["run_id"]
dfs_acteur = kwargs["ti"].xcom_pull(task_ids="db_data_prepare")
Expand All @@ -40,7 +42,7 @@ def db_write_suggestion_wrapper(**kwargs) -> None:
kwargs["ti"].xcom_push(key="skip", value=True)
return

return db_write_suggestion(
return db_write_type_action_suggestions(
dag_name=dag_name,
run_id=run_id,
df_acteur_to_create=df_acteur_to_create,
Expand Down
4 changes: 2 additions & 2 deletions dags/sources/tasks/airflow_logic/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
db_read_propositions_max_id_task,
)
from sources.tasks.airflow_logic.db_write_suggestion_task import (
db_write_suggestion_task,
db_write_type_action_suggestions_task,
)
from sources.tasks.airflow_logic.propose_acteur_changes_task import (
propose_acteur_changes_task,
Expand Down Expand Up @@ -93,5 +93,5 @@ def eo_task_chain(dag: DAG) -> None:
create_tasks,
propose_services_sous_categories_task(dag),
db_data_prepare_task(dag),
db_write_suggestion_task(dag),
db_write_type_action_suggestions_task(dag),
)
6 changes: 0 additions & 6 deletions dags/sources/tasks/business_logic/db_data_prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@ def db_data_prepare(
df_acteur_to_delete["suggestion"] = df_acteur_to_delete[
update_actors_columns
].apply(lambda row: json.dumps(row.to_dict(), default=str), axis=1)
# Created or updated Acteurs
# df_acteur_services = (
# df_acteur_services
# if df_acteur_services is not None
# else pd.DataFrame(columns=["acteur_id", "acteurservice_id"])
# )

if df_acteur.empty:
raise ValueError("df_acteur est vide")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
logger = logging.getLogger(__name__)


def db_write_suggestion(
def db_write_type_action_suggestions(
dag_name: str,
run_id: str,
df_acteur_to_create: pd.DataFrame,
Expand Down

0 comments on commit f839697

Please sign in to comment.