Skip to content

Commit

Permalink
fix(job-scheduler): use delayed job data when template data is not pr…
Browse files Browse the repository at this point in the history
…esent (#3010) fixes #3009
  • Loading branch information
roggervalf authored Jan 14, 2025
1 parent 3c15d4b commit 95edb40
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 9 deletions.
1 change: 1 addition & 0 deletions src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ export class JobScheduler extends QueueBase {
const jobId = await this.scripts.updateJobSchedulerNextMillis(
jobSchedulerId,
nextMillis,
JSON.stringify(typeof jobData === 'undefined' ? {} : jobData),
Job.optsAsJSON(mergedOpts),
producerId,
);
Expand Down
2 changes: 2 additions & 0 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ export class Scripts {
async updateJobSchedulerNextMillis(
jobSchedulerId: string,
nextMillis: number,
templateData: string,
delayedJobOpts: JobsOptions,
// The job id of the job that produced this next iteration
producerId?: string,
Expand All @@ -375,6 +376,7 @@ export class Scripts {
const args = [
nextMillis,
jobSchedulerId,
templateData,
pack(delayedJobOpts),
Date.now(),
queueKeys[''],
Expand Down
27 changes: 18 additions & 9 deletions src/commands/updateJobScheduler-7.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,23 @@
ARGV[1] next milliseconds
ARGV[2] jobs scheduler id
ARGV[3] msgpacked delayed opts
ARGV[4] timestamp
ARGV[5] prefix key
ARGV[6] producer id
ARGV[3] Json stringified delayed data
ARGV[4] msgpacked delayed opts
ARGV[5] timestamp
ARGV[6] prefix key
ARGV[7] producer id
Output:
next delayed job id - OK
]]
local rcall = redis.call
local repeatKey = KEYS[6]
local delayedKey = KEYS[4]
local timestamp = ARGV[4]
local nextMillis = ARGV[1]
local jobSchedulerId = ARGV[2]
local prefixKey = ARGV[5]
local producerId = ARGV[6]
local timestamp = ARGV[5]
local prefixKey = ARGV[6]
local producerId = ARGV[7]

-- Includes
--- @include "includes/addDelayedJob"
Expand All @@ -53,10 +54,18 @@ if prevMillis ~= false then

rcall("INCR", KEYS[3])

local delayedOpts = cmsgpack.unpack(ARGV[3])
local delayedOpts = cmsgpack.unpack(ARGV[4])

-- TODO: remove this workaround in next breaking change,
-- all job-schedulers must save job data
local templateData = schedulerAttributes[2] or ARGV[3]

if templateData and templateData ~= '{}' then
rcall("HSET", schedulerKey, "data", templateData)
end

addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerAttributes[1],
schedulerAttributes[2] or "{}", delayedOpts, timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil)
templateData or '{}', delayedOpts, timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil)

if KEYS[7] ~= "" then
rcall("HSET", KEYS[7], "nrjid", nextDelayedJobId)
Expand Down
67 changes: 67 additions & 0 deletions tests/test_job_scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,7 @@ describe('Job Scheduler', function () {
}
} catch (error) {
console.log(error);
reject(error);
}
});
});
Expand All @@ -1328,6 +1329,72 @@ describe('Job Scheduler', function () {
delayStub.restore();
});

describe('when template data is only present in delayed job', function () {
it('should continue saving data in next delayed jobs', async function () {
const client = await queue.client;

const date = new Date('2017-05-05 13:12:00');
this.clock.setSystemTime(date);

const nextTick = ONE_DAY + 10 * ONE_SECOND;
const delay = 5 * ONE_SECOND + 500;

const worker = new Worker(
queueName,
async () => {
await client.hdel(`${prefix}:${queueName}:repeat:repeat`, 'data');
this.clock.tick(nextTick);
},
{
autorun: false,
connection,
prefix,
skipStalledCheck: true,
skipLockRenewal: true,
},
);
const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {
console.log('delay');
});
const templateData = { foo: 'bar' };

let prev: Job;
let counter = 0;
const completing = new Promise<void>((resolve, reject) => {
worker.on('completed', async job => {
try {
expect(job.data).to.deep.equal(templateData);

counter++;
if (counter == 5) {
resolve();
}
} catch (error) {
console.log(error);
reject(error);
}
});
});

await queue.upsertJobScheduler(
'repeat',
{
pattern: '0 1 * * *',
endDate: new Date('2017-05-10 01:00:00'),
},
{ data: { foo: 'bar' } },
);

this.clock.tick(nextTick + delay);

worker.run();

await completing;
await worker.close();
delayStub.restore();
});
});

describe('when utc option is provided', function () {
it('repeats once a day for 5 days', async function () {
this.timeout(8000);
Expand Down

0 comments on commit 95edb40

Please sign in to comment.