From ae07d7e1c70d3d31a40413814973b36ff4227e08 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Thu, 19 Dec 2024 23:22:57 -0500 Subject: [PATCH 1/3] feat(job): add moveToWait method for manual processing --- src/classes/job.ts | 10 ++++++++++ src/classes/worker.ts | 2 +- tests/test_job.ts | 15 +++++++++++++++ 3 files changed, 26 insertions(+), 1 deletion(-) diff --git a/src/classes/job.ts b/src/classes/job.ts index df39be9ba6..c7184b967b 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -695,6 +695,16 @@ export class Job< return result; } + /** + * Moves a job to the wait or prioritized state. + * + * @param token - Worker token used to acquire completed job. + * @returns Returns pttl. + */ + moveToWait(token: string): Promise { + return this.scripts.moveJobFromActiveToWait(this.id, token); + } + private async shouldRetryJob(err: Error): Promise<[boolean, number]> { if ( this.attemptsMade + 1 < this.opts.attempts && diff --git a/src/classes/worker.ts b/src/classes/worker.ts index b79ddf4648..ce45df96a3 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -1285,6 +1285,6 @@ will never work with more accuracy than 1ms. */ job: Job, token: string, ) { - return this.scripts.moveJobFromActiveToWait(job.id, token); + return job.moveToWait(token); } } diff --git a/tests/test_job.ts b/tests/test_job.ts index 0393fe006e..c4538b1b30 100644 --- a/tests/test_job.ts +++ b/tests/test_job.ts @@ -933,6 +933,21 @@ describe('Job', function () { }); }); + describe('.moveToWait', () => { + it('moves job to wait from active', async function () { + const worker = new Worker(queueName, null, { connection, prefix }); + const token = 'my-token'; + await Job.create(queue, 'test', { foo: 'bar' }); + const job = (await worker.getNextJob(token)) as Job; + const isWaiting = await job.isWaiting(); + expect(isWaiting).to.be.equal(false); + await job.moveToWait('0'); + const isisWaiting2 = await job.isWaiting(); + expect(isisWaiting2).to.be.equal(true); + await worker.close(); + }); + }); + describe('.changeDelay', () => { it('can change delay of a delayed job', async function () { this.timeout(8000); From 57472bd1dfbcd1d42a2912edb112cc69c291aa0d Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sat, 21 Dec 2024 23:35:35 -0500 Subject: [PATCH 2/3] test: fix test case --- tests/test_job.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_job.ts b/tests/test_job.ts index c4538b1b30..d4a9ddc82f 100644 --- a/tests/test_job.ts +++ b/tests/test_job.ts @@ -941,7 +941,7 @@ describe('Job', function () { const job = (await worker.getNextJob(token)) as Job; const isWaiting = await job.isWaiting(); expect(isWaiting).to.be.equal(false); - await job.moveToWait('0'); + await job.moveToWait(token); const isisWaiting2 = await job.isWaiting(); expect(isisWaiting2).to.be.equal(true); await worker.close(); From 27f0813d2f6312411acbe595e1ccbc23ef0f8924 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Mon, 30 Dec 2024 11:20:35 -0500 Subject: [PATCH 3/3] docs(guide): add description for moveToWait method --- .../patterns/manually-fetching-jobs.md | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/docs/gitbook/patterns/manually-fetching-jobs.md b/docs/gitbook/patterns/manually-fetching-jobs.md index dfdc5ab02f..9f8a450ef3 100644 --- a/docs/gitbook/patterns/manually-fetching-jobs.md +++ b/docs/gitbook/patterns/manually-fetching-jobs.md @@ -96,3 +96,24 @@ while (1) { } } ``` + +## Rate Limiting + +If you want to move a job back to wait because your queue is rate limited. + +```typescript +const worker = new Worker('my-queue', null, { connection, prefix }); +const token = 'my-token'; +await Job.create(queue, 'test', { foo: 'bar' }); +const job = (await worker.getNextJob(token)) as Job; + +await queue.rateLimit(60000); +await job.moveToWait(token); +``` + +## Read more: + +- 💡 [Get Next Job API Reference](https://api.docs.bullmq.io/classes/v5.Worker.html#getNextJob) +- 💡 [Move To Completed API Reference](https://api.docs.bullmq.io/classes/v5.Job.html#moveToCompleted) +- 💡 [Move To Failed API Reference](https://api.docs.bullmq.io/classes/v5.Job.html#moveToFailed) +- 💡 [Move To Wait API Reference](https://api.docs.bullmq.io/classes/v5.Job.html#moveToWait)