From 5eb03cd7f27027191eb4bc4ed7386755fd9be1fb Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Tue, 5 Nov 2024 18:38:38 -0600 Subject: [PATCH] feat(queue-events): add QueueEventsProducer for publishing custom events (#2844) --- docs/gitbook/SUMMARY.md | 3 +- .../guide/{events.md => events/README.md} | 2 +- .../guide/events/create-custom-events.md | 44 ++++++++++++++ src/classes/index.ts | 1 + src/classes/queue-events-producer.ts | 58 +++++++++++++++++++ src/classes/queue-events.ts | 48 ++++++++------- tests/test_events.ts | 54 ++++++++++++++++- 7 files changed, 187 insertions(+), 23 deletions(-) rename docs/gitbook/guide/{events.md => events/README.md} (96%) create mode 100644 docs/gitbook/guide/events/create-custom-events.md create mode 100644 src/classes/queue-events-producer.ts diff --git a/docs/gitbook/SUMMARY.md b/docs/gitbook/SUMMARY.md index 4dd39d5106..58dbb9c43d 100644 --- a/docs/gitbook/SUMMARY.md +++ b/docs/gitbook/SUMMARY.md @@ -53,7 +53,8 @@ * [Parallelism and Concurrency](guide/parallelism-and-concurrency.md) * [Retrying failing jobs](guide/retrying-failing-jobs.md) * [Returning job data](guide/returning-job-data.md) -* [Events](guide/events.md) +* [Events](guide/events/README.md) + * [Create Custom Events](guide/events/create-custom-events.md) * [Telemetry](guide/telemetry/README.md) * [Getting started](guide/telemetry/getting-started.md) * [Running Jaeger](guide/telemetry/running-jaeger.md) diff --git a/docs/gitbook/guide/events.md b/docs/gitbook/guide/events/README.md similarity index 96% rename from docs/gitbook/guide/events.md rename to docs/gitbook/guide/events/README.md index 144b92dd0a..f9a0e6e33a 100644 --- a/docs/gitbook/guide/events.md +++ b/docs/gitbook/guide/events/README.md @@ -32,7 +32,7 @@ myWorker.on('failed', (job: Job) => { }); ``` -The events above are local for the workers that actually completed the jobs. However, in many situations you want to listen to all the events emitted by all the workers in one single place. For this you can use the [`QueueEvents`](../api/bullmq.queueevents.md) class: +The events above are local for the workers that actually completed the jobs. However, in many situations you want to listen to all the events emitted by all the workers in one single place. For this you can use the [`QueueEvents`](https://api.docs.bullmq.io/classes/v5.QueueEvents.html) class: ```typescript import { QueueEvents } from 'bullmq'; diff --git a/docs/gitbook/guide/events/create-custom-events.md b/docs/gitbook/guide/events/create-custom-events.md new file mode 100644 index 0000000000..9698fbe49c --- /dev/null +++ b/docs/gitbook/guide/events/create-custom-events.md @@ -0,0 +1,44 @@ +# Create Custom Events + +In BullMQ, creating a generic distributed realtime event emitter is possible by using our **QueueEventsProducer** class. + +Consumers must use **QueueEvents** class to subscribe to those events that they are interested in. + +```typescript +const queueName = 'customQueue'; +const queueEventsProducer = new QueueEventsProducer(queueName, { + connection, +}); +const queueEvents = new QueueEvents(queueName, { + connection, +}); + +interface CustomListener extends QueueEventsListener { + example: (args: { custom: string }, id: string) => void; +} +queueEvents.on('example', async ({ custom }) => { + // custom logic +}); + +interface CustomEventPayload { + eventName: string; + custom: string; +} + +await queueEventsProducer.publishEvent({ + eventName: 'example', + custom: 'value', +}); +``` + +Only eventName attribute is required. + +{% hint style="warning" %} +Some event names are reserved from [Queue Listener API Reference](https://api.docs.bullmq.io/interfaces/v5.QueueListener.html). +{% endhint %} + +## Read more: + +- 💡 [Queue Events API Reference](https://api.docs.bullmq.io/classes/v5.QueueEvents.html) +- 💡 [Queue Events Listener API Reference](https://api.docs.bullmq.io/interfaces/v5.QueueEventsListener.html) +- 💡 [Queue Events Producer API Reference](https://api.docs.bullmq.io/interfaces/v5.QueueEventsProducer.html) diff --git a/src/classes/index.ts b/src/classes/index.ts index 314bb8f0b8..ad2158913c 100644 --- a/src/classes/index.ts +++ b/src/classes/index.ts @@ -9,6 +9,7 @@ export * from './job'; // export * from './main-worker'; this file must not be exported export * from './queue-base'; export * from './queue-events'; +export * from './queue-events-producer'; export * from './queue-getters'; export * from './queue-keys'; export * from './queue'; diff --git a/src/classes/queue-events-producer.ts b/src/classes/queue-events-producer.ts new file mode 100644 index 0000000000..87d7487d16 --- /dev/null +++ b/src/classes/queue-events-producer.ts @@ -0,0 +1,58 @@ +import { QueueBaseOptions } from '../interfaces'; +import { QueueBase } from './queue-base'; +import { RedisConnection } from './redis-connection'; + +/** + * The QueueEventsProducer class is used for publishing custom events. + */ +export class QueueEventsProducer extends QueueBase { + constructor( + name: string, + opts: QueueBaseOptions = { + connection: {}, + }, + Connection?: typeof RedisConnection, + ) { + super( + name, + { + blockingConnection: false, + ...opts, + }, + Connection, + ); + + this.opts = opts; + } + + /** + * Publish custom event to be processed in QueueEvents. + * @param argsObj - Event payload + * @param maxEvents - Max quantity of events to be saved + */ + async publishEvent( + argsObj: T, + maxEvents = 1000, + ): Promise { + const client = await this.client; + const key = this.keys.events; + const { eventName, ...restArgs } = argsObj; + const args: any[] = ['MAXLEN', '~', maxEvents, '*', 'event', eventName]; + + for (const [key, value] of Object.entries(restArgs)) { + args.push(key, value); + } + + await client.xadd(key, ...args); + } + + /** + * Closes the connection and returns a promise that resolves when the connection is closed. + */ + async close(): Promise { + if (!this.closing) { + this.closing = this.connection.close(); + } + await this.closing; + } +} diff --git a/src/classes/queue-events.ts b/src/classes/queue-events.ts index 76e977fa21..f3b9407fcb 100644 --- a/src/classes/queue-events.ts +++ b/src/classes/queue-events.ts @@ -172,6 +172,12 @@ export interface QueueEventsListener extends IoredisListener { 'waiting-children': (args: { jobId: string }, id: string) => void; } +type CustomParameters = T extends (...args: infer Args) => void + ? Args + : never; + +type KeyOf = Extract; + /** * The QueueEvents class is used for listening to the global events * emitted by a given queue. @@ -213,34 +219,34 @@ export class QueueEvents extends QueueBase { } } - emit( - event: U, - ...args: Parameters - ): boolean { + emit< + QEL extends QueueEventsListener = QueueEventsListener, + U extends KeyOf = KeyOf, + >(event: U, ...args: CustomParameters): boolean { return super.emit(event, ...args); } - off( - eventName: U, - listener: QueueEventsListener[U], - ): this { - super.off(eventName, listener); + off< + QEL extends QueueEventsListener = QueueEventsListener, + U extends KeyOf = KeyOf, + >(eventName: U, listener: QEL[U]): this { + super.off(eventName, listener as (...args: any[]) => void); return this; } - on( - event: U, - listener: QueueEventsListener[U], - ): this { - super.on(event, listener); + on< + QEL extends QueueEventsListener = QueueEventsListener, + U extends KeyOf = KeyOf, + >(event: U, listener: QEL[U]): this { + super.on(event, listener as (...args: any[]) => void); return this; } - once( - event: U, - listener: QueueEventsListener[U], - ): this { - super.once(event, listener); + once< + QEL extends QueueEventsListener = QueueEventsListener, + U extends KeyOf = KeyOf, + >(event: U, listener: QEL[U]): this { + super.once(event, listener as (...args: any[]) => void); return this; } @@ -310,7 +316,9 @@ export class QueueEvents extends QueueBase { this.emit(event, id); } else { this.emit(event as any, restArgs, id); - this.emit(`${event}:${restArgs.jobId}` as any, restArgs, id); + if (restArgs.jobId) { + this.emit(`${event}:${restArgs.jobId}` as any, restArgs, id); + } } } } diff --git a/tests/test_events.ts b/tests/test_events.ts index a77ca2f985..a1fd791d99 100644 --- a/tests/test_events.ts +++ b/tests/test_events.ts @@ -3,7 +3,14 @@ import { v4 } from 'uuid'; import { expect } from 'chai'; import { after } from 'lodash'; import { beforeEach, describe, it, before, after as afterAll } from 'mocha'; -import { FlowProducer, Queue, QueueEvents, Worker } from '../src/classes'; +import { + FlowProducer, + Queue, + QueueEvents, + QueueEventsListener, + QueueEventsProducer, + Worker, +} from '../src/classes'; import { delay, removeAllQueueData } from '../src/utils'; describe('events', function () { @@ -1253,4 +1260,49 @@ describe('events', function () { await trimmedQueue.close(); await removeAllQueueData(new IORedis(redisHost), queueName); }); + + describe('when publishing custom events', function () { + it('emits waiting when a job has been added', async () => { + const queueName2 = `test-${v4()}`; + const queueEventsProducer = new QueueEventsProducer(queueName2, { + connection, + prefix, + }); + const queueEvents2 = new QueueEvents(queueName2, { + autorun: false, + connection, + prefix, + lastEventId: '0-0', + }); + await queueEvents2.waitUntilReady(); + + interface CustomListener extends QueueEventsListener { + example: (args: { custom: string }, id: string) => void; + } + const customEvent = new Promise(resolve => { + queueEvents2.on('example', async ({ custom }) => { + await delay(250); + await expect(custom).to.be.equal('value'); + resolve(); + }); + }); + + interface CustomEventPayload { + eventName: string; + custom: string; + } + + await queueEventsProducer.publishEvent({ + eventName: 'example', + custom: 'value', + }); + + queueEvents2.run(); + await customEvent; + + await queueEventsProducer.close(); + await queueEvents2.close(); + await removeAllQueueData(new IORedis(redisHost), queueName2); + }); + }); });