From 486aa0796f3ef9b19b17d7639cd3ecb099202f72 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Mon, 1 Apr 2024 21:40:43 -0500 Subject: [PATCH] refactor: add setGlobalConcurrency method --- src/classes/queue.ts | 15 ++++++++++++--- src/interfaces/queue-options.ts | 8 -------- tests/test_concurrency.ts | 23 +++++++++++++---------- 3 files changed, 25 insertions(+), 21 deletions(-) diff --git a/src/classes/queue.ts b/src/classes/queue.ts index 520a97befd..124e0d1ced 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -167,9 +167,6 @@ export class Queue< get metaValues(): Record { return { 'opts.maxLenEvents': this.opts?.streams?.events?.maxLen ?? 10000, - ...(this.opts?.concurrency - ? { concurrency: this.opts?.concurrency } - : {}), }; } @@ -186,6 +183,18 @@ export class Queue< }); } + /** + * Enable and set global concurrency value. + * @param concurrency - Maximum number of simultaneous jobs that the workers can handle. + * For instance, setting this value to 1 ensures that no more than one job + * is processed at any given time. If this limit is not defined, there will be no + * restriction on the number of concurrent jobs. + */ + async setGlobalConcurrency(concurrency: number) { + const client = await this.client; + return client.hset(this.keys.meta, 'concurrency', concurrency); + } + /** * Adds a new job to the queue. * diff --git a/src/interfaces/queue-options.ts b/src/interfaces/queue-options.ts index 631c24ae31..c8f00bd78c 100644 --- a/src/interfaces/queue-options.ts +++ b/src/interfaces/queue-options.ts @@ -54,14 +54,6 @@ export interface QueueOptions extends QueueBaseOptions { }; }; - /** - * Maximum number of simultaneous jobs that the workers can handle. - * For instance, setting this value to 1 ensures that no more than one job - * is processed at any given time. If this limit is not defined, there will be no - * restriction on the number of concurrent jobs. - */ - concurrency?: number; - settings?: AdvancedRepeatOptions; } diff --git a/tests/test_concurrency.ts b/tests/test_concurrency.ts index 3ae4f0a641..1b002f6e11 100644 --- a/tests/test_concurrency.ts +++ b/tests/test_concurrency.ts @@ -31,7 +31,7 @@ describe('Concurrency', () => { }); it('should run max concurrency for jobs added', async () => { - const queue = new Queue(queueName, { connection, concurrency: 1, prefix }); + const queue = new Queue(queueName, { connection, prefix }); const numJobs = 15; const jobsData: { name: string; data: any }[] = []; for (let j = 0; j < numJobs; j++) { @@ -42,7 +42,7 @@ describe('Concurrency', () => { } await queue.addBulk(jobsData); - + await queue.setGlobalConcurrency(1); const bar = new ProgressBar(':bar', { total: numJobs }); let count = 0; @@ -93,7 +93,7 @@ describe('Concurrency', () => { }).timeout(16000); it('emits drained global event only once when worker is idle', async function () { - const queue = new Queue(queueName, { connection, concurrency: 1, prefix }); + const queue = new Queue(queueName, { connection, prefix }); const worker = new Worker( queueName, async () => { @@ -119,6 +119,7 @@ describe('Concurrency', () => { { name: 'test', data: { foo: 'bar' } }, { name: 'test', data: { foo: 'baz' } }, ]); + await queue.setGlobalConcurrency(1); await delay(4000); @@ -138,11 +139,11 @@ describe('Concurrency', () => { const queue = new Queue(queueName, { connection, - concurrency: 1, prefix, }); const queueEvents = new QueueEvents(queueName, { connection, prefix }); await queueEvents.waitUntilReady(); + await queue.setGlobalConcurrency(1); const worker = new Worker( queueName, @@ -222,11 +223,11 @@ describe('Concurrency', () => { const queue = new Queue(queueName, { connection, - concurrency: 1, prefix, }); const queueEvents = new QueueEvents(queueName, { connection, prefix }); await queueEvents.waitUntilReady(); + await queue.setGlobalConcurrency(1); const worker = new Worker( queueName, @@ -304,7 +305,6 @@ describe('Concurrency', () => { const flow = new FlowProducer({ connection, prefix }); const queue = new Queue(queueName, { connection, - concurrency: 1, prefix, }); @@ -317,6 +317,8 @@ describe('Concurrency', () => { } await queue.addBulk(jobsData); + await queue.setGlobalConcurrency(1); + const name = 'child-job'; await flow.add({ @@ -394,13 +396,13 @@ describe('Concurrency', () => { const globalConcurrency = 2; const queue = new Queue(queueName, { connection, - concurrency: globalConcurrency, prefix, }); for (let j = 0; j < numJobs; j++) { await queue.add('test-stalled', { foo: j % 2 }); } + await queue.setGlobalConcurrency(globalConcurrency); const concurrency = 4; @@ -486,7 +488,6 @@ describe('Concurrency', () => { const globalConcurrency = 1; const queue = new Queue(queueName, { connection, - concurrency: globalConcurrency, prefix, }); @@ -497,6 +498,7 @@ describe('Concurrency', () => { { attempts: 2, backoff: 100 }, ); } + await queue.setGlobalConcurrency(globalConcurrency); const concurrency = 10; @@ -538,7 +540,6 @@ describe('Concurrency', () => { const globalConcurrency = 1; const queue = new Queue(queueName, { connection, - concurrency: globalConcurrency, prefix, }); @@ -549,6 +550,7 @@ describe('Concurrency', () => { { attempts: 2, backoff: 0 }, ); } + await queue.setGlobalConcurrency(globalConcurrency); const concurrency = 4; @@ -591,9 +593,10 @@ describe('Concurrency', () => { const globalConcurrency = 1; const queue = new Queue(queueName, { connection, - concurrency: globalConcurrency, prefix, }); + await queue.waitUntilReady(); + await queue.setGlobalConcurrency(globalConcurrency); const worker = new Worker(queueName, null, { connection, lockRenewTime: 200,