diff --git a/python/bullmq/scripts.py b/python/bullmq/scripts.py index d3c18232d0..bcfc36cc71 100644 --- a/python/bullmq/scripts.py +++ b/python/bullmq/scripts.py @@ -31,10 +31,10 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection self.redisConnection = redisConnection self.redisClient = redisConnection.conn self.commands = { - "addStandardJob": self.redisClient.register_script(self.getScript("addStandardJob-8.lua")), - "addDelayedJob": self.redisClient.register_script(self.getScript("addDelayedJob-6.lua")), + "addStandardJob": self.redisClient.register_script(self.getScript("addStandardJob-9.lua")), + "addDelayedJob": self.redisClient.register_script(self.getScript("addDelayedJob-7.lua")), "addParentJob": self.redisClient.register_script(self.getScript("addParentJob-4.lua")), - "addPrioritizedJob": self.redisClient.register_script(self.getScript("addPrioritizedJob-8.lua")), + "addPrioritizedJob": self.redisClient.register_script(self.getScript("addPrioritizedJob-9.lua")), "changePriority": self.redisClient.register_script(self.getScript("changePriority-7.lua")), "cleanJobsInSet": self.redisClient.register_script(self.getScript("cleanJobsInSet-3.lua")), "extendLock": self.redisClient.register_script(self.getScript("extendLock-2.lua")), @@ -119,7 +119,7 @@ def addStandardJob(self, job: Job, timestamp: int, pipe = None): Add a standard job to the queue """ keys = self.getKeys(['wait', 'paused', 'meta', 'id', - 'completed', 'active', 'events', 'marker']) + 'completed', 'active', 'events', 'marker', 'waiting-children']) args = self.addJobArgs(job, None) args.append(timestamp) @@ -130,7 +130,7 @@ def addDelayedJob(self, job: Job, timestamp: int, pipe = None): Add a delayed job to the queue """ keys = self.getKeys(['marker', 'meta', 'id', - 'delayed', 'completed', 'events']) + 'delayed', 'completed', 'events', 'waiting-children']) args = self.addJobArgs(job, None) args.append(timestamp) @@ -141,7 +141,7 @@ def addPrioritizedJob(self, job: Job, timestamp: int, pipe = None): Add a prioritized job to the queue """ keys = self.getKeys(['marker', 'meta', 'id', - 'prioritized', 'completed', 'active', 'events', 'pc']) + 'prioritized', 'completed', 'active', 'events', 'pc', 'waiting-children']) args = self.addJobArgs(job, None) args.append(timestamp) diff --git a/src/classes/job.ts b/src/classes/job.ts index bffd9c5eb5..b6b732a73b 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -30,6 +30,7 @@ import { removeUndefinedFields, optsAsJSON, optsFromJSON, + getChainKey, } from '../utils'; import { Backoffs } from './backoffs'; import { Scripts, raw2NextJobData } from './scripts'; @@ -149,9 +150,9 @@ export class Job< deduplicationId?: string; /** - * Chain identifier. + * Chain key. */ - chainId?: string; + chainKey?: string; /** * Base repeat job key. @@ -222,7 +223,9 @@ export class Job< ? opts.deduplication.id : this.debounceId; - this.chainId = opts.chainId; + this.chainKey = opts.chain + ? getChainKey(opts.chain, queue.qualifiedName) + : undefined; this.toKey = queue.toKey.bind(queue); this.setScripts(); @@ -347,8 +350,8 @@ export class Job< job.repeatJobKey = json.rjk; } - if (json.cid) { - job.chainId = json.cid; + if (json.chkey) { + job.chainKey = json.chkey; } if (json.deid) { @@ -458,7 +461,7 @@ export class Job< timestamp: this.timestamp, failedReason: JSON.stringify(this.failedReason), stacktrace: JSON.stringify(this.stacktrace), - chainId: this.chainId, + chainKey: this.chainKey, debounceId: this.debounceId, deduplicationId: this.deduplicationId, repeatJobKey: this.repeatJobKey, diff --git a/src/classes/queue-keys.ts b/src/classes/queue-keys.ts index 8140b594f1..b2b09d8ceb 100644 --- a/src/classes/queue-keys.ts +++ b/src/classes/queue-keys.ts @@ -25,12 +25,11 @@ export class QueueKeys { 'pc', // priority counter key 'marker', // marker key 'de', // deduplication key + 'ch', // chains key ].forEach(key => { keys[key] = this.toKey(name, key); }); - keys['chains'] = `${this.prefix}:chains`; - return keys; } diff --git a/src/classes/queue.ts b/src/classes/queue.ts index 5a444012f5..2603a838ee 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -663,7 +663,7 @@ export class Queue< async removeChainKey(id: string): Promise { const client = await this.client; - return client.del(`${this.keys.chains}:${id}`); + return client.del(`${this.keys.ch}:${id}`); } /** diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 0951ae82ed..397d9277e1 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -194,7 +194,7 @@ export class Scripts { parent, job.repeatJobKey, job.deduplicationId ? `${queueKeys.de}:${job.deduplicationId}` : null, - job.chainId ? `${queueKeys.chains}:${job.chainId}` : null, + job.chainKey ? job.chainKey : null, ]; let encodedOpts; @@ -564,7 +564,6 @@ export class Scripts { keys[11] = this.queue.toKey(job.id ?? ''); keys[12] = metricsKey; keys[13] = this.queue.keys.marker; - keys[14] = this.queue.keys.chains; const keepJobs = this.getKeepJobs(shouldRemove, workerKeepJobs); diff --git a/src/commands/addDelayedJob-7.lua b/src/commands/addDelayedJob-7.lua index 0b279c8b07..ce866e7be6 100644 --- a/src/commands/addDelayedJob-7.lua +++ b/src/commands/addDelayedJob-7.lua @@ -104,7 +104,7 @@ end -- Store the job. local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp, parentKey, parentData, - repeatJobKey) + repeatJobKey, chainKey) local lastJobKeyInChain = upsertChainKeyIfNeeded(chainKey, jobIdKey) diff --git a/src/commands/addPrioritizedJob-9.lua b/src/commands/addPrioritizedJob-9.lua index dabe7d3e46..76d31ee2dd 100644 --- a/src/commands/addPrioritizedJob-9.lua +++ b/src/commands/addPrioritizedJob-9.lua @@ -104,7 +104,7 @@ end -- Store the job. local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp, parentKey, parentData, - repeatJobKey) + repeatJobKey, chainKey) local lastJobKeyInChain = upsertChainKeyIfNeeded(chainKey, jobIdKey) diff --git a/src/commands/addStandardJob-9.lua b/src/commands/addStandardJob-9.lua index b99d98362e..1d6a87d054 100644 --- a/src/commands/addStandardJob-9.lua +++ b/src/commands/addStandardJob-9.lua @@ -74,9 +74,9 @@ local parentData --- @include "includes/upsertChainKeyIfNeeded" if parentKey ~= nil then - if rcall("EXISTS", parentKey) ~= 1 then return -5 end + if rcall("EXISTS", parentKey) ~= 1 then return -5 end - parentData = cjson.encode(parent) + parentData = cjson.encode(parent) end local jobCounter = rcall("INCR", KEYS[4]) @@ -107,7 +107,7 @@ end -- Store the job. storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp, - parentKey, parentData, repeatJobKey) + parentKey, parentData, repeatJobKey, chainKey) local lastJobKeyInChain = upsertChainKeyIfNeeded(chainKey, jobIdKey) diff --git a/src/commands/includes/moveParentFromWaitingChildrenToFailed.lua b/src/commands/includes/moveParentFromWaitingChildrenToFailed.lua index ee1447d236..34bba955a4 100644 --- a/src/commands/includes/moveParentFromWaitingChildrenToFailed.lua +++ b/src/commands/includes/moveParentFromWaitingChildrenToFailed.lua @@ -21,7 +21,7 @@ local function moveParentFromWaitingChildrenToFailed( parentQueueKey, parentKey, local jobAttributes = rcall("HMGET", parentKey, "parent", "deid", "opts", "cid") removeDeduplicationKeyIfNeeded(parentQueuePrefix, jobAttributes[2]) - removeChainKeyIfNeeded(parentQueuePrefix, jobAttributes[4], parentKey) + removeChainKeyIfNeeded(jobAttributes[4], parentKey) if jobAttributes[1] then local parentData = cjson.decode(jobAttributes[1]) diff --git a/src/commands/includes/removeChainKeyIfNeeded.lua b/src/commands/includes/removeChainKeyIfNeeded.lua index cdda9ee484..8b73c10dbb 100644 --- a/src/commands/includes/removeChainKeyIfNeeded.lua +++ b/src/commands/includes/removeChainKeyIfNeeded.lua @@ -2,13 +2,12 @@ Function to remove chain key if needed. ]] -local function removeChainKeyIfNeeded(chainsKey, chainId, jobKey) - if chainId then - local chainKey = chainsKey .. ":" .. chainId +local function removeChainKeyIfNeeded(chainKey, jobKey) + if chainKey then local lastJobKeyInChain = rcall("GET", chainKey) if lastJobKeyInChain == jobKey then rcall("DEL", chainKey) end end -end \ No newline at end of file +end diff --git a/src/commands/includes/removeJob.lua b/src/commands/includes/removeJob.lua index 2f2d4dba5f..5ad5755760 100644 --- a/src/commands/includes/removeJob.lua +++ b/src/commands/includes/removeJob.lua @@ -14,6 +14,6 @@ local function removeJob(jobId, hard, baseKey, shouldRemoveDeduplicationKey) if shouldRemoveDeduplicationKey then removeDeduplicationKey(baseKey, jobKey) end - removeChainKeyIfNeeded(baseKey, rcall("HGET", jobKey, "cid"), jobKey) + removeChainKeyIfNeeded(rcall("HGET", jobKey, "cid"), jobKey) removeJobKeys(jobKey) end diff --git a/src/commands/includes/storeJob.lua b/src/commands/includes/storeJob.lua index b0b8db6ca0..e9404010fa 100644 --- a/src/commands/includes/storeJob.lua +++ b/src/commands/includes/storeJob.lua @@ -2,7 +2,7 @@ Function to store a job ]] local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp, - parentKey, parentData, repeatJobKey) + parentKey, parentData, repeatJobKey, chainKey) local jsonOpts = cjson.encode(opts) local delay = opts['delay'] or 0 local priority = opts['priority'] or 0 @@ -26,9 +26,9 @@ local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp, table.insert(optionalValues, debounceId) end - if opts['cid'] then - table.insert(optionalValues, "cid") - table.insert(optionalValues, opts['cid']) + if chainKey then + table.insert(optionalValues, "chkey") + table.insert(optionalValues, chainKey) end rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts, diff --git a/src/commands/moveStalledJobsToWait-9.lua b/src/commands/moveStalledJobsToWait-9.lua index 0ba50f64ba..a7bbefcb3c 100644 --- a/src/commands/moveStalledJobsToWait-9.lua +++ b/src/commands/moveStalledJobsToWait-9.lua @@ -87,7 +87,7 @@ if (#stalling > 0) then local opts = cjson.decode(rawOpts) rcall("ZADD", failedKey, timestamp, jobId) removeDeduplicationKeyIfNeeded(queueKeyPrefix, jobAttributes[3]) - removeChainKeyIfNeeded(queueKeyPrefix, jobAttributes[4], jobKey) + removeChainKeyIfNeeded(jobAttributes[4], jobKey) local failedReason = "job stalled more than allowable limit" diff --git a/src/commands/moveToFinished-15.lua b/src/commands/moveToFinished-14.lua similarity index 98% rename from src/commands/moveToFinished-15.lua rename to src/commands/moveToFinished-14.lua index e20d462fc2..dc2986d234 100644 --- a/src/commands/moveToFinished-15.lua +++ b/src/commands/moveToFinished-14.lua @@ -23,7 +23,6 @@ KEYS[12] jobId key KEYS[13] metrics key KEYS[14] marker key - KEYS[15] chains key ARGV[1] jobId ARGV[2] timestamp @@ -96,7 +95,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists return -4 end - local jobAttributes = rcall("HMGET", jobIdKey, "parentKey", "parent", "deid", "cid") + local jobAttributes = rcall("HMGET", jobIdKey, "parentKey", "parent", "deid", "chkey") local parentKey = jobAttributes[1] or "" local parentId = "" local parentQueueKey = "" @@ -122,7 +121,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists local prefix = ARGV[7] removeDeduplicationKeyIfNeeded(prefix, jobAttributes[3]) - removeChainKeyIfNeeded(KEYS[15], jobAttributes[4], jobIdKey) + removeChainKeyIfNeeded(jobAttributes[4], jobIdKey) -- If job has a parent we need to -- 1) remove this job id from parents dependencies diff --git a/src/commands/removeJob-2.lua b/src/commands/removeJob-2.lua index 2a2a159b3d..cf06d226d6 100644 --- a/src/commands/removeJob-2.lua +++ b/src/commands/removeJob-2.lua @@ -69,7 +69,7 @@ local function removeJob(prefix, jobId, parentKey, removeChildren) local prev = removeJobFromAnyState(prefix, jobId) removeDeduplicationKey(prefix, jobKey) - removeChainnKeyIfNeeded(prefix, rcall("HGET", jobKey, "cid"), jobKey) + removeChainKeyIfNeeded(rcall("HGET", jobKey, "cid"), jobKey) if removeJobKeys(jobKey) > 0 then local maxEvents = getOrSetMaxEvents(KEYS[2]) rcall("XADD", prefix .. "events", "MAXLEN", "~", maxEvents, "*", "event", "removed", "jobId", jobId, "prev", diff --git a/src/interfaces/chain-options.ts b/src/interfaces/chain-options.ts new file mode 100644 index 0000000000..1a3ad4fca0 --- /dev/null +++ b/src/interfaces/chain-options.ts @@ -0,0 +1,9 @@ +export interface ChainOptions { + id: string; + + /** + * It includes the prefix, the namespace separator :, and queue name. + * @see https://www.gnu.org/software/gawk/manual/html_node/Qualified-Names.html + */ + queueKey?: string; +} diff --git a/src/interfaces/index.ts b/src/interfaces/index.ts index 44a4945006..0ae751c3ec 100644 --- a/src/interfaces/index.ts +++ b/src/interfaces/index.ts @@ -1,6 +1,7 @@ export * from './advanced-options'; export * from './backoff-options'; export * from './base-job-options'; +export * from './chain-options'; export * from './child-message'; export * from './connection'; export * from './debounce-options'; diff --git a/src/interfaces/job-json.ts b/src/interfaces/job-json.ts index 926b510a7c..5edcc1cdab 100644 --- a/src/interfaces/job-json.ts +++ b/src/interfaces/job-json.ts @@ -20,7 +20,7 @@ export interface JobJson { repeatJobKey?: string; debounceId?: string; deduplicationId?: string; - chainId?: string; + chainKey?: string; processedBy?: string; } @@ -40,7 +40,7 @@ export interface JobJsonRaw { returnvalue: string; parentKey?: string; parent?: string; - cid?: string; + chkey?: string; deid?: string; rjk?: string; atm?: string; diff --git a/src/types/job-options.ts b/src/types/job-options.ts index a8f10a18c1..ec49b51c99 100644 --- a/src/types/job-options.ts +++ b/src/types/job-options.ts @@ -1,10 +1,10 @@ -import { BaseJobOptions, DebounceOptions } from '../interfaces'; +import { BaseJobOptions, ChainOptions, DebounceOptions } from '../interfaces'; export type JobsOptions = BaseJobOptions & { /** - * Chain identifier to add a new dependency on a flow chain identified by this option. + * Chain options to add a new dependency on a flow chain. */ - chainId?: string; + chain?: ChainOptions; /** * Debounce options. @@ -37,6 +37,11 @@ export type JobsOptions = BaseJobOptions & { * These fields are the ones stored in Redis with smaller keys for compactness. */ export type RedisJobOptions = BaseJobOptions & { + /** + * Debounce identifier. + */ + ch?: ChainOptions; + /** * Debounce identifier. */ diff --git a/src/utils.ts b/src/utils.ts index 1cdc09fa58..b5fad834dc 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -7,6 +7,7 @@ import { AbortController } from 'node-abort-controller'; // @ts-ignore import { CONNECTION_CLOSED_ERROR_MSG } from 'ioredis/built/utils'; import { + ChainOptions, ChildMessage, ContextManager, RedisClient, @@ -155,6 +156,17 @@ export function getParentKey(opts: { } } +export function getChainKey( + opts: ChainOptions, + queueQualifiedName: string, +): string | undefined { + if (opts) { + return `${opts.queueKey ? opts.queueKey : queueQualifiedName}:ch:${ + opts.id + }`; + } +} + export const clientCommandMessageReg = /ERR unknown command ['`]\s*client\s*['`]/; @@ -272,7 +284,7 @@ export const toString = (value: any): string => { export const QUEUE_EVENT_SUFFIX = ':qe'; const optsDecodeMap = { - cid: 'chainId', + ch: 'chain', de: 'deduplication', fpof: 'failParentOnFailure', idof: 'ignoreDependencyOnFailure', diff --git a/tests/test_flow.ts b/tests/test_flow.ts index dbb0853298..7bf30f6580 100644 --- a/tests/test_flow.ts +++ b/tests/test_flow.ts @@ -32,7 +32,7 @@ describe('flows', () => { afterEach(async function () { await queue.close(); - await removeAllQueueData(new IORedis(redisHost), '*'); + await removeAllQueueData(new IORedis(redisHost), queueName); }); afterAll(async function () { @@ -2669,11 +2669,11 @@ describe('flows', () => { }); }); - describe('when chainId is provided', async () => { + describe('when chain option is provided', async () => { describe('when job is the first one in the chain', async () => { describe('when job is standard', async () => { it('should add job in waiting state', async () => { - const job = await queue.add('test', {}, { chainId: '1' }); + const job = await queue.add('test', {}, { chain: { id: '1' } }); const state = await job.getState(); @@ -2683,7 +2683,11 @@ describe('flows', () => { describe('when job is delayed', async () => { it('should add job in delayed state', async () => { - const job = await queue.add('test', {}, { chainId: '1', delay: 500 }); + const job = await queue.add( + 'test', + {}, + { chain: { id: '1' }, delay: 500 }, + ); const state = await job.getState(); @@ -2696,7 +2700,7 @@ describe('flows', () => { const job = await queue.add( 'test', {}, - { chainId: '1', priority: 5 }, + { chain: { id: '1' }, priority: 5 }, ); const state = await job.getState(); @@ -2710,8 +2714,8 @@ describe('flows', () => { describe('when last job in the chain did not finish', async () => { describe('when last added job is standard', async () => { it('should add last job in waiting-children state', async () => { - await queue.add('test1', {}, { chainId: '1' }); - const job = await queue.add('test2', {}, { chainId: '1' }); + await queue.add('test1', {}, { chain: { id: '1' } }); + const job = await queue.add('test2', {}, { chain: { id: '1' } }); const state = await job.getState(); @@ -2721,11 +2725,11 @@ describe('flows', () => { describe('when last added job is delayed', async () => { it('should add last job in waiting-children state', async () => { - await queue.add('test1', {}, { chainId: '1' }); + await queue.add('test1', {}, { chain: { id: '1' } }); const job = await queue.add( 'test2', {}, - { chainId: '1', delay: 500 }, + { chain: { id: '1' }, delay: 500 }, ); const state = await job.getState(); @@ -2736,11 +2740,11 @@ describe('flows', () => { describe('when last added job is prioritized', async () => { it('should add last job in waiting-children state', async () => { - await queue.add('test1', {}, { chainId: '1' }); + await queue.add('test1', {}, { chain: { id: '1' } }); const job = await queue.add( 'test2', {}, - { chainId: '1', priority: 5 }, + { chain: { id: '1' }, priority: 5 }, ); const state = await job.getState(); @@ -2752,7 +2756,7 @@ describe('flows', () => { describe('when last job in the chain finished', async () => { describe('when last added job is standard', async () => { - it('should add last job in waiting-children state', async () => { + it('should add last job in waiting state', async () => { const testQueueName = `test-queue-${v4()}`; const testQueue = new Queue(testQueueName, { connection, prefix }); @@ -2770,16 +2774,20 @@ describe('flows', () => { }); }); - await testQueue.add('test1', {}, { chainId: '1' }); + await testQueue.add( + 'test1', + {}, + { chain: { id: '1', queueKey: `${prefix}:${queueName}` } }, + ); worker.run(); await completed; - const job = await queue.add('test2', {}, { chainId: '1' }); + const job = await queue.add('test2', {}, { chain: { id: '1' } }); const state = await job.getState(); - expect(state).to.be.equal('waiting-children'); + expect(state).to.be.equal('waiting'); await testQueue.close(); await worker.close(); @@ -2788,32 +2796,82 @@ describe('flows', () => { }); describe('when last added job is delayed', async () => { - it('should add last job in waiting-children state', async () => { - await queue.add('test1', {}, { chainId: '1' }); + it('should add last job in delayed state', async () => { + const testQueueName = `test-queue-${v4()}`; + const testQueue = new Queue(testQueueName, { connection, prefix }); + + const worker = new Worker(testQueueName, async () => {}, { + autorun: false, + connection, + prefix, + }); + + await worker.waitUntilReady(); + + const completed = new Promise((resolve, reject) => { + worker.on('completed', async function () { + resolve(); + }); + }); + + await testQueue.add( + 'test1', + {}, + { chain: { id: '1', queueKey: `${prefix}:${queueName}` } }, + ); + + worker.run(); + await completed; + const job = await queue.add( 'test2', {}, - { chainId: '1', delay: 500 }, + { chain: { id: '1' }, delay: 500 }, ); const state = await job.getState(); - expect(state).to.be.equal('waiting-children'); + expect(state).to.be.equal('delayed'); }); }); describe('when last added job is prioritized', async () => { - it('should add last job in waiting-children state', async () => { - await queue.add('test1', {}, { chainId: '1' }); + it('should add last job in prioritized state', async () => { + const testQueueName = `test-queue-${v4()}`; + const testQueue = new Queue(testQueueName, { connection, prefix }); + + const worker = new Worker(testQueueName, async () => {}, { + autorun: false, + connection, + prefix, + }); + + await worker.waitUntilReady(); + + const completed = new Promise((resolve, reject) => { + worker.on('completed', async function () { + resolve(); + }); + }); + + await testQueue.add( + 'test1', + {}, + { chain: { id: '1', queueKey: `${prefix}:${queueName}` } }, + ); + + worker.run(); + await completed; + const job = await queue.add( 'test2', {}, - { chainId: '1', priority: 5 }, + { chain: { id: '1' }, priority: 5 }, ); const state = await job.getState(); - expect(state).to.be.equal('waiting-children'); + expect(state).to.be.equal('prioritized'); }); }); });