From 72ee95c2a572aab130ec158aa6111c05720cf50d Mon Sep 17 00:00:00 2001 From: roggervalf Date: Fri, 3 Jan 2025 00:32:35 -0500 Subject: [PATCH 1/8] refactor(delayed): addDelayedJob include --- src/commands/addDelayedJob-6.lua | 20 +++----------------- src/commands/addJobScheduler-2.lua | 11 ++++++++--- src/commands/includes/addDelayedJob.lua | 25 +++++++++++++++++++++++++ 3 files changed, 36 insertions(+), 20 deletions(-) create mode 100644 src/commands/includes/addDelayedJob.lua diff --git a/src/commands/addDelayedJob-6.lua b/src/commands/addDelayedJob-6.lua index de2c0f764e..998f578837 100644 --- a/src/commands/addDelayedJob-6.lua +++ b/src/commands/addDelayedJob-6.lua @@ -56,12 +56,10 @@ local deduplicationKey = args[10] local parentData -- Includes ---- @include "includes/addDelayMarkerIfNeeded" +--- @include "includes/addDelayedJob" --- @include "includes/deduplicateJob" ---- @include "includes/getDelayedScore" --- @include "includes/getOrSetMaxEvents" --- @include "includes/handleDuplicatedJob" ---- @include "includes/storeJob" if parentKey ~= nil then if rcall("EXISTS", parentKey) ~= 1 then return -5 end @@ -96,20 +94,8 @@ if deduplicationJobId then return deduplicationJobId end --- Store the job. -local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], - opts, timestamp, parentKey, parentData, - repeatJobKey) - -local score, delayedTimestamp = getDelayedScore(delayedKey, timestamp, tonumber(delay)) - -rcall("ZADD", delayedKey, score, jobId) -rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "delayed", - "jobId", jobId, "delay", delayedTimestamp) - --- mark that a delayed job is available -local markerKey = KEYS[1] -addDelayMarkerIfNeeded(markerKey, delayedKey) +addDelayedJob(jobIdKey, jobId, delayedKey, eventsKey, args[3], ARGV[2], opts, timestamp, repeatJobKey, + maxEvents, KEYS[1], parentKey, parentData) -- Check if this job is a child of another job, if so add it to the parents dependencies if parentDependenciesKey ~= nil then diff --git a/src/commands/addJobScheduler-2.lua b/src/commands/addJobScheduler-2.lua index c5a84a641c..bf4bd0b0a4 100644 --- a/src/commands/addJobScheduler-2.lua +++ b/src/commands/addJobScheduler-2.lua @@ -30,6 +30,7 @@ local templateOpts = cmsgpack.unpack(ARGV[5]) local prefixKey = ARGV[6] -- Includes +--- @include "includes/addDelayedJob" --- @include "includes/removeJob" local function storeRepeatableJob(schedulerId, repeatKey, nextMillis, rawOpts, templateData, templateOpts) @@ -72,13 +73,14 @@ local function storeRepeatableJob(schedulerId, repeatKey, nextMillis, rawOpts, t unpack(optionalValues)) end +local nextDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis +local nextDelayedJobKey = repeatKey .. ":" .. jobSchedulerId .. ":" .. nextMillis + -- If we are overriding a repeatable job we must delete the delayed job for -- the next iteration. local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId) if prevMillis ~= false then local delayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis - local nextDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis - local nextDelayedJobKey = repeatKey .. ":" .. jobSchedulerId .. ":" .. nextMillis if rcall("ZSCORE", delayedKey, delayedJobId) ~= false and (rcall("EXISTS", nextDelayedJobKey) ~= 1 @@ -88,4 +90,7 @@ if prevMillis ~= false then end end -return storeRepeatableJob(jobSchedulerId, repeatKey, nextMillis, ARGV[2], ARGV[4], templateOpts) +storeRepeatableJob(jobSchedulerId, repeatKey, nextMillis, ARGV[2], ARGV[4], templateOpts) + +addDelayedJob(jobIdKey, jobId, delayedKey, eventsKey, args[3], ARGV[2], opts, timestamp, repeatJobKey, + maxEvents, KEYS[1], parentKey, parentData) diff --git a/src/commands/includes/addDelayedJob.lua b/src/commands/includes/addDelayedJob.lua new file mode 100644 index 0000000000..cfc5391216 --- /dev/null +++ b/src/commands/includes/addDelayedJob.lua @@ -0,0 +1,25 @@ +--[[ + Add marker if needed when a job is available. +]] + +-- Includes +--- @include "addDelayMarkerIfNeeded" +--- @include "getDelayedScore" +--- @include "storeJob" + +local function addDelayedJob(jobIdKey, jobId, delayedKey, eventsKey, name, data, opts, timestamp, repeatJobKey, + maxEvents, markerKey, parentKey, parentData) + -- Store the job. + local delay, priority = storeJob(eventsKey, jobIdKey, jobId, name, data, + opts, timestamp, parentKey, parentData, repeatJobKey) + + local score, delayedTimestamp = getDelayedScore(delayedKey, timestamp, tonumber(delay)) + + rcall("ZADD", delayedKey, score, jobId) + rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "delayed", + "jobId", jobId, "delay", delayedTimestamp) + + -- mark that a delayed job is available + addDelayMarkerIfNeeded(markerKey, delayedKey) +end + \ No newline at end of file From 398078b98cb3f652e4a96bc9989f60a6b7849ea6 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sat, 4 Jan 2025 15:00:12 -0500 Subject: [PATCH 2/8] refactor: add delayed job when adding job scheduler --- src/classes/job-scheduler.ts | 126 +++++++++++++++--- src/classes/job.ts | 7 +- src/classes/scripts.ts | 17 ++- src/classes/worker.ts | 2 +- src/commands/addDelayedJob-6.lua | 2 +- ...bScheduler-2.lua => addJobScheduler-6.lua} | 56 +++++--- tests/test_job_scheduler.ts | 5 +- 7 files changed, 171 insertions(+), 44 deletions(-) rename src/commands/{addJobScheduler-2.lua => addJobScheduler-6.lua} (61%) diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index 743af75b72..a136c0279d 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -116,18 +116,70 @@ export class JobScheduler extends QueueBase { const multi = (await this.client).multi(); if (nextMillis) { if (override) { - this.scripts.addJobScheduler( - (multi) as RedisClient, - jobSchedulerId, - nextMillis, - JSON.stringify(typeof jobData === 'undefined' ? {} : jobData), - Job.optsAsJSON(opts), - { - name: jobName, - endDate: endDate ? new Date(endDate).getTime() : undefined, - tz: repeatOpts.tz, - pattern, - every, + return this.trace>( + SpanKind.PRODUCER, + 'add', + `${this.name}.${jobName}`, + async (span, srcPropagationMedatada) => { + let telemetry = opts.telemetry; + + if (srcPropagationMedatada) { + const omitContext = opts.telemetry?.omitContext; + const telemetryMetadata = + opts.telemetry?.metadata || + (!omitContext && srcPropagationMedatada); + + if (telemetryMetadata || omitContext) { + telemetry = { + metadata: telemetryMetadata, + omitContext, + }; + } + } + + const mergedOpts = this.getNextJobOpts( + nextMillis, + jobSchedulerId, + { + ...opts, + repeat: { ...filteredRepeatOpts, offset: newOffset }, + telemetry, + }, + iterationCount, + ); + + const jobId = await this.scripts.addJobScheduler( + jobSchedulerId, + nextMillis, + JSON.stringify(typeof jobData === 'undefined' ? {} : jobData), + Job.optsAsJSON(opts), + { + name: jobName, + endDate: endDate ? new Date(endDate).getTime() : undefined, + tz: repeatOpts.tz, + pattern, + every, + }, + Job.optsAsJSON(mergedOpts), + producerId, + ); + + const job = new this.Job( + this, + jobName, + jobData, + mergedOpts, + jobId, + ); + + job.id = jobId; + + span?.setAttributes({ + [TelemetryAttributes.JobSchedulerId]: jobSchedulerId, + [TelemetryAttributes.JobId]: job.id, + }); + + return job; }, ); } else { @@ -218,6 +270,38 @@ export class JobScheduler extends QueueBase { nextMillis, }); + const mergedOpts = this.getNextJobOpts( + nextMillis, + jobSchedulerId, + opts, + currentCount, + ); + + const job = new this.Job(this, name, data, mergedOpts, jobId); + job.addJob(client); + + if (producerId) { + const producerJobKey = this.toKey(producerId); + client.hset(producerJobKey, 'nrjid', job.id); + } + + return job; + } + + private getNextJobOpts( + nextMillis: number, + jobSchedulerId: string, + opts: JobsOptions, + currentCount: number, + ): JobsOptions { + // + // Generate unique job id for this iteration. + // + const jobId = this.getSchedulerNextJobId({ + jobSchedulerId, + nextMillis, + }); + const now = Date.now(); const delay = nextMillis - now; @@ -230,17 +314,15 @@ export class JobScheduler extends QueueBase { repeatJobKey: jobSchedulerId, }; - mergedOpts.repeat = { ...opts.repeat, count: currentCount }; - - const job = new this.Job(this, name, data, mergedOpts, jobId); - job.addJob(client); - - if (producerId) { - const producerJobKey = this.toKey(producerId); - client.hset(producerJobKey, 'nrjid', job.id); - } + mergedOpts.repeat = { + ...opts.repeat, + count: currentCount, + endDate: opts.repeat?.endDate + ? new Date(opts.repeat.endDate).getTime() + : undefined, + }; - return job; + return mergedOpts; } async removeJobScheduler(jobSchedulerId: string): Promise { diff --git a/src/classes/job.ts b/src/classes/job.ts index 623330f91a..84112c7bcb 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -1,4 +1,3 @@ -import { ChainableCommander } from 'ioredis'; import { debuglog } from 'util'; import { BackoffOptions, @@ -866,6 +865,12 @@ export class Job< return this.queue.name; } + get jobSchedulerId(): string { + if (this.repeatJobKey) { + return this.repeatJobKey.slice(this.toKey('repeat').length + 1); + } + } + /** * @returns the prefix that is used. */ diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index a36359a946..e309e8cd78 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -32,6 +32,7 @@ import { FinishedPropValAttribute, MinimalQueue, RedisJobOptions, + JobsOptions, } from '../types'; import { ErrorCode } from '../enums'; import { @@ -313,18 +314,26 @@ export class Scripts { } async addJobScheduler( - client: RedisClient, jobSchedulerId: string, nextMillis: number, templateData: string, templateOpts: RedisJobOptions, opts: RepeatableOptions, + delayedJobOpts: JobsOptions, + // The job id of the job that produced this next iteration + producerId?: string, ): Promise { + const client = await this.queue.client; + const queueKeys = this.queue.keys; const keys: (string | number | Buffer)[] = [ - queueKeys.repeat, + queueKeys.marker, + queueKeys.meta, + queueKeys.id, queueKeys.delayed, + queueKeys.events, + queueKeys.repeat, ]; const args = [ @@ -333,8 +342,12 @@ export class Scripts { jobSchedulerId, templateData, pack(templateOpts), + pack(delayedJobOpts), + Date.now(), queueKeys[''], + producerId ? this.queue.toKey(producerId) : '', ]; + return this.execCommand(client, 'addJobScheduler', keys.concat(args)); } diff --git a/src/classes/worker.ts b/src/classes/worker.ts index b79ddf4648..3a7d233f54 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -793,7 +793,7 @@ will never work with more accuracy than 1ms. */ if (job.repeatJobKey) { const jobScheduler = await this.jobScheduler; await jobScheduler.upsertJobScheduler( - job.repeatJobKey, + job.jobSchedulerId, job.opts.repeat, job.name, job.data, diff --git a/src/commands/addDelayedJob-6.lua b/src/commands/addDelayedJob-6.lua index 998f578837..f615fe20f6 100644 --- a/src/commands/addDelayedJob-6.lua +++ b/src/commands/addDelayedJob-6.lua @@ -26,7 +26,7 @@ [8] parent? {id, queueKey} [9] repeat job key [10] deduplication key - + ARGV[2] Json stringified job data ARGV[3] msgpacked options diff --git a/src/commands/addJobScheduler-2.lua b/src/commands/addJobScheduler-6.lua similarity index 61% rename from src/commands/addJobScheduler-2.lua rename to src/commands/addJobScheduler-6.lua index bf4bd0b0a4..493d6c9f81 100644 --- a/src/commands/addJobScheduler-2.lua +++ b/src/commands/addJobScheduler-6.lua @@ -2,8 +2,12 @@ Adds a job scheduler, i.e. a job factory that creates jobs based on a given schedule (repeat options). Input: - KEYS[1] 'repeat' key - KEYS[2] 'delayed' key + KEYS[1] 'marker', + KEYS[2] 'meta' + KEYS[3] 'id' + KEYS[4] 'delayed' + KEYS[5] events stream key + KEYS[6] 'repeat' key ARGV[1] next milliseconds ARGV[2] msgpacked options @@ -14,28 +18,31 @@ [5] every? ARGV[3] jobs scheduler id ARGV[4] Json stringified template data - ARGV[5] mspacked template opts - ARGV[6] prefix key + ARGV[5] msgpacked template opts + ARGV[6] msgpacked delayed opts + ARGV[7] timestamp + ARGV[8] prefix key + ARGV[9] producer key Output: - repeatableKey - OK + next delayed job id - OK ]] local rcall = redis.call -local repeatKey = KEYS[1] -local delayedKey = KEYS[2] - +local repeatKey = KEYS[6] +local delayedKey = KEYS[4] +local timestamp = ARGV[7] local nextMillis = ARGV[1] local jobSchedulerId = ARGV[3] local templateOpts = cmsgpack.unpack(ARGV[5]) -local prefixKey = ARGV[6] +local prefixKey = ARGV[8] -- Includes --- @include "includes/addDelayedJob" +--- @include "includes/getOrSetMaxEvents" --- @include "includes/removeJob" -local function storeRepeatableJob(schedulerId, repeatKey, nextMillis, rawOpts, templateData, templateOpts) +local function storeRepeatableJob(schedulerId, schedulerKey, repeatKey, nextMillis, opts, templateData, templateOpts) rcall("ZADD", repeatKey, nextMillis, schedulerId) - local opts = cmsgpack.unpack(rawOpts) local optionalValues = {} if opts['tz'] then @@ -69,12 +76,13 @@ local function storeRepeatableJob(schedulerId, repeatKey, nextMillis, rawOpts, t table.insert(optionalValues, templateData) end - rcall("HMSET", repeatKey .. ":" .. schedulerId, "name", opts['name'], + rcall("HMSET", schedulerKey, "name", opts['name'], unpack(optionalValues)) end +local schedulerKey = repeatKey .. ":" .. jobSchedulerId local nextDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis -local nextDelayedJobKey = repeatKey .. ":" .. jobSchedulerId .. ":" .. nextMillis +local nextDelayedJobKey = schedulerKey .. ":" .. nextMillis -- If we are overriding a repeatable job we must delete the delayed job for -- the next iteration. @@ -90,7 +98,23 @@ if prevMillis ~= false then end end -storeRepeatableJob(jobSchedulerId, repeatKey, nextMillis, ARGV[2], ARGV[4], templateOpts) +local schedulerOpts = cmsgpack.unpack(ARGV[2]) + +storeRepeatableJob(jobSchedulerId, schedulerKey, repeatKey, nextMillis, schedulerOpts, ARGV[4], templateOpts) + +local eventsKey = KEYS[5] +local metaKey = KEYS[2] +local maxEvents = getOrSetMaxEvents(metaKey) + +rcall("INCR", KEYS[3]) + +local delayedOpts = cmsgpack.unpack(ARGV[6]) + +addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerOpts['name'], ARGV[4], delayedOpts, + timestamp, schedulerKey, maxEvents, KEYS[1], nil, nil) + +if ARGV[9] ~= "" then + rcall("HSET", ARGV[9], "nrjid", nextDelayedJobId) +end -addDelayedJob(jobIdKey, jobId, delayedKey, eventsKey, args[3], ARGV[2], opts, timestamp, repeatJobKey, - maxEvents, KEYS[1], parentKey, parentData) +return nextDelayedJobId .. "" -- convert to string diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index c2da779a83..ffcb549604 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -1649,6 +1649,7 @@ describe('Job Scheduler', function () { resolve(); }, { + autorun: false, connection, prefix, }, @@ -1668,6 +1669,8 @@ describe('Job Scheduler', function () { const delayedCountBeforeFailing = await queue.getDelayedCount(); expect(delayedCountBeforeFailing).to.be.equal(0); + worker.run(); + await failing; const failedCount = await queue.getFailedCount(); @@ -1954,7 +1957,7 @@ describe('Job Scheduler', function () { }); // This test is flaky and too complex we need something simpler that tests the same thing - it.skip('should not re-add a repeatable job after it has been removed', async function () { + it('should not re-add a repeatable job after it has been removed', async function () { const repeat = await queue.repeat; let worker: Worker; From b5ed939cf32326eacd879504a071fa756ea096a1 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sat, 4 Jan 2025 21:24:09 -0500 Subject: [PATCH 3/8] test: fix test cases --- tests/test_job_scheduler.ts | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index ffcb549604..4d7cd6f0cd 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -724,8 +724,8 @@ describe('Job Scheduler', function () { worker.on('completed', async job => { try { if (prev) { - expect(prev.timestamp).to.be.lt(job.timestamp); - expect(job.timestamp - prev.timestamp).to.be.gte(2000); + expect(prev.timestamp).to.be.lt(job.processedOn!); + expect(job.processedOn! - prev.timestamp).to.be.gte(2000); } prev = job; counter++; @@ -747,8 +747,8 @@ describe('Job Scheduler', function () { worker.on('completed', async job => { try { if (prev2) { - expect(prev2.timestamp).to.be.lt(job.timestamp); - expect(job.timestamp - prev2.timestamp).to.be.gte(2000); + expect(prev2.timestamp).to.be.lt(job.processedOn!); + expect(job.processedOn! - prev2.timestamp).to.be.gte(2000); } prev2 = job; counter2++; @@ -1726,6 +1726,7 @@ describe('Job Scheduler', function () { } }, { + autorun: false, connection, prefix, }, @@ -1746,6 +1747,8 @@ describe('Job Scheduler', function () { this.clock.tick(177); + worker.run(); + await failing; this.clock.tick(177); @@ -1957,7 +1960,7 @@ describe('Job Scheduler', function () { }); // This test is flaky and too complex we need something simpler that tests the same thing - it('should not re-add a repeatable job after it has been removed', async function () { + it.skip('should not re-add a repeatable job after it has been removed', async function () { const repeat = await queue.repeat; let worker: Worker; From 8c89bcf57287429c8eb1ec8bba4e430ebe406f11 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sat, 4 Jan 2025 21:32:41 -0500 Subject: [PATCH 4/8] chore: add mergedOpts type --- src/classes/job-scheduler.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index a136c0279d..57da12bcac 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -305,7 +305,7 @@ export class JobScheduler extends QueueBase { const now = Date.now(); const delay = nextMillis - now; - const mergedOpts = { + const mergedOpts: JobsOptions = { ...opts, jobId, delay: delay < 0 ? 0 : delay, From 3fb043d08b5dd801b351b52f2a9f53360acf3b72 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sat, 4 Jan 2025 21:43:12 -0500 Subject: [PATCH 5/8] chore: validate offset --- src/classes/job-scheduler.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index cf2521eba7..016c840c4c 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -144,7 +144,10 @@ export class JobScheduler extends QueueBase { jobSchedulerId, { ...opts, - repeat: { ...filteredRepeatOpts, offset: newOffset }, + repeat: { + ...filteredRepeatOpts, + offset: newOffset ? newOffset : undefined, + }, telemetry, }, iterationCount, From bd1c86e9580832f6ae3024a871808624ad7e656a Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sat, 4 Jan 2025 21:47:54 -0500 Subject: [PATCH 6/8] chore: fix offset reference --- src/classes/job-scheduler.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index 016c840c4c..3362adedf3 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -144,13 +144,11 @@ export class JobScheduler extends QueueBase { jobSchedulerId, { ...opts, - repeat: { - ...filteredRepeatOpts, - offset: newOffset ? newOffset : undefined, - }, + repeat: filteredRepeatOpts, telemetry, }, iterationCount, + newOffset, ); const jobId = await this.scripts.addJobScheduler( @@ -282,6 +280,7 @@ export class JobScheduler extends QueueBase { jobSchedulerId, opts, currentCount, + offset, ); const job = new this.Job(this, name, data, mergedOpts, jobId); @@ -300,6 +299,7 @@ export class JobScheduler extends QueueBase { jobSchedulerId: string, opts: JobsOptions, currentCount: number, + offset?: number, ): JobsOptions { // // Generate unique job id for this iteration. @@ -324,6 +324,7 @@ export class JobScheduler extends QueueBase { mergedOpts.repeat = { ...opts.repeat, count: currentCount, + offset, endDate: opts.repeat?.endDate ? new Date(opts.repeat.endDate).getTime() : undefined, From 6c62219c2ff2aed719dddf1858820b3f236d6d45 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sat, 4 Jan 2025 22:25:25 -0500 Subject: [PATCH 7/8] chore: restore reference of repeatJobKey --- src/classes/job.ts | 6 ------ src/classes/repeat.ts | 1 + src/classes/worker.ts | 2 +- src/commands/addJobScheduler-6.lua | 2 +- tests/test_job_scheduler.ts | 4 +++- tests/test_repeat.ts | 2 +- 6 files changed, 7 insertions(+), 10 deletions(-) diff --git a/src/classes/job.ts b/src/classes/job.ts index 381660883e..3293b619e1 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -868,12 +868,6 @@ export class Job< return this.queue.name; } - get jobSchedulerId(): string { - if (this.repeatJobKey) { - return this.repeatJobKey.slice(this.toKey('repeat').length + 1); - } - } - /** * @returns the prefix that is used. */ diff --git a/src/classes/repeat.ts b/src/classes/repeat.ts index 4d14ff265e..adaec846a4 100644 --- a/src/classes/repeat.ts +++ b/src/classes/repeat.ts @@ -186,6 +186,7 @@ export class Repeat extends QueueBase { key: repeat.key, }); + console.log(legacyRepeatJobId, repeatConcatOptions, repeatJobKey); return this.scripts.removeRepeatable( legacyRepeatJobId, repeatConcatOptions, diff --git a/src/classes/worker.ts b/src/classes/worker.ts index 3a7d233f54..b79ddf4648 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -793,7 +793,7 @@ will never work with more accuracy than 1ms. */ if (job.repeatJobKey) { const jobScheduler = await this.jobScheduler; await jobScheduler.upsertJobScheduler( - job.jobSchedulerId, + job.repeatJobKey, job.opts.repeat, job.name, job.data, diff --git a/src/commands/addJobScheduler-6.lua b/src/commands/addJobScheduler-6.lua index 493d6c9f81..f13e09514d 100644 --- a/src/commands/addJobScheduler-6.lua +++ b/src/commands/addJobScheduler-6.lua @@ -111,7 +111,7 @@ rcall("INCR", KEYS[3]) local delayedOpts = cmsgpack.unpack(ARGV[6]) addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerOpts['name'], ARGV[4], delayedOpts, - timestamp, schedulerKey, maxEvents, KEYS[1], nil, nil) + timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil) if ARGV[9] ~= "" then rcall("HSET", ARGV[9], "nrjid", nextDelayedJobId) diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index a6c02843de..2d63215fb3 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -838,7 +838,7 @@ describe('Job Scheduler', function () { async () => { this.clock.tick(nextTick); }, - { connection, prefix }, + { autorun: false, connection, prefix }, ); let prev: Job; @@ -876,6 +876,8 @@ describe('Job Scheduler', function () { const delayedCountBefore = await queue.getDelayedCount(); expect(delayedCountBefore).to.be.eq(1); + worker.run(); + await completing; const waitingCount = await queue.getWaitingCount(); diff --git a/tests/test_repeat.ts b/tests/test_repeat.ts index 4685c08803..e691c850f0 100644 --- a/tests/test_repeat.ts +++ b/tests/test_repeat.ts @@ -59,7 +59,7 @@ describe('repeat', function () { await queue.close(); await repeat.close(); await queueEvents.close(); - await removeAllQueueData(new IORedis(redisHost), queueName); + //await removeAllQueueData(new IORedis(redisHost), queueName); }); afterAll(async function () { From 42e9e18e1b82af85abc9231e5b8ceece419b6d25 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sat, 4 Jan 2025 22:28:34 -0500 Subject: [PATCH 8/8] chore: remove console log --- src/classes/repeat.ts | 1 - tests/test_repeat.ts | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/classes/repeat.ts b/src/classes/repeat.ts index adaec846a4..4d14ff265e 100644 --- a/src/classes/repeat.ts +++ b/src/classes/repeat.ts @@ -186,7 +186,6 @@ export class Repeat extends QueueBase { key: repeat.key, }); - console.log(legacyRepeatJobId, repeatConcatOptions, repeatJobKey); return this.scripts.removeRepeatable( legacyRepeatJobId, repeatConcatOptions, diff --git a/tests/test_repeat.ts b/tests/test_repeat.ts index e691c850f0..4685c08803 100644 --- a/tests/test_repeat.ts +++ b/tests/test_repeat.ts @@ -59,7 +59,7 @@ describe('repeat', function () { await queue.close(); await repeat.close(); await queueEvents.close(); - //await removeAllQueueData(new IORedis(redisHost), queueName); + await removeAllQueueData(new IORedis(redisHost), queueName); }); afterAll(async function () {