diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index 2e41591e45..743af75b72 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -33,13 +33,13 @@ export class JobScheduler extends QueueBase { async upsertJobScheduler( jobSchedulerId: string, - repeatOpts: Omit, + repeatOpts: Omit, jobName: N, jobData: T, opts: JobSchedulerTemplateOptions, - { override }: { override: boolean }, + { override, producerId }: { override: boolean; producerId?: string }, ): Promise | undefined> { - const { every, pattern } = repeatOpts; + const { every, pattern, offset } = repeatOpts; if (pattern && every) { throw new Error( @@ -59,6 +59,12 @@ export class JobScheduler extends QueueBase { ); } + if (repeatOpts.immediately && repeatOpts.every) { + console.warn( + "Using option immediately with every does not affect the job's schedule. Job will run immediately anyway.", + ); + } + // Check if we reached the limit of the repeatable job's iterations const iterationCount = repeatOpts.count ? repeatOpts.count + 1 : 1; if ( @@ -75,8 +81,6 @@ export class JobScheduler extends QueueBase { return; } - const prevMillis = opts.prevMillis || 0; - // Check if we have a start date for the repeatable job const { startDate, immediately, ...filteredRepeatOpts } = repeatOpts; if (startDate) { @@ -84,15 +88,28 @@ export class JobScheduler extends QueueBase { now = startMillis > now ? startMillis : now; } + const prevMillis = opts.prevMillis || 0; + now = prevMillis < now ? now : prevMillis; + let nextMillis: number; + let newOffset = offset; + if (every) { - nextMillis = prevMillis + every; + const nextSlot = Math.floor(now / every) * every + every; + if (prevMillis || offset) { + nextMillis = nextSlot + (offset || 0); + } else { + nextMillis = now; + newOffset = every - (nextSlot - now); + + // newOffset should always be positive, but as an extra safety check + newOffset = newOffset < 0 ? 0 : newOffset; + } if (nextMillis < now) { nextMillis = now; } } else if (pattern) { - now = prevMillis < now ? now : prevMillis; nextMillis = await this.repeatStrategy(now, repeatOpts, jobName); } @@ -149,11 +166,12 @@ export class JobScheduler extends QueueBase { jobSchedulerId, { ...opts, - repeat: filteredRepeatOpts, + repeat: { ...filteredRepeatOpts, offset: newOffset }, telemetry, }, jobData, iterationCount, + producerId, ); const results = await multi.exec(); // multi.exec returns an array of results [ err, result ][] @@ -189,6 +207,8 @@ export class JobScheduler extends QueueBase { opts: JobsOptions, data: T, currentCount: number, + // The job id of the job that produced this next iteration + producerId?: string, ) { // // Generate unique job id for this iteration. @@ -215,6 +235,11 @@ export class JobScheduler extends QueueBase { const job = new this.Job(this, name, data, mergedOpts, jobId); job.addJob(client); + if (producerId) { + const producerJobKey = this.toKey(producerId); + client.hset(producerJobKey, 'nrjid', job.id); + } + return job; } diff --git a/src/classes/job.ts b/src/classes/job.ts index 531839e62c..19d96d3921 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -168,6 +168,12 @@ export class Job< */ repeatJobKey?: string; + /** + * Produced next repetable job Id. + * + */ + nextRepeatableJobId?: string; + /** * The token used for locking this job. */ @@ -384,6 +390,10 @@ export class Job< job.processedBy = json.pb; } + if (json.nrjid) { + job.nextRepeatableJobId = json.nrjid; + } + return job; } @@ -493,6 +503,7 @@ export class Job< deduplicationId: this.deduplicationId, repeatJobKey: this.repeatJobKey, returnvalue: JSON.stringify(this.returnvalue), + nrjid: this.nextRepeatableJobId, }); } diff --git a/src/classes/worker.ts b/src/classes/worker.ts index f68811c589..b79ddf4648 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -788,7 +788,7 @@ will never work with more accuracy than 1ms. */ job.token = token; // Add next scheduled job if necessary. - if (job.opts.repeat) { + if (job.opts.repeat && !job.nextRepeatableJobId) { // Use new job scheduler if possible if (job.repeatJobKey) { const jobScheduler = await this.jobScheduler; @@ -798,7 +798,7 @@ will never work with more accuracy than 1ms. */ job.name, job.data, job.opts, - { override: false }, + { override: false, producerId: job.id }, ); } else { const repeat = await this.repeat; @@ -835,6 +835,8 @@ will never work with more accuracy than 1ms. */ }); const handleCompleted = async (result: ResultType) => { + jobsInProgress.delete(inProgressItem); + if (!this.connection.closing) { const completed = await job.moveToCompleted( result, @@ -855,6 +857,8 @@ will never work with more accuracy than 1ms. */ }; const handleFailed = async (err: Error) => { + jobsInProgress.delete(inProgressItem); + if (!this.connection.closing) { try { // Check if the job was manually rate-limited @@ -911,8 +915,6 @@ will never work with more accuracy than 1ms. */ [TelemetryAttributes.JobFinishedTimestamp]: Date.now(), [TelemetryAttributes.JobProcessedTimestamp]: processedOn, }); - - jobsInProgress.delete(inProgressItem); } }, srcPropagationMedatada, diff --git a/src/interfaces/job-json.ts b/src/interfaces/job-json.ts index 25ad335145..e27c302432 100644 --- a/src/interfaces/job-json.ts +++ b/src/interfaces/job-json.ts @@ -18,6 +18,7 @@ export interface JobJson { parent?: ParentKeys; parentKey?: string; repeatJobKey?: string; + nextRepeatableJobKey?: string; debounceId?: string; deduplicationId?: string; processedBy?: string; @@ -41,6 +42,7 @@ export interface JobJsonRaw { parent?: string; deid?: string; rjk?: string; + nrjid?: string; atm?: string; ats?: string; pb?: string; // Worker name diff --git a/src/interfaces/repeat-options.ts b/src/interfaces/repeat-options.ts index fe71d27d12..65c5da3d4d 100644 --- a/src/interfaces/repeat-options.ts +++ b/src/interfaces/repeat-options.ts @@ -32,7 +32,7 @@ export interface RepeatOptions extends Omit { /** * Repeated job should start right now - * ( work only with every settings) + * ( work only with cron settings) */ immediately?: boolean; @@ -42,16 +42,15 @@ export interface RepeatOptions extends Omit { count?: number; /** - * Internal property to store the previous time the job was executed. - */ - prevMillis?: number; + * Offset in milliseconds to affect the next iteration time + * + * */ + offset?: number; /** - * Internal property to store the offset to apply to the next iteration. - * - * @deprecated + * Internal property to store the previous time the job was executed. */ - offset?: number; + prevMillis?: number; /** * Internal property to store de job id diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index 427cb99cee..c2da779a83 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -191,6 +191,9 @@ describe('Job Scheduler', function () { describe('when job schedulers have same id and different every pattern', function () { it('should create only one job scheduler', async function () { + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + await Promise.all([ queue.upsertJobScheduler('test-scheduler1', { every: 1000 }), queue.upsertJobScheduler('test-scheduler1', { every: 2000 }), @@ -244,6 +247,9 @@ describe('Job Scheduler', function () { }); it('should create job schedulers with different cron patterns', async function () { + const date = new Date('2017-02-07T15:24:00.000Z'); + this.clock.setSystemTime(date); + const crons = [ '10 * * * * *', '2 10 * * * *', @@ -254,11 +260,11 @@ describe('Job Scheduler', function () { await Promise.all([ queue.upsertJobScheduler('first', { pattern: crons[0], - endDate: 12345, + endDate: Date.now() + 12345, }), queue.upsertJobScheduler('second', { pattern: crons[1], - endDate: 610000, + endDate: Date.now() + 6100000, }), queue.upsertJobScheduler('third', { pattern: crons[2], @@ -273,9 +279,13 @@ describe('Job Scheduler', function () { tz: 'Europa/Copenhaguen', }), ]); + const count = await repeat.getRepeatableCount(); expect(count).to.be.eql(5); + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.be.eql(5); + const jobs = await repeat.getRepeatableJobs(0, -1, true); expect(jobs) @@ -288,25 +298,25 @@ describe('Job Scheduler', function () { tz: 'Europa/Copenhaguen', pattern: null, every: '5000', - next: 5000, + next: 1486481040000, }) .and.to.deep.include({ key: 'first', name: 'first', - endDate: 12345, + endDate: Date.now() + 12345, tz: null, pattern: '10 * * * * *', every: null, - next: 10000, + next: 1486481050000, }) .and.to.deep.include({ key: 'second', name: 'second', - endDate: 610000, + endDate: Date.now() + 6100000, tz: null, pattern: '2 10 * * * *', every: null, - next: 602000, + next: 1486483802000, }) .and.to.deep.include({ key: 'fourth', @@ -315,7 +325,7 @@ describe('Job Scheduler', function () { tz: 'Africa/Accra', pattern: '2 * * 4 * *', every: null, - next: 259202000, + next: 1488585602000, }) .and.to.deep.include({ key: 'third', @@ -324,7 +334,7 @@ describe('Job Scheduler', function () { tz: 'Africa/Abidjan', pattern: '1 * * 5 * *', every: null, - next: 345601000, + next: 1488672001000, }); }); @@ -773,54 +783,175 @@ describe('Job Scheduler', function () { }); }); - it('should repeat every 2 seconds and start immediately', async function () { - const date = new Date('2017-02-07 9:24:00'); - this.clock.setSystemTime(date); - const nextTick = 2 * ONE_SECOND; + describe("when using 'every' option is on same millis as iteration time", function () { + it('should repeat every 2 seconds and start immediately', async function () { + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + const nextTick = 2 * ONE_SECOND; - const worker = new Worker( - queueName, - async () => { - this.clock.tick(nextTick); - }, - { connection, prefix }, - ); + const worker = new Worker( + queueName, + async () => { + this.clock.tick(nextTick); + }, + { connection, prefix }, + ); - let prev: Job; - let counter = 0; + let prev: Job; + let counter = 0; - const completing = new Promise((resolve, reject) => { - worker.on('completed', async job => { - try { - if (prev && counter === 1) { - expect(prev.timestamp).to.be.lte(job.timestamp); - expect(job.timestamp - prev.timestamp).to.be.lte(1); - } else if (prev) { - expect(prev.timestamp).to.be.lt(job.timestamp); - expect(job.timestamp - prev.timestamp).to.be.gte(2000); + const completing = new Promise((resolve, reject) => { + worker.on('completed', async job => { + try { + if (prev && counter === 1) { + expect(prev.timestamp).to.be.lte(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.lte(1); + } else if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.eq(2000); + } + prev = job; + counter++; + if (counter === 5) { + resolve(); + } + } catch (err) { + reject(err); } - prev = job; - counter++; - if (counter === 5) { - resolve(); + }); + }); + + await queue.upsertJobScheduler( + 'repeat', + { + every: 2000, + }, + { data: { foo: 'bar' } }, + ); + + const delayedCountBefore = await queue.getDelayedCount(); + expect(delayedCountBefore).to.be.eq(1); + + await completing; + + const waitingCount = await queue.getWaitingCount(); + expect(waitingCount).to.be.eq(0); + + const delayedCountAfter = await queue.getDelayedCount(); + expect(delayedCountAfter).to.be.eq(1); + + await worker.close(); + }); + }); + + describe("when using 'every' and time is one millisecond before iteration time", function () { + it('should repeat every 2 seconds and start immediately', async function () { + const startTimeMillis = new Date('2017-02-07 9:24:00').getTime(); + + const date = new Date(startTimeMillis - 1); + this.clock.setSystemTime(date); + const nextTick = 2 * ONE_SECOND; + + const worker = new Worker( + queueName, + async () => { + this.clock.tick(nextTick); + }, + { connection, prefix }, + ); + + let prev: Job; + let counter = 0; + + const completing = new Promise((resolve, reject) => { + worker.on('completed', async job => { + try { + if (prev && counter === 1) { + expect(prev.timestamp).to.be.lte(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.lte(1); + } else if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.eq(2000); + } + + prev = job; + counter++; + if (counter === 5) { + resolve(); + } + } catch (err) { + reject(err); } - } catch (err) { - reject(err); - } + }); }); + + await queue.upsertJobScheduler( + 'repeat', + { + every: 2000, + }, + { data: { foo: 'bar' } }, + ); + + await completing; + + await worker.close(); }); + }); - await queue.upsertJobScheduler( - 'repeat', - { - every: 2000, - }, - { data: { foo: 'bar' } }, - ); + describe("when using 'every' and time is one millisecond after iteration time", function () { + it('should repeat every 2 seconds and start immediately', async function () { + const startTimeMillis = new Date('2017-02-07 9:24:00').getTime() + 1; - await completing; + const date = new Date(startTimeMillis); + this.clock.setSystemTime(date); + const nextTick = 2 * ONE_SECOND; - await worker.close(); + const worker = new Worker( + queueName, + async () => { + this.clock.tick(nextTick); + }, + { connection, prefix }, + ); + + let prev: Job; + let counter = 0; + + const completing = new Promise((resolve, reject) => { + worker.on('completed', async job => { + try { + if (prev && counter === 1) { + expect(prev.timestamp).to.be.lte(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.lte(1); + } else if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.eq(2000); + } + + prev = job; + counter++; + if (counter === 5) { + resolve(); + } + } catch (err) { + reject(err); + } + }); + }); + + await queue.upsertJobScheduler( + 'repeat', + { + every: 2000, + }, + { data: { foo: 'bar' } }, + ); + + await completing; + + await worker.close(); + }); }); it('should start immediately even after removing the job scheduler and adding it again', async function () { @@ -850,7 +981,6 @@ describe('Job Scheduler', function () { 'repeat', { every: 2000, - immediately: true, }, { data: { foo: 'bar' } }, ); @@ -884,7 +1014,6 @@ describe('Job Scheduler', function () { 'repeat', { every: 2000, - immediately: true, }, { data: { foo: 'bar' } }, ); @@ -1196,12 +1325,15 @@ describe('Job Scheduler', function () { }); }); - await queue.upsertJobScheduler('repeat', { + const job = await queue.upsertJobScheduler('repeat', { pattern: '0 1 * * *', endDate: new Date('2017-05-10 13:13:00'), tz: 'Europe/Athens', utc: true, }); + + expect(job).to.be.ok; + this.clock.tick(nextTick + delay); worker.run(); @@ -1495,26 +1627,33 @@ describe('Job Scheduler', function () { }); it('should not create a new delayed job if the failed job is retried with retryJobs', async function () { + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + const repeatOpts = { every: 579, }; let isFirstRun = true; - const worker = new Worker( - queueName, - async () => { - this.clock.tick(177); - if (isFirstRun) { - isFirstRun = false; - throw new Error('failed'); - } - }, - { - connection, - prefix, - }, - ); + let worker; + const processingAfterFailing = new Promise(resolve => { + worker = new Worker( + queueName, + async () => { + this.clock.tick(177); + if (isFirstRun) { + isFirstRun = false; + throw new Error('failed'); + } + resolve(); + }, + { + connection, + prefix, + }, + ); + }); const failing = new Promise(resolve => { worker.on('failed', async () => { @@ -1523,26 +1662,43 @@ describe('Job Scheduler', function () { }); const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts); - const delayedCount = await queue.getDelayedCount(); - expect(delayedCount).to.be.equal(1); await repeatableJob!.promote(); + + const delayedCountBeforeFailing = await queue.getDelayedCount(); + expect(delayedCountBeforeFailing).to.be.equal(0); + await failing; const failedCount = await queue.getFailedCount(); expect(failedCount).to.be.equal(1); + const delayedCountAfterFailing = await queue.getDelayedCount(); + expect(delayedCountAfterFailing).to.be.equal(1); + // Retry the failed job this.clock.tick(1143); await queue.retryJobs({ state: 'failed' }); const failedCountAfterRetry = await queue.getFailedCount(); expect(failedCountAfterRetry).to.be.equal(0); + await processingAfterFailing; + + await worker.close(); + const delayedCount2 = await queue.getDelayedCount(); expect(delayedCount2).to.be.equal(1); + + const waitingCount = await queue.getWaitingCount(); + expect(waitingCount).to.be.equal(0); }); it('should not create a new delayed job if the failed job is retried with Job.retry()', async function () { + let expectError; + + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + const repeatOpts = { every: 477, }; @@ -1554,6 +1710,13 @@ describe('Job Scheduler', function () { async () => { this.clock.tick(177); + try { + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.be.equal(1); + } catch (error) { + expectError = error; + } + if (isFirstRun) { isFirstRun = false; throw new Error('failed'); @@ -1572,11 +1735,12 @@ describe('Job Scheduler', function () { }); const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts); - const delayedCount = await queue.getDelayedCount(); - expect(delayedCount).to.be.equal(1); await repeatableJob!.promote(); + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.be.equal(0); + this.clock.tick(177); await failing; @@ -1592,6 +1756,14 @@ describe('Job Scheduler', function () { const failedCountAfterRetry = await queue.getFailedCount(); expect(failedCountAfterRetry).to.be.equal(0); + this.clock.tick(177); + + await worker.close(); + + if (expectError) { + throw expectError; + } + const delayedCount2 = await queue.getDelayedCount(); expect(delayedCount2).to.be.equal(1); }); @@ -1884,6 +2056,9 @@ describe('Job Scheduler', function () { }).timeout(8000); it('should not allow to remove a delayed job if it belongs to a repeatable job', async function () { + const date = new Date('2019-07-13 1:58:23'); + this.clock.setSystemTime(date); + const repeat = { every: 1000, }; @@ -1902,6 +2077,9 @@ describe('Job Scheduler', function () { }); it('should not remove delayed jobs if they belong to a repeatable job when using drain', async function () { + const date = new Date('2014-09-03 5:32:12'); + this.clock.setSystemTime(date); + await queue.upsertJobScheduler('myTestJob', { every: 5000 }); await queue.add('test', { foo: 'bar' }, { delay: 1000 }); @@ -1919,6 +2097,9 @@ describe('Job Scheduler', function () { }); it('should not remove delayed jobs if they belong to a repeatable job when using clean', async function () { + const date = new Date('2012-08-05 2:32:12'); + this.clock.setSystemTime(date); + await queue.upsertJobScheduler('myTestJob', { every: 5000 }); await queue.add('test', { foo: 'bar' }, { delay: 1000 }); @@ -1936,6 +2117,9 @@ describe('Job Scheduler', function () { }); it("should keep one delayed job if updating a repeatable job's every option", async function () { + const date = new Date('2022-01-08 7:22:21'); + this.clock.setSystemTime(date); + await queue.upsertJobScheduler('myTestJob', { every: 5000 }); await queue.upsertJobScheduler('myTestJob', { every: 4000 }); await queue.upsertJobScheduler('myTestJob', { every: 5000 }); @@ -2175,6 +2359,9 @@ describe('Job Scheduler', function () { }); it("should return a valid job with the job's options and data passed as the job template", async function () { + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + const repeatOpts = { every: 1000, }; @@ -2224,6 +2411,9 @@ describe('Job Scheduler', function () { }); it('should have the right count value', async function () { + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + await queue.upsertJobScheduler('test', { every: 1000 }); this.clock.tick(ONE_SECOND + 100);