Skip to content

Commit

Permalink
Merge branch 'master' into fix-retry-jobs-marker
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Jan 14, 2024
2 parents beb63bd + 379016e commit 2a0c3d5
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 33 deletions.
4 changes: 4 additions & 0 deletions docs/gitbook/python/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

<!--next-version-placeholder-->

## v2.2.0 (2024-01-14)
### Feature
* **queue:** Add promoteJobs method [python] ([#2377](https://github.com/taskforcesh/bullmq/issues/2377)) ([`3b9de96`](https://github.com/taskforcesh/bullmq/commit/3b9de967efa34ea22cdab1fbc7ff65d49927d787))

## v2.1.0 (2024-01-12)
### Feature
* **repeatable:** Allow saving custom key ([#1824](https://github.com/taskforcesh/bullmq/issues/1824)) ([`8ea0e1f`](https://github.com/taskforcesh/bullmq/commit/8ea0e1f76baf36dab94a66657c0f432492cb9999))
Expand Down
2 changes: 1 addition & 1 deletion python/bullmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
A background job processor and message queue for Python based on Redis.
"""
__version__ = "2.1.0"
__version__ = "2.2.0"
__author__ = 'Taskforce.sh Inc.'
__credits__ = 'Taskforce.sh Inc.'

Expand Down
15 changes: 13 additions & 2 deletions python/bullmq/queue.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
from bullmq.redis_connection import RedisConnection
from bullmq.types import QueueBaseOptions, RetryJobsOptions, JobOptions
from bullmq.types import QueueBaseOptions, RetryJobsOptions, JobOptions, PromoteJobsOptions
from bullmq.utils import extract_result
from bullmq.scripts import Scripts
from bullmq.job import Job
Expand Down Expand Up @@ -136,7 +136,7 @@ async def obliterate(self, force: bool = False):

async def retryJobs(self, opts: RetryJobsOptions = {}):
"""
Retry all the failed jobs.
Retry all the failed or completed jobs.
"""
while True:
cursor = await self.scripts.retryJobs(
Expand All @@ -147,6 +147,17 @@ async def retryJobs(self, opts: RetryJobsOptions = {}):
if cursor is None or cursor == 0 or cursor == "0":
break

async def promoteJobs(self, opts: PromoteJobsOptions = {}):
"""
Retry all the delayed jobs.
"""
while True:
cursor = await self.scripts.promoteJobs(
opts.get("count")
)
if cursor is None or cursor == 0 or cursor == "0":
break

def trimEvents(self, maxLength: int):
"""
Trim the event stream to an approximately maxLength.
Expand Down
24 changes: 20 additions & 4 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,14 +417,30 @@ async def obliterate(self, count: int, force: bool = False):
raise Exception("Cannot obliterate queue with active jobs")
return result

def moveJobsToWaitArgs(self, state: str, count: int, timestamp: int) -> int:
keys = self.getKeys(
['', 'events', state, 'wait', 'paused', 'meta', 'marker'])

args = [count or 1000, timestamp or round(time.time()*1000), state]
return (keys, args)

async def retryJobs(self, state: str, count: int, timestamp: int):
"""
Remove a queue completely
Retry jobs that are in failed or completed state
"""
current_state = state or 'failed'
keys = self.getKeys(
['', 'events', current_state, 'wait', 'paused', 'meta', 'marker'])
result = await self.commands["moveJobsToWait"](keys=keys, args=[count or 1000, timestamp or round(time.time()*1000), current_state])
keys, args = self.moveJobsToWaitArgs(current_state, count, timestamp)

result = await self.commands["moveJobsToWait"](keys=keys, args=args)
return result

async def promoteJobs(self, count: int):
"""
Promote jobs in delayed state
"""
keys, args = self.moveJobsToWaitArgs('delayed', count, 1.7976931348623157e+308)

result = await self.commands["moveJobsToWait"](keys=keys, args=args)
return result

async def moveToActive(self, token: str, opts: dict) -> list[Any]:
Expand Down
3 changes: 2 additions & 1 deletion python/bullmq/types/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from bullmq.types.backoff_options import BackoffOptions
from bullmq.types.keep_jobs import KeepJobs
from bullmq.types.job_options import JobOptions
from bullmq.types.promote_jobs_options import PromoteJobsOptions
from bullmq.types.queue_options import QueueBaseOptions
from bullmq.types.worker_options import WorkerOptions
from bullmq.types.retry_job_options import RetryJobsOptions
from bullmq.types.retry_jobs_options import RetryJobsOptions
6 changes: 6 additions & 0 deletions python/bullmq/types/promote_jobs_options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

from typing import TypedDict


class PromoteJobsOptions(TypedDict, total=False):
count: int
File renamed without changes.
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "bullmq"
version = "2.1.0"
version = "2.2.0"
description='BullMQ for Python'
readme="README.md"
authors = [
Expand Down
48 changes: 46 additions & 2 deletions python/tests/queue_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async def test_is_paused(self):
await queue.close()

async def test_is_paused_with_custom_prefix(self):
queue = Queue(queueName, {}, {"prefix": "test"})
queue = Queue(queueName, {"prefix": "test"})
await queue.pause()
isPaused = await queue.isPaused()

Expand Down Expand Up @@ -114,7 +114,7 @@ async def test_trim_events_manually(self):
await queue.close()

async def test_trim_events_manually_with_custom_prefix(self):
queue = Queue(queueName, {}, {"prefix": "test"})
queue = Queue(queueName, {"prefix": "test"})
await queue.add("test", data={}, opts={})
await queue.add("test", data={}, opts={})
await queue.add("test", data={}, opts={})
Expand Down Expand Up @@ -368,6 +368,50 @@ def failing(job: Job, result):
await queue.close()
await worker.close()

async def test_promote_all_delayed_jobs(self):
queue = Queue(queueName)
job_count = 8

for index in range(job_count):
data = {"idx": index}
await queue.add("test", data=data, opts={ "delay": 5000 })

delayed_count = await queue.getJobCounts('delayed')
self.assertEqual(delayed_count['delayed'], job_count)

await queue.promoteJobs();

waiting_count = await queue.getJobCounts('waiting')
self.assertEqual(waiting_count['waiting'], job_count)

async def process(job: Job, token: str):
await asyncio.sleep(0.1)
return
order = 0

worker = Worker(queueName, process)

completed_events = Future()

def completing(job: Job, result):
nonlocal order
if order == (job_count - 1):
completed_events.set_result(None)
order += 1

worker.on("completed", completing)

await completed_events

worker.off('completed', completing)

delayed_count = await queue.getJobCounts('delayed')

self.assertEqual(delayed_count['delayed'], 0)

await queue.close()
await worker.close()

async def test_remove_job(self):
queue = Queue(queueName)
job = await queue.add("test", {"foo": "bar"}, {})
Expand Down
2 changes: 1 addition & 1 deletion src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ export class Queue<
}

/**
* Retry all the failed jobs.
* Retry all the failed or completed jobs.
*
* @param opts: { count: number; state: FinishedStatus; timestamp: number}
* - count number to limit how many jobs will be moved to wait status per iteration,
Expand Down
54 changes: 33 additions & 21 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ export class Scripts {
return Number.isInteger(result);
}

private async addDelayedJob(
protected addDelayedJob(
client: RedisClient,
job: JobJson,
encodedOpts: any,
args: (string | number | Record<string, any>)[],
): Promise<string> {
): Promise<string | number> {
const queueKeys = this.queue.keys;
const keys: (string | Buffer)[] = [
queueKeys.marker,
Expand All @@ -94,12 +94,12 @@ export class Scripts {
return (<any>client).addDelayedJob(keys);
}

private async addPrioritizedJob(
protected addPrioritizedJob(
client: RedisClient,
job: JobJson,
encodedOpts: any,
args: (string | number | Record<string, any>)[],
): Promise<string> {
): Promise<string | number> {
const queueKeys = this.queue.keys;
const keys: (string | Buffer)[] = [
queueKeys.marker,
Expand All @@ -116,12 +116,12 @@ export class Scripts {
return (<any>client).addPrioritizedJob(keys);
}

private async addParentJob(
protected addParentJob(
client: RedisClient,
job: JobJson,
encodedOpts: any,
args: (string | number | Record<string, any>)[],
): Promise<string> {
): Promise<string | number> {
const queueKeys = this.queue.keys;
const keys: (string | Buffer)[] = [
queueKeys.meta,
Expand All @@ -135,6 +135,28 @@ export class Scripts {
return (<any>client).addParentJob(keys);
}

protected addStandardJob(
client: RedisClient,
job: JobJson,
encodedOpts: any,
args: (string | number | Record<string, any>)[],
): Promise<string | number> {
const queueKeys = this.queue.keys;
const keys: (string | Buffer)[] = [
queueKeys.wait,
queueKeys.paused,
queueKeys.meta,
queueKeys.id,
queueKeys.completed,
queueKeys.events,
queueKeys.marker,
];

keys.push(pack(args), job.data, encodedOpts);

return (<any>client).addStandardJob(keys);
}

async addJob(
client: RedisClient,
job: JobJson,
Expand Down Expand Up @@ -181,7 +203,7 @@ export class Scripts {
encodedOpts = pack(opts);
}

let result;
let result: string | number;

if (parentOpts.waitChildrenKey) {
result = await this.addParentJob(client, job, encodedOpts, args);
Expand All @@ -190,24 +212,14 @@ export class Scripts {
} else if (opts.priority) {
result = await this.addPrioritizedJob(client, job, encodedOpts, args);
} else {
const keys: (string | Buffer)[] = [
queueKeys.wait,
queueKeys.paused,
queueKeys.meta,
queueKeys.id,
queueKeys.completed,
queueKeys.events,
queueKeys.marker,
];
keys.push(pack(args), job.data, encodedOpts);
result = await (<any>client).addStandardJob(keys);
result = await this.addStandardJob(client, job, encodedOpts, args);
}

if (result < 0) {
throw this.finishedErrors(result, parentOpts.parentKey, 'addJob');
if (<number>result < 0) {
throw this.finishedErrors(<number>result, parentOpts.parentKey, 'addJob');
}

return result;
return <string>result;
}

async pause(pause: boolean): Promise<void> {
Expand Down

0 comments on commit 2a0c3d5

Please sign in to comment.