diff --git a/packages/pubsub/src/Publisher.ts b/packages/pubsub/src/Publisher.ts index 14c398d..41d200a 100644 --- a/packages/pubsub/src/Publisher.ts +++ b/packages/pubsub/src/Publisher.ts @@ -87,7 +87,7 @@ export class Publisher { private logWarnIfMessageViolatesSchema(data: T): void { if (this.writerAvroType) { if (!this.writerAvroType.isValid(data)) { - this.logger?.warn('Message violates writer avro schema', { payload: data, metadata: this.avroMessageMetadata }) + this.logger?.warn(`[schema-violation] [${this.topicName}] Message violates writer avro schema`, { payload: data, metadata: this.avroMessageMetadata }) } } } @@ -161,9 +161,9 @@ export class Publisher { // for now we are checking that avro is enabled before calling sendAvroMessage // eslint-disable-next-line @typescript-eslint/no-non-null-assertion if (!this.writerAvroType!.isValid(data)) { - this.logger?.error('Invalid payload for the specified writer schema, please check that the schema is correct ' + - 'and payload can be encoded with it', {payload: data, schemaMetadata: this.avroMessageMetadata}) - throw new Error(`Can't encode the avro message for the topic ${this.topicName}`) + this.logger?.error(`[${this.topicName}] Invalid payload for the specified writer schema, please check that the schema is correct ' + + 'and payload can be encoded with it`, {payload: data, schemaMetadata: this.avroMessageMetadata}) + throw new Error(`[${this.topicName}] Can't encode the avro message for the topic`) } if (this.avroMessageMetadata && this.avroMessageMetadata['join_preserve_null']) { logWarnWhenUndefinedInNullPreserveFields(data, this.avroMessageMetadata['join_preserve_null'], this.logger) diff --git a/packages/pubsub/src/__tests__/Subscriber.test.ts b/packages/pubsub/src/__tests__/Subscriber.test.ts index e3ea332..e3166d8 100644 --- a/packages/pubsub/src/__tests__/Subscriber.test.ts +++ b/packages/pubsub/src/__tests__/Subscriber.test.ts @@ -327,7 +327,7 @@ describe('Subscriber', () => { }) it('receives avro parsed data', async () => { - topicMock.exists.mockResolvedValue([false]) + topicMock.exists.mockResolvedValue([true]) subscriptionMock.exists.mockResolvedValue([true]) topicMock.getMetadata.mockResolvedValue([{'schemaSettings': {'schema': 'mock-schema'}}]) schemaClientMock.getSchema.mockResolvedValue([{definition: JSON.stringify(SCHEMA_DEFINITION_EXAMPLE)}])