From e8b12af7d2ea3048fcde1891654a4302637ea42c Mon Sep 17 00:00:00 2001 From: Jack Ellis Date: Wed, 24 Jan 2024 11:54:51 +0000 Subject: [PATCH] fix: allow signal and message queues to be closed --- .../queue/src/connection/getConnection.ts | 24 +++++- packages/queue/src/disconnect.ts | 15 ++++ packages/queue/src/index.ts | 3 +- packages/queue/src/listeners/createWorker.ts | 29 ++++++-- packages/queue/src/queues/getMessageQueue.ts | 74 ++++++++++++------- packages/queue/src/queues/getQueue.ts | 31 ++++++-- 6 files changed, 134 insertions(+), 42 deletions(-) create mode 100644 packages/queue/src/disconnect.ts diff --git a/packages/queue/src/connection/getConnection.ts b/packages/queue/src/connection/getConnection.ts index f3a657ea..286f62e0 100644 --- a/packages/queue/src/connection/getConnection.ts +++ b/packages/queue/src/connection/getConnection.ts @@ -1,8 +1,28 @@ import IORedis from 'ioredis'; import { BULLMQ_REDIS_URI } from '../constants'; -const getConnection = () => { - return new IORedis(BULLMQ_REDIS_URI); +export class Connection extends IORedis { + constructor(public name: string, connectionUri: string) { + super(connectionUri); + Connection.connections[name] = this; + } + + close() { + delete Connection.connections[this.name]; + super.disconnect(); + } + + static connections: Record = {}; + + static closeAll() { + Object.values(Connection.connections).forEach((connection) => { + connection.close(); + }); + } +} + +const getConnection = (name: string) => { + return Connection.connections[name] || new Connection(name, BULLMQ_REDIS_URI); }; export default getConnection; diff --git a/packages/queue/src/disconnect.ts b/packages/queue/src/disconnect.ts new file mode 100644 index 00000000..0a3dc230 --- /dev/null +++ b/packages/queue/src/disconnect.ts @@ -0,0 +1,15 @@ +import { Connection } from './connection/getConnection'; +import { NftxQueue } from './queues/getQueue'; +import { MessageQueue } from './queues/getMessageQueue'; + +// Tear down all connections and queues +const disconnect = () => { + // Kill the message queue + MessageQueue.close(); + // Kill all signal queues + NftxQueue.closeAll(); + // Kill all remaining connections + Connection.closeAll(); +}; + +export default disconnect; diff --git a/packages/queue/src/index.ts b/packages/queue/src/index.ts index 262b16e6..40efaff2 100644 --- a/packages/queue/src/index.ts +++ b/packages/queue/src/index.ts @@ -1,5 +1,6 @@ export * from './actions'; export * from './connection'; export * from './listeners'; -// export * from './queues'; +export * from './queues'; export * from './types'; +export { default as disconnect } from './disconnect'; diff --git a/packages/queue/src/listeners/createWorker.ts b/packages/queue/src/listeners/createWorker.ts index af73e136..30efffc2 100644 --- a/packages/queue/src/listeners/createWorker.ts +++ b/packages/queue/src/listeners/createWorker.ts @@ -1,13 +1,32 @@ import { Job, Worker } from 'bullmq'; import { getConnection } from '../connection'; +export class NftxWorker extends Worker { + constructor(queueName: string, onJob: (job: Job) => any) { + super(queueName, onJob, { + concurrency: 8, + connection: getConnection(queueName), + }); + NftxWorker.workers[queueName] = this; + } + + close() { + delete NftxWorker.workers[this.name]; + return super.close(); + } + + static workers: Record = {}; + + static closeAll() { + Object.values(NftxWorker.workers).forEach((worker) => { + worker.close(); + }); + } +} + // Create a bullmq worker for the given queue name (internal use) const createWorker = (queueName: string, onJob: (job: Job) => any) => { - const connection = getConnection(); - const worker = new Worker(queueName, onJob, { - concurrency: 8, - connection, - }); + const worker = new NftxWorker(queueName, onJob); return worker; }; diff --git a/packages/queue/src/queues/getMessageQueue.ts b/packages/queue/src/queues/getMessageQueue.ts index 526579f6..fd2ffd79 100644 --- a/packages/queue/src/queues/getMessageQueue.ts +++ b/packages/queue/src/queues/getMessageQueue.ts @@ -1,40 +1,62 @@ import { getConnection } from '../connection'; +import { Connection } from '../connection/getConnection'; import { MESSAGE_QUEUE_NAME } from '../constants'; -let queue: { +interface IMessageQueue { add: (type: string, payload: Record) => void; - subscribe: (cb: (type: string, payload: any) => void) => void; -}; + subscribe: (cb: (type: string, payload: any) => void) => () => void; + close: () => void; +} -/** Returns a queue object that allows you to emit or subscribe to messages */ -const getMessageQueue = () => { - if (!queue) { - const connection = getConnection(); - const channel = MESSAGE_QUEUE_NAME; - - const add = (type: string, payload: Record) => { - console.debug(`Sending message: ${type}`); - const message = JSON.stringify({ type, payload }); - connection.publish(channel, message); - }; - const subscribe = ( - callback: (type: string, payload: Record) => any - ) => { - connection.subscribe(MESSAGE_QUEUE_NAME, (err) => { +export class MessageQueue implements IMessageQueue { + channel = MESSAGE_QUEUE_NAME; + + constructor(public connection: Connection) { + MessageQueue.queue = this; + } + + add(type: string, payload: Record) { + console.debug(`Sending message: ${type}`); + const message = JSON.stringify({ type, payload }); + this.connection.publish(this.channel, message); + } + subscribe(cb: (type: string, payload: any) => void) { + this.connection.subscribe(MESSAGE_QUEUE_NAME, (err) => { + if (err) { + console.error(`Failed to subscribe: ${err.message}`); + } + }); + + this.connection.on('message', (channel, message) => { + const { type, payload } = JSON.parse(message); + cb(type, payload); + }); + + return () => { + this.connection.unsubscribe(MESSAGE_QUEUE_NAME, (err) => { if (err) { - console.error(`Failed to subscribe: ${err.message}`); + console.error(`Failed to unsubscribe: ${err.message}`); } }); - - connection.on('message', (channel, message) => { - const { type, payload } = JSON.parse(message); - callback(type, payload); - }); }; + } + close() { + MessageQueue.queue = undefined; + this.connection.close(); + } - queue = { add, subscribe }; + static queue: IMessageQueue | undefined; + + static close() { + MessageQueue.queue?.close(); } - return queue; +} + +/** Returns a queue object that allows you to emit or subscribe to messages */ +const getMessageQueue = () => { + return ( + MessageQueue.queue || new MessageQueue(getConnection(MESSAGE_QUEUE_NAME)) + ); }; export default getMessageQueue; diff --git a/packages/queue/src/queues/getQueue.ts b/packages/queue/src/queues/getQueue.ts index 12a4fedd..17bcfd06 100644 --- a/packages/queue/src/queues/getQueue.ts +++ b/packages/queue/src/queues/getQueue.ts @@ -1,15 +1,30 @@ -import { Queue } from 'bullmq'; +import { Queue, QueueOptions } from 'bullmq'; import { getConnection } from '../connection'; -const queues: Record = {}; +export class NftxQueue extends Queue { + constructor(name: string, opts: QueueOptions) { + super(name, opts); + NftxQueue.queues[name] = this; + } + close() { + delete NftxQueue.queues[this.name]; + return super.close(); + } -const getQueue = (queueName: string) => { - let queue = queues[queueName]; - if (!queue) { - const connection = getConnection(); - queue = queues[queueName] = new Queue(queueName, { connection }); + static queues: Record = {}; + + static closeAll() { + Object.values(NftxQueue.queues).forEach((queue) => { + queue.close(); + }); } - return queue; +} + +const getQueue = (queueName: string) => { + return ( + NftxQueue.queues[queueName] || + new NftxQueue(queueName, { connection: getConnection(queueName) }) + ); }; export default getQueue;