Skip to content

Commit

Permalink
fix: subscriber update
Browse files Browse the repository at this point in the history
  • Loading branch information
eugene-taran committed May 5, 2023
1 parent 4e2e04f commit cd58654
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 5 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ dist/**
.idea
packages/*/lib
sandbox
packages/*/dist
2 changes: 1 addition & 1 deletion packages/pubsub/src/DataParser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ export class DataParser {
}
return value
}
return JSON.parse(data.toString(), dateTimeReviver)
return JSON.parse(data, dateTimeReviver)
}
}
23 changes: 19 additions & 4 deletions packages/pubsub/src/Subscriber.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { IAM, Message, Subscription, Topic, PubSub, SubscriptionOptions } from '@google-cloud/pubsub'
import { IAM, Message, PubSub, Subscription, SubscriptionOptions, Topic } from '@google-cloud/pubsub'
import { Type } from 'avsc'
import { createCallOptions } from './createCallOptions'
import { DataParser } from './DataParser'
Expand Down Expand Up @@ -47,7 +47,7 @@ interface ISubscriptionInitializationOptions {
retryPolicy: ISubscriptionRetryPolicy
}

export class Subscriber<T = unknown> extends TopicHandler{
export class Subscriber<T = unknown> extends TopicHandler {
readonly topicName: string
readonly subscriptionName: string

Expand Down Expand Up @@ -130,11 +130,26 @@ export class Subscriber<T = unknown> extends TopicHandler{
private parseData(message: Message): T {
let data: string
if (this.avroType) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
data = this.avroType.fromBuffer(message.data)
try {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
data = this.avroType.fromBuffer(message.data)
} catch (e) {
if (this.avroType) {
throw e
}
//reload the topic to try reprocessing with schema if it was updated
this.topic = this.client.topic(this.topicName)
this.getTopicType()
.then(type => {this.avroType = type})
.catch(error => {this.logger?.error('Can not load topic schema type', error)})
throw e
}
} else {
data = message.data.toString()
}
if (!data) {
throw new Error()
}
const dataParser = new DataParser()
const dataParsed = dataParser.parse(data) as T
this.logMessage(message, dataParsed)
Expand Down

0 comments on commit cd58654

Please sign in to comment.