From 02b83380334879cc2434043141566f2a375db958 Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Wed, 12 Jun 2024 08:36:10 -0600 Subject: [PATCH] feat(queue): add getCountsPerPriority method [python] (#2607) --- docs/gitbook/guide/jobs/prioritized.md | 15 +++++++++++++++ python/bullmq/queue.py | 17 +++++++++++++++++ python/bullmq/scripts.py | 18 ++++++++++++++++-- python/tests/queue_tests.py | 20 ++++++++++++++++++++ 4 files changed, 68 insertions(+), 2 deletions(-) diff --git a/docs/gitbook/guide/jobs/prioritized.md b/docs/gitbook/guide/jobs/prioritized.md index a504c3322d..c7b7aa4c76 100644 --- a/docs/gitbook/guide/jobs/prioritized.md +++ b/docs/gitbook/guide/jobs/prioritized.md @@ -59,8 +59,23 @@ const jobs = await queue.getJobs(['prioritized']); const jobs2 = await queue.getPrioritized(); ``` +## Get Counts per Priority + +If you want to get the `count` of jobs in `prioritized` status (priorities higher than 0) or in `waiting` status (priority 0), use the **`getCountsPerPriority`** method. For example, let's say that you want to get counts for `priority` `1` and `0`: + +```typescript +const counts = await queue.getCountsPerPriority([1, 0]); +/* +{ + '1': 11, + '0': 10 +} +*/ +``` + ## Read more: * 📋 [Faster Priority jobs](https://bullmq.io/news/062123/faster-priority-jobs/) * 💡 [Change Priority API Reference](https://api.docs.bullmq.io/classes/v5.Job.html#changePriority) * 💡 [Get Prioritized API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#getPrioritized) +* 💡 [Get Counts per Priority API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#getCountsPerPriority) diff --git a/python/bullmq/queue.py b/python/bullmq/queue.py index 5489db08f9..bd8868bd07 100644 --- a/python/bullmq/queue.py +++ b/python/bullmq/queue.py @@ -220,6 +220,23 @@ async def getJobCounts(self, *types): counts[current_types[index]] = val or 0 return counts + async def getCountsPerPriority(self, priorities): + """ + Returns the number of jobs per priority. + + @returns: An object, key (priority) and value (count) + """ + set_priorities = set(priorities) + unique_priorities = (list(set_priorities)) + + responses = await self.scripts.getCountsPerPriority(unique_priorities) + + counts = {} + + for index, val in enumerate(responses): + counts[f"{unique_priorities[index]}"] = val or 0 + return counts + async def clean(self, grace: int, limit: int, type: str): """ Cleans jobs from a queue. Similar to drain but keeps jobs within a certain diff --git a/python/bullmq/scripts.py b/python/bullmq/scripts.py index 9dd1af7e47..de5f4f4827 100644 --- a/python/bullmq/scripts.py +++ b/python/bullmq/scripts.py @@ -38,8 +38,9 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection "changePriority": self.redisClient.register_script(self.getScript("changePriority-6.lua")), "cleanJobsInSet": self.redisClient.register_script(self.getScript("cleanJobsInSet-2.lua")), "extendLock": self.redisClient.register_script(self.getScript("extendLock-2.lua")), - "getCounts": self.redisClient.register_script(self.getScript("getCounts-1.lua")), # - "getRanges": self.redisClient.register_script(self.getScript("getRanges-1.lua")), # + "getCounts": self.redisClient.register_script(self.getScript("getCounts-1.lua")), + "getCountsPerPriority": self.redisClient.register_script(self.getScript("getCountsPerPriority-2.lua")), + "getRanges": self.redisClient.register_script(self.getScript("getRanges-1.lua")), "getState": self.redisClient.register_script(self.getScript("getState-8.lua")), "getStateV2": self.redisClient.register_script(self.getScript("getStateV2-8.lua")), "isJobInList": self.redisClient.register_script(self.getScript("isJobInList-1.lua")), @@ -322,6 +323,19 @@ def getCounts(self, types): return self.commands["getCounts"](keys=keys, args=transformed_types) + def getCountsPerPriorityArgs(self, priorities): + keys = [self.keys['wait'], + self.keys['prioritized']] + + args = priorities + + return (keys, args) + + def getCountsPerPriority(self, priorities): + keys, args = self.getCountsPerPriorityArgs(priorities) + + return self.commands["getCountsPerPriority"](keys=keys, args=args) + async def getState(self, job_id: str): keys = self.getKeys(['completed', 'failed', 'delayed', 'active', 'wait', 'paused', 'waiting-children', 'prioritized']) diff --git a/python/tests/queue_tests.py b/python/tests/queue_tests.py index 5441da446b..e3d8b578c6 100644 --- a/python/tests/queue_tests.py +++ b/python/tests/queue_tests.py @@ -422,6 +422,26 @@ async def test_remove_job(self): await queue.close() + async def test_get_counts_per_priority(self): + queue = Queue(queueName) + jobs = [{ + "name": "test", + "data": {}, + "opts": { + "priority": index % 4 + } + } for index in range(42)] + await queue.addBulk(jobs) + counts = await queue.getCountsPerPriority([0, 1, 2, 3]) + self.assertEqual(counts, { + "0": 11, + "1": 11, + "2": 10, + "3": 10 + }) + + await queue.close() + async def test_reusable_redis(self): conn = redis.Redis(decode_responses=True, host="localhost", port="6379", db=0) queue = Queue(queueName, {"connection": conn})