Skip to content

Commit

Permalink
add comment to use setMetadata when it is fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
eugene-taran committed Jun 13, 2023
1 parent 807488b commit 6967ee1
Showing 1 changed file with 39 additions and 27 deletions.
66 changes: 39 additions & 27 deletions packages/pubsub/src/Publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,38 +110,50 @@ export class Publisher<T = unknown> {
private async initializeTopicSchema(): Promise<void> {
if (this.avroSchemasProvided) {
this.topicHasAssignedSchema = await this.doesTopicHaveSchemaAssigned()

if (!this.topicHasAssignedSchema && await this.doesRegistryHaveTopicSchema()) {
const projectName = process.env['GCLOUD_PROJECT']
if (!projectName) {
throw new Error('Can\'t find GCLOUD_PROJECT env variable, please define it')
}
// TODO: this.setSchemaToTheTopic() should be replace with
// ```await this.topic.setMetadata({ schemaSettings: { schema: this.topicSchemaName, encoding: Encoding.JSON }})
// this.topicHasAssignedSchema = true```
// once https://github.com/googleapis/nodejs-pubsub/issues/1587 is fixed
this.setSchemaToTheTopic()
}
}
}

this.topic.request(
{
client: 'PublisherClient',
method: 'updateTopic',
reqOpts: {
topic: {
name: `projects/${projectName}/topics/${this.topicName}`,
schemaSettings: { schema: `projects/${projectName}/schemas/${this.topicSchemaName}`, encoding: Encoding.JSON }
},
updateMask: {
paths: ['schema_settings'],
},
private setSchemaToTheTopic() {
const projectName = process.env['GCLOUD_PROJECT']
if (!projectName) {
throw new Error('Can\'t find GCLOUD_PROJECT env variable, please define it')
}

this.topic.request(
{
client: 'PublisherClient',
method: 'updateTopic',
reqOpts: {
topic: {
name: `projects/${projectName}/topics/${this.topicName}`,
schemaSettings: {
schema: `projects/${projectName}/schemas/${this.topicSchemaName}`,
encoding: Encoding.JSON,
},
gaxOpts: {},
},
(err, _) => {
if (!err) {
this.topicHasAssignedSchema = true
this.logger?.info(`Schema '${this.topicSchemaName}' set to the topic '${this.topicName}'`)
} else {
this.logger?.error(`Couldn't set schema '${this.topicSchemaName}' to the topic '${this.topicName}'`)
}
updateMask: {
paths: ['schema_settings'],
},
);
}
}
},
gaxOpts: {},
},
(err, _) => {
if (!err) {
this.topicHasAssignedSchema = true
this.logger?.info(`Schema '${this.topicSchemaName}' set to the topic '${this.topicName}'`)
} else {
this.logger?.error(`Couldn't set schema '${this.topicSchemaName}' to the topic '${this.topicName}'`)
}
},
)
}

private async sendAvroMessage(data: T): Promise<void> {
Expand Down

0 comments on commit 6967ee1

Please sign in to comment.