diff --git a/docs/gitbook/python/changelog.md b/docs/gitbook/python/changelog.md index 35ca98a8eb..3e482472fe 100644 --- a/docs/gitbook/python/changelog.md +++ b/docs/gitbook/python/changelog.md @@ -2,6 +2,10 @@ +## v2.2.0 (2024-01-14) +### Feature +* **queue:** Add promoteJobs method [python] ([#2377](https://github.com/taskforcesh/bullmq/issues/2377)) ([`3b9de96`](https://github.com/taskforcesh/bullmq/commit/3b9de967efa34ea22cdab1fbc7ff65d49927d787)) + ## v2.1.0 (2024-01-12) ### Feature * **repeatable:** Allow saving custom key ([#1824](https://github.com/taskforcesh/bullmq/issues/1824)) ([`8ea0e1f`](https://github.com/taskforcesh/bullmq/commit/8ea0e1f76baf36dab94a66657c0f432492cb9999)) diff --git a/python/bullmq/__init__.py b/python/bullmq/__init__.py index b5d3a62db2..7792ac5fae 100644 --- a/python/bullmq/__init__.py +++ b/python/bullmq/__init__.py @@ -3,7 +3,7 @@ A background job processor and message queue for Python based on Redis. """ -__version__ = "2.1.0" +__version__ = "2.2.0" __author__ = 'Taskforce.sh Inc.' __credits__ = 'Taskforce.sh Inc.' diff --git a/python/bullmq/queue.py b/python/bullmq/queue.py index 456fd102f2..32535cb870 100644 --- a/python/bullmq/queue.py +++ b/python/bullmq/queue.py @@ -1,6 +1,6 @@ import asyncio from bullmq.redis_connection import RedisConnection -from bullmq.types import QueueBaseOptions, RetryJobsOptions, JobOptions +from bullmq.types import QueueBaseOptions, RetryJobsOptions, JobOptions, PromoteJobsOptions from bullmq.utils import extract_result from bullmq.scripts import Scripts from bullmq.job import Job @@ -136,7 +136,7 @@ async def obliterate(self, force: bool = False): async def retryJobs(self, opts: RetryJobsOptions = {}): """ - Retry all the failed jobs. + Retry all the failed or completed jobs. """ while True: cursor = await self.scripts.retryJobs( @@ -147,6 +147,17 @@ async def retryJobs(self, opts: RetryJobsOptions = {}): if cursor is None or cursor == 0 or cursor == "0": break + async def promoteJobs(self, opts: PromoteJobsOptions = {}): + """ + Retry all the delayed jobs. + """ + while True: + cursor = await self.scripts.promoteJobs( + opts.get("count") + ) + if cursor is None or cursor == 0 or cursor == "0": + break + def trimEvents(self, maxLength: int): """ Trim the event stream to an approximately maxLength. diff --git a/python/bullmq/scripts.py b/python/bullmq/scripts.py index 3bee03bfe6..d45cf099bf 100644 --- a/python/bullmq/scripts.py +++ b/python/bullmq/scripts.py @@ -417,14 +417,30 @@ async def obliterate(self, count: int, force: bool = False): raise Exception("Cannot obliterate queue with active jobs") return result + def moveJobsToWaitArgs(self, state: str, count: int, timestamp: int) -> int: + keys = self.getKeys( + ['', 'events', state, 'wait', 'paused', 'meta', 'marker']) + + args = [count or 1000, timestamp or round(time.time()*1000), state] + return (keys, args) + async def retryJobs(self, state: str, count: int, timestamp: int): """ - Remove a queue completely + Retry jobs that are in failed or completed state """ current_state = state or 'failed' - keys = self.getKeys( - ['', 'events', current_state, 'wait', 'paused', 'meta', 'marker']) - result = await self.commands["moveJobsToWait"](keys=keys, args=[count or 1000, timestamp or round(time.time()*1000), current_state]) + keys, args = self.moveJobsToWaitArgs(current_state, count, timestamp) + + result = await self.commands["moveJobsToWait"](keys=keys, args=args) + return result + + async def promoteJobs(self, count: int): + """ + Promote jobs in delayed state + """ + keys, args = self.moveJobsToWaitArgs('delayed', count, 1.7976931348623157e+308) + + result = await self.commands["moveJobsToWait"](keys=keys, args=args) return result async def moveToActive(self, token: str, opts: dict) -> list[Any]: diff --git a/python/bullmq/types/__init__.py b/python/bullmq/types/__init__.py index 241acd07b7..abad58121b 100644 --- a/python/bullmq/types/__init__.py +++ b/python/bullmq/types/__init__.py @@ -1,6 +1,7 @@ from bullmq.types.backoff_options import BackoffOptions from bullmq.types.keep_jobs import KeepJobs from bullmq.types.job_options import JobOptions +from bullmq.types.promote_jobs_options import PromoteJobsOptions from bullmq.types.queue_options import QueueBaseOptions from bullmq.types.worker_options import WorkerOptions -from bullmq.types.retry_job_options import RetryJobsOptions +from bullmq.types.retry_jobs_options import RetryJobsOptions diff --git a/python/bullmq/types/promote_jobs_options.py b/python/bullmq/types/promote_jobs_options.py new file mode 100644 index 0000000000..9542e6540c --- /dev/null +++ b/python/bullmq/types/promote_jobs_options.py @@ -0,0 +1,6 @@ + +from typing import TypedDict + + +class PromoteJobsOptions(TypedDict, total=False): + count: int diff --git a/python/bullmq/types/retry_job_options.py b/python/bullmq/types/retry_jobs_options.py similarity index 100% rename from python/bullmq/types/retry_job_options.py rename to python/bullmq/types/retry_jobs_options.py diff --git a/python/pyproject.toml b/python/pyproject.toml index 9f4ca4dfbd..45a1cc59fa 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "bullmq" -version = "2.1.0" +version = "2.2.0" description='BullMQ for Python' readme="README.md" authors = [ diff --git a/python/tests/queue_tests.py b/python/tests/queue_tests.py index 8234eb4fe7..8c05bb7550 100644 --- a/python/tests/queue_tests.py +++ b/python/tests/queue_tests.py @@ -80,7 +80,7 @@ async def test_is_paused(self): await queue.close() async def test_is_paused_with_custom_prefix(self): - queue = Queue(queueName, {}, {"prefix": "test"}) + queue = Queue(queueName, {"prefix": "test"}) await queue.pause() isPaused = await queue.isPaused() @@ -114,7 +114,7 @@ async def test_trim_events_manually(self): await queue.close() async def test_trim_events_manually_with_custom_prefix(self): - queue = Queue(queueName, {}, {"prefix": "test"}) + queue = Queue(queueName, {"prefix": "test"}) await queue.add("test", data={}, opts={}) await queue.add("test", data={}, opts={}) await queue.add("test", data={}, opts={}) @@ -368,6 +368,50 @@ def failing(job: Job, result): await queue.close() await worker.close() + async def test_promote_all_delayed_jobs(self): + queue = Queue(queueName) + job_count = 8 + + for index in range(job_count): + data = {"idx": index} + await queue.add("test", data=data, opts={ "delay": 5000 }) + + delayed_count = await queue.getJobCounts('delayed') + self.assertEqual(delayed_count['delayed'], job_count) + + await queue.promoteJobs(); + + waiting_count = await queue.getJobCounts('waiting') + self.assertEqual(waiting_count['waiting'], job_count) + + async def process(job: Job, token: str): + await asyncio.sleep(0.1) + return + order = 0 + + worker = Worker(queueName, process) + + completed_events = Future() + + def completing(job: Job, result): + nonlocal order + if order == (job_count - 1): + completed_events.set_result(None) + order += 1 + + worker.on("completed", completing) + + await completed_events + + worker.off('completed', completing) + + delayed_count = await queue.getJobCounts('delayed') + + self.assertEqual(delayed_count['delayed'], 0) + + await queue.close() + await worker.close() + async def test_remove_job(self): queue = Queue(queueName) job = await queue.add("test", {"foo": "bar"}, {}) diff --git a/src/classes/queue.ts b/src/classes/queue.ts index 413120ac37..cee7af045c 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -475,7 +475,7 @@ export class Queue< } /** - * Retry all the failed jobs. + * Retry all the failed or completed jobs. * * @param opts: { count: number; state: FinishedStatus; timestamp: number} * - count number to limit how many jobs will be moved to wait status per iteration, diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index bd9b311f10..23e762bb1f 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -73,12 +73,12 @@ export class Scripts { return Number.isInteger(result); } - private async addDelayedJob( + protected addDelayedJob( client: RedisClient, job: JobJson, encodedOpts: any, args: (string | number | Record)[], - ): Promise { + ): Promise { const queueKeys = this.queue.keys; const keys: (string | Buffer)[] = [ queueKeys.marker, @@ -94,12 +94,12 @@ export class Scripts { return (client).addDelayedJob(keys); } - private async addPrioritizedJob( + protected addPrioritizedJob( client: RedisClient, job: JobJson, encodedOpts: any, args: (string | number | Record)[], - ): Promise { + ): Promise { const queueKeys = this.queue.keys; const keys: (string | Buffer)[] = [ queueKeys.marker, @@ -116,12 +116,12 @@ export class Scripts { return (client).addPrioritizedJob(keys); } - private async addParentJob( + protected addParentJob( client: RedisClient, job: JobJson, encodedOpts: any, args: (string | number | Record)[], - ): Promise { + ): Promise { const queueKeys = this.queue.keys; const keys: (string | Buffer)[] = [ queueKeys.meta, @@ -135,6 +135,28 @@ export class Scripts { return (client).addParentJob(keys); } + protected addStandardJob( + client: RedisClient, + job: JobJson, + encodedOpts: any, + args: (string | number | Record)[], + ): Promise { + const queueKeys = this.queue.keys; + const keys: (string | Buffer)[] = [ + queueKeys.wait, + queueKeys.paused, + queueKeys.meta, + queueKeys.id, + queueKeys.completed, + queueKeys.events, + queueKeys.marker, + ]; + + keys.push(pack(args), job.data, encodedOpts); + + return (client).addStandardJob(keys); + } + async addJob( client: RedisClient, job: JobJson, @@ -181,7 +203,7 @@ export class Scripts { encodedOpts = pack(opts); } - let result; + let result: string | number; if (parentOpts.waitChildrenKey) { result = await this.addParentJob(client, job, encodedOpts, args); @@ -190,24 +212,14 @@ export class Scripts { } else if (opts.priority) { result = await this.addPrioritizedJob(client, job, encodedOpts, args); } else { - const keys: (string | Buffer)[] = [ - queueKeys.wait, - queueKeys.paused, - queueKeys.meta, - queueKeys.id, - queueKeys.completed, - queueKeys.events, - queueKeys.marker, - ]; - keys.push(pack(args), job.data, encodedOpts); - result = await (client).addStandardJob(keys); + result = await this.addStandardJob(client, job, encodedOpts, args); } - if (result < 0) { - throw this.finishedErrors(result, parentOpts.parentKey, 'addJob'); + if (result < 0) { + throw this.finishedErrors(result, parentOpts.parentKey, 'addJob'); } - return result; + return result; } async pause(pause: boolean): Promise {