diff --git a/{{cookiecutter.repostory_name}}/README.md b/{{cookiecutter.repostory_name}}/README.md index e877781..109f183 100644 --- a/{{cookiecutter.repostory_name}}/README.md +++ b/{{cookiecutter.repostory_name}}/README.md @@ -144,6 +144,30 @@ If an SSO provider supports the OIDC protocol, it can be set up as a generic OID 2. Allauth "social users" are just an extension to regular django users. When someone logs in via allauth, a django user model will also be created for them. 3. A "profile" page is available at `/accounts/` +{% endif %} + +{% if cookiecutter.use_celery == 'y' %} +# Background tasks with Celery + +## Dead letter queue + +
+There is a special queue named `dead_letter` that is used to store tasks +that failed for some reason. + +A task should be annotated with `on_failure=send_to_dead_letter_queue`. +Once the reason of tasks failure is fixed, the task can be re-processed +by moving tasks from dead letter queue to the main one ("celery"): + + manage.py move_tasks "dead_letter" "celery" + +If tasks fails again, it will be put back to dead letter queue. + +To flush add tasks in specific queue, use + + manage.py flush_tasks "dead_letter" +
+ {% endif %} {% if cookiecutter.monitoring == 'y' %} # Monitoring diff --git a/{{cookiecutter.repostory_name}}/app/src/{{cookiecutter.django_project_name}}/celery.py b/{{cookiecutter.repostory_name}}/app/src/{{cookiecutter.django_project_name}}/celery.py index 3ea6334..62809c7 100644 --- a/{{cookiecutter.repostory_name}}/app/src/{{cookiecutter.django_project_name}}/celery.py +++ b/{{cookiecutter.repostory_name}}/app/src/{{cookiecutter.django_project_name}}/celery.py @@ -8,6 +8,7 @@ {% endif -%} from django.conf import settings from django_structlog.celery.steps import DjangoStructLogInitStep +from more_itertools import chunked {%- if cookiecutter.monitoring == "y" %} from prometheus_client import multiprocess {% endif %} @@ -16,7 +17,6 @@ os.environ.setdefault("DJANGO_SETTINGS_MODULE", "{{cookiecutter.django_project_name}}.settings") app = Celery("{{cookiecutter.django_project_name}}") - app.config_from_object("django.conf:settings", namespace="CELERY") app.steps["worker"].add(DjangoStructLogInitStep) app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) @@ -33,10 +33,34 @@ def receiver_setup_logging(loglevel, logfile, format, colorize, **kwargs): # pr configure_structlog() -def route_task(name, args, kwargs, options, task=None, **kw): - return {"queue": "celery"} -{% if cookiecutter.monitoring == "y" %} +def get_tasks_in_queue(queue_name: str) -> list[bytes]: + with app.pool.acquire(block=True) as conn: + return conn.default_channel.client.lrange(queue_name, 0, -1) + + +def get_num_tasks_in_queue(queue_name: str) -> int: + with app.pool.acquire(block=True) as conn: + return conn.default_channel.client.llen(queue_name) + +def move_tasks(source_queue: str, destination_queue: str, chunk_size: int = 100) -> None: + with app.pool.acquire(block=True) as conn: + client = conn.default_channel.client + tasks = client.lrange(source_queue, 0, -1) + + for chunk in chunked(tasks, chunk_size): + with client.pipeline() as pipe: + for task in chunk: + client.rpush(destination_queue, task) + client.lrem(source_queue, 1, task) + pipe.execute() + + +def flush_tasks(queue_name: str) -> None: + with app.pool.acquire(block=True) as conn: + conn.default_channel.client.delete(queue_name) + +{% if cookiecutter.monitoring == "y" %} @worker_process_shutdown.connect def child_exit(pid, **kw): multiprocess.mark_process_dead(pid) diff --git a/{{cookiecutter.repostory_name}}/app/src/{{cookiecutter.django_project_name}}/settings.py b/{{cookiecutter.repostory_name}}/app/src/{{cookiecutter.django_project_name}}/settings.py index 5f30e71..2130be5 100644 --- a/{{cookiecutter.repostory_name}}/app/src/{{cookiecutter.django_project_name}}/settings.py +++ b/{{cookiecutter.repostory_name}}/app/src/{{cookiecutter.django_project_name}}/settings.py @@ -10,14 +10,16 @@ from functools import wraps import environ +import structlog {% if cookiecutter.use_celery == "y" -%} # from celery.schedules import crontab +from kombu import Queue + {% endif %} {%- if cookiecutter.use_allauth == "y" -%} from django.urls import reverse_lazy {% endif -%} -import structlog root = environ.Path(__file__) - 2 @@ -337,7 +339,12 @@ def wrapped(*args, **kwargs): # 'options': {"time_limit": 300}, # }, } -CELERY_TASK_ROUTES = ["{{cookiecutter.django_project_name}}.celery.route_task"] +CELERY_TASK_CREATE_MISSING_QUEUES = False +CELERY_TASK_QUEUES = (Queue("celery"), Queue("worker"), Queue("dead_letter")) +CELERY_TASK_DEFAULT_EXCHANGE = "celery" +CELERY_TASK_DEFAULT_ROUTING_KEY = "celery" +CELERY_TASK_ANNOTATIONS = {"*": {"acks_late": True, "reject_on_worker_lost": True}} +CELERY_TASK_ROUTES = {"*": {"queue": "celery"}} CELERY_TASK_TIME_LIMIT = int(timedelta(minutes=5).total_seconds()) CELERY_TASK_ALWAYS_EAGER = env.bool("CELERY_TASK_ALWAYS_EAGER", default=False) CELERY_ACCEPT_CONTENT = ["json"] diff --git a/{{cookiecutter.repostory_name}}/app/src/{{cookiecutter.django_project_name}}/{{cookiecutter.django_default_app_name}}/management/commands/flush_queue.py b/{{cookiecutter.repostory_name}}/app/src/{{cookiecutter.django_project_name}}/{{cookiecutter.django_default_app_name}}/management/commands/flush_queue.py new file mode 100644 index 0000000..ea5f4db --- /dev/null +++ b/{{cookiecutter.repostory_name}}/app/src/{{cookiecutter.django_project_name}}/{{cookiecutter.django_default_app_name}}/management/commands/flush_queue.py @@ -0,0 +1,21 @@ +from django.core.management.base import BaseCommand + +from {{cookiecutter.django_project_name}}.celery import flush_tasks, get_num_tasks_in_queue + + +class Command(BaseCommand): + help = "Flush task queue." + + def add_arguments(self, parser) -> None: + parser.add_argument("queue", type=str, help="Queue name to flush") + + def handle(self, *args, **kwargs): + queue_name = kwargs["queue"] + + num_tasks = get_num_tasks_in_queue(queue_name) + self.stdout.write(f"Found {num_tasks} tasks in '{queue_name}' queue") + if not num_tasks: + return + + flush_tasks(queue_name) + self.stdout.write("All done") diff --git a/{{cookiecutter.repostory_name}}/app/src/{{cookiecutter.django_project_name}}/{{cookiecutter.django_default_app_name}}/management/commands/move_tasks.py b/{{cookiecutter.repostory_name}}/app/src/{{cookiecutter.django_project_name}}/{{cookiecutter.django_default_app_name}}/management/commands/move_tasks.py new file mode 100644 index 0000000..db23779 --- /dev/null +++ b/{{cookiecutter.repostory_name}}/app/src/{{cookiecutter.django_project_name}}/{{cookiecutter.django_default_app_name}}/management/commands/move_tasks.py @@ -0,0 +1,23 @@ +from django.core.management.base import BaseCommand + +from {{cookiecutter.django_project_name}}.celery import get_num_tasks_in_queue, move_tasks + + +class Command(BaseCommand): + help = "Reschedule dead letter tasks." + + def add_arguments(self, parser) -> None: + parser.add_argument("source_queue", type=str, help="Source queue name") + parser.add_argument("destination_queue", type=str, help="Destination queue name") + + def handle(self, *args, **kwargs): + source_queue = kwargs["source_queue"] + destination_queue = kwargs["destination_queue"] + + num_tasks = get_num_tasks_in_queue(source_queue) + self.stdout.write(f"Found {num_tasks} tasks in '{source_queue}' queue") + if not num_tasks: + return + + move_tasks(source_queue, destination_queue) + self.stdout.write("All done") diff --git a/{{cookiecutter.repostory_name}}/app/src/{{cookiecutter.django_project_name}}/{{cookiecutter.django_default_app_name}}/metrics.py b/{{cookiecutter.repostory_name}}/app/src/{{cookiecutter.django_project_name}}/{{cookiecutter.django_default_app_name}}/metrics.py index 1a56081..5cb2841 100644 --- a/{{cookiecutter.repostory_name}}/app/src/{{cookiecutter.django_project_name}}/{{cookiecutter.django_default_app_name}}/metrics.py +++ b/{{cookiecutter.repostory_name}}/app/src/{{cookiecutter.django_project_name}}/{{cookiecutter.django_default_app_name}}/metrics.py @@ -1,13 +1,17 @@ {%- if cookiecutter.monitoring == "y" -%} import glob import os +from functools import partial import prometheus_client +from django.conf import settings from django.http import HttpResponse from django_prometheus.exports import ExportToDjangoView from django_prometheus.migrations import ExportMigrations from prometheus_client import multiprocess +from ..celery import get_num_tasks_in_queue + class RecursiveMultiProcessCollector(multiprocess.MultiProcessCollector): """A multiprocess collector that scans the directory recursively""" @@ -33,4 +37,14 @@ def metrics_view(request): ) else: return ExportToDjangoView(request) + + +num_tasks_in_queue = {} +for queue in settings.CELERY_TASK_QUEUES: + gauge = prometheus_client.Gauge( + f"celery_{queue.name}_queue_len", + f"How many tasks are there in '{queue.name}' queue", + ) + num_tasks_in_queue[queue.name] = gauge + gauge.set_function(partial(get_num_tasks_in_queue, queue.name)) {% endif %} \ No newline at end of file diff --git a/{{cookiecutter.repostory_name}}/app/src/{{cookiecutter.django_project_name}}/{{cookiecutter.django_default_app_name}}/tasks.py b/{{cookiecutter.repostory_name}}/app/src/{{cookiecutter.django_project_name}}/{{cookiecutter.django_default_app_name}}/tasks.py index 71fd4ce..4748704 100644 --- a/{{cookiecutter.repostory_name}}/app/src/{{cookiecutter.django_project_name}}/{{cookiecutter.django_default_app_name}}/tasks.py +++ b/{{cookiecutter.repostory_name}}/app/src/{{cookiecutter.django_project_name}}/{{cookiecutter.django_default_app_name}}/tasks.py @@ -1,5 +1,6 @@ -{%- if cookiecutter.use_celery == 'y' -%} +{%- if cookiecutter.use_celery == "y" -%} import structlog +from celery import Task from celery.utils.log import get_task_logger from {{cookiecutter.django_project_name}}.celery import app @@ -7,7 +8,24 @@ logger = structlog.wrap_logger(get_task_logger(__name__)) -@app.task +def send_to_dead_letter_queue(task: Task, exc, task_id, args, kwargs, einfo): + """Hook to put a task into dead letter queue when it fails.""" + if task.app.conf.task_always_eager: + return # do not run failed task again in eager mode + + logger.warning( + "Sending failed task to dead letter queue", + task=task, + exc=exc, + task_id=task_id, + args=args, + kwargs=kwargs, + einfo=einfo, + ) + task.apply_async(args=args, kwargs=kwargs, queue="dead_letter") + + +@app.task(on_failure=send_to_dead_letter_queue) def demo_task(x, y): logger.info("adding two numbers", x=x, y=y) return x + y diff --git a/{{cookiecutter.repostory_name}}/pyproject.toml b/{{cookiecutter.repostory_name}}/pyproject.toml index 3a32592..ac39382 100644 --- a/{{cookiecutter.repostory_name}}/pyproject.toml +++ b/{{cookiecutter.repostory_name}}/pyproject.toml @@ -26,6 +26,7 @@ dependencies = [ "sentry-sdk==1.3.0", "ipython~=8.14.0", "nox==2023.4.22", + "more-itertools~=10.3.0", {% if cookiecutter.monitoring == "y" -%} "psutil>=5.9.8", "prometheus-client~=0.17.0",