Skip to content

Commit

Permalink
fix: kafka commit offset and flatten timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
christianmat committed Dec 3, 2024
1 parent 8ebe66f commit 82b2e8d
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 26 deletions.
2 changes: 2 additions & 0 deletions apps/trench/src/common/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ export function flatten(data: any): Record<string, any> {
function recurse(cur: any, prop: string) {
if (Object(cur) !== cur) {
result[prop] = cur
} else if (cur instanceof Date) {
result[prop] = cur.toISOString()
} else if (Array.isArray(cur)) {
for (let i = 0; i < cur.length; i++) {
recurse(cur[i], prop + '_' + i)
Expand Down
36 changes: 16 additions & 20 deletions apps/trench/src/services/data/kafka/kafka.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,26 +78,22 @@ export class KafkaService {
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning: false })

if (enableBatching) {
await consumer.run({
eachBatch: async ({ batch }) => {
await eachBatch(batch.messages.map((message) => JSON.parse(message.value.toString())))
await consumer.commitOffsets(
batch.messages.map((message) => ({
topic: batch.topic,
partition: batch.partition,
offset: message.offset,
}))
)
},
})
} else {
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
await eachBatch([JSON.parse(message.value.toString())])
await consumer.commitOffsets([{ topic, partition, offset: message.offset }])
},
})
try {
if (enableBatching) {
await consumer.run({
eachBatch: async ({ batch }) => {
await eachBatch(batch.messages.map((message) => JSON.parse(message.value.toString())))
},
})
} else {
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
await eachBatch([JSON.parse(message.value.toString())])
},
})
}
} catch (e) {
console.log(`Error initiating consumer for groupId ${groupId}.`, e)
}
}

Expand Down
14 changes: 8 additions & 6 deletions apps/trench/src/webhooks/webhooks.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ export class WebhooksService implements OnModuleInit {
const webhooks = await this.webhooksDao.getWebhooks(workspace)
for (const webhook of webhooks) {
console.log('Initiating consumer for webhook:', webhook.uuid, webhook.url)
// if (process.env.NODE_ENV === 'production') {
await this.initiateConsumer(webhook, workspace)
// }
this.initiateConsumer(webhook, workspace)
.then(() => {
console.log(`Consumer for webhook ${webhook.uuid} has been initiated.`)
})
.catch((e) => {
console.error(`Error initiating consumer for webhook ${webhook.uuid}.`, e)
})
}
}
// This call takes a while, so we don't block in development!
console.log('Kafka consumer(s) successfully started!')
}

private getGroupId(webhookUUID: string) {
Expand All @@ -59,7 +61,7 @@ export class WebhooksService implements OnModuleInit {
return
}

const maxRetries = 4
const maxRetries = 8
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms))
const numberOfEventsToFind = payloads.length
let retries = 0
Expand Down

0 comments on commit 82b2e8d

Please sign in to comment.