From 426822f3932046f386c5793f58a62da7b1fbf900 Mon Sep 17 00:00:00 2001 From: jlenon7 Date: Sat, 11 May 2024 20:36:21 +0100 Subject: [PATCH] feat(queue): implement database driver --- src/helpers/queue.ts | 195 ++++++++++++++++++-------- src/providers/queueworker.provider.ts | 6 +- src/services/auth.service.ts | 2 +- src/services/user.service.ts | 21 +-- storage/queues.json | 4 - tests/e2e/auth.controller.test.ts | 2 +- tests/e2e/user.controller.test.ts | 6 +- tests/unit/auth.service.test.ts | 2 +- 8 files changed, 161 insertions(+), 77 deletions(-) delete mode 100644 storage/queues.json diff --git a/src/helpers/queue.ts b/src/helpers/queue.ts index fd39d46..97d5134 100644 --- a/src/helpers/queue.ts +++ b/src/helpers/queue.ts @@ -1,112 +1,197 @@ import { Log } from '@athenna/logger' import { Service } from '@athenna/ioc' -import { File, Path } from '@athenna/common' +import { Database, type DatabaseImpl } from '@athenna/database' -class VanillaQueue { +class VanillaQueue { private queueName = 'default' + private queues: Record = { + default: [], + deadletter: [] + } + + public async truncate() { + Object.keys(this.queues).forEach(key => (this.queues[key] = [])) + } - private getFile() { - const path = Path.storage('queues.json') + public queue(name: string) { + this.queueName = name + + if (!this.queues[name]) { + this.queues[name] = [] + } - return new File(path, JSON.stringify({ default: [], deadletter: [] })) + return this } - public async truncate() { - const path = Path.storage('queues.json') + public async add(item: unknown) { + this.queues[this.queueName].push(item) - return new File(path, '').setContent( - JSON.stringify({ default: [], deadletter: [] }, null, 2) - ) + return this } - public async queue(name: string) { - const file = this.getFile() - const queues = file.getContentAsJsonSync() + public async pop() { + if (!this.queues[this.queueName].length) { + return null + } - this.queueName = name + return this.queues[this.queueName].shift() + } - if (!queues[name]) { - queues[name] = [] + public async peek() { + if (!this.queues[this.queueName].length) { + return null } - file.setContentSync(JSON.stringify(queues)) + return this.queues[this.queueName][0] + } - return this + public async length() { + return this.queues[this.queueName].length } - public async add(item: T) { - const file = this.getFile() - const queues = file.getContentAsJsonSync() + public async isEmpty() { + return !this.queues[this.queueName].length + } - queues[this.queueName].push(item) + public async process(processor: (item: unknown) => any | Promise) { + const data = await this.pop() - file.setContentSync(JSON.stringify(queues)) + try { + await processor(data) + } catch (err) { + Log.error( + `Adding data of ({yellow} "${this.queueName}") to deadletter queue due to:`, + err + ) + + this.queues.deadletter.push({ queue: this.queueName, data }) + } + } +} + +class DatabaseQueue { + private DB: DatabaseImpl + private dbConnection: string + + private table: string + private queueName: string + private connection: string + private deadLetterQueueName: string + + public constructor(connection: string) { + const { + table, + queue, + deadletter, + connection: dbConnection + } = Config.get(`database.connections.${connection}`) + + this.table = table + this.queueName = queue + this.connection = connection + this.dbConnection = dbConnection + this.deadLetterQueueName = deadletter + + this.DB = Database.connection(this.dbConnection) + } + + public async truncate() { + await this.DB.truncate(this.table) + } + + public queue(name: string) { + this.queueName = name + + return this + } + + public async add(item: unknown) { + await this.DB.table(this.table).create({ + queue: this.queueName, + item + }) return this } public async pop() { - const file = this.getFile() - const queues = file.getContentAsJsonSync() + const data = await this.DB.table(this.table) + .where('queue', this.queueName) + .orderBy('id', 'DESC') + .find() - if (!queues[this.queueName].length) { - return null + if (!data) { + return } - const item = queues[this.queueName].shift() + await this.DB.table(this.table) + .where('id', data.id) + .where('queue', this.queueName) + .delete() - file.setContentSync(JSON.stringify(queues)) - - return item + return data.item } public async peek() { - const file = this.getFile() - const queues = file.getContentAsJsonSync() + const data = await this.DB.table(this.table) + .where('queue', this.queueName) + .orderBy('id', 'DESC') + .find() - if (!queues[this.queueName].length) { + if (!data) { return null } - return queues[this.queueName][0] + return data.item } - public async length() { - const file = this.getFile() - const queues = file.getContentAsJsonSync() - - return queues[this.queueName].length + public length() { + return this.DB.table(this.table).where('queue', this.queueName).count() } - public async process(processor: (item: T) => any | Promise) { + public async process(processor: (item: unknown) => any | Promise) { const data = await this.pop() try { await processor(data) } catch (err) { - console.log(err) Log.error( `Adding data of ({yellow} "${this.queueName}") to deadletter queue due to:`, err ) - const queue = await new QueueImpl().queue('deadletter') - - await queue.add({ queue: this.queueName, data }) + await this.DB.table(this.table).create({ + queue: this.deadLetterQueueName, + formerQueue: this.queueName, + item: data + }) } } public async isEmpty() { - const file = this.getFile() - const queues = file.getContentAsJsonSync() + const count = await this.DB.table(this.table) + .where('queue', this.queueName) + .count() - return !queues[this.queueName].length + return count === '0' } } -@Service({ alias: 'App/Helpers/Queue' }) -export class QueueImpl { - public driver = new VanillaQueue() +@Service({ alias: 'App/Helpers/Queue', type: 'singleton' }) +export class QueueImpl { + public driver: any = new VanillaQueue() + + public connection(name: string) { + if (name === 'vanilla') { + this.driver = new VanillaQueue() + } + + if (name === 'database') { + this.driver = new DatabaseQueue('queue') + } + + return this + } public async truncate() { await this.driver.truncate() @@ -114,13 +199,13 @@ export class QueueImpl { return this } - public async queue(name: string) { - await this.driver.queue(name) + public queue(name: string) { + this.driver.queue(name) return this } - public async add(item: T) { + public async add(item: unknown) { await this.driver.add(item) } @@ -136,7 +221,7 @@ export class QueueImpl { return this.driver.length() } - public async process(cb: (item: T) => any | Promise) { + public async process(cb: (item: unknown) => any | Promise) { return this.driver.process(cb) } diff --git a/src/providers/queueworker.provider.ts b/src/providers/queueworker.provider.ts index 378fef0..8e225fa 100644 --- a/src/providers/queueworker.provider.ts +++ b/src/providers/queueworker.provider.ts @@ -52,9 +52,9 @@ export default class QueueWorkerProvider extends ServiceProvider { processor: (data: any) => any | Promise ) { const interval = setInterval(async () => { - const queue = await Queue.queue(queueName) + const queue = Queue.queue(queueName) - if (await queue.isEmpty()) { + if (queue.isEmpty()) { return } @@ -62,7 +62,7 @@ export default class QueueWorkerProvider extends ServiceProvider { await queue.process(processor) - const jobsLength = await queue.length() + const jobsLength = queue.length() if (jobsLength) { Log.info( diff --git a/src/services/auth.service.ts b/src/services/auth.service.ts index 3e96961..d801f4b 100644 --- a/src/services/auth.service.ts +++ b/src/services/auth.service.ts @@ -47,7 +47,7 @@ export class AuthService { const user = await this.userService.create(data) - await Queue.queue('user:confirm').then(q => q.add(user)) + await Queue.queue('user:confirm').add(user) return user } diff --git a/src/services/user.service.ts b/src/services/user.service.ts index 148c852..66d257d 100644 --- a/src/services/user.service.ts +++ b/src/services/user.service.ts @@ -62,23 +62,26 @@ export class UserService { switch (`${isEmailEqual}:${isPasswordEqual}`) { case 'false:true': - await Queue.queue('user:email').then(q => - q.add({ user, token, email: data.email }) - ) + await Queue.queue('user:email').add({ user, token, email: data.email }) break case 'true:false': data.password = await bcrypt.hash(data.password, 10) - await Queue.queue('user:password').then(q => - q.add({ user, token, password: data.password }) - ) + await Queue.queue('user:password').add({ + user, + token, + password: data.password + }) break case 'false:false': data.password = await bcrypt.hash(data.password, 10) - await Queue.queue('user:email:password').then(q => - q.add({ user, token, email: data.email, password: data.password }) - ) + await Queue.queue('user:email:password').add({ + user, + token, + email: data.email, + password: data.password + }) } data = Json.omit(data, ['email', 'password']) diff --git a/storage/queues.json b/storage/queues.json deleted file mode 100644 index 6701659..0000000 --- a/storage/queues.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "default":[], - "deadletter":[] -} diff --git a/tests/e2e/auth.controller.test.ts b/tests/e2e/auth.controller.test.ts index 2f6e44f..9f039fe 100644 --- a/tests/e2e/auth.controller.test.ts +++ b/tests/e2e/auth.controller.test.ts @@ -127,7 +127,7 @@ export default class AuthControllerTest extends BaseE2ETest { } }) - const queue = await Queue.queue('user:confirm') + const queue = Queue.queue('user:confirm') assert.deepEqual(await queue.length(), 1) assert.isTrue(await User.exists({ email: 'test@athenna.io' })) diff --git a/tests/e2e/user.controller.test.ts b/tests/e2e/user.controller.test.ts index acadbba..fbfabdc 100644 --- a/tests/e2e/user.controller.test.ts +++ b/tests/e2e/user.controller.test.ts @@ -155,7 +155,7 @@ export default class UserControllerTest extends BaseE2ETest { await user.refresh() - const queue = await Queue.queue('user:email') + const queue = Queue.queue('user:email') assert.deepEqual(await queue.length(), 1) assert.deepEqual(user.name, 'Customer Updated') @@ -177,7 +177,7 @@ export default class UserControllerTest extends BaseE2ETest { await user.refresh() - const queue = await Queue.queue('user:password') + const queue = Queue.queue('user:password') assert.deepEqual(await queue.length(), 1) assert.deepEqual(user.name, 'Customer Updated') @@ -204,7 +204,7 @@ export default class UserControllerTest extends BaseE2ETest { await user.refresh() - const queue = await Queue.queue('user:email:password') + const queue = Queue.queue('user:email:password') assert.deepEqual(await queue.length(), 1) assert.deepEqual(user.name, 'Customer Updated') diff --git a/tests/unit/auth.service.test.ts b/tests/unit/auth.service.test.ts index 3247060..c6f2581 100644 --- a/tests/unit/auth.service.test.ts +++ b/tests/unit/auth.service.test.ts @@ -86,7 +86,7 @@ export default class AuthServiceTest { } Mail.when('send').resolve(undefined) - Queue.when('queue').resolve({ add: () => {} }) + Queue.when('queue').return({ add: () => {} }) Mock.when(this.userService, 'create').resolve(userToRegister) const authService = new AuthService(this.userService)