Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: guarantee every repeatable jobs are slotted #2966

Merged
merged 5 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ export class JobScheduler extends QueueBase {

async upsertJobScheduler<T = any, R = any, N extends string = string>(
jobSchedulerId: string,
repeatOpts: Omit<RepeatOptions, 'key' | 'prevMillis' | 'offset'>,
repeatOpts: Omit<RepeatOptions, 'key' | 'prevMillis'>,
jobName: N,
jobData: T,
opts: JobSchedulerTemplateOptions,
{ override }: { override: boolean },
{ override, producerId }: { override: boolean; producerId?: string },
): Promise<Job<T, R, N> | undefined> {
const { every, pattern } = repeatOpts;
const { every, pattern, offset } = repeatOpts;

if (pattern && every) {
throw new Error(
Expand All @@ -59,6 +59,12 @@ export class JobScheduler extends QueueBase {
);
}

if (repeatOpts.immediately && repeatOpts.every) {
console.warn(
"Using option immediately with every does not affect the job's schedule. Job will run immediately anyway.",
);
}

// Check if we reached the limit of the repeatable job's iterations
const iterationCount = repeatOpts.count ? repeatOpts.count + 1 : 1;
if (
Expand All @@ -75,24 +81,35 @@ export class JobScheduler extends QueueBase {
return;
}

const prevMillis = opts.prevMillis || 0;

// Check if we have a start date for the repeatable job
const { startDate, immediately, ...filteredRepeatOpts } = repeatOpts;
if (startDate) {
const startMillis = new Date(startDate).getTime();
now = startMillis > now ? startMillis : now;
}

const prevMillis = opts.prevMillis || 0;
now = prevMillis < now ? now : prevMillis;

let nextMillis: number;
let newOffset = offset;

if (every) {
nextMillis = prevMillis + every;
const nextSlot = Math.floor(now / every) * every + every;
if (prevMillis || offset) {
nextMillis = nextSlot + (offset || 0);
} else {
nextMillis = now;
newOffset = every - (nextSlot - now);

// newOffset should always be positive, but as an extra safety check
newOffset = newOffset < 0 ? 0 : newOffset;
}

if (nextMillis < now) {
nextMillis = now;
}
} else if (pattern) {
now = prevMillis < now ? now : prevMillis;
nextMillis = await this.repeatStrategy(now, repeatOpts, jobName);
}

Expand Down Expand Up @@ -149,11 +166,12 @@ export class JobScheduler extends QueueBase {
jobSchedulerId,
{
...opts,
repeat: filteredRepeatOpts,
repeat: { ...filteredRepeatOpts, offset: newOffset },
telemetry,
},
jobData,
iterationCount,
producerId,
);

const results = await multi.exec(); // multi.exec returns an array of results [ err, result ][]
Expand Down Expand Up @@ -189,6 +207,8 @@ export class JobScheduler extends QueueBase {
opts: JobsOptions,
data: T,
currentCount: number,
// The job id of the job that produced this next iteration
producerId?: string,
) {
//
// Generate unique job id for this iteration.
Expand All @@ -215,6 +235,11 @@ export class JobScheduler extends QueueBase {
const job = new this.Job<T, R, N>(this, name, data, mergedOpts, jobId);
job.addJob(client);

if (producerId) {
const producerJobKey = this.toKey(producerId);
client.hset(producerJobKey, 'nrjid', job.id);
}

return job;
}

Expand Down
11 changes: 11 additions & 0 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ export class Job<
*/
repeatJobKey?: string;

/**
* Produced next repetable job Id.
*
*/
nextRepeatableJobId?: string;

/**
* The token used for locking this job.
*/
Expand Down Expand Up @@ -384,6 +390,10 @@ export class Job<
job.processedBy = json.pb;
}

if (json.nrjid) {
job.nextRepeatableJobId = json.nrjid;
}

return job;
}

Expand Down Expand Up @@ -493,6 +503,7 @@ export class Job<
deduplicationId: this.deduplicationId,
repeatJobKey: this.repeatJobKey,
returnvalue: JSON.stringify(this.returnvalue),
nrjid: this.nextRepeatableJobId,
});
}

Expand Down
10 changes: 6 additions & 4 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ will never work with more accuracy than 1ms. */
job.token = token;

// Add next scheduled job if necessary.
if (job.opts.repeat) {
if (job.opts.repeat && !job.nextRepeatableJobId) {
// Use new job scheduler if possible
if (job.repeatJobKey) {
const jobScheduler = await this.jobScheduler;
Expand All @@ -798,7 +798,7 @@ will never work with more accuracy than 1ms. */
job.name,
job.data,
job.opts,
{ override: false },
{ override: false, producerId: job.id },
);
} else {
const repeat = await this.repeat;
Expand Down Expand Up @@ -835,6 +835,8 @@ will never work with more accuracy than 1ms. */
});

const handleCompleted = async (result: ResultType) => {
jobsInProgress.delete(inProgressItem);

if (!this.connection.closing) {
const completed = await job.moveToCompleted(
result,
Expand All @@ -855,6 +857,8 @@ will never work with more accuracy than 1ms. */
};

const handleFailed = async (err: Error) => {
jobsInProgress.delete(inProgressItem);

if (!this.connection.closing) {
try {
// Check if the job was manually rate-limited
Expand Down Expand Up @@ -911,8 +915,6 @@ will never work with more accuracy than 1ms. */
[TelemetryAttributes.JobFinishedTimestamp]: Date.now(),
[TelemetryAttributes.JobProcessedTimestamp]: processedOn,
});

jobsInProgress.delete(inProgressItem);
}
},
srcPropagationMedatada,
Expand Down
2 changes: 2 additions & 0 deletions src/interfaces/job-json.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export interface JobJson {
parent?: ParentKeys;
parentKey?: string;
repeatJobKey?: string;
nextRepeatableJobKey?: string;
debounceId?: string;
deduplicationId?: string;
processedBy?: string;
Expand All @@ -41,6 +42,7 @@ export interface JobJsonRaw {
parent?: string;
deid?: string;
rjk?: string;
nrjid?: string;
atm?: string;
ats?: string;
pb?: string; // Worker name
Expand Down
15 changes: 7 additions & 8 deletions src/interfaces/repeat-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export interface RepeatOptions extends Omit<ParserOptions, 'iterator'> {

/**
* Repeated job should start right now
* ( work only with every settings)
* ( work only with cron settings)
*/
immediately?: boolean;

Expand All @@ -42,16 +42,15 @@ export interface RepeatOptions extends Omit<ParserOptions, 'iterator'> {
count?: number;

/**
* Internal property to store the previous time the job was executed.
*/
prevMillis?: number;
* Offset in milliseconds to affect the next iteration time
*
* */
offset?: number;

/**
* Internal property to store the offset to apply to the next iteration.
*
* @deprecated
* Internal property to store the previous time the job was executed.
*/
offset?: number;
prevMillis?: number;

/**
* Internal property to store de job id
Expand Down
Loading
Loading