Skip to content

Commit

Permalink
Merge pull request #74 from join-com/alpha
Browse files Browse the repository at this point in the history
feat: JOIN-33235 add pubsub avro schema creation during the deployment
  • Loading branch information
eugene-taran authored Jul 6, 2023
2 parents b4120f4 + 1bbd684 commit 9cf72a9
Show file tree
Hide file tree
Showing 3 changed files with 272 additions and 0 deletions.
122 changes: 122 additions & 0 deletions packages/pubsub/src/SchemaDeployer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import { PubSub } from '@google-cloud/pubsub';
import { SchemaServiceClient } from '@google-cloud/pubsub/build/src/v1'
import { ILogger } from './ILogger'

interface ISchemasForDeployment {
forCreate: Map<string, string>,
forNewRevision: Map<string, string>
}
interface ISchemaWithEvent {
Event: string
}
interface IDeploymentResult {
schemasCreated: number,
revisionsCreated: number
}
const AVRO = 'AVRO'

export const SCHEMA_NAME_SUFFIX = '-generated-avro'
export type ReaderAvroSchema = {
reader: object
}
export class SchemaDeployer {

constructor(private readonly logger: ILogger,
private readonly pubSubClient: PubSub = new PubSub(),
private readonly schemaClient: SchemaServiceClient = new SchemaServiceClient()) {
}
public deployAvroSchemas = async (topicsSchemaConfig: Record<string, boolean>,
topicReaderSchemas: Record<string, ReaderAvroSchema>): Promise<IDeploymentResult> => {
if (!process.env['GCLOUD_PROJECT']) {
throw new Error('Can\'t find GCLOUD_PROJECT env variable, please define it')
}
const topicSchemasToDeploy = this.getEnabledTopicSchemas(topicsSchemaConfig, topicReaderSchemas);
if (topicSchemasToDeploy.size === 0) {
this.logger.info('Finished deployAvroSchemas, no schemas to deploy')
return {schemasCreated: 0, revisionsCreated: 0}
}
this.logger.info(`Found ${topicSchemasToDeploy.size} schemas enabled for deployment`)

const { forCreate, forNewRevision } = await this.aggregateTopicSchemas(topicSchemasToDeploy, topicsSchemaConfig);
if (forCreate.size === 0 && forNewRevision.size === 0) {
this.logger.info('Finished deployAvroSchemas, all schemas are already deployed')
return {schemasCreated: 0, revisionsCreated: 0}

}
this.logger.info(`Found ${forCreate.size} not deployed schemas, and ${forNewRevision.size} new revisions, starting deployment`)

await this.createSchemas(forCreate)
await this.createRevisions(forNewRevision)

this.logger.info(`Schemas deployment is finished, ${forCreate.size} schemas and ${forNewRevision.size} revisions are created`)

return {schemasCreated: forCreate.size, revisionsCreated: forNewRevision.size}

}

private async createRevisions(forNewRevision: Map<string, string>) {
const projectName = process.env['GCLOUD_PROJECT'] as string
for (const [topicSchema, definition] of forNewRevision) {
const schemaName = topicSchema + SCHEMA_NAME_SUFFIX
const schemaPath = `projects/${projectName}/schemas/${schemaName}`
await this.schemaClient.commitSchema({
name: schemaPath, schema: {
name: schemaPath, type: AVRO, definition,
},
})
this.logger.info(`Schema ${schemaName} is updated`)
}
}

private async createSchemas(forCreate: Map<string, string>) {
for (const [topicSchema, definition] of forCreate) {
const schemaName = topicSchema + SCHEMA_NAME_SUFFIX
await this.pubSubClient.createSchema(schemaName, AVRO, definition)
this.logger.info(`Schema ${schemaName} is created`)
}
}

private getEnabledTopicSchemas(schemasConfig: Record<string, boolean>, readerSchemas: Record<string, ReaderAvroSchema>)
: Map<string, string> {
const enabledTopicsSchemas = new Map<string, string>()
for (const topicName in schemasConfig) {
const readerSchema = readerSchemas[topicName]
if (schemasConfig[topicName] && readerSchema) {
enabledTopicsSchemas.set(topicName, JSON.stringify(readerSchema.reader))
}
}
return enabledTopicsSchemas;
}

private async aggregateTopicSchemas(topicSchemasToDeploy: Map<string, string>, topicsSchemaConfig: Record<string, boolean>)
: Promise<ISchemasForDeployment> {
const forCreate = new Map<string, string>(topicSchemasToDeploy)
const forNewRevision = new Map<string, string>()
for await (const schema of this.pubSubClient.listSchemas('FULL')) {
if (schema.type != AVRO) {
if (schema.name && topicsSchemaConfig[schema.name]) {
throw new Error(`Non avro schema exists for avro topic '${schema.name}', please remove it before starting the service`)
}
}
const definition = schema.definition
if (!definition) {
if (schema.name) {
this.logger.warn(`Schema without definition: ${schema.name}`)
} else {
this.logger.warn('Found schema without name and definition')
}
continue
}
const parsedDefinition = JSON.parse(definition) as ISchemaWithEvent
const eventName = parsedDefinition.Event
if (topicSchemasToDeploy.has(eventName)) {
forCreate.delete(eventName)
const newDefinition = topicSchemasToDeploy.get(eventName);
if (newDefinition && definition !== newDefinition) {
forNewRevision.set(eventName, newDefinition)
}
}
}
return { forCreate, forNewRevision }
}
}
149 changes: 149 additions & 0 deletions packages/pubsub/src/__tests__/SchemaDeployer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import { PubSub } from '@google-cloud/pubsub';
import { SchemaServiceClient } from '@google-cloud/pubsub/build/src/v1';
import { SCHEMA_NAME_SUFFIX, SchemaDeployer } from '../SchemaDeployer'

const processApplicationStateStringSchema = '{"type":"record","name":"ProcessApplicationState","fields":[{"name":"applicationId","type":["null","int"],"default":null}],"Event":"data-cmd-process-application-state","SchemaType":"READER","AvdlSchemaVersion":"4adb1df1c9243e24b937ddd165abf7572d7e2491","AvdlSchemaGitRemoteOriginUrl":"[email protected]:join-com/data.git","AvdlSchemaPathInGitRepo":"schemas/avro/commands/commands.avdl","GeneratorVersion":"387a0b3f2c890dc67f99085b7c94ff4bdc9cc967","GeneratorGitRemoteOriginUrl":"https://github.com/join-com/avro-join"}'
const processApplicationStateReaderSchema = {'reader':{'type':'record','name':'ProcessApplicationState','fields':[{'name':'applicationId','type':['null','int'],'default':null}],'Event':'data-cmd-process-application-state','SchemaType':'READER','AvdlSchemaVersion':'4adb1df1c9243e24b937ddd165abf7572d7e2491','AvdlSchemaGitRemoteOriginUrl':'[email protected]:join-com/data.git','AvdlSchemaPathInGitRepo':'schemas/avro/commands/commands.avdl','GeneratorVersion':'387a0b3f2c890dc67f99085b7c94ff4bdc9cc967','GeneratorGitRemoteOriginUrl':'https://github.com/join-com/avro-join'}}
const processApplicationStateReaderSchemaUpdated = {'reader':{'type':'record','name':'ProcessApplicationState','fields':[{'name':'applicationId','type':['null','int'],'default':null},{'name':'userId','type':['null','int'],'default':null}],'Event':'data-cmd-process-application-state','SchemaType':'READER','AvdlSchemaVersion':'4adb1df1c9243e24b937ddd165abf7572d7e2491','AvdlSchemaGitRemoteOriginUrl':'[email protected]:join-com/data.git','AvdlSchemaPathInGitRepo':'schemas/avro/commands/commands.avdl','GeneratorVersion':'387a0b3f2c890dc67f99085b7c94ff4bdc9cc967','GeneratorGitRemoteOriginUrl':'https://github.com/join-com/avro-join'}}

const processApplicationStateGCloudSchema = {
type: 'AVRO',
name: 'data-company-affiliate-referral-created',
definition: processApplicationStateStringSchema
}

const getLoggerMock = () => ({
info: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
})

type ListSchemaAsyncIteratorMock = { [Symbol.asyncIterator](): AsyncIterableIterator<{ name: string; definition: string }> }
const getPubsubMock = (asyncIterable: ListSchemaAsyncIteratorMock
= undefined as unknown as ListSchemaAsyncIteratorMock) => ({
listSchemas: jest.fn().mockReturnValue(asyncIterable),
createSchema: jest.fn()
})
const getSchemaServiceClientMock = () => ({
getSchema: jest.fn(),
commitSchema: jest.fn()
})

describe('deployAvroSchemas', () => {
beforeEach(() => {
process.env['GCLOUD_PROJECT'] = 'project'
})

it('does nothing when no enabled avro topics', async () => {
const schemasToDeploy = {
'data-user-created': false,
'data-user-deleted': false,
}
const schemaDeployer = new SchemaDeployer(getLoggerMock(), getPubsubMock() as unknown as PubSub,
getSchemaServiceClientMock() as unknown as SchemaServiceClient)

const { schemasCreated, revisionsCreated } = await schemaDeployer.deployAvroSchemas(schemasToDeploy, {})

expect(schemasCreated).toBe(0)
expect(revisionsCreated).toBe(0)
})

it('throws error when non-avro schema exists with avro name', async () => {
const nonAvroSchemaWithAvroName = {
type: 'PROTOCOL_BUFFER',
name: 'data-company-affiliate-referral-created',
definition: 'some'
}
const asyncIterable = {
// eslint-disable-next-line @typescript-eslint/require-await
async *[Symbol.asyncIterator]() {
yield nonAvroSchemaWithAvroName
}
}
const pubsubMock = getPubsubMock(asyncIterable);
const schemaDeployer = new SchemaDeployer(getLoggerMock(), pubsubMock as unknown as PubSub,
getSchemaServiceClientMock() as unknown as SchemaServiceClient)

const schemasToDeploy = { 'data-company-affiliate-referral-created': true }
const readerSchemas = {'data-company-affiliate-referral-created': processApplicationStateReaderSchema};

await expect(schemaDeployer.deployAvroSchemas(schemasToDeploy, readerSchemas)).rejects
.toThrow('Non avro schema exists for avro topic \'data-company-affiliate-referral-created\', please remove it before starting the service')

})

it('does nothing and logs when schema revisions match', async () => {
const asyncIterable = {
// eslint-disable-next-line @typescript-eslint/require-await
async *[Symbol.asyncIterator]() {
yield processApplicationStateGCloudSchema
}
}
const pubsubMock = getPubsubMock(asyncIterable);
const schemaDeployer = new SchemaDeployer(getLoggerMock(), pubsubMock as unknown as PubSub,
getSchemaServiceClientMock() as unknown as SchemaServiceClient)

const schemasToDeploy = { 'data-cmd-process-application-state': true }
const readerSchemas = {'data-cmd-process-application-state': processApplicationStateReaderSchema};

const { schemasCreated, revisionsCreated } = await schemaDeployer.deployAvroSchemas(schemasToDeploy, readerSchemas)

expect(schemasCreated).toBe(0)
expect(revisionsCreated).toBe(0)
})

it('creates schema when it doesn\'t exist', async () => {
const asyncIterable = {
// eslint-disable-next-line @typescript-eslint/no-empty-function
async *[Symbol.asyncIterator]() {
}
}
const pubsubMock = getPubsubMock(asyncIterable);
const schemaDeployer = new SchemaDeployer(getLoggerMock(), pubsubMock as unknown as PubSub,
getSchemaServiceClientMock() as unknown as SchemaServiceClient)
const schemasToDeploy = {
'data-cmd-process-application-state': true,
}
const readerSchemas = {'data-cmd-process-application-state': processApplicationStateReaderSchema};

const { schemasCreated, revisionsCreated } = await schemaDeployer.deployAvroSchemas(schemasToDeploy, readerSchemas)

expect(schemasCreated).toBe(1)
expect(revisionsCreated).toBe(0)
expect(pubsubMock.createSchema).toHaveBeenCalledWith('data-cmd-process-application-state-generated-avro', 'AVRO', JSON.stringify(readerSchemas['data-cmd-process-application-state'].reader))
})

it('creates schema revision when schemas don\'t match', async () => {
const processApplicationStateGCloudSchema = {
type: 'PROTOBUF',
name: 'data-company-affiliate-referral-created',
definition: processApplicationStateStringSchema
}
const asyncIterable = {
// eslint-disable-next-line @typescript-eslint/require-await
async *[Symbol.asyncIterator]() {
yield processApplicationStateGCloudSchema
}
}
const pubsubMock = getPubsubMock(asyncIterable);
const schemaServiceClientMock = getSchemaServiceClientMock() as unknown as SchemaServiceClient
const schemaDeployer = new SchemaDeployer(getLoggerMock(), pubsubMock as unknown as PubSub,
schemaServiceClientMock)
const schemasToDeploy = {
'data-cmd-process-application-state': true,
}
const readerSchemas = {'data-cmd-process-application-state': processApplicationStateReaderSchemaUpdated};

const { schemasCreated, revisionsCreated } = await schemaDeployer.deployAvroSchemas(schemasToDeploy, readerSchemas)

expect(schemasCreated).toBe(0)
expect(revisionsCreated).toBe(1)
const schemaName = 'data-cmd-process-application-state' + SCHEMA_NAME_SUFFIX
const schemaPath = `projects/${process.env['GCLOUD_PROJECT'] as string}/schemas/${schemaName}`
expect(schemaServiceClientMock.commitSchema).toHaveBeenCalledWith({
name: schemaPath, schema: {
name: schemaPath, type: 'AVRO', definition: JSON.stringify(processApplicationStateReaderSchemaUpdated.reader),
},
})
})
})
1 change: 1 addition & 0 deletions packages/pubsub/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ export * from './Subscriber'
export * from './SubscriberFactory'
export * from './MessageHandler'
export * from './ILogger'
export * from './SchemaDeployer'

0 comments on commit 9cf72a9

Please sign in to comment.