From 4ea35dd9e16ff0197f204210696f41c0c5bd0e30 Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Sun, 12 Jan 2025 11:35:28 -0500 Subject: [PATCH] fix(job-scheduler): add next delayed job only when prevMillis matches with producerId (#3001) --- src/classes/scripts.ts | 3 +- ...heduler-6.lua => updateJobScheduler-7.lua} | 58 +++++++++------- tests/test_job_scheduler.ts | 67 ++++++++++++++++++- 3 files changed, 99 insertions(+), 29 deletions(-) rename src/commands/{updateJobScheduler-6.lua => updateJobScheduler-7.lua} (63%) diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index e5e832eda5..93241de272 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -369,6 +369,7 @@ export class Scripts { queueKeys.delayed, queueKeys.events, queueKeys.repeat, + producerId ? this.queue.toKey(producerId) : '', ]; const args = [ @@ -377,7 +378,7 @@ export class Scripts { pack(delayedJobOpts), Date.now(), queueKeys[''], - producerId ? this.queue.toKey(producerId) : '', + producerId, ]; return this.execCommand(client, 'updateJobScheduler', keys.concat(args)); diff --git a/src/commands/updateJobScheduler-6.lua b/src/commands/updateJobScheduler-7.lua similarity index 63% rename from src/commands/updateJobScheduler-6.lua rename to src/commands/updateJobScheduler-7.lua index 8e9040de80..4a6b052312 100644 --- a/src/commands/updateJobScheduler-6.lua +++ b/src/commands/updateJobScheduler-7.lua @@ -1,23 +1,24 @@ --[[ Updates a job scheduler and adds next delayed job - Input: - KEYS[1] 'marker', - KEYS[2] 'meta' - KEYS[3] 'id' - KEYS[4] 'delayed' - KEYS[5] events stream key - KEYS[6] 'repeat' key - - ARGV[1] next milliseconds - ARGV[2] jobs scheduler id - ARGV[3] msgpacked delayed opts - ARGV[4] timestamp - ARGV[5] prefix key - ARGV[6] producer key - - Output: - next delayed job id - OK + Input: + KEYS[1] 'marker', + KEYS[2] 'meta' + KEYS[3] 'id' + KEYS[4] 'delayed' + KEYS[5] events stream key + KEYS[6] 'repeat' key + KEYS[7] producer key + + ARGV[1] next milliseconds + ARGV[2] jobs scheduler id + ARGV[3] msgpacked delayed opts + ARGV[4] timestamp + ARGV[5] prefix key + ARGV[6] producer id + + Output: + next delayed job id - OK ]] local rcall = redis.call local repeatKey = KEYS[6] @@ -26,6 +27,7 @@ local timestamp = ARGV[4] local nextMillis = ARGV[1] local jobSchedulerId = ARGV[2] local prefixKey = ARGV[5] +local producerId = ARGV[6] -- Includes --- @include "includes/addDelayedJob" @@ -38,24 +40,28 @@ local nextDelayedJobKey = schedulerKey .. ":" .. nextMillis -- Validate that scheduler exists. local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId) if prevMillis ~= false then + local currentDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis + + if producerId == currentDelayedJobId then local schedulerAttributes = rcall("HMGET", schedulerKey, "name", "data") rcall("ZADD", repeatKey, nextMillis, jobSchedulerId) - + local eventsKey = KEYS[5] local metaKey = KEYS[2] local maxEvents = getOrSetMaxEvents(metaKey) - + rcall("INCR", KEYS[3]) - + local delayedOpts = cmsgpack.unpack(ARGV[3]) - + addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerAttributes[1], schedulerAttributes[2] or "{}", delayedOpts, timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil) - - if ARGV[6] ~= "" then - rcall("HSET", ARGV[6], "nrjid", nextDelayedJobId) + + if KEYS[7] ~= "" then + rcall("HSET", KEYS[7], "nrjid", nextDelayedJobId) end - - return nextDelayedJobId .. "" -- convert to string + + return nextDelayedJobId .. "" -- convert to string + end end diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index f053f9cfd7..beca75885f 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -13,8 +13,7 @@ import { getNextMillis, Worker, } from '../src/classes'; -import { JobsOptions } from '../src/types'; -import { removeAllQueueData } from '../src/utils'; +import { delay, removeAllQueueData } from '../src/utils'; const moment = require('moment'); @@ -2047,6 +2046,70 @@ describe('Job Scheduler', function () { await worker.close(); }); + describe('when overriding a scheduler', function () { + it('should not continue adding new delayed jobs from previous delayed record', async function () { + this.clock.restore(); + + const repeatOpts = { pattern: '*/2 * * * * *' }; + + let count = 0; + const worker = new Worker( + queueName, + async () => { + if (count === 0) { + await delay(2000); + await queue.pause(); // keep job in waiting list + } + }, + { connection, prefix }, + ); + + const completing = new Promise(async resolve => { + worker.on('completed', async () => { + count++; + if (count === 1) { + const waitingCount = await queue.getWaitingCount(); + expect(waitingCount).to.eql(1); + + await queue.upsertJobScheduler( + 'test', + { pattern: '*/15 * * * * *' }, + { + data: { foo: 'baz' }, + }, + ); + + const waitingCount2 = await queue.getWaitingCount(); + expect(waitingCount2).to.eql(1); + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.eql(1); + + await queue.resume(); + } else { + resolve(); + } + }); + }); + + await queue.upsertJobScheduler('test', repeatOpts, { + data: { foo: 'bar' }, + }); + + await completing; + + const schedulerCount = await queue.getJobSchedulersCount(); + expect(schedulerCount).to.eql(1); + + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.eql(1); + + const totalJobs = await queue.getJobCountByTypes(); + expect(totalJobs).to.eql(3); // 2 completed, 1 delayed + + await worker.close(); + }); + }); + it('should allow adding a repeatable job after removing it', async function () { const repeat = { pattern: '*/5 * * * *',