Skip to content

Commit

Permalink
Merge pull request #67 from join-com/alpha
Browse files Browse the repository at this point in the history
feat: JOIN-29819 // add avro format support
  • Loading branch information
eugene-taran authored Jun 14, 2023
2 parents 0da3841 + 54756b7 commit 4b77d35
Show file tree
Hide file tree
Showing 23 changed files with 4,470 additions and 7,865 deletions.
28 changes: 19 additions & 9 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,26 @@ defaults: &defaults
- image: cimg/node:16.14.1
<<: *docker-auth

references:
auth_with_registry: &auth_with_registry
run:
name: Authenticate with registry
command: echo "//registry.npmjs.org/:_authToken=$npm_TOKEN" > .npmrc
commands:
authenticate_npm:
description: Authenticate client with registry
parameters:
path:
type: string
default: '.'
steps:
- run:
name: Authenticate with GAR (.npmrc)
command: |
echo $GCLOUD_SERVICE_KEY_CI_DEV | base64 -d > ${HOME}/gcloud-service-key.json
GOOGLE_APPLICATION_CREDENTIALS=${HOME}/gcloud-service-key.json yarn artifactregistry-login --repo-config=<< parameters.path >>/.npmrc
references:
remove_auth_with_registry: &remove_auth_with_registry
run:
name: Remove authentication with registry
command: rm .npmrc
command: rm ~/.npmrc

yarn_install: &yarn_install
run:
Expand All @@ -48,7 +58,7 @@ jobs:
steps:
- checkout
- *restore_yarn_cache
- *auth_with_registry
- authenticate_npm
- *yarn_install
- run:
name: lint
Expand All @@ -61,7 +71,7 @@ jobs:
steps:
- checkout
- *restore_yarn_cache
- *auth_with_registry
- authenticate_npm
- *yarn_install
- run:
name: test
Expand All @@ -74,7 +84,7 @@ jobs:
steps:
- checkout
- *restore_yarn_cache
- *auth_with_registry
- authenticate_npm
- *yarn_install
- run:
name: release
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ dist/**
.idea
packages/*/lib
sandbox
packages/*/dist
2 changes: 2 additions & 0 deletions .npmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
@join-private:registry=https://europe-west1-npm.pkg.dev/join-dev-330413/join-private/
//europe-west1-npm.pkg.dev/join-dev-330413/join-private/:always-auth=true
1 change: 0 additions & 1 deletion .yarnrc

This file was deleted.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
"lint": "yarn workspaces run lint",
"test": "yarn workspaces run test",
"release": "yarn workspaces run semantic-release -e semantic-release-monorepo",
"artifactregistry-login": "npm_config_registry=https://registry.npmjs.org/ npx google-artifactregistry-auth",
"preinstall": "[ \"$CI\" != \"true\" ] && yarn artifactregistry-login || true",
"postinstall": "if [ -d .git ]; then git config core.hooksPath .hooks; fi"
},
"engines": {
Expand Down
1 change: 1 addition & 0 deletions packages/pubsub-legacy-factories/.npmignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ jest.config.js
tsconfig.prod.tsbuildinfo
tsconfig.prod.json
tsconfig.json
.npmrc
2 changes: 2 additions & 0 deletions packages/pubsub-legacy-factories/.npmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
@join-private:registry=https://europe-west1-npm.pkg.dev/join-dev-330413/join-private/
//europe-west1-npm.pkg.dev/join-dev-330413/join-private/:always-auth=true
3,797 changes: 0 additions & 3,797 deletions packages/pubsub-legacy-factories/yarn.lock

This file was deleted.

1 change: 1 addition & 0 deletions packages/pubsub/.npmignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ jest.config.js
tsconfig.prod.tsbuildinfo
tsconfig.prod.json
tsconfig.json
.npmrc
2 changes: 2 additions & 0 deletions packages/pubsub/.npmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
@join-private:registry=https://europe-west1-npm.pkg.dev/join-dev-330413/join-private/
//europe-west1-npm.pkg.dev/join-dev-330413/join-private/:always-auth=true
6 changes: 4 additions & 2 deletions packages/pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@
"test": "jest",
"test:watch": "jest --watch",
"test:coverage": "jest --collect-coverage",
"test:debug": "export NODE_OPTIONS=\"$NODE_OPTIONS --inspect-brk\" && jest --forceExit",
"lint": "yarn lint:tsc && yarn lint:eslint",
"lint:eslint": "eslint . --ext .ts --max-warnings 0",
"lint:tsc": "tsc --noEmit",
"prepublishOnly": "yarn lint && yarn build"
},
"dependencies": {
"@google-cloud/pubsub": "^3.5.1"
"@google-cloud/pubsub": "^3.5.1",
"avsc": "^5.7.7"
},
"devDependencies": {
"@join-private/eslint-config-backend": "^1.1.3",
"@join-private/eslint-config-backend": "^1.3.0",
"@types/jest": "^29.1.1",
"@types/node": "^18.8.2",
"eslint": "^8.24.0",
Expand Down
3 changes: 3 additions & 0 deletions packages/pubsub/src/DataParser.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/**
* @deprecated remove the class as only avro should be used later
*/
export class DataParser {
public parse(data: Buffer): unknown {
const dateTimeRegex = /^(\d{4}-\d\d-\d\d([tT][\d:.]*)?)([zZ]|([+-])(\d\d):?(\d\d))?$/
Expand Down
181 changes: 178 additions & 3 deletions packages/pubsub/src/Publisher.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,95 @@
import { readFileSync } from 'fs'
import { PubSub, Topic } from '@google-cloud/pubsub'
import { google } from '@google-cloud/pubsub/build/protos/protos'
import { MessageOptions } from '@google-cloud/pubsub/build/src/topic'
import { Schema, Type } from 'avsc'
import { createCallOptions } from './createCallOptions'
import { ILogger } from './ILogger'
import { DateType } from './logical-types/DateType'
import { logWarnWhenUndefinedInNullPreserveFields } from './util'
import Encoding = google.pubsub.v1.Encoding

interface IMessageMetadata {
Event: string,
GeneratorVersion: string,
GeneratorGitRemoteOriginUrl: string,
SchemaType: string
AvdlSchemaPathInGitRepo: string,
AvdlSchemaGitRemoteOriginUrl: string,
AvdlSchemaVersion: string,
PreserveNull?: string
}

type SchemaWithMetadata = Schema & IMessageMetadata

export class Publisher<T = unknown> {
private readonly topic: Topic
private readonly topicSchemaName: string

private readonly writerAvroType?: Type
private readonly readerAvroType?: Type

private readonly avroMessageMetadata?: Record<string, string>
//TODO: remove flags below, when only avro will be used
private topicHasAssignedSchema = false
private avroSchemasProvided = false

constructor(readonly topicName: string, client: PubSub, private readonly logger?: ILogger) {
constructor(readonly topicName: string, readonly client: PubSub, private readonly logger?: ILogger,
avroSchemas?: { writer: object, reader: object }) {
//TODO: avroSchemas parameter should be mandatory when only avro is used
if (avroSchemas) {
this.avroSchemasProvided = true
const writerAvroSchema: SchemaWithMetadata = avroSchemas.writer as SchemaWithMetadata
this.writerAvroType = Type.forSchema(writerAvroSchema, { logicalTypes: { 'timestamp-micros': DateType } })

const readerAvroSchema: SchemaWithMetadata = avroSchemas.reader as SchemaWithMetadata
this.readerAvroType = Type.forSchema(readerAvroSchema, { logicalTypes: { 'timestamp-micros': DateType } })

this.avroMessageMetadata = this.prepareAvroMessageMetadata(readerAvroSchema)
}
this.topic = client.topic(topicName)
this.topicSchemaName = `${this.topicName}-generated-avro`
}

public async initialize() {
try {
await this.initializeTopic()
await this.initializeTopicSchema()
} catch (e) {
this.logger?.error('PubSub: Failed to initialize publisher', e)
process.abort()
}
}

public async publishMsg(data: T): Promise<void> {
const messageId = await this.topic.publishMessage({ json: data })
this.logger?.info(`PubSub: Message sent for topic: ${this.topicName}:`, { data, messageId })
if (!this.avroSchemasProvided) {
// old flow, just send message if no avro schemas provided
await this.sendJsonMessage({ json: data })
} else if (!this.topicHasAssignedSchema) {
try {
await this.sendJsonMessage({ json: data })
this.logWarnIfMessageViolatesSchema(data)
} catch (e) {
//it's a corner case when application started without schema on topic, and then schema was added to the topic
//in this case we are trying to resend message with avro format if schema appeared
this.topicHasAssignedSchema = await this.doesTopicHaveSchemaAssigned()
if (!this.topicHasAssignedSchema) {
throw e
}
await this.sendAvroMessage(data)
}
} else {
// TODO: remove everything except this call after services will be ready to use only avro
await this.sendAvroMessage(data)
}
}

private logWarnIfMessageViolatesSchema(data: T): void {
if (this.writerAvroType) {
if (!this.writerAvroType.isValid(data)) {
this.logger?.warn(`[schema-violation] [${this.topicName}] Message violates writer avro schema`, { payload: data, metadata: this.avroMessageMetadata })
}
}
}

public async flush(): Promise<void> {
Expand All @@ -37,4 +106,110 @@ export class Publisher<T = unknown> {
this.logger?.info(`PubSub: Topic ${this.topicName} is created`)
}
}

private async initializeTopicSchema(): Promise<void> {
if (this.avroSchemasProvided) {
this.topicHasAssignedSchema = await this.doesTopicHaveSchemaAssigned()

if (!this.topicHasAssignedSchema && await this.doesRegistryHaveTopicSchema()) {
// TODO: this.setSchemaToTheTopic() should be replace with
// ```await this.topic.setMetadata({ schemaSettings: { schema: this.topicSchemaName, encoding: Encoding.JSON }})
// this.topicHasAssignedSchema = true```
// once https://github.com/googleapis/nodejs-pubsub/issues/1587 is fixed
this.setSchemaToTheTopic()
}
}
}

private setSchemaToTheTopic() {
const projectName = process.env['GCLOUD_PROJECT']
if (!projectName) {
throw new Error('Can\'t find GCLOUD_PROJECT env variable, please define it')
}

this.topic.request(
{
client: 'PublisherClient',
method: 'updateTopic',
reqOpts: {
topic: {
name: `projects/${projectName}/topics/${this.topicName}`,
schemaSettings: {
schema: `projects/${projectName}/schemas/${this.topicSchemaName}`,
encoding: Encoding.JSON,
},
},
updateMask: {
paths: ['schema_settings'],
},
},
gaxOpts: {},
},
(err, _) => {
if (!err) {
this.topicHasAssignedSchema = true
this.logger?.info(`Schema '${this.topicSchemaName}' set to the topic '${this.topicName}'`)
} else {
this.logger?.error(`Couldn't set schema '${this.topicSchemaName}' to the topic '${this.topicName}'`)
}
},
)
}

private async sendAvroMessage(data: T): Promise<void> {
// TODO: remove non-null assertion and eslint-disable when avroType will be mandatory on every topic
// for now we are checking that avro is enabled before calling sendAvroMessage
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
if (!this.writerAvroType!.isValid(data)) {
this.logger?.error(`[${this.topicName}] Invalid payload for the specified writer schema, please check that the schema is correct ' +
'and payload can be encoded with it`, {payload: data, schemaMetadata: this.avroMessageMetadata})
throw new Error(`[${this.topicName}] Can't encode the avro message for the topic`)
}
if (this.avroMessageMetadata && this.avroMessageMetadata['join_preserve_null']) {
logWarnWhenUndefinedInNullPreserveFields(data, this.avroMessageMetadata['join_preserve_null'], this.logger)
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const buffer = Buffer.from(this.readerAvroType!.toString(data))
const messageId = await this.topic.publishMessage({ data: buffer, attributes: this.avroMessageMetadata })
this.logger?.info(`PubSub: Avro message sent for topic: ${this.topicName}:`, { data, messageId })
}

private async sendJsonMessage(message: MessageOptions) {
const messageId = await this.topic.publishMessage(message)
this.logger?.info(`PubSub: JSON Message sent for topic: ${this.topicName}:`, { message, messageId })
}

private prepareAvroMessageMetadata(schema: SchemaWithMetadata): Record<string, string> {
const metadata: Record<string, string> = {}

metadata['join_event'] = schema.Event
metadata['join_generator_version'] = schema.GeneratorVersion
metadata['join_generator_git_remote_origin_url'] = schema.GeneratorGitRemoteOriginUrl
metadata['join_schema_type'] = schema.SchemaType
metadata['join_avdl_schema_path_in_git_repo'] = schema.AvdlSchemaPathInGitRepo
metadata['join_avdl_schema_git_remote_origin_url'] = schema.AvdlSchemaGitRemoteOriginUrl
metadata['join_avdl_schema_version'] = schema.AvdlSchemaVersion
metadata['join_pubsub_lib_version'] = this.getLibraryVersion()
if (schema.PreserveNull) {
metadata['join_preserve_null'] = schema.PreserveNull
}

return metadata
}

private getLibraryVersion(): string {
const libPackageJsonPath = `${__dirname}/../package.json`
const packageJson = JSON.parse(readFileSync(libPackageJsonPath, 'utf8')) as { version: string}
return packageJson.version
}

private async doesTopicHaveSchemaAssigned(): Promise<boolean> {
const [metadata] = await this.topic.getMetadata()
const schemaName = metadata?.schemaSettings?.schema
return !!schemaName
}

public async doesRegistryHaveTopicSchema(): Promise<boolean> {
return !!(await this.client.schema(this.topicSchemaName).get());
}
}
8 changes: 4 additions & 4 deletions packages/pubsub/src/PublisherFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ export interface IPublisher<T> {
flush: () => Promise<void>
}

export class PublisherFactory<T> {
export class PublisherFactory<TypeMap> {
private readonly client: PubSub

constructor(private readonly logger: ILogger) {
constructor(private readonly logger: ILogger, private readonly avroSchemas?: Record<keyof TypeMap, { writer: object, reader: object }>) {
this.client = new PubSub()
}

public getPublisher<K extends keyof T>(topic: K): IPublisher<T[K]> {
return new Publisher(topic.toString(), this.client, this.logger)
public getPublisher<Topic extends keyof TypeMap> (topic: Topic): IPublisher<TypeMap[Topic]> {
return new Publisher(topic.toString(), this.client, this.logger, this.avroSchemas?.[topic])
}
}
Loading

0 comments on commit 4b77d35

Please sign in to comment.