Skip to content

Commit

Permalink
Merge pull request #79 from join-com/alpha
Browse files Browse the repository at this point in the history
fix: JOIN-36367 // add SchemaCache as separate class to pubsub lib to use in CFs
  • Loading branch information
eugene-taran authored Oct 2, 2023
2 parents efe8aeb + 03985d0 commit c4687cc
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 25 deletions.
35 changes: 35 additions & 0 deletions packages/pubsub/src/SchemaCache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { SchemaServiceClient } from '@google-cloud/pubsub/build/src/v1'
import { Schema, Type } from 'avsc'
import { DateType } from './logical-types/DateType'

export class SchemaCache {

private readonly topicTypeRevisionsCache: Record<string, Type> = {}

constructor(
private readonly schemaServiceClient: SchemaServiceClient,
private readonly topicSchemaName: string
) { }

public async getTypeFromCacheOrRemote(schemaRevisionId: string): Promise<Type> {
const typeFromCache = this.topicTypeRevisionsCache[schemaRevisionId]
if (typeFromCache) {
return typeFromCache
}
const projectName = process.env['GCLOUD_PROJECT']
if (!projectName) {
throw new Error('Can\'t find GCLOUD_PROJECT env variable, please define it')
}
const revisionPath = `projects/${projectName}/schemas/${this.topicSchemaName}@${schemaRevisionId}`
const [remoteSchema] = await this.schemaServiceClient.getSchema({ name: revisionPath })

if (!remoteSchema.definition) {
throw new Error(`Can't process schema ${schemaRevisionId} without definition`)
}
const schema = JSON.parse(remoteSchema.definition) as Schema
const type = Type.forSchema(schema, { logicalTypes: { 'timestamp-micros': DateType } })
this.topicTypeRevisionsCache[schemaRevisionId] = type

return type
}
}
14 changes: 14 additions & 0 deletions packages/pubsub/src/SchemaCacheFactory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { SchemaServiceClient } from '@google-cloud/pubsub/build/src/v1'
import { SchemaCache } from './SchemaCache'

export class SchemaCacheFactory {
private readonly schemaServiceClient: SchemaServiceClient

constructor() {
this.schemaServiceClient = new SchemaServiceClient()
}

public getSchemaCache(topicSchemaName: string): SchemaCache {
return new SchemaCache(this.schemaServiceClient, topicSchemaName)
}
}
30 changes: 5 additions & 25 deletions packages/pubsub/src/Subscriber.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { IAM, Message, PubSub, Subscription, SubscriptionOptions, Topic } from '@google-cloud/pubsub'
import { SchemaServiceClient } from '@google-cloud/pubsub/build/src/v1'
import { Schema, Type } from 'avsc'
import { Type } from 'avsc'
import { createCallOptions } from './createCallOptions'
import { DataParser } from './DataParser'
import { ILogger } from './ILogger'
import { DateType } from './logical-types/DateType'
import { SchemaCache } from './SchemaCache'
import { replaceNullsWithUndefined } from './util'

export interface IParsedMessage<T = unknown> {
Expand Down Expand Up @@ -68,7 +68,7 @@ export class Subscriber<T = unknown> {
private readonly deadLetterTopic?: Topic
private readonly deadLetterSubscription?: Subscription

private readonly topicTypeRevisionsCache: Record<string, Type> = {}
private readonly schemaCache: SchemaCache

constructor(
private readonly subscriberOptions: ISubscriberOptions,
Expand All @@ -84,6 +84,7 @@ export class Subscriber<T = unknown> {

this.topic = pubSubClient.topic(topicName)
this.subscription = this.topic.subscription(subscriptionName, this.getStartupOptions(subscriptionOptions))
this.schemaCache = new SchemaCache(this.schemaServiceClient, this.topicSchemaName)

if (this.isDeadLetterPolicyEnabled()) {
this.deadLetterTopicName = `${subscriptionName}-unack`
Expand Down Expand Up @@ -152,31 +153,10 @@ export class Subscriber<T = unknown> {
}

private async parseAvroMessage(message: Message, schemaRevisionId: string): Promise<T> {
const type: Type = await this.getTypeFromCacheOrRemote(schemaRevisionId)
const type: Type = await this.schemaCache.getTypeFromCacheOrRemote(schemaRevisionId)
return type.fromString(message.data.toString()) as T
}

private async getTypeFromCacheOrRemote(schemaRevisionId: string): Promise<Type> {
const typeFromCache = this.topicTypeRevisionsCache[schemaRevisionId]
if (typeFromCache) {
return typeFromCache
}
const projectName = process.env['GCLOUD_PROJECT']
if (!projectName) {
throw new Error('Can\'t find GCLOUD_PROJECT env variable, please define it')
}
const revisionPath = `projects/${projectName}/schemas/${this.topicSchemaName}@${schemaRevisionId}`
const [remoteSchema] = await this.schemaServiceClient.getSchema({ name: revisionPath })

if (!remoteSchema.definition) {
throw new Error(`Can't process schema ${schemaRevisionId} without definition`)
}
const schema = JSON.parse(remoteSchema.definition) as Schema
const type = Type.forSchema(schema, { logicalTypes: { 'timestamp-micros': DateType } })
this.topicTypeRevisionsCache[schemaRevisionId] = type

return type
}

private processMsg(asyncCallback: (msg: IParsedMessage<T> , info: IMessageInfo) => Promise<void>): (message: Message) => void {
const asyncMessageProcessor = async (message: Message) => {
Expand Down

0 comments on commit c4687cc

Please sign in to comment.