Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dead letter queue support #195

Merged
merged 5 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions {{cookiecutter.repostory_name}}/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,30 @@ If an SSO provider supports the OIDC protocol, it can be set up as a generic OID
2. Allauth "social users" are just an extension to regular django users. When someone logs in via allauth, a django user model will also be created for them.
3. A "profile" page is available at `/accounts/`

{% endif %}

{% if cookiecutter.use_celery == 'y' %}
# Background tasks with Celery
agoncharov-reef marked this conversation as resolved.
Show resolved Hide resolved

## Dead letter queue

<details>
There is a special queue named `dead_letter` that is used to store tasks
that failed for some reason.

A task should be annotated with `on_failure=send_to_dead_letter_queue`.
Once the reason of tasks failure is fixed, the task can be re-processed
by moving tasks from dead letter queue to the main one ("celery"):

manage.py move_tasks "dead_letter" "celery"

If tasks fails again, it will be put back to dead letter queue.

To flush add tasks in specific queue, use

manage.py flush_tasks "dead_letter"
</details>

{% endif %}
{% if cookiecutter.monitoring == 'y' %}
# Monitoring
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
{% endif -%}
from django.conf import settings
from django_structlog.celery.steps import DjangoStructLogInitStep
from more_itertools import chunked
{%- if cookiecutter.monitoring == "y" %}
from prometheus_client import multiprocess
{% endif %}
Expand All @@ -16,7 +17,6 @@
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "{{cookiecutter.django_project_name}}.settings")

app = Celery("{{cookiecutter.django_project_name}}")

app.config_from_object("django.conf:settings", namespace="CELERY")
app.steps["worker"].add(DjangoStructLogInitStep)
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
Expand All @@ -33,10 +33,34 @@ def receiver_setup_logging(loglevel, logfile, format, colorize, **kwargs): # pr
configure_structlog()


def route_task(name, args, kwargs, options, task=None, **kw):
return {"queue": "celery"}
{% if cookiecutter.monitoring == "y" %}
def get_tasks_in_queue(queue_name: str) -> list[bytes]:
with app.pool.acquire(block=True) as conn:
return conn.default_channel.client.lrange(queue_name, 0, -1)


def get_num_tasks_in_queue(queue_name: str) -> int:
with app.pool.acquire(block=True) as conn:
return conn.default_channel.client.llen(queue_name)


def move_tasks(source_queue: str, destination_queue: str, chunk_size: int = 100) -> None:
with app.pool.acquire(block=True) as conn:
client = conn.default_channel.client
tasks = client.lrange(source_queue, 0, -1)

for chunk in chunked(tasks, chunk_size):
with client.pipeline() as pipe:
for task in chunk:
client.rpush(destination_queue, task)
client.lrem(source_queue, 1, task)
pipe.execute()


def flush_tasks(queue_name: str) -> None:
with app.pool.acquire(block=True) as conn:
conn.default_channel.client.delete(queue_name)

{% if cookiecutter.monitoring == "y" %}
@worker_process_shutdown.connect
def child_exit(pid, **kw):
multiprocess.mark_process_dead(pid)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@
from functools import wraps

import environ
import structlog

{% if cookiecutter.use_celery == "y" -%}
# from celery.schedules import crontab
from kombu import Queue

{% endif %}
{%- if cookiecutter.use_allauth == "y" -%}
from django.urls import reverse_lazy
{% endif -%}
import structlog

root = environ.Path(__file__) - 2

Expand Down Expand Up @@ -337,7 +339,12 @@ def wrapped(*args, **kwargs):
# 'options': {"time_limit": 300},
# },
}
CELERY_TASK_ROUTES = ["{{cookiecutter.django_project_name}}.celery.route_task"]
CELERY_TASK_CREATE_MISSING_QUEUES = False
CELERY_TASK_QUEUES = (Queue("celery"), Queue("worker"), Queue("dead_letter"))
CELERY_TASK_DEFAULT_EXCHANGE = "celery"
CELERY_TASK_DEFAULT_ROUTING_KEY = "celery"
CELERY_TASK_ANNOTATIONS = {"*": {"acks_late": True, "reject_on_worker_lost": True}}
CELERY_TASK_ROUTES = {"*": {"queue": "celery"}}
CELERY_TASK_TIME_LIMIT = int(timedelta(minutes=5).total_seconds())
CELERY_TASK_ALWAYS_EAGER = env.bool("CELERY_TASK_ALWAYS_EAGER", default=False)
CELERY_ACCEPT_CONTENT = ["json"]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from django.core.management.base import BaseCommand

from {{cookiecutter.django_project_name}}.celery import flush_tasks, get_num_tasks_in_queue


class Command(BaseCommand):
help = "Flush task queue."

def add_arguments(self, parser) -> None:
parser.add_argument("queue", type=str, help="Queue name to flush")

def handle(self, *args, **kwargs):
queue_name = kwargs["queue"]

num_tasks = get_num_tasks_in_queue(queue_name)
self.stdout.write(f"Found {num_tasks} tasks in '{queue_name}' queue")
if not num_tasks:
return

flush_tasks(queue_name)
self.stdout.write("All done")
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from django.core.management.base import BaseCommand

from {{cookiecutter.django_project_name}}.celery import get_num_tasks_in_queue, move_tasks


class Command(BaseCommand):
help = "Reschedule dead letter tasks."

def add_arguments(self, parser) -> None:
parser.add_argument("source_queue", type=str, help="Source queue name")
parser.add_argument("destination_queue", type=str, help="Destination queue name")

def handle(self, *args, **kwargs):
source_queue = kwargs["source_queue"]
destination_queue = kwargs["destination_queue"]

num_tasks = get_num_tasks_in_queue(source_queue)
self.stdout.write(f"Found {num_tasks} tasks in '{source_queue}' queue")
if not num_tasks:
return

move_tasks(source_queue, destination_queue)
self.stdout.write("All done")
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
{%- if cookiecutter.monitoring == "y" -%}
import glob
import os
from functools import partial

import prometheus_client
from django.conf import settings
from django.http import HttpResponse
from django_prometheus.exports import ExportToDjangoView
from django_prometheus.migrations import ExportMigrations
from prometheus_client import multiprocess

from ..celery import get_num_tasks_in_queue


class RecursiveMultiProcessCollector(multiprocess.MultiProcessCollector):
"""A multiprocess collector that scans the directory recursively"""
Expand All @@ -33,4 +37,14 @@ def metrics_view(request):
)
else:
return ExportToDjangoView(request)


num_tasks_in_queue = {}
for queue in settings.CELERY_TASK_QUEUES:
gauge = prometheus_client.Gauge(
agoncharov-reef marked this conversation as resolved.
Show resolved Hide resolved
f"celery_{queue.name}_queue_len",
f"How many tasks are there in '{queue.name}' queue",
)
num_tasks_in_queue[queue.name] = gauge
gauge.set_function(partial(get_num_tasks_in_queue, queue.name))
{% endif %}
Original file line number Diff line number Diff line change
@@ -1,13 +1,31 @@
{%- if cookiecutter.use_celery == 'y' -%}
{%- if cookiecutter.use_celery == "y" -%}
import structlog
from celery import Task
from celery.utils.log import get_task_logger

from {{cookiecutter.django_project_name}}.celery import app

logger = structlog.wrap_logger(get_task_logger(__name__))


@app.task
def send_to_dead_letter_queue(task: Task, exc, task_id, args, kwargs, einfo):
"""Hook to put a task into dead letter queue when it fails."""
if task.app.conf.task_always_eager:
return # do not run failed task again in eager mode

logger.warning(
"Sending failed task to dead letter queue",
task=task,
exc=exc,
task_id=task_id,
args=args,
kwargs=kwargs,
einfo=einfo,
)
task.apply_async(args=args, kwargs=kwargs, queue="dead_letter")


@app.task(on_failure=send_to_dead_letter_queue)
def demo_task(x, y):
logger.info("adding two numbers", x=x, y=y)
return x + y
Expand Down
1 change: 1 addition & 0 deletions {{cookiecutter.repostory_name}}/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies = [
"sentry-sdk==1.3.0",
"ipython~=8.14.0",
"nox==2023.4.22",
"more-itertools~=10.3.0",
{% if cookiecutter.monitoring == "y" -%}
"psutil>=5.9.8",
"prometheus-client~=0.17.0",
Expand Down
Loading