Skip to content

Commit

Permalink
fix: subscriber update
Browse files Browse the repository at this point in the history
  • Loading branch information
eugene-taran committed May 5, 2023
1 parent cd58654 commit dc1589e
Showing 1 changed file with 20 additions and 18 deletions.
38 changes: 20 additions & 18 deletions packages/pubsub/src/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,26 +130,11 @@ export class Subscriber<T = unknown> extends TopicHandler {
private parseData(message: Message): T {
let data: string
if (this.avroType) {
try {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
data = this.avroType.fromBuffer(message.data)
} catch (e) {
if (this.avroType) {
throw e
}
//reload the topic to try reprocessing with schema if it was updated
this.topic = this.client.topic(this.topicName)
this.getTopicType()
.then(type => {this.avroType = type})
.catch(error => {this.logger?.error('Can not load topic schema type', error)})
throw e
}
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
data = this.avroType.fromBuffer(message.data)
} else {
data = message.data.toString()
}
if (!data) {
throw new Error()
}
const dataParser = new DataParser()
const dataParsed = dataParser.parse(data) as T
this.logMessage(message, dataParsed)
Expand All @@ -158,7 +143,24 @@ export class Subscriber<T = unknown> extends TopicHandler {

private processMsg(asyncCallback: (msg: IParsedMessage<T>) => Promise<void>) {
return (message: Message) => {
const dataParsed = this.parseData(message)
let dataParsed
try {
dataParsed = this.parseData(message)
} catch (e) {
this.logger?.error(`Couldn't parse message: ${JSON.stringify(message)}`)
//reload the topic to try reprocessing with schema if it was updated
if (!this.avroType) {
this.topic = this.client.topic(this.topicName)
this.getTopicType()
.then(type => {
this.avroType = type
})
.catch(error => {
this.logger?.error('Can not load topic schema type', error)
})
}
throw e
}
const messageParsed = Object.assign(message, { dataParsed })
asyncCallback(messageParsed).catch(e => {
message.nack()
Expand Down

0 comments on commit dc1589e

Please sign in to comment.