From 82b2e8dad77efb9d2bfd3e09d02814963776196b Mon Sep 17 00:00:00 2001 From: christianmat Date: Tue, 3 Dec 2024 15:20:09 -0800 Subject: [PATCH] fix: kafka commit offset and flatten timestamps --- apps/trench/src/common/utils.ts | 2 ++ .../src/services/data/kafka/kafka.service.ts | 36 +++++++++---------- apps/trench/src/webhooks/webhooks.service.ts | 14 ++++---- 3 files changed, 26 insertions(+), 26 deletions(-) diff --git a/apps/trench/src/common/utils.ts b/apps/trench/src/common/utils.ts index 549b91e..b94dfe2 100644 --- a/apps/trench/src/common/utils.ts +++ b/apps/trench/src/common/utils.ts @@ -4,6 +4,8 @@ export function flatten(data: any): Record { function recurse(cur: any, prop: string) { if (Object(cur) !== cur) { result[prop] = cur + } else if (cur instanceof Date) { + result[prop] = cur.toISOString() } else if (Array.isArray(cur)) { for (let i = 0; i < cur.length; i++) { recurse(cur[i], prop + '_' + i) diff --git a/apps/trench/src/services/data/kafka/kafka.service.ts b/apps/trench/src/services/data/kafka/kafka.service.ts index 5ba7709..ab0e0cf 100644 --- a/apps/trench/src/services/data/kafka/kafka.service.ts +++ b/apps/trench/src/services/data/kafka/kafka.service.ts @@ -78,26 +78,22 @@ export class KafkaService { await consumer.connect() await consumer.subscribe({ topic, fromBeginning: false }) - if (enableBatching) { - await consumer.run({ - eachBatch: async ({ batch }) => { - await eachBatch(batch.messages.map((message) => JSON.parse(message.value.toString()))) - await consumer.commitOffsets( - batch.messages.map((message) => ({ - topic: batch.topic, - partition: batch.partition, - offset: message.offset, - })) - ) - }, - }) - } else { - await consumer.run({ - eachMessage: async ({ topic, partition, message }) => { - await eachBatch([JSON.parse(message.value.toString())]) - await consumer.commitOffsets([{ topic, partition, offset: message.offset }]) - }, - }) + try { + if (enableBatching) { + await consumer.run({ + eachBatch: async ({ batch }) => { + await eachBatch(batch.messages.map((message) => JSON.parse(message.value.toString()))) + }, + }) + } else { + await consumer.run({ + eachMessage: async ({ topic, partition, message }) => { + await eachBatch([JSON.parse(message.value.toString())]) + }, + }) + } + } catch (e) { + console.log(`Error initiating consumer for groupId ${groupId}.`, e) } } diff --git a/apps/trench/src/webhooks/webhooks.service.ts b/apps/trench/src/webhooks/webhooks.service.ts index d3d324a..22094c1 100644 --- a/apps/trench/src/webhooks/webhooks.service.ts +++ b/apps/trench/src/webhooks/webhooks.service.ts @@ -27,13 +27,15 @@ export class WebhooksService implements OnModuleInit { const webhooks = await this.webhooksDao.getWebhooks(workspace) for (const webhook of webhooks) { console.log('Initiating consumer for webhook:', webhook.uuid, webhook.url) - // if (process.env.NODE_ENV === 'production') { - await this.initiateConsumer(webhook, workspace) - // } + this.initiateConsumer(webhook, workspace) + .then(() => { + console.log(`Consumer for webhook ${webhook.uuid} has been initiated.`) + }) + .catch((e) => { + console.error(`Error initiating consumer for webhook ${webhook.uuid}.`, e) + }) } } - // This call takes a while, so we don't block in development! - console.log('Kafka consumer(s) successfully started!') } private getGroupId(webhookUUID: string) { @@ -59,7 +61,7 @@ export class WebhooksService implements OnModuleInit { return } - const maxRetries = 4 + const maxRetries = 8 const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)) const numberOfEventsToFind = payloads.length let retries = 0