Skip to content

Commit

Permalink
handle missing records due to last_sync_date
Browse files Browse the repository at this point in the history
  • Loading branch information
david-loe committed Feb 25, 2025
1 parent 6034f8f commit dc2b4c4
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 21 deletions.
26 changes: 24 additions & 2 deletions sync/bidirectional.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

class BidirectionalSyncTask(SyncTaskBase[BidirectionalTaskConfig]):
def sync(self, last_sync_date_utc: datetime | None = None):
frappe_dict = self.get_frappe_key_record_dict(last_sync_date_utc)
db_dict = self.get_db_key_record_dict(last_sync_date_utc)
frappe_dict = self.get_frappe_key_record_dict(self.get_frappe_records(last_sync_date_utc))
db_dict = self.get_db_key_record_dict(self.get_db_records(last_sync_date_utc))

# check for same types in key
if len(frappe_dict) > 0 and len(db_dict) > 0:
Expand All @@ -18,8 +18,30 @@ def sync(self, last_sync_date_utc: datetime | None = None):
if not self.compare_key_tuple_structure(first_frappe_key, first_db_key):
raise ValueError("Die Schlüssel-Tupel haben einen unterschiedlichen Typaufbau!")

# Falls nur die letzten Änderungen synchronisiert werden, muss geprüft werden, ob die Gegenseite nicht doch Einträge enthält
if last_sync_date_utc:
missing_db_keys = frappe_dict.keys() - db_dict.keys()
missing_db_ids = [
frappe_dict[key].get(self.config.frappe.fk_id_field)
for key in missing_db_keys
if frappe_dict[key].get(self.config.frappe.fk_id_field)
]
missing_frappe_keys = db_dict.keys() - frappe_dict.keys()
missing_frappe_ids = [
db_dict[key].get(self.config.db.fk_id_field)
for key in missing_frappe_keys
if db_dict[key].get(self.config.db.fk_id_field)
]
if missing_frappe_ids:
additional_frappe_records = self.get_frappe_records_by_ids(missing_frappe_ids)
frappe_dict.update(self.get_frappe_key_record_dict(additional_frappe_records))
if missing_db_ids:
additional_db_records = self.get_db_records_by_ids(missing_db_ids)
db_dict.update(self.get_db_key_record_dict(additional_db_records))

# Alle vorhandenen Schlüssel zusammenführen
all_keys = set(frappe_dict.keys()).union(db_dict.keys())

for key in all_keys:
frappe_rec = frappe_dict.get(key)
db_rec = db_dict.get(key)
Expand Down
62 changes: 43 additions & 19 deletions sync/task.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from abc import ABC, abstractmethod
from datetime import datetime
import json
import logging
from typing import TypeVar, Generic
from api.database import DatabaseConnection, format_query, get_time_zone
Expand Down Expand Up @@ -57,7 +58,7 @@ def _execute_select_query(self, sql: str, params: list = []):
self.db_conn.rollback()
finally:
cursor.close()

logging.debug(f"Insgesamt {len(db_records)} Datensätze gefunden.")
return db_records

def map_frappe_to_db(self, record: dict, warns=True) -> dict:
Expand Down Expand Up @@ -117,6 +118,23 @@ def split_frappe_in_data_and_keys(self, frappe_rec: dict):
data[k] = v
return data, keys

def _cast_frappe_record(self, record: dict):
for field in self.config.frappe.datetime_fields:
if field in record and isinstance(record[field], str):
try:
record[field] = datetime.fromisoformat(record[field])
except ValueError:
# Falls der String kein gültiges ISO-Datum ist, bleibt der Wert unverändert.
pass
for field in self.config.frappe.int_fields:
if field in record and isinstance(record[field], str):
try:
record[field] = int(record[field])
except ValueError:
# Falls der String kein gültiges ISO-Datum ist, bleibt der Wert unverändert.
pass
return record

def get_frappe_records(self, last_sync_date_utc: datetime | None = None) -> list:
"""
Frappe-Datensätze abrufen
Expand All @@ -128,24 +146,18 @@ def get_frappe_records(self, last_sync_date_utc: datetime | None = None) -> list
frappe_response = self.frappe_api.get_all_data(self.config.doc_type, filters)
records = frappe_response.get("data", [])
for rec in records:
for field in self.config.frappe.datetime_fields:
if field in rec and isinstance(rec[field], str):
try:
rec[field] = datetime.fromisoformat(rec[field])
except ValueError:
# Falls der String kein gültiges ISO-Datum ist, bleibt der Wert unverändert.
pass
for field in self.config.frappe.int_fields:
if field in rec and isinstance(rec[field], str):
try:
rec[field] = int(rec[field])
except ValueError:
# Falls der String kein gültiges ISO-Datum ist, bleibt der Wert unverändert.
pass
self._cast_frappe_record(rec)
return records

def get_frappe_records_by_ids(self, ids: list[str | int]):
filters = [f'["name", "in", {json.dumps(ids)}]']
frappe_response = self.frappe_api.get_all_data(self.config.doc_type, filters)
records = frappe_response.get("data", [])
for rec in records:
self._cast_frappe_record(rec)
return records

def get_frappe_key_record_dict(self, last_sync_date_utc: datetime | None = None):
frappe_records = self.get_frappe_records(last_sync_date_utc)
def get_frappe_key_record_dict(self, frappe_records: list[dict[str, any]]):
frappe_dict: dict[tuple, dict[str, any]] = {}
for rec in frappe_records:
key = self.extract_key_from_frappe(rec)
Expand Down Expand Up @@ -175,8 +187,20 @@ def get_db_records(self, last_sync_date_utc: datetime | None = None):

return self._execute_select_query(select_sql, params)

def get_db_key_record_dict(self, last_sync_date_utc: datetime | None = None):
db_records = self.get_db_records(last_sync_date_utc)
def get_db_records_by_ids(self, ids: list[str | int]):
select_sql = f"SELECT * FROM {self.config.table_name}"
id_selector = f"{self.config.db.id_field} IN ({', '.join(['?']*len(ids))})"

if self.config.query:
select_sql = self.config.query

if "WHERE" in select_sql:
select_sql = select_sql + f" AND {id_selector}"
else:
select_sql = select_sql + f" WHERE {id_selector}"
return self._execute_select_query(select_sql, ids)

def get_db_key_record_dict(self, db_records: list[dict[str, any]]):
db_dict: dict[tuple, dict[str, any]] = {}
for rec in db_records:
key = self.extract_key_from_db(rec)
Expand Down

0 comments on commit dc2b4c4

Please sign in to comment.