Skip to content

Commit

Permalink
feat: TECH skip message processing when the message parsing fails
Browse files Browse the repository at this point in the history
  • Loading branch information
kirpichenko committed Dec 20, 2024
1 parent fcb946b commit d55cb3e
Show file tree
Hide file tree
Showing 2 changed files with 234 additions and 119 deletions.
50 changes: 36 additions & 14 deletions packages/pubsub/src/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export interface ISubscriptionOptions {
name: string
id: number
}
labels?: ({ [k: string]: string } | null)
labels?: { [k: string]: string } | null
filter?: string
}

Expand All @@ -61,7 +61,7 @@ interface ISubscriptionDeadLetterPolicy {
interface ISubscriptionInitializationOptions {
deadLetterPolicy: ISubscriptionDeadLetterPolicy | null
retryPolicy: ISubscriptionRetryPolicy
labels?: ({ [k: string]: string } | null);
labels?: { [k: string]: string } | null
filter?: string
}

Expand Down Expand Up @@ -152,7 +152,9 @@ export class Subscriber<T = unknown> {
private async parseData(message: Message): Promise<T> {
let schemaId = message.attributes['googclient_schemarevisionid']
if (!schemaId) {
this.logger?.error(`PubSub: Subscription: ${this.subscriptionName} Message does not have schema revision id`, { message })
this.logger?.error(`PubSub: Subscription: ${this.subscriptionName} Message does not have schema revision id`, {
message,
})
schemaId = await this.schemaCache.getLatestSchemaRevisionId()
}

Expand All @@ -178,7 +180,17 @@ export class Subscriber<T = unknown> {
asyncCallback: (msg: IParsedMessage<T>, info: IMessageInfo) => Promise<void>,
): (message: Message) => void {
const asyncMessageProcessor = async (message: Message) => {
const dataParsed = await this.parseData(message)
const dataParsed = await this.parseData(message).catch(e => {
this.logger?.error(`PubSub: Subscription: ${this.subscriptionName} Failed to parse message:`, e)
return undefined
})

// If message parsing failed, nack the message and skip processing
if (!dataParsed) {
message.nack()
return
}

const messageParsed = Object.assign(message, { dataParsed })
const info: IMessageInfo = {
id: message.id,
Expand Down Expand Up @@ -229,8 +241,10 @@ export class Subscriber<T = unknown> {
} else if (options) {
const [existingSubscription] = await subscription.getMetadata()
if ((options.filter || existingSubscription.filter) && options.filter != existingSubscription.filter) {
throw new Error(`PubSub: Subscriptions filters are immutable, they can't be changed, subscription: ${subscriptionName},` +
` currentFilter: ${existingSubscription.filter as string}, newFilter: ${options.filter as string}`)
throw new Error(
`PubSub: Subscriptions filters are immutable, they can't be changed, subscription: ${subscriptionName},` +
` currentFilter: ${existingSubscription.filter as string}, newFilter: ${options.filter as string}`,
)
}
if (this.isMetadataChanged(existingSubscription, options)) {
await subscription.setMetadata(options)
Expand Down Expand Up @@ -335,20 +349,28 @@ export class Subscriber<T = unknown> {
}

private isMetadataChanged(existingSubscription: ISubscription, options: ISubscriptionInitializationOptions): boolean {
if (options.retryPolicy.minimumBackoff?.seconds &&
String(options.retryPolicy.minimumBackoff.seconds) !== existingSubscription.retryPolicy?.minimumBackoff?.seconds) {
if (
options.retryPolicy.minimumBackoff?.seconds &&
String(options.retryPolicy.minimumBackoff.seconds) !== existingSubscription.retryPolicy?.minimumBackoff?.seconds
) {
return true
}
if (options.retryPolicy.maximumBackoff?.seconds &&
String(options.retryPolicy.maximumBackoff.seconds) !== existingSubscription.retryPolicy?.maximumBackoff?.seconds) {
if (
options.retryPolicy.maximumBackoff?.seconds &&
String(options.retryPolicy.maximumBackoff.seconds) !== existingSubscription.retryPolicy?.maximumBackoff?.seconds
) {
return true
}
if (!!options.deadLetterPolicy?.maxDeliveryAttempts &&
options.deadLetterPolicy.maxDeliveryAttempts !== existingSubscription.deadLetterPolicy?.maxDeliveryAttempts) {
if (
!!options.deadLetterPolicy?.maxDeliveryAttempts &&
options.deadLetterPolicy.maxDeliveryAttempts !== existingSubscription.deadLetterPolicy?.maxDeliveryAttempts
) {
return true
}
if (!!options.labels && JSON.stringify(existingSubscription.labels) !== JSON.stringify(options.labels)
|| options.labels == null && !!existingSubscription.labels && Object.keys(existingSubscription.labels).length !== 0) {
if (
(!!options.labels && JSON.stringify(existingSubscription.labels) !== JSON.stringify(options.labels)) ||
(options.labels == null && !!existingSubscription.labels && Object.keys(existingSubscription.labels).length !== 0)
) {
return true
}

Expand Down
Loading

0 comments on commit d55cb3e

Please sign in to comment.