Skip to content

Commit

Permalink
feat(queue): create job concept
Browse files Browse the repository at this point in the history
  • Loading branch information
jlenon7 committed May 11, 2024
1 parent 426822f commit 842bf92
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 56 deletions.
6 changes: 6 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,12 @@
"#src/validators/login.validator",
"#src/validators/register.validator",
"#src/validators/update.validator"
],
"jobs": [
"#src/jobs/userconfirm.job",
"#src/jobs/useremail.job",
"#src/jobs/useremailpassword.job",
"#src/jobs/userpassword.job"
]
}
}
20 changes: 20 additions & 0 deletions src/jobs/userconfirm.job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Mail } from '@athenna/mail'
import type { User } from '#src/models/user'

type Item = {
user: User
}

export class UserConfirmJob {
public static queue() {
return 'user:confirm'
}

public async handle({ user }: Item) {
await Mail.from('[email protected]')
.to(user.email)
.subject('Athenna Account Confirmation')
.view('mail/confirm', { user })
.send()
}
}
22 changes: 22 additions & 0 deletions src/jobs/useremail.job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { Mail } from '@athenna/mail'
import type { User } from '#src/models/user'

type Item = {
user: User
email: string
token: string
}

export class UserEmailJob {
public static queue() {
return 'user:email'
}

public async handle({ user, email, token }: Item) {
await Mail.from('[email protected]')
.to(user.email)
.subject('Athenna Email Change')
.view('mail/change-email', { user, email, token })
.send()
}
}
23 changes: 23 additions & 0 deletions src/jobs/useremailpassword.job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { Mail } from '@athenna/mail'
import type { User } from '#src/models/user'

type Item = {
user: User
email: string
password: string
token: string
}

export class UserEmailPasswordJob {
public static queue() {
return 'user:email:password'
}

public async handle({ user, email, password, token }: Item) {
await Mail.from('[email protected]')
.to(user.email)
.subject('Athenna Email & Password Change')
.view('mail/change-email-password', { user, email, password, token })
.send()
}
}
22 changes: 22 additions & 0 deletions src/jobs/userpassword.job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { Mail } from '@athenna/mail'
import type { User } from '#src/models/user'

type Item = {
user: User
password: string
token: string
}

export class UserPasswordJob {
public static queue() {
return 'user:password'
}

public async handle({ user, password, token }: Item) {
await Mail.from('[email protected]')
.to(user.email)
.subject('Athenna Password Change')
.view('mail/change-password', { user, password, token })
.send()
}
}
82 changes: 26 additions & 56 deletions src/providers/queueworker.provider.ts
Original file line number Diff line number Diff line change
@@ -1,76 +1,46 @@
import { Mail } from '@athenna/mail'
import { Log } from '@athenna/logger'
import { ServiceProvider } from '@athenna/ioc'
import { Exec, Module } from '@athenna/common'
import { Queue } from '#src/providers/facades/queue'

export default class QueueWorkerProvider extends ServiceProvider {
public intervals = []

public async boot() {
this.processByQueue('user:confirm', async user => {
return Mail.from('[email protected]')
.to(user.email)
.subject('Athenna Account Confirmation')
.view('mail/confirm', { user })
.send()
})

this.processByQueue('user:email', async ({ user, token, email }) => {
return Mail.from('[email protected]')
.to(user.email)
.subject('Athenna Email Change')
.view('mail/change-email', { user, email, token })
.send()
})
const jobs = Config.get<string[]>('rc.jobs', [])

this.processByQueue('user:password', async ({ user, token, password }) => {
return Mail.from('[email protected]')
.to(user.email)
.subject('Athenna Password Change')
.view('mail/change-password', { user, password, token })
.send()
})
await Exec.concurrently(jobs, async jobPath => {
const Job = await Module.resolve(jobPath, import.meta.url)
const alias = `App/Jobs/${Job.name}`

this.processByQueue(
'user:email:password',
async ({ user, token, email, password }) => {
return Mail.from('[email protected]')
.to(user.email)
.subject('Athenna Email & Password Change')
.view('mail/change-email-password', { user, email, password, token })
.send()
}
)
}
const queueName = Job.queue()
const job = this.container.transient(Job, alias).use(alias)

public async shutdown() {
this.intervals.forEach(interval => clearInterval(interval))
}
const interval = setInterval(async () => {
const queue = Queue.queue(queueName)

public processByQueue(
queueName: string,
processor: (data: any) => any | Promise<any>
) {
const interval = setInterval(async () => {
const queue = Queue.queue(queueName)
if (queue.isEmpty()) {
return
}

if (queue.isEmpty()) {
return
}
Log.info(`Processing jobs of ({yellow} "${queueName}") queue`)

Log.info(`Processing jobs of ({yellow} "${queueName}") queue`)
await queue.process(job.handle.bind(job))

await queue.process(processor)
const jobsLength = queue.length()

const jobsLength = queue.length()
if (jobsLength) {
Log.info(
`Still has ({yellow} ${jobsLength}) jobs to process on ({yellow} "${queueName}")`
)
}
}, 5000)

if (jobsLength) {
Log.info(
`Still has ({yellow} ${jobsLength}) jobs to process on ({yellow} "${queueName}")`
)
}
}, 5000)
this.intervals.push(interval)
})
}

this.intervals.push(interval)
public async shutdown() {
this.intervals.forEach(interval => clearInterval(interval))
}
}

0 comments on commit 842bf92

Please sign in to comment.