diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index 05936cd5d5..523fbb9bf1 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -166,22 +166,24 @@ export class JobScheduler extends QueueBase { producerId, ); - const job = new this.Job( - this, - jobName, - jobData, - mergedOpts, - jobId, - ); + if (jobId) { + const job = new this.Job( + this, + jobName, + jobData, + mergedOpts, + jobId, + ); - job.id = jobId; + job.id = jobId; - span?.setAttributes({ - [TelemetryAttributes.JobSchedulerId]: jobSchedulerId, - [TelemetryAttributes.JobId]: job.id, - }); + span?.setAttributes({ + [TelemetryAttributes.JobSchedulerId]: jobSchedulerId, + [TelemetryAttributes.JobId]: job.id, + }); - return job; + return job; + } } else { const jobId = await this.scripts.updateJobSchedulerNextMillis( jobSchedulerId, diff --git a/src/commands/addJobScheduler-6.lua b/src/commands/addJobScheduler-6.lua index 87055e1177..99c8acb950 100644 --- a/src/commands/addJobScheduler-6.lua +++ b/src/commands/addJobScheduler-6.lua @@ -40,50 +40,7 @@ local prefixKey = ARGV[8] --- @include "includes/addDelayedJob" --- @include "includes/getOrSetMaxEvents" --- @include "includes/removeJob" - -local function storeRepeatableJob(schedulerId, schedulerKey, repeatKey, nextMillis, opts, templateData, templateOpts) - rcall("ZADD", repeatKey, nextMillis, schedulerId) - - local optionalValues = {} - if opts['tz'] then - table.insert(optionalValues, "tz") - table.insert(optionalValues, opts['tz']) - end - - if opts['limit'] then - table.insert(optionalValues, "limit") - table.insert(optionalValues, opts['limit']) - end - - if opts['pattern'] then - table.insert(optionalValues, "pattern") - table.insert(optionalValues, opts['pattern']) - end - - if opts['endDate'] then - table.insert(optionalValues, "endDate") - table.insert(optionalValues, opts['endDate']) - end - - if opts['every'] then - table.insert(optionalValues, "every") - table.insert(optionalValues, opts['every']) - end - - local jsonTemplateOpts = cjson.encode(templateOpts) - if jsonTemplateOpts and jsonTemplateOpts ~= '{}' then - table.insert(optionalValues, "opts") - table.insert(optionalValues, jsonTemplateOpts) - end - - if templateData and templateData ~= '{}' then - table.insert(optionalValues, "data") - table.insert(optionalValues, templateData) - end - - rcall("HMSET", schedulerKey, "name", opts['name'], "ic", 1, - unpack(optionalValues)) -end +--- @include "includes/storeJobScheduler" local schedulerKey = repeatKey .. ":" .. jobSchedulerId local nextDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis @@ -93,33 +50,38 @@ local nextDelayedJobKey = schedulerKey .. ":" .. nextMillis -- the next iteration. local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId) if prevMillis ~= false then - local delayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis - - if rcall("ZSCORE", delayedKey, delayedJobId) ~= false - and (rcall("EXISTS", nextDelayedJobKey) ~= 1 - or delayedJobId == nextDelayedJobId) then - removeJob(delayedJobId, true, prefixKey, true --[[remove debounce key]]) - rcall("ZREM", delayedKey, delayedJobId) + local prevDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis + + if rcall("ZSCORE", delayedKey, prevDelayedJobId) ~= false then + -- Delayed job is not regenerated with new scheduler opts, + -- next delayed job will be scheduled with old repeat options. + -- This is why we need to remove the current delayed job. + -- This is happening in that way because we use opts from current delayed job to schedule the next one. + removeJob(prevDelayedJobId, true, prefixKey, true --[[remove debounce key]]) + rcall("ZREM", delayedKey, prevDelayedJobId) end end local schedulerOpts = cmsgpack.unpack(ARGV[2]) -storeRepeatableJob(jobSchedulerId, schedulerKey, repeatKey, nextMillis, schedulerOpts, ARGV[4], templateOpts) +storeJobScheduler(jobSchedulerId, schedulerKey, repeatKey, nextMillis, schedulerOpts, ARGV[4], templateOpts) -local eventsKey = KEYS[5] -local metaKey = KEYS[2] -local maxEvents = getOrSetMaxEvents(metaKey) +if rcall("EXISTS", nextDelayedJobKey) ~= 1 then + local eventsKey = KEYS[5] + local metaKey = KEYS[2] + local maxEvents = getOrSetMaxEvents(metaKey) -rcall("INCR", KEYS[3]) + rcall("INCR", KEYS[3]) + rcall("HINCRBY", schedulerKey, "ic", 1) -local delayedOpts = cmsgpack.unpack(ARGV[6]) + local delayedOpts = cmsgpack.unpack(ARGV[6]) -addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerOpts['name'], ARGV[4], delayedOpts, - timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil) + addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerOpts['name'], ARGV[4], delayedOpts, + timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil) -if ARGV[9] ~= "" then - rcall("HSET", ARGV[9], "nrjid", nextDelayedJobId) -end + if ARGV[9] ~= "" then + rcall("HSET", ARGV[9], "nrjid", nextDelayedJobId) + end -return nextDelayedJobId .. "" -- convert to string + return nextDelayedJobId .. "" -- convert to string +end diff --git a/src/commands/includes/storeJobScheduler.lua b/src/commands/includes/storeJobScheduler.lua new file mode 100644 index 0000000000..d11cf41223 --- /dev/null +++ b/src/commands/includes/storeJobScheduler.lua @@ -0,0 +1,46 @@ +--[[ + Function to store a job scheduler +]] +local function storeJobScheduler(schedulerId, schedulerKey, repeatKey, nextMillis, opts, templateData, templateOpts) + rcall("ZADD", repeatKey, nextMillis, schedulerId) + + local optionalValues = {} + if opts['tz'] then + table.insert(optionalValues, "tz") + table.insert(optionalValues, opts['tz']) + end + + if opts['limit'] then + table.insert(optionalValues, "limit") + table.insert(optionalValues, opts['limit']) + end + + if opts['pattern'] then + table.insert(optionalValues, "pattern") + table.insert(optionalValues, opts['pattern']) + end + + if opts['endDate'] then + table.insert(optionalValues, "endDate") + table.insert(optionalValues, opts['endDate']) + end + + if opts['every'] then + table.insert(optionalValues, "every") + table.insert(optionalValues, opts['every']) + end + + local jsonTemplateOpts = cjson.encode(templateOpts) + if jsonTemplateOpts and jsonTemplateOpts ~= '{}' then + table.insert(optionalValues, "opts") + table.insert(optionalValues, jsonTemplateOpts) + end + + if templateData and templateData ~= '{}' then + table.insert(optionalValues, "data") + table.insert(optionalValues, templateData) + end + + rcall("HMSET", schedulerKey, "name", opts['name'], "ic", 0, + unpack(optionalValues)) +end diff --git a/src/commands/updateJobScheduler-7.lua b/src/commands/updateJobScheduler-7.lua index 864de3be8e..c1ba1ec71e 100644 --- a/src/commands/updateJobScheduler-7.lua +++ b/src/commands/updateJobScheduler-7.lua @@ -47,31 +47,33 @@ if prevMillis ~= false then local schedulerAttributes = rcall("HMGET", schedulerKey, "name", "data") rcall("ZADD", repeatKey, nextMillis, jobSchedulerId) - rcall("HINCRBY", schedulerKey, "ic", 1) - local eventsKey = KEYS[5] - local metaKey = KEYS[2] - local maxEvents = getOrSetMaxEvents(metaKey) + if rcall("EXISTS", nextDelayedJobKey) ~= 1 then + local eventsKey = KEYS[5] + local metaKey = KEYS[2] + local maxEvents = getOrSetMaxEvents(metaKey) - rcall("INCR", KEYS[3]) + rcall("INCR", KEYS[3]) + rcall("HINCRBY", schedulerKey, "ic", 1) - local delayedOpts = cmsgpack.unpack(ARGV[4]) + local delayedOpts = cmsgpack.unpack(ARGV[4]) - -- TODO: remove this workaround in next breaking change, - -- all job-schedulers must save job data - local templateData = schedulerAttributes[2] or ARGV[3] + -- TODO: remove this workaround in next breaking change, + -- all job-schedulers must save job data + local templateData = schedulerAttributes[2] or ARGV[3] - if templateData and templateData ~= '{}' then - rcall("HSET", schedulerKey, "data", templateData) - end + if templateData and templateData ~= '{}' then + rcall("HSET", schedulerKey, "data", templateData) + end - addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerAttributes[1], - templateData or '{}', delayedOpts, timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil) - - if KEYS[7] ~= "" then - rcall("HSET", KEYS[7], "nrjid", nextDelayedJobId) - end + addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerAttributes[1], + templateData or '{}', delayedOpts, timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil) + + if KEYS[7] ~= "" then + rcall("HSET", KEYS[7], "nrjid", nextDelayedJobId) + end - return nextDelayedJobId .. "" -- convert to string + return nextDelayedJobId .. "" -- convert to string + end end end diff --git a/tests/test_bulk.ts b/tests/test_bulk.ts index cdaa97ea76..71dd4e0e2e 100644 --- a/tests/test_bulk.ts +++ b/tests/test_bulk.ts @@ -127,14 +127,14 @@ describe('bulk jobs', () => { const worker = new Worker( queueName, async () => { - await delay(900); + await delay(800); }, { connection, prefix }, ); const worker2 = new Worker( queueName, async () => { - await delay(900); + await delay(800); }, { connection, prefix }, ); diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index 180d6b4c20..a9317d6fd3 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -241,8 +241,8 @@ describe('Job Scheduler', function () { const repeatableJobs = await queue.getJobSchedulers(); expect(repeatableJobs.length).to.be.eql(1); await this.clock.tickAsync(ONE_MINUTE); - const delayed = await queue.getDelayed(); - expect(delayed).to.have.length(1); + const counts = await queue.getJobCounts(); + expect(counts.delayed + counts.completed + counts.waiting).to.be.eql(1); await worker.close(); }); @@ -1533,6 +1533,8 @@ describe('Job Scheduler', function () { describe('when repeatable job is promoted', function () { it('keeps one repeatable and one delayed after being processed', async function () { + this.clock.restore(); + const repeatOpts = { pattern: '0 * 1 * *', }; @@ -1558,16 +1560,65 @@ describe('Job Scheduler', function () { const delayedCount2 = await queue.getDelayedCount(); expect(delayedCount2).to.be.equal(1); - const configs = await repeat.getRepeatableJobs(0, -1, true); + const schedulersCount = await queue.getJobSchedulersCount(); expect(delayedCount).to.be.equal(1); const count = await queue.count(); expect(count).to.be.equal(1); - expect(configs).to.have.length(1); + expect(schedulersCount).to.be.equal(1); await worker.close(); }); + + describe('when scheduler is removed and re-added', function () { + it('should not add next delayed job if already existed in different state than delayed', async function () { + this.clock.restore(); + + const repeatOpts = { + pattern: '0 * 1 * *', + }; + + const worker = new Worker( + queueName, + async () => { + await queue.removeJobScheduler('test'); + await queue.upsertJobScheduler('test', repeatOpts); + }, + { + connection, + prefix, + }, + ); + + const completing = new Promise(resolve => { + worker.on('completed', () => { + resolve(); + }); + }); + + const repeatableJob = await queue.upsertJobScheduler( + 'test', + repeatOpts, + ); + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.be.equal(1); + + await repeatableJob!.promote(); + await completing; + + const delayedCountAfter = await queue.getDelayedCount(); + expect(delayedCountAfter).to.be.equal(0); + + const schedulersCount = await queue.getJobSchedulersCount(); + + const counts = await queue.getJobCounts(); + + expect(counts.completed).to.be.equal(1); + expect(schedulersCount).to.be.equal(1); + await worker.close(); + }); + }); }); it('should allow removing a named repeatable job', async function () {