From b58ba9a2e48f9cb673de20b5a139ee12295b1139 Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Sat, 28 Dec 2024 02:30:37 -0500 Subject: [PATCH 1/2] fix(events-producer): stringify values in args --- src/classes/queue-events-producer.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/classes/queue-events-producer.ts b/src/classes/queue-events-producer.ts index 87d7487d16..3dce3c555d 100644 --- a/src/classes/queue-events-producer.ts +++ b/src/classes/queue-events-producer.ts @@ -40,7 +40,7 @@ export class QueueEventsProducer extends QueueBase { const args: any[] = ['MAXLEN', '~', maxEvents, '*', 'event', eventName]; for (const [key, value] of Object.entries(restArgs)) { - args.push(key, value); + args.push(key, JSON.stringify(value)); } await client.xadd(key, ...args); From a2d6d5c8eb917183bbeb58a20ad010b2b6d3b0c5 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sat, 28 Dec 2024 23:48:07 -0500 Subject: [PATCH 2/2] refactor: deserialize custom event object --- src/classes/queue-events.ts | 31 ++++++++++++++++++++---- tests/test_events.ts | 47 ++++++++++++++++++++++++++++++++++++- 2 files changed, 72 insertions(+), 6 deletions(-) diff --git a/src/classes/queue-events.ts b/src/classes/queue-events.ts index ea875b8706..4c9529cd27 100644 --- a/src/classes/queue-events.ts +++ b/src/classes/queue-events.ts @@ -8,6 +8,7 @@ import { array2obj, clientCommandMessageReg, isRedisInstance, + parseObjectValues, QUEUE_EVENT_SUFFIX, } from '../utils'; import { QueueBase } from './queue-base'; @@ -298,20 +299,40 @@ export class QueueEvents extends QueueBase { id = events[i][0]; const args = array2obj(events[i][1]); + let { event, ...restArgs } = args; + // // TODO: we may need to have a separate xtream for progress data // to avoid this hack. - switch (args.event) { + switch (event) { + case 'active': + case 'added': + case 'cleaned': + case 'debounced': // TODO: to be removed in next breaking change + case 'deduplicated': + case 'delayed': + case 'duplicated': + case 'error': + case 'failed': + case 'paused': + case 'removed': + case 'resumed': + case 'retries-exhausted': + case 'stalled': + case 'waiting': + case 'waiting-children': + break; case 'progress': - args.data = JSON.parse(args.data); + restArgs.data = JSON.parse(restArgs.data); break; case 'completed': - args.returnvalue = JSON.parse(args.returnvalue); + restArgs.returnvalue = JSON.parse(restArgs.returnvalue); + break; + default: + restArgs = parseObjectValues(restArgs); break; } - const { event, ...restArgs } = args; - if (event === 'drained') { this.emit(event, id); } else { diff --git a/tests/test_events.ts b/tests/test_events.ts index 3ec45cc9ce..75e636b948 100644 --- a/tests/test_events.ts +++ b/tests/test_events.ts @@ -1269,7 +1269,7 @@ describe('events', function () { }); describe('when publishing custom events', function () { - it('emits waiting when a job has been added', async () => { + it('emits custom event', async () => { const queueName2 = `test-${v4()}`; const queueEventsProducer = new QueueEventsProducer(queueName2, { connection, @@ -1311,5 +1311,50 @@ describe('events', function () { await queueEvents2.close(); await removeAllQueueData(new IORedis(redisHost), queueName2); }); + + describe('when published event is an object', function () { + it('deserialize event', async () => { + const queueName2 = `test-${v4()}`; + const queueEventsProducer = new QueueEventsProducer(queueName2, { + connection, + prefix, + }); + const queueEvents2 = new QueueEvents(queueName2, { + autorun: false, + connection, + prefix, + lastEventId: '0-0', + }); + await queueEvents2.waitUntilReady(); + + interface CustomListener extends QueueEventsListener { + example: (args: { custom: { foo: string } }, id: string) => void; + } + const customEvent = new Promise(resolve => { + queueEvents2.on('example', async ({ custom }) => { + await delay(250); + await expect(custom.foo).to.be.equal('value'); + resolve(); + }); + }); + + interface CustomEventPayload { + eventName: string; + custom: { foo: string }; + } + + await queueEventsProducer.publishEvent({ + eventName: 'example', + custom: { foo: 'value' }, + }); + + queueEvents2.run(); + await customEvent; + + await queueEventsProducer.close(); + await queueEvents2.close(); + await removeAllQueueData(new IORedis(redisHost), queueName2); + }); + }); }); });