Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(job): add moveToWait method for manual processing #2978

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions docs/gitbook/patterns/manually-fetching-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,24 @@ while (1) {
}
}
```

## Rate Limiting

If you want to move a job back to wait because your queue is rate limited.

```typescript
const worker = new Worker('my-queue', null, { connection, prefix });
const token = 'my-token';
await Job.create(queue, 'test', { foo: 'bar' });
const job = (await worker.getNextJob(token)) as Job;

await queue.rateLimit(60000);
await job.moveToWait(token);
```

## Read more:

- 💡 [Get Next Job API Reference](https://api.docs.bullmq.io/classes/v5.Worker.html#getNextJob)
- 💡 [Move To Completed API Reference](https://api.docs.bullmq.io/classes/v5.Job.html#moveToCompleted)
- 💡 [Move To Failed API Reference](https://api.docs.bullmq.io/classes/v5.Job.html#moveToFailed)
- 💡 [Move To Wait API Reference](https://api.docs.bullmq.io/classes/v5.Job.html#moveToWait)
10 changes: 10 additions & 0 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,16 @@ export class Job<
return result;
}

/**
* Moves a job to the wait or prioritized state.
*
* @param token - Worker token used to acquire completed job.
* @returns Returns pttl.
*/
moveToWait(token: string): Promise<number> {
return this.scripts.moveJobFromActiveToWait(this.id, token);
}

private async shouldRetryJob(err: Error): Promise<[boolean, number]> {
if (
this.attemptsMade + 1 < this.opts.attempts &&
Expand Down
2 changes: 1 addition & 1 deletion src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1285,6 +1285,6 @@ will never work with more accuracy than 1ms. */
job: Job<DataType, ResultType, NameType>,
token: string,
) {
return this.scripts.moveJobFromActiveToWait(job.id, token);
return job.moveToWait(token);
}
}
15 changes: 15 additions & 0 deletions tests/test_job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,21 @@ describe('Job', function () {
});
});

describe('.moveToWait', () => {
it('moves job to wait from active', async function () {
const worker = new Worker(queueName, null, { connection, prefix });
const token = 'my-token';
await Job.create(queue, 'test', { foo: 'bar' });
const job = (await worker.getNextJob(token)) as Job;
const isWaiting = await job.isWaiting();
expect(isWaiting).to.be.equal(false);
await job.moveToWait(token);
const isisWaiting2 = await job.isWaiting();
expect(isisWaiting2).to.be.equal(true);
await worker.close();
});
});

describe('.changeDelay', () => {
it('can change delay of a delayed job', async function () {
this.timeout(8000);
Expand Down
Loading