Skip to content

Commit

Permalink
feat(queue): add getCountsPerPriority method [python] (#2607)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Jun 12, 2024
1 parent 74e7cce commit 02b8338
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 2 deletions.
15 changes: 15 additions & 0 deletions docs/gitbook/guide/jobs/prioritized.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,23 @@ const jobs = await queue.getJobs(['prioritized']);
const jobs2 = await queue.getPrioritized();
```

## Get Counts per Priority

If you want to get the `count` of jobs in `prioritized` status (priorities higher than 0) or in `waiting` status (priority 0), use the **`getCountsPerPriority`** method. For example, let's say that you want to get counts for `priority` `1` and `0`:

```typescript
const counts = await queue.getCountsPerPriority([1, 0]);
/*
{
'1': 11,
'0': 10
}
*/
```

## Read more:

* 📋 [Faster Priority jobs](https://bullmq.io/news/062123/faster-priority-jobs/)
* 💡 [Change Priority API Reference](https://api.docs.bullmq.io/classes/v5.Job.html#changePriority)
* 💡 [Get Prioritized API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#getPrioritized)
* 💡 [Get Counts per Priority API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#getCountsPerPriority)
17 changes: 17 additions & 0 deletions python/bullmq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,23 @@ async def getJobCounts(self, *types):
counts[current_types[index]] = val or 0
return counts

async def getCountsPerPriority(self, priorities):
"""
Returns the number of jobs per priority.
@returns: An object, key (priority) and value (count)
"""
set_priorities = set(priorities)
unique_priorities = (list(set_priorities))

responses = await self.scripts.getCountsPerPriority(unique_priorities)

counts = {}

for index, val in enumerate(responses):
counts[f"{unique_priorities[index]}"] = val or 0
return counts

async def clean(self, grace: int, limit: int, type: str):
"""
Cleans jobs from a queue. Similar to drain but keeps jobs within a certain
Expand Down
18 changes: 16 additions & 2 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
"changePriority": self.redisClient.register_script(self.getScript("changePriority-6.lua")),
"cleanJobsInSet": self.redisClient.register_script(self.getScript("cleanJobsInSet-2.lua")),
"extendLock": self.redisClient.register_script(self.getScript("extendLock-2.lua")),
"getCounts": self.redisClient.register_script(self.getScript("getCounts-1.lua")), #
"getRanges": self.redisClient.register_script(self.getScript("getRanges-1.lua")), #
"getCounts": self.redisClient.register_script(self.getScript("getCounts-1.lua")),
"getCountsPerPriority": self.redisClient.register_script(self.getScript("getCountsPerPriority-2.lua")),
"getRanges": self.redisClient.register_script(self.getScript("getRanges-1.lua")),
"getState": self.redisClient.register_script(self.getScript("getState-8.lua")),
"getStateV2": self.redisClient.register_script(self.getScript("getStateV2-8.lua")),
"isJobInList": self.redisClient.register_script(self.getScript("isJobInList-1.lua")),
Expand Down Expand Up @@ -322,6 +323,19 @@ def getCounts(self, types):

return self.commands["getCounts"](keys=keys, args=transformed_types)

def getCountsPerPriorityArgs(self, priorities):
keys = [self.keys['wait'],
self.keys['prioritized']]

args = priorities

return (keys, args)

def getCountsPerPriority(self, priorities):
keys, args = self.getCountsPerPriorityArgs(priorities)

return self.commands["getCountsPerPriority"](keys=keys, args=args)

async def getState(self, job_id: str):
keys = self.getKeys(['completed', 'failed', 'delayed', 'active', 'wait',
'paused', 'waiting-children', 'prioritized'])
Expand Down
20 changes: 20 additions & 0 deletions python/tests/queue_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,26 @@ async def test_remove_job(self):

await queue.close()

async def test_get_counts_per_priority(self):
queue = Queue(queueName)
jobs = [{
"name": "test",
"data": {},
"opts": {
"priority": index % 4
}
} for index in range(42)]
await queue.addBulk(jobs)
counts = await queue.getCountsPerPriority([0, 1, 2, 3])
self.assertEqual(counts, {
"0": 11,
"1": 11,
"2": 10,
"3": 10
})

await queue.close()

async def test_reusable_redis(self):
conn = redis.Redis(decode_responses=True, host="localhost", port="6379", db=0)
queue = Queue(queueName, {"connection": conn})
Expand Down

0 comments on commit 02b8338

Please sign in to comment.