From dbb0cde78b3b0bdcd444405cc4ac68ccf8835076 Mon Sep 17 00:00:00 2001 From: systemime Date: Tue, 2 Nov 2021 13:50:31 +0800 Subject: [PATCH] perf: respect django database routing --- django_celery_beat/schedulers.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/django_celery_beat/schedulers.py b/django_celery_beat/schedulers.py index e2606866..ae63f252 100644 --- a/django_celery_beat/schedulers.py +++ b/django_celery_beat/schedulers.py @@ -15,7 +15,7 @@ from kombu.utils.json import dumps, loads from django.conf import settings -from django.db import transaction, close_old_connections +from django.db import transaction, close_old_connections, router, DEFAULT_DB_ALIAS from django.db.utils import DatabaseError, InterfaceError from django.core.exceptions import ObjectDoesNotExist @@ -258,7 +258,7 @@ def schedule_changed(self): # other transactions until the current transaction is # committed (Issue #41). try: - transaction.commit() + transaction.commit(using=self.target_db) except transaction.TransactionManagementError: pass # not in transaction management. @@ -287,7 +287,17 @@ def reserve(self, entry): self._dirty.add(new_entry.name) return new_entry - def sync(self): + @property + def target_db(self): + """Determine if there is a django route""" + if not settings.DATABASE_ROUTERS: + return DEFAULT_DB_ALIAS + # If the project does not actually implement this method, DEFAULT_DB_ALIAS will be automatically returned. + # The exception will be located to the django routing section + db = router.db_for_write(self.Model) + return db + + def _sync(self): if logger.isEnabledFor(logging.DEBUG): debug('Writing entries...') _tried = set() @@ -313,6 +323,10 @@ def sync(self): # retry later, only for the failed ones self._dirty |= _failed + def sync(self): + with transaction.atomic(using=self.target_db): + self._sync() + def update_from_dict(self, mapping): s = {} for name, entry_fields in mapping.items():