Skip to content

Commit

Permalink
fix: allow signal and message queues to be closed
Browse files Browse the repository at this point in the history
  • Loading branch information
jackmellis committed Jan 24, 2024
1 parent 6ad58f5 commit e8b12af
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 42 deletions.
24 changes: 22 additions & 2 deletions packages/queue/src/connection/getConnection.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,28 @@
import IORedis from 'ioredis';
import { BULLMQ_REDIS_URI } from '../constants';

const getConnection = () => {
return new IORedis(BULLMQ_REDIS_URI);
export class Connection extends IORedis {
constructor(public name: string, connectionUri: string) {
super(connectionUri);
Connection.connections[name] = this;
}

close() {
delete Connection.connections[this.name];
super.disconnect();
}

static connections: Record<string, Connection> = {};

static closeAll() {
Object.values(Connection.connections).forEach((connection) => {
connection.close();
});
}
}

const getConnection = (name: string) => {
return Connection.connections[name] || new Connection(name, BULLMQ_REDIS_URI);
};

export default getConnection;
15 changes: 15 additions & 0 deletions packages/queue/src/disconnect.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { Connection } from './connection/getConnection';
import { NftxQueue } from './queues/getQueue';
import { MessageQueue } from './queues/getMessageQueue';

// Tear down all connections and queues
const disconnect = () => {
// Kill the message queue
MessageQueue.close();
// Kill all signal queues
NftxQueue.closeAll();
// Kill all remaining connections
Connection.closeAll();
};

export default disconnect;
3 changes: 2 additions & 1 deletion packages/queue/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export * from './actions';
export * from './connection';
export * from './listeners';
// export * from './queues';
export * from './queues';
export * from './types';
export { default as disconnect } from './disconnect';
29 changes: 24 additions & 5 deletions packages/queue/src/listeners/createWorker.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,32 @@
import { Job, Worker } from 'bullmq';
import { getConnection } from '../connection';

export class NftxWorker extends Worker {
constructor(queueName: string, onJob: (job: Job) => any) {
super(queueName, onJob, {
concurrency: 8,
connection: getConnection(queueName),
});
NftxWorker.workers[queueName] = this;
}

close() {
delete NftxWorker.workers[this.name];
return super.close();
}

static workers: Record<string, NftxWorker> = {};

static closeAll() {
Object.values(NftxWorker.workers).forEach((worker) => {
worker.close();
});
}
}

// Create a bullmq worker for the given queue name (internal use)
const createWorker = (queueName: string, onJob: (job: Job) => any) => {
const connection = getConnection();
const worker = new Worker(queueName, onJob, {
concurrency: 8,
connection,
});
const worker = new NftxWorker(queueName, onJob);
return worker;
};

Expand Down
74 changes: 48 additions & 26 deletions packages/queue/src/queues/getMessageQueue.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,62 @@
import { getConnection } from '../connection';
import { Connection } from '../connection/getConnection';
import { MESSAGE_QUEUE_NAME } from '../constants';

let queue: {
interface IMessageQueue {
add: (type: string, payload: Record<string, any>) => void;
subscribe: (cb: (type: string, payload: any) => void) => void;
};
subscribe: (cb: (type: string, payload: any) => void) => () => void;
close: () => void;
}

/** Returns a queue object that allows you to emit or subscribe to messages */
const getMessageQueue = () => {
if (!queue) {
const connection = getConnection();
const channel = MESSAGE_QUEUE_NAME;

const add = (type: string, payload: Record<string, any>) => {
console.debug(`Sending message: ${type}`);
const message = JSON.stringify({ type, payload });
connection.publish(channel, message);
};
const subscribe = (
callback: (type: string, payload: Record<string, any>) => any
) => {
connection.subscribe(MESSAGE_QUEUE_NAME, (err) => {
export class MessageQueue implements IMessageQueue {
channel = MESSAGE_QUEUE_NAME;

constructor(public connection: Connection) {
MessageQueue.queue = this;
}

add(type: string, payload: Record<string, any>) {
console.debug(`Sending message: ${type}`);
const message = JSON.stringify({ type, payload });
this.connection.publish(this.channel, message);
}
subscribe(cb: (type: string, payload: any) => void) {
this.connection.subscribe(MESSAGE_QUEUE_NAME, (err) => {
if (err) {
console.error(`Failed to subscribe: ${err.message}`);
}
});

this.connection.on('message', (channel, message) => {
const { type, payload } = JSON.parse(message);
cb(type, payload);
});

return () => {
this.connection.unsubscribe(MESSAGE_QUEUE_NAME, (err) => {
if (err) {
console.error(`Failed to subscribe: ${err.message}`);
console.error(`Failed to unsubscribe: ${err.message}`);
}
});

connection.on('message', (channel, message) => {
const { type, payload } = JSON.parse(message);
callback(type, payload);
});
};
}
close() {
MessageQueue.queue = undefined;
this.connection.close();
}

queue = { add, subscribe };
static queue: IMessageQueue | undefined;

static close() {
MessageQueue.queue?.close();
}
return queue;
}

/** Returns a queue object that allows you to emit or subscribe to messages */
const getMessageQueue = () => {
return (
MessageQueue.queue || new MessageQueue(getConnection(MESSAGE_QUEUE_NAME))
);
};

export default getMessageQueue;
31 changes: 23 additions & 8 deletions packages/queue/src/queues/getQueue.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,30 @@
import { Queue } from 'bullmq';
import { Queue, QueueOptions } from 'bullmq';
import { getConnection } from '../connection';

const queues: Record<string, Queue> = {};
export class NftxQueue extends Queue {
constructor(name: string, opts: QueueOptions) {
super(name, opts);
NftxQueue.queues[name] = this;
}
close() {
delete NftxQueue.queues[this.name];
return super.close();
}

const getQueue = (queueName: string) => {
let queue = queues[queueName];
if (!queue) {
const connection = getConnection();
queue = queues[queueName] = new Queue(queueName, { connection });
static queues: Record<string, Queue> = {};

static closeAll() {
Object.values(NftxQueue.queues).forEach((queue) => {
queue.close();
});
}
return queue;
}

const getQueue = (queueName: string) => {
return (
NftxQueue.queues[queueName] ||
new NftxQueue(queueName, { connection: getConnection(queueName) })
);
};

export default getQueue;

0 comments on commit e8b12af

Please sign in to comment.