Skip to content

Commit

Permalink
fix: edge case where webhooks are not filtered correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
christianmat committed Dec 4, 2024
1 parent 9964a3f commit 39c39f5
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 27 deletions.
2 changes: 1 addition & 1 deletion apps/trench/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async function bootstrap(nodeNumber: number) {
await app.listen(port, '0.0.0.0')
}

if (process.env.NODE_ENV !== 'production') {
if (process.env.NODE_ENV !== 'production' && process.env.FORCE_CLUSTER_MODE !== 'true') {
console.log('Running in single instance dev mode')
bootstrap(1)
} else {
Expand Down
21 changes: 7 additions & 14 deletions apps/trench/src/services/data/kafka/kafka.service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Injectable } from '@nestjs/common'
import { Kafka, Producer } from 'kafkajs'
import { Consumer, Kafka, Producer } from 'kafkajs'
import { KafkaEventWithUUID } from './kafka.interface'
import { DEFAULT_KAFKA_CLIENT_ID, DEFAULT_KAFKA_PARTITIONS } from '../../../common/constants'

Expand Down Expand Up @@ -71,7 +71,7 @@ export class KafkaService {
async initiateConsumer(
topic: string,
groupId: string,
eachBatch: (payloads: any[]) => Promise<void>,
eachBatch: (payloads: any[], consumer: Consumer) => Promise<void>,
enableBatching: boolean = false
) {
const consumer = this.kafka.consumer({ groupId })
Expand All @@ -82,28 +82,21 @@ export class KafkaService {
if (enableBatching) {
await consumer.run({
eachBatch: async ({ batch }) => {
await eachBatch(batch.messages.map((message) => JSON.parse(message.value.toString())))
await eachBatch(
batch.messages.map((message) => JSON.parse(message.value.toString())),
consumer
)
},
})
} else {
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
await eachBatch([JSON.parse(message.value.toString())])
await eachBatch([JSON.parse(message.value.toString())], consumer)
},
})
}
} catch (e) {
console.log(`Error initiating consumer for groupId ${groupId}.`, e)
}
}

async removeConsumer(groupId: string) {
try {
const consumer = this.kafka.consumer({ groupId })
await consumer.disconnect()
console.log(`Consumer with groupId ${groupId} has been removed.`)
} catch (e) {
console.log(`Consumer with groupId ${groupId} not found.`, e)
}
}
}
19 changes: 18 additions & 1 deletion apps/trench/src/webhooks/webhooks.controller.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Controller, Get, Post, Delete, Body, Param, UseGuards, Request } from '@nestjs/common'
import { Controller, Get, Post, Delete, Put, Body, Param, UseGuards, Request } from '@nestjs/common'
import { WebhooksService } from './webhooks.service'
import { PaginatedWebhookResponse, Webhook, WebhookDTO } from './webhooks.interface'
import { PrivateApiGuard } from '../middlewares/private-api.guard'
Expand Down Expand Up @@ -44,6 +44,23 @@ export class WebhooksController {
return this.webhooksService.createWebhook(workspace, webhookDTO)
}

@Put(':uuid')
@ApiOperation({ summary: 'Update a webhook' })
@ApiResponse({
status: 200,
description:
'The webhook has been successfully updated. Requires private API key in Bearer token.',
type: Webhook,
})
async updateWebhook(
@Request() request: Request,
@Param('uuid') uuid: string,
@Body() webhookDTO: WebhookDTO
) {
const workspace = getWorkspace(request)
return this.webhooksService.updateWebhook(workspace, uuid, webhookDTO)
}

@Delete(':uuid')
@ApiOperation({ summary: 'Delete a webhook' })
@ApiResponse({
Expand Down
10 changes: 7 additions & 3 deletions apps/trench/src/webhooks/webhooks.dao.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,20 @@ export class WebhooksDao {
eventNames: row.event_names,
flatten: row.flatten,
}))
await this.cacheManager.set(cacheKey, resultData)
await this.cacheManager.set(cacheKey, resultData, 60000) // Cache for 1 minute
return resultData
}

async createWebhook(workspace: Workspace, webhookDTO: WebhookDTO): Promise<Webhook> {
async createWebhook(
workspace: Workspace,
webhookDTO: WebhookDTO,
existingUuid?: string
): Promise<Webhook> {
if (!webhookDTO.url) {
throw new BadRequestException('URL is required to create a webhook')
}

const uuid = uuidv4()
const uuid = existingUuid ?? uuidv4()
await this.clickhouse.insert(
'webhooks',
[
Expand Down
36 changes: 29 additions & 7 deletions apps/trench/src/webhooks/webhooks.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import { flatten } from '../common/utils'
import { Workspace } from '../workspaces/workspaces.interface'
import { WorkspacesService } from '../workspaces/workspaces.service'
import { getKafkaTopicFromWorkspace } from '../services/data/kafka/kafka.util'
import { shouldProcessEvent } from './webhooks.util'
import { Consumer } from 'kafkajs'
@Injectable()
export class WebhooksService implements OnModuleInit {
constructor(
Expand Down Expand Up @@ -46,18 +48,33 @@ export class WebhooksService implements OnModuleInit {
await this.kafkaService.initiateConsumer(
getKafkaTopicFromWorkspace(workspace),
this.getGroupId(webhook.uuid),
(payloads) => this.processMessages(payloads, webhook.uuid, workspace),
(payloads, consumer) => this.processMessages(payloads, webhook.uuid, workspace, consumer),
webhook.enableBatching
)
}

async processMessages(payloads: KafkaEvent[], webhookUUID: string, workspace: Workspace) {
async processMessages(
payloads: KafkaEvent[],
webhookUUID: string,
workspace: Workspace,
consumer: Consumer
) {
const webhooks = await this.webhooksDao.getWebhooks(workspace)
const thisWebhook = webhooks.find((webhook) => webhook.uuid === webhookUUID)

if (!thisWebhook) {
await this.kafkaService.removeConsumer(this.getGroupId(webhookUUID))
console.error(`Webhook not found. Removing consumer for ${webhookUUID}.`)
console.error(
`Webhook not found. Skipping processing for ${webhookUUID} and disconnecting consumer.`
)
await consumer.stop()
await consumer.disconnect()
return
}

payloads = payloads.filter((payload) => shouldProcessEvent(payload, thisWebhook))

if (payloads.length === 0) {
console.log(`No events to process for webhook ${webhookUUID}.`)
return
}

Expand Down Expand Up @@ -114,13 +131,18 @@ export class WebhooksService implements OnModuleInit {
return await this.webhooksDao.getWebhooks(workspace)
}

async createWebhook(workspace: Workspace, webhookDTO: WebhookDTO) {
const newWebhook = await this.webhooksDao.createWebhook(workspace, webhookDTO)
async createWebhook(workspace: Workspace, webhookDTO: WebhookDTO, uuid?: string) {
const newWebhook = await this.webhooksDao.createWebhook(workspace, webhookDTO, uuid)
await this.initiateConsumer(newWebhook, workspace)
return
return newWebhook
}

async deleteWebhook(workspace: Workspace, uuid: string) {
await this.webhooksDao.deleteWebhook(workspace, uuid)
}

async updateWebhook(workspace: Workspace, uuid: string, webhookDTO: WebhookDTO) {
await this.deleteWebhook(workspace, uuid)
return await this.createWebhook(workspace, webhookDTO, uuid)
}
}
16 changes: 16 additions & 0 deletions apps/trench/src/webhooks/webhooks.util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { KafkaEvent } from '../services/data/kafka/kafka.interface'
import { Webhook } from './webhooks.interface'

export function shouldProcessEvent(event: KafkaEvent, webhook: Webhook): boolean {
const typeMatches = webhook.eventTypes.includes('*') || webhook.eventTypes.includes(event.type)
if (!typeMatches) {
return false
}

const nameMatches = webhook.eventNames.includes('*') || webhook.eventNames.includes(event.event)
if (!nameMatches) {
return false
}

return true
}
Loading

0 comments on commit 39c39f5

Please sign in to comment.