Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(job-scheduler): validate if next delayed job exists #3015

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
28 changes: 15 additions & 13 deletions src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,22 +165,24 @@ export class JobScheduler extends QueueBase {
producerId,
);

const job = new this.Job<T, R, N>(
this,
jobName,
jobData,
mergedOpts,
jobId,
);
if (jobId) {
const job = new this.Job<T, R, N>(
this,
jobName,
jobData,
mergedOpts,
jobId,
);

job.id = jobId;
job.id = jobId;

span?.setAttributes({
[TelemetryAttributes.JobSchedulerId]: jobSchedulerId,
[TelemetryAttributes.JobId]: job.id,
});
span?.setAttributes({
[TelemetryAttributes.JobSchedulerId]: jobSchedulerId,
[TelemetryAttributes.JobId]: job.id,
});

return job;
return job;
}
} else {
const jobId = await this.scripts.updateJobSchedulerNextMillis(
jobSchedulerId,
Expand Down
77 changes: 20 additions & 57 deletions src/commands/addJobScheduler-6.lua
Original file line number Diff line number Diff line change
Expand Up @@ -40,45 +40,7 @@ local prefixKey = ARGV[8]
--- @include "includes/addDelayedJob"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/removeJob"

local function storeRepeatableJob(schedulerId, schedulerKey, repeatKey, nextMillis, opts, templateData, templateOpts)
rcall("ZADD", repeatKey, nextMillis, schedulerId)

local optionalValues = {}
if opts['tz'] then
table.insert(optionalValues, "tz")
table.insert(optionalValues, opts['tz'])
end

if opts['pattern'] then
table.insert(optionalValues, "pattern")
table.insert(optionalValues, opts['pattern'])
end

if opts['endDate'] then
table.insert(optionalValues, "endDate")
table.insert(optionalValues, opts['endDate'])
end

if opts['every'] then
table.insert(optionalValues, "every")
table.insert(optionalValues, opts['every'])
end

local jsonTemplateOpts = cjson.encode(templateOpts)
if jsonTemplateOpts and jsonTemplateOpts ~= '{}' then
table.insert(optionalValues, "opts")
table.insert(optionalValues, jsonTemplateOpts)
end

if templateData and templateData ~= '{}' then
table.insert(optionalValues, "data")
table.insert(optionalValues, templateData)
end

rcall("HMSET", schedulerKey, "name", opts['name'], "ic", 1,
unpack(optionalValues))
end
--- @include "includes/storeJobScheduler"

local schedulerKey = repeatKey .. ":" .. jobSchedulerId
local nextDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis
Expand All @@ -88,33 +50,34 @@ local nextDelayedJobKey = schedulerKey .. ":" .. nextMillis
-- the next iteration.
local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId)
if prevMillis ~= false then
local delayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis
local prevDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis

if rcall("ZSCORE", delayedKey, delayedJobId) ~= false
and (rcall("EXISTS", nextDelayedJobKey) ~= 1
or delayedJobId == nextDelayedJobId) then
removeJob(delayedJobId, true, prefixKey, true --[[remove debounce key]])
rcall("ZREM", delayedKey, delayedJobId)
if rcall("ZSCORE", delayedKey, prevDelayedJobId) ~= false then
removeJob(prevDelayedJobId, true, prefixKey, true --[[remove debounce key]])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of removing the job here and then adding a new one, couldn't we just set a flag here and if true not add the job again?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem is that if the delayed is not regenerated with new repeat opts, next delayed job will be scheduled with old repeat options. This is happening in that way because we use opts from current delayed job to schedule the next one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, can you write a comment then to explain why it is needed so we not need to remember it next time? :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I'll add a description about it

rcall("ZREM", delayedKey, prevDelayedJobId)
end
end

local schedulerOpts = cmsgpack.unpack(ARGV[2])

storeRepeatableJob(jobSchedulerId, schedulerKey, repeatKey, nextMillis, schedulerOpts, ARGV[4], templateOpts)
storeJobScheduler(jobSchedulerId, schedulerKey, repeatKey, nextMillis, schedulerOpts, ARGV[4], templateOpts)

local eventsKey = KEYS[5]
local metaKey = KEYS[2]
local maxEvents = getOrSetMaxEvents(metaKey)
if rcall("EXISTS", nextDelayedJobKey) ~= 1 then
local eventsKey = KEYS[5]
local metaKey = KEYS[2]
local maxEvents = getOrSetMaxEvents(metaKey)

rcall("INCR", KEYS[3])
rcall("INCR", KEYS[3])
rcall("HINCRBY", schedulerKey, "ic", 1)

local delayedOpts = cmsgpack.unpack(ARGV[6])
local delayedOpts = cmsgpack.unpack(ARGV[6])

addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerOpts['name'], ARGV[4], delayedOpts,
timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil)
addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerOpts['name'], ARGV[4], delayedOpts,
timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil)

if ARGV[9] ~= "" then
rcall("HSET", ARGV[9], "nrjid", nextDelayedJobId)
end
if ARGV[9] ~= "" then
rcall("HSET", ARGV[9], "nrjid", nextDelayedJobId)
end

return nextDelayedJobId .. "" -- convert to string
return nextDelayedJobId .. "" -- convert to string
end
41 changes: 41 additions & 0 deletions src/commands/includes/storeJobScheduler.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
--[[
Function to store a job scheduler
]]
local function storeJobScheduler(schedulerId, schedulerKey, repeatKey, nextMillis, opts, templateData, templateOpts)
rcall("ZADD", repeatKey, nextMillis, schedulerId)

local optionalValues = {}
if opts['tz'] then
table.insert(optionalValues, "tz")
table.insert(optionalValues, opts['tz'])
end

if opts['pattern'] then
table.insert(optionalValues, "pattern")
table.insert(optionalValues, opts['pattern'])
end

if opts['endDate'] then
table.insert(optionalValues, "endDate")
table.insert(optionalValues, opts['endDate'])
end

if opts['every'] then
table.insert(optionalValues, "every")
table.insert(optionalValues, opts['every'])
end

local jsonTemplateOpts = cjson.encode(templateOpts)
if jsonTemplateOpts and jsonTemplateOpts ~= '{}' then
table.insert(optionalValues, "opts")
table.insert(optionalValues, jsonTemplateOpts)
end

if templateData and templateData ~= '{}' then
table.insert(optionalValues, "data")
table.insert(optionalValues, templateData)
end

rcall("HMSET", schedulerKey, "name", opts['name'], "ic", 0,
unpack(optionalValues))
end
40 changes: 21 additions & 19 deletions src/commands/updateJobScheduler-7.lua
Original file line number Diff line number Diff line change
Expand Up @@ -47,31 +47,33 @@ if prevMillis ~= false then
local schedulerAttributes = rcall("HMGET", schedulerKey, "name", "data")

rcall("ZADD", repeatKey, nextMillis, jobSchedulerId)
rcall("HINCRBY", schedulerKey, "ic", 1)

local eventsKey = KEYS[5]
local metaKey = KEYS[2]
local maxEvents = getOrSetMaxEvents(metaKey)
if rcall("EXISTS", nextDelayedJobKey) ~= 1 then
local eventsKey = KEYS[5]
local metaKey = KEYS[2]
local maxEvents = getOrSetMaxEvents(metaKey)

rcall("INCR", KEYS[3])
rcall("INCR", KEYS[3])
rcall("HINCRBY", schedulerKey, "ic", 1)

local delayedOpts = cmsgpack.unpack(ARGV[4])
local delayedOpts = cmsgpack.unpack(ARGV[4])

-- TODO: remove this workaround in next breaking change,
-- all job-schedulers must save job data
local templateData = schedulerAttributes[2] or ARGV[3]
-- TODO: remove this workaround in next breaking change,
-- all job-schedulers must save job data
local templateData = schedulerAttributes[2] or ARGV[3]

if templateData and templateData ~= '{}' then
rcall("HSET", schedulerKey, "data", templateData)
end
if templateData and templateData ~= '{}' then
rcall("HSET", schedulerKey, "data", templateData)
end

addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerAttributes[1],
templateData or '{}', delayedOpts, timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil)

if KEYS[7] ~= "" then
rcall("HSET", KEYS[7], "nrjid", nextDelayedJobId)
end
addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerAttributes[1],
templateData or '{}', delayedOpts, timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil)
if KEYS[7] ~= "" then
rcall("HSET", KEYS[7], "nrjid", nextDelayedJobId)
end

return nextDelayedJobId .. "" -- convert to string
return nextDelayedJobId .. "" -- convert to string
end
end
end
4 changes: 2 additions & 2 deletions tests/test_bulk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,14 @@ describe('bulk jobs', () => {
const worker = new Worker(
queueName,
async () => {
await delay(900);
await delay(800);
},
{ connection, prefix },
);
const worker2 = new Worker(
queueName,
async () => {
await delay(900);
await delay(800);
},
{ connection, prefix },
);
Expand Down
59 changes: 55 additions & 4 deletions tests/test_job_scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ describe('Job Scheduler', function () {
const repeatableJobs = await queue.getJobSchedulers();
expect(repeatableJobs.length).to.be.eql(1);
await this.clock.tickAsync(ONE_MINUTE);
const delayed = await queue.getDelayed();
expect(delayed).to.have.length(1);
const counts = await queue.getJobCounts();
expect(counts.delayed + counts.completed + counts.waiting).to.be.eql(1);

await worker.close();
});
Expand Down Expand Up @@ -1539,6 +1539,8 @@ describe('Job Scheduler', function () {

describe('when repeatable job is promoted', function () {
it('keeps one repeatable and one delayed after being processed', async function () {
this.clock.restore();

const repeatOpts = {
pattern: '0 * 1 * *',
};
Expand All @@ -1564,16 +1566,65 @@ describe('Job Scheduler', function () {
const delayedCount2 = await queue.getDelayedCount();
expect(delayedCount2).to.be.equal(1);

const configs = await repeat.getRepeatableJobs(0, -1, true);
const schedulersCount = await queue.getJobSchedulersCount();

expect(delayedCount).to.be.equal(1);

const count = await queue.count();

expect(count).to.be.equal(1);
expect(configs).to.have.length(1);
expect(schedulersCount).to.be.equal(1);
await worker.close();
});

describe('when scheduler is removed and re-added', function () {
it('should not add next delayed job if already existed in different state than delayed', async function () {
this.clock.restore();

const repeatOpts = {
pattern: '0 * 1 * *',
};

const worker = new Worker(
queueName,
async () => {
await queue.removeJobScheduler('test');
await queue.upsertJobScheduler('test', repeatOpts);
},
{
connection,
prefix,
},
);

const completing = new Promise<void>(resolve => {
worker.on('completed', () => {
resolve();
});
});

const repeatableJob = await queue.upsertJobScheduler(
'test',
repeatOpts,
);
const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.be.equal(1);

await repeatableJob!.promote();
await completing;

const delayedCountAfter = await queue.getDelayedCount();
expect(delayedCountAfter).to.be.equal(0);

const schedulersCount = await queue.getJobSchedulersCount();

const counts = await queue.getJobCounts();

expect(counts.completed).to.be.equal(1);
expect(schedulersCount).to.be.equal(1);
await worker.close();
});
});
});

it('should allow removing a named repeatable job', async function () {
Expand Down
Loading