Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
kolok committed Jan 20, 2025
1 parent b8b16bc commit cd0f0f2
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ def db_write_validsuggestions_task(dag: DAG) -> PythonOperator:


def db_write_validsuggestions_wrapper(**kwargs):
data_from_db = kwargs["ti"].xcom_pull(task_ids="db_normalize_suggestion")
data_acteurs_normalized = kwargs["ti"].xcom_pull(task_ids="db_normalize_suggestion")

log.preview("data_from_db acteur", data_from_db["actors"])
log.preview("data_from_db change_type", data_from_db["change_type"])
log.preview("data_acteurs_normalized acteur", data_acteurs_normalized["actors"])
log.preview(
"data_acteurs_normalized change_type", data_acteurs_normalized["change_type"]
)

return db_write_validsuggestions(data_from_db=data_from_db)
return db_write_validsuggestions(data_acteurs_normalized=data_acteurs_normalized)
20 changes: 10 additions & 10 deletions dags/suggestions/tasks/business_logic/db_write_validsuggestions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,23 @@
logger = logging.getLogger(__name__)


def db_write_validsuggestions(data_from_db: dict):
def db_write_validsuggestions(data_acteurs_normalized: dict):
# If data_set is empty, nothing to do
dag_run_id = data_from_db["dag_run_id"]
dag_run_id = data_acteurs_normalized["dag_run_id"]
engine = PostgresConnectionManager().engine
if "actors" not in data_from_db:
if "actors" not in data_acteurs_normalized:
with engine.begin() as connection:
update_suggestion_status(
connection, dag_run_id, constants.SUGGESTION_ENCOURS
)
return
df_actors = data_from_db["actors"]
df_labels = data_from_db.get("labels")
df_acteur_services = data_from_db.get("acteur_services")
df_pds = data_from_db.get("pds")
df_pdssc = data_from_db.get("pds_sous_categories")
dag_run_id = data_from_db["dag_run_id"]
change_type = data_from_db.get("change_type", "CREATE")
df_actors = data_acteurs_normalized["actors"]
df_labels = data_acteurs_normalized.get("labels")
df_acteur_services = data_acteurs_normalized.get("acteur_services")
df_pds = data_acteurs_normalized.get("pds")
df_pdssc = data_acteurs_normalized.get("pds_sous_categories")
dag_run_id = data_acteurs_normalized["dag_run_id"]
change_type = data_acteurs_normalized.get("change_type", "CREATE")

with engine.begin() as connection:
if change_type in [
Expand Down

0 comments on commit cd0f0f2

Please sign in to comment.