Skip to content

Commit

Permalink
Add management commands to move and flush tasks in queues
Browse files Browse the repository at this point in the history
  • Loading branch information
agoncharov-reef committed Jul 25, 2024
1 parent d29694f commit 208f725
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os

from celery import Celery
from more_itertools import chunked
{%- if cookiecutter.monitoring == "y" %}
from celery.signals import setup_logging, worker_process_shutdown
{% endif -%}
Expand Down Expand Up @@ -42,6 +43,24 @@ def get_num_tasks_in_queue(queue_name: str) -> int:
return conn.default_channel.client.llen(queue_name)


def move_tasks(source_queue: str, destination_queue: str, chunk_size: int = 100) -> None:
with app.pool.acquire(block=True) as conn:
client = conn.default_channel.client
tasks = client.lrange(source_queue, 0, -1)

for chunk in chunked(tasks, chunk_size):
with client.pipeline() as pipe:
for task in chunk:
client.rpush(destination_queue, task)
client.lrem(source_queue, 1, task)
pipe.execute()


def flush_tasks(queue_name: str) -> None:
with app.pool.acquire(block=True) as conn:
conn.default_channel.client.delete(queue_name)


{% if cookiecutter.monitoring == "y" %}

@worker_process_shutdown.connect
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from django.core.management.base import BaseCommand
from {{cookiecutter.django_project_name}}.celery import flush_tasks, get_num_tasks_in_queue


class Command(BaseCommand):
help = 'Flush task queue.'

def add_arguments(self, parser) -> None:
parser.add_argument('queue', type=str, help='Queue name to flush')

def handle(self, *args, **kwargs):
queue_name = kwargs['queue']

num_tasks = get_num_tasks_in_queue(queue_name)
self.stdout.write(f'Found {num_tasks} tasks in "{queue_name}" queue')
if not num_tasks:
return

flush_tasks(queue_name)
self.stdout.write('All done')
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from django.core.management.base import BaseCommand
from {{cookiecutter.django_project_name}}.celery import move_tasks, get_num_tasks_in_queue


class Command(BaseCommand):
help = 'Reschedule dead letter tasks.'

def add_arguments(self, parser) -> None:
parser.add_argument('source_queue', type=str, help='Source queue name')
parser.add_argument('destination_queue', type=str, help='Destination queue name')

def handle(self, *args, **kwargs):
source_queue = kwargs['source_queue']
destination_queue = kwargs['destination_queue']

num_tasks = get_num_tasks_in_queue(source_queue)
self.stdout.write(f'Found {num_tasks} tasks in "{source_queue}" queue')
if not num_tasks:
return

move_tasks(source_queue, destination_queue)
self.stdout.write('All done')
1 change: 1 addition & 0 deletions {{cookiecutter.repostory_name}}/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies = [
"sentry-sdk==1.3.0",
"ipython~=8.14.0",
"nox==2023.4.22",
"more-itertools~=10.3.0",
{% if cookiecutter.monitoring == "y" -%}
"psutil>=5.9.8",
"prometheus-client~=0.17.0",
Expand Down

0 comments on commit 208f725

Please sign in to comment.