diff --git a/docs/gitbook/SUMMARY.md b/docs/gitbook/SUMMARY.md index fa3416a74c..57d72f0275 100644 --- a/docs/gitbook/SUMMARY.md +++ b/docs/gitbook/SUMMARY.md @@ -25,6 +25,7 @@ * [Stalled Jobs](guide/workers/stalled-jobs.md) * [Sandboxed processors](guide/workers/sandboxed-processors.md) * [Pausing queues](guide/workers/pausing-queues.md) + * [Preserve Order](guide/workers/preserve-order.md) * [Jobs](guide/jobs/README.md) * [FIFO](guide/jobs/fifo.md) * [LIFO](guide/jobs/lifo.md) diff --git a/docs/gitbook/guide/workers/preserve-order.md b/docs/gitbook/guide/workers/preserve-order.md new file mode 100644 index 0000000000..7b8a65752a --- /dev/null +++ b/docs/gitbook/guide/workers/preserve-order.md @@ -0,0 +1,19 @@ +# Preserve Order + +BullMQ supports preserving execution order in jobs. When _preserveOrder_ option is provided as *true*, jobs will be processed in the same order, independently of retry strategies. If the current job fails and has a retry strategy, queue will be in rate limit state until the delay is accomplish. + +```typescript +const worker = new Worker('queueName', async (job: Job) => { + // do some work +}, { + preserveOrder: true + }); +``` + +{% hint style="info" %} +when using retries and backoffs, for instance, a failed job will keep the queue idle during the time the job is being backed off until it is picked again. +{% endhint %} + +{% hint style="warning" %} +This feature is only allowed when using concurrency 1, any greater value will throw an error. Make sure to also set a [global concurrency](https://docs.bullmq.io/guide/queues/global-concurrency) +{% endhint %} diff --git a/python/bullmq/job.py b/python/bullmq/job.py index 45a1995f33..c5b4c4caaa 100644 --- a/python/bullmq/job.py +++ b/python/bullmq/job.py @@ -150,7 +150,7 @@ async def moveToFailed(self, err, token:str, fetchNext:bool = False): ) if delay == -1: move_to_failed = True - elif delay: + elif delay and not self.queue.opts.get("preserveOrder", False): keys, args = self.scripts.moveToDelayedArgs( self.id, round(time.time() * 1000), @@ -161,7 +161,10 @@ async def moveToFailed(self, err, token:str, fetchNext:bool = False): await self.scripts.commands["moveToDelayed"](keys=keys, args=args, client=pipe) command = 'delayed' else: - keys, args = self.scripts.retryJobArgs(self.id, self.opts.get("lifo", False), token) + keys, args = self.scripts.retryJobArgs(self.id, self.opts.get("lifo", False), token, { + "preserveOrder": self.queue.opts.get("preserveOrder", False), + "pttl": delay + }) await self.scripts.commands["retryJob"](keys=keys, args=args, client=pipe) command = 'retryJob' diff --git a/python/bullmq/scripts.py b/python/bullmq/scripts.py index ef1851bb12..bae363db99 100644 --- a/python/bullmq/scripts.py +++ b/python/bullmq/scripts.py @@ -54,7 +54,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection "promote": self.redisClient.register_script(self.getScript("promote-9.lua")), "removeJob": self.redisClient.register_script(self.getScript("removeJob-2.lua")), "reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-8.lua")), - "retryJob": self.redisClient.register_script(self.getScript("retryJob-11.lua")), + "retryJob": self.redisClient.register_script(self.getScript("retryJob-12.lua")), "moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-8.lua")), "saveStacktrace": self.redisClient.register_script(self.getScript("saveStacktrace-1.lua")), "updateData": self.redisClient.register_script(self.getScript("updateData-1.lua")), @@ -252,13 +252,16 @@ def retryJobArgs(self, job_id: str, lifo: bool, token: str, opts: dict = {}): keys.append(self.keys['delayed']) keys.append(self.keys['prioritized']) keys.append(self.keys['pc']) + keys.append(self.keys['limiter']) keys.append(self.keys['marker']) keys.append(self.keys['stalled']) push_cmd = "RPUSH" if lifo else "LPUSH" + pttl = opts.get("pttl", 0) args = [self.keys[''], round(time.time() * 1000), push_cmd, - job_id, token, "1" if opts.get("skipAttempt") else "0"] + job_id, token, "1" if opts.get("preserveOrder") else "0", + pttl if pttl > 0 else 0] return (keys, args) diff --git a/src/classes/job.ts b/src/classes/job.ts index 06035eefd9..a8b6ad2a5e 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -682,7 +682,7 @@ export class Job< if (delay === -1) { moveToFailed = true; - } else if (delay) { + } else if (delay && !opts.preserveOrder) { const args = this.scripts.moveToDelayedArgs( this.id, Date.now(), @@ -694,7 +694,10 @@ export class Job< } else { // Retry immediately (multi).retryJob( - this.scripts.retryJobArgs(this.id, this.opts.lifo, token), + this.scripts.retryJobArgs(this.id, this.opts.lifo, token, { + preserveOrder: opts.preserveOrder, + pttl: delay, + }), ); command = 'retryJob'; } diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index df3f722c28..ead7ff7e71 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -23,6 +23,7 @@ import { WorkerOptions, KeepJobs, MoveToDelayedOpts, + RetryOpts, RepeatableOptions, } from '../interfaces'; import { @@ -1029,6 +1030,7 @@ export class Scripts { jobId: string, lifo: boolean, token: string, + opts: RetryOpts, ): (string | number)[] { const keys: (string | number)[] = [ this.queue.keys.active, @@ -1042,9 +1044,10 @@ export class Scripts { this.queue.keys.pc, this.queue.keys.marker, this.queue.keys.stalled, + this.queue.keys.limiter, ]; - const pushCmd = (lifo ? 'R' : 'L') + 'PUSH'; + const pushCmd = (lifo || opts.preserveOrder ? 'R' : 'L') + 'PUSH'; return keys.concat([ this.queue.toKey(''), @@ -1052,6 +1055,8 @@ export class Scripts { pushCmd, jobId, token, + opts.preserveOrder ? '1' : '0', + opts.pttl && opts.pttl > 0 ? opts.pttl : 0, ]); } diff --git a/src/classes/worker.ts b/src/classes/worker.ts index 8bf4567dc7..7f7cd1a742 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -218,6 +218,7 @@ export class Worker< stalledInterval: 30000, autorun: true, runRetryDelay: 15000, + preserveOrder: false, ...this.opts, }; @@ -381,6 +382,11 @@ export class Worker< ) { throw new Error('concurrency must be a finite number greater than 0'); } + + if (this.opts.preserveOrder && concurrency > 1) { + throw new Error('concurrency must be 1 when preserveOrder is enabled'); + } + this.opts.concurrency = concurrency; } diff --git a/src/commands/retryJob-11.lua b/src/commands/retryJob-12.lua similarity index 88% rename from src/commands/retryJob-11.lua rename to src/commands/retryJob-12.lua index 33d1f7a85a..7abb3d8fae 100644 --- a/src/commands/retryJob-11.lua +++ b/src/commands/retryJob-12.lua @@ -13,12 +13,15 @@ KEYS[9] 'pc' priority counter KEYS[10] 'marker' KEYS[11] 'stalled' + KEYS[12] limiter key ARGV[1] key prefix ARGV[2] timestamp - ARGV[3] pushCmd + ARGV[3] pushCmd - lifo -> RPUSH - fifo -> LPUSH ARGV[4] jobId ARGV[5] token + ARGV[6] preserve order + ARGV[7] rate limit pttl Events: 'waiting' @@ -70,6 +73,13 @@ if rcall("EXISTS", KEYS[4]) == 1 then rcall("HINCRBY", KEYS[4], "atm", 1) + if ARGV[6] == "1" then + local pttl = tonumber(ARGV[7]) + if pttl > 0 then + rcall("SET", KEYS[12], 999999, "PX", pttl) + end + end + local maxEvents = getOrSetMaxEvents(KEYS[5]) -- Emit waiting event diff --git a/src/interfaces/minimal-job.ts b/src/interfaces/minimal-job.ts index 60d4eb494a..9bc0b3587e 100644 --- a/src/interfaces/minimal-job.ts +++ b/src/interfaces/minimal-job.ts @@ -8,6 +8,11 @@ export interface MoveToDelayedOpts { skipAttempt?: boolean; } +export interface RetryOpts { + preserveOrder?: boolean; + pttl?: number; +} + export interface MoveToWaitingChildrenOpts { child?: { id: string; diff --git a/src/interfaces/worker-options.ts b/src/interfaces/worker-options.ts index 77a204a23f..0d523de1b1 100644 --- a/src/interfaces/worker-options.ts +++ b/src/interfaces/worker-options.ts @@ -58,6 +58,12 @@ export interface WorkerOptions extends QueueBaseOptions { */ maxStalledCount?: number; + /** + * If true, it will rate limit the queue when moving this job into delayed. + * Will stop rate limiting the queue until this job is moved to completed or failed. + */ + preserveOrder?: boolean; + /** * Number of milliseconds between stallness checks. * diff --git a/tests/test_delay.ts b/tests/test_delay.ts index fe670e7690..198ea370d1 100644 --- a/tests/test_delay.ts +++ b/tests/test_delay.ts @@ -336,6 +336,59 @@ describe('Delayed jobs', function () { await worker.close(); }); + describe('when preserveOrder is provided', function () { + it('should process delayed jobs waiting to be finished in correct order ', async function () { + this.timeout(4000); + const numJobs = 12; + + const worker = new Worker(queueName, async (job: Job) => {}, { + autorun: false, + preserveOrder: true, + connection, + prefix, + }); + + worker.on('failed', function (job, err) {}); + + const orderList: number[] = []; + let count = 0; + const completed = new Promise((resolve, reject) => { + worker.on('completed', async function (job) { + try { + count++; + orderList.push(job.data.order as number); + if (count == numJobs) { + resolve(); + } + } catch (err) { + reject(err); + } + }); + }); + + const jobs = Array.from(Array(numJobs).keys()).map(index => ({ + name: 'test', + data: { order: numJobs - index }, + opts: { + delay: (numJobs - index) * 150, + attempts: 1, + backoff: { type: 'fixed', delay: 200 }, + }, + })); + const expectedOrder = Array.from(Array(numJobs).keys()).map( + index => index + 1, + ); + + await queue.addBulk(jobs); + worker.run(); + await completed; + + expect(orderList).to.eql(expectedOrder); + + await worker.close(); + }); + }); + it('should process delayed jobs with several workers respecting delay', async function () { this.timeout(30000); let count = 0; diff --git a/tests/test_worker.ts b/tests/test_worker.ts index 34cd191905..55eb94a361 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -2092,6 +2092,21 @@ describe('workers', function () { ).to.throw('drainDelay must be greater than 0'); }); + describe('when preserveOrder is enabled', () => { + it('concurrency cannot be greater than 1', function () { + this.timeout(4000); + expect( + () => + new Worker(queueName, async () => {}, { + connection, + concurrency: 2, + prefix, + preserveOrder: true, + }), + ).to.throw('concurrency must be 1 when preserveOrder is enabled'); + }); + }); + it('lock extender continues to run until all active jobs are completed when closing a worker', async function () { this.timeout(4000); let worker;