Skip to content

Commit

Permalink
feat: JOIN-33235 add pubsub avro schema creation during the deployment
Browse files Browse the repository at this point in the history
  • Loading branch information
eugene-taran committed Jul 5, 2023
1 parent b4120f4 commit 1cd9454
Show file tree
Hide file tree
Showing 2 changed files with 229 additions and 0 deletions.
115 changes: 115 additions & 0 deletions packages/pubsub/src/SchemaDeployer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import { PubSub } from '@google-cloud/pubsub';
import { SchemaServiceClient } from '@google-cloud/pubsub/build/src/v1'
import { ILogger } from './ILogger'

interface ISchemasForDeployment {
forCreate: Map<string, string>,
forNewRevision: Map<string, string>
}
interface ISchemaWithEvent {
Event: string
}

export const SCHEMA_NAME_SUFFIX = '-generated-avro'
export type ReaderAvroSchema = {
reader: object
}
export class SchemaDeployer {

constructor(private readonly logger: ILogger,
private readonly pubSubClient: PubSub = new PubSub(),
private readonly schemaClient: SchemaServiceClient = new SchemaServiceClient()) {
}
public deployAvroSchemas = async (topicsSchemaConfig: Record<string, boolean>, topicReaderSchemas: Record<string, ReaderAvroSchema>): Promise<void> => {
const topicSchemasToDeploy = this.getEnabledTopicSchemas(topicsSchemaConfig, topicReaderSchemas);
if (topicSchemasToDeploy.size === 0) {
this.logger.info('Finished deployAvroSchemas, no schemas to deploy')
return
}
this.logger.info(`Found ${topicSchemasToDeploy.size} schemas enabled for deployment`)

const { forCreate, forNewRevision } = await this.aggregateTopicSchemas(topicSchemasToDeploy);
if (forCreate.size === 0 && forNewRevision.size === 0) {
this.logger.info('Finished deployAvroSchemas, all schemas are already deployed')
return
}
this.logger.info(`Found ${forCreate.size} not deployed schemas, and ${forNewRevision.size} new revisions, starting deployment`)

if (forCreate.size > 0) {
await this.createSchemas(forCreate)
}
if (forNewRevision.size > 0) {
await this.createRevisions(forNewRevision)
}

this.logger.info('Schemas deployment is finished')

}

private async createRevisions(forNewRevision: Map<string, string>) {
const projectName = process.env['GCLOUD_PROJECT']
if (!projectName) {
throw new Error('Can\'t find GCLOUD_PROJECT env variable, please define it')
}
for (const [topicSchema, definition] of forNewRevision) {
const schemaName = topicSchema + SCHEMA_NAME_SUFFIX
const schemaPath = `projects/${projectName}/schemas/${schemaName}`
await this.schemaClient.commitSchema({
name: schemaPath, schema: {
name: schemaPath, type: 'AVRO', definition,
},
})
this.logger.info(`Schema ${schemaName} is updated`)
}
}

private async createSchemas(forCreate: Map<string, string>) {
for (const [topicSchema, definition] of forCreate) {
const schemaName = topicSchema + SCHEMA_NAME_SUFFIX
await this.pubSubClient.createSchema(schemaName, 'AVRO', definition)
this.logger.info(`Schema ${schemaName} is created`)
}
}

private getEnabledTopicSchemas(schemasConfig: Record<string, boolean>, readerSchemas: Record<string, ReaderAvroSchema>)
: Map<string, string> {
const enableTopicSchemas = new Map<string, string>()
for (const topicName in schemasConfig) {
const readerSchema = readerSchemas[topicName]
if (schemasConfig[topicName] && readerSchema) {
enableTopicSchemas.set(topicName, JSON.stringify(readerSchema.reader))
}
}
return enableTopicSchemas;
}

private async aggregateTopicSchemas(topicSchemasToDeploy: Map<string, string>)
: Promise<ISchemasForDeployment> {
const forCreate = new Map<string, string>(topicSchemasToDeploy)
const forNewRevision = new Map<string, string>()
for await (const schema of this.pubSubClient.listSchemas('FULL')) {
if (schema.type != 'AVRO') {
continue
}
const definition = schema.definition
if (!definition) {
if (schema.name) {
this.logger.warn(`Schema without definition: ${schema.name}`)
} else {
this.logger.warn('Found schema without name and definition')
}
continue
}
const parsedDefinition = JSON.parse(definition) as ISchemaWithEvent
const eventName = parsedDefinition.Event
if (topicSchemasToDeploy.has(eventName)) {
forCreate.delete(eventName)
const newDefinition = topicSchemasToDeploy.get(eventName);
if (newDefinition && definition !== newDefinition) {
forNewRevision.set(eventName, newDefinition)
}
}
}
return { forCreate, forNewRevision }
}
}
114 changes: 114 additions & 0 deletions packages/pubsub/src/__tests__/SchemaDeployer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import { PubSub } from '@google-cloud/pubsub';
import { SchemaServiceClient } from '@google-cloud/pubsub/build/src/v1';
import { SCHEMA_NAME_SUFFIX, SchemaDeployer } from '../SchemaDeployer'

const processApplicationStateStringSchema = '{"type":"record","name":"ProcessApplicationState","fields":[{"name":"applicationId","type":["null","int"],"default":null}],"Event":"data-cmd-process-application-state","SchemaType":"READER","AvdlSchemaVersion":"4adb1df1c9243e24b937ddd165abf7572d7e2491","AvdlSchemaGitRemoteOriginUrl":"[email protected]:join-com/data.git","AvdlSchemaPathInGitRepo":"schemas/avro/commands/commands.avdl","GeneratorVersion":"387a0b3f2c890dc67f99085b7c94ff4bdc9cc967","GeneratorGitRemoteOriginUrl":"https://github.com/join-com/avro-join"}'
const processApplicationStateReaderSchema = {'reader':{'type':'record','name':'ProcessApplicationState','fields':[{'name':'applicationId','type':['null','int'],'default':null}],'Event':'data-cmd-process-application-state','SchemaType':'READER','AvdlSchemaVersion':'4adb1df1c9243e24b937ddd165abf7572d7e2491','AvdlSchemaGitRemoteOriginUrl':'[email protected]:join-com/data.git','AvdlSchemaPathInGitRepo':'schemas/avro/commands/commands.avdl','GeneratorVersion':'387a0b3f2c890dc67f99085b7c94ff4bdc9cc967','GeneratorGitRemoteOriginUrl':'https://github.com/join-com/avro-join'}}
const processApplicationStateReaderSchemaUpdated = {'reader':{'type':'record','name':'ProcessApplicationState','fields':[{'name':'applicationId','type':['null','int'],'default':null},{'name':'userId','type':['null','int'],'default':null}],'Event':'data-cmd-process-application-state','SchemaType':'READER','AvdlSchemaVersion':'4adb1df1c9243e24b937ddd165abf7572d7e2491','AvdlSchemaGitRemoteOriginUrl':'[email protected]:join-com/data.git','AvdlSchemaPathInGitRepo':'schemas/avro/commands/commands.avdl','GeneratorVersion':'387a0b3f2c890dc67f99085b7c94ff4bdc9cc967','GeneratorGitRemoteOriginUrl':'https://github.com/join-com/avro-join'}}

const processApplicationStateGCloudSchema = {
type: 'AVRO',
name: 'data-company-affiliate-referral-created',
definition: processApplicationStateStringSchema
}

const getLoggerMock = () => ({
info: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
})

type ListSchemaAsyncIteratorMock = { [Symbol.asyncIterator](): AsyncIterableIterator<{ name: string; definition: string }> }
const getPubsubMock = (asyncIterable: ListSchemaAsyncIteratorMock
= undefined as unknown as ListSchemaAsyncIteratorMock) => ({
listSchemas: jest.fn().mockReturnValue(asyncIterable),
createSchema: jest.fn()
})
const getSchemaServiceClientMock = () => ({
getSchema: jest.fn(),
commitSchema: jest.fn()
})

describe('deployAvroSchemas', () => {
it('does nothing and logs when no enabled avro topics', async () => {
const schemasToDeploy = {
'data-user-created': false,
'data-user-deleted': false,
}
const logger = getLoggerMock();
const schemaDeployer = new SchemaDeployer(logger, getPubsubMock() as unknown as PubSub,
getSchemaServiceClientMock() as unknown as SchemaServiceClient)

await schemaDeployer.deployAvroSchemas(schemasToDeploy, {})

expect(logger.info).toHaveBeenCalledWith('Finished deployAvroSchemas, no schemas to deploy')
})

it('does nothing and logs when schema exist', async () => {
const logger = getLoggerMock();
const asyncIterable = {
// eslint-disable-next-line @typescript-eslint/require-await
async *[Symbol.asyncIterator]() {
yield processApplicationStateGCloudSchema
}
}
const pubsubMock = getPubsubMock(asyncIterable);
const schemaDeployer = new SchemaDeployer(logger, pubsubMock as unknown as PubSub,
getSchemaServiceClientMock() as unknown as SchemaServiceClient)

const schemasToDeploy = { 'data-cmd-process-application-state': true }
const readerSchemas = {'data-cmd-process-application-state': processApplicationStateReaderSchema};

await schemaDeployer.deployAvroSchemas(schemasToDeploy, readerSchemas)

expect(logger.info).toHaveBeenLastCalledWith('Finished deployAvroSchemas, all schemas are already deployed')
})

it('creates schema when it doesn\'t exist', async () => {
const asyncIterable = {
// eslint-disable-next-line @typescript-eslint/no-empty-function
async *[Symbol.asyncIterator]() {
}
}
const pubsubMock = getPubsubMock(asyncIterable);
const schemaDeployer = new SchemaDeployer(getLoggerMock(), pubsubMock as unknown as PubSub,
getSchemaServiceClientMock() as unknown as SchemaServiceClient)
const schemasToDeploy = {
'data-cmd-process-application-state': true,
}
const readerSchemas = {'data-cmd-process-application-state': processApplicationStateReaderSchema};

await schemaDeployer.deployAvroSchemas(schemasToDeploy, readerSchemas)

expect(pubsubMock.createSchema).toHaveBeenCalledWith('data-cmd-process-application-state-generated-avro', 'AVRO', JSON.stringify(readerSchemas['data-cmd-process-application-state'].reader))
})

it('creates schema revision when schemas don\'t match', async () => {
const projectName = 'project'
process.env['GCLOUD_PROJECT'] = projectName
const asyncIterable = {
// eslint-disable-next-line @typescript-eslint/require-await
async *[Symbol.asyncIterator]() {
yield processApplicationStateGCloudSchema
}
}
const pubsubMock = getPubsubMock(asyncIterable);
const schemaServiceClientMock = getSchemaServiceClientMock() as unknown as SchemaServiceClient
const schemaDeployer = new SchemaDeployer(getLoggerMock(), pubsubMock as unknown as PubSub,
schemaServiceClientMock)
const schemasToDeploy = {
'data-cmd-process-application-state': true,
}
const readerSchemas = {'data-cmd-process-application-state': processApplicationStateReaderSchemaUpdated};

await schemaDeployer.deployAvroSchemas(schemasToDeploy, readerSchemas)

const schemaName = 'data-cmd-process-application-state' + SCHEMA_NAME_SUFFIX
const schemaPath = `projects/${projectName}/schemas/${schemaName}`
expect(schemaServiceClientMock.commitSchema).toHaveBeenCalledWith({
name: schemaPath, schema: {
name: schemaPath, type: 'AVRO', definition: JSON.stringify(processApplicationStateReaderSchemaUpdated.reader),
},
})
})
})

0 comments on commit 1cd9454

Please sign in to comment.