diff --git a/packages/pubsub/src/__tests__/Publisher.test.ts b/packages/pubsub/src/__tests__/Publisher.test.ts index 0228fb5..b591c66 100644 --- a/packages/pubsub/src/__tests__/Publisher.test.ts +++ b/packages/pubsub/src/__tests__/Publisher.test.ts @@ -6,7 +6,7 @@ import { Publisher } from '../Publisher' import { ConsoleLogger, getClientMock, - getTopicMock, + getTopicMock, IMessageType, SCHEMA_DEFINITION_EXAMPLE, SCHEMA_EXAMPLE, schemaMock, @@ -20,6 +20,8 @@ const type = Type.forSchema(SCHEMA_DEFINITION_EXAMPLE as Schema, {logicalTypes: const processAbortSpy = jest.spyOn(process, 'abort') const schemas = {writer: SCHEMA_DEFINITION_EXAMPLE, reader: SCHEMA_DEFINITION_EXAMPLE} +const DATE_WITH_UNSAFE_NUMBER_TIMESTAMP_IN_MICROS = new Date('3000-01-01T00:00:00.000Z') +const MAX_DATE_WITH_SAFE_NUMBER_TIMESTAMP_IN_MICROS = new Date('2255-06-05T23:47:34.740Z') describe('Publisher', () => { let publisher: Publisher @@ -97,7 +99,7 @@ describe('Publisher', () => { it('publishes avro json encoded object', async () => { publisher = new Publisher(topic, clientMock as unknown as PubSub, new ConsoleLogger(), schemas) topicMock.exists.mockResolvedValue([true]) - topicMock.getMetadata.mockResolvedValue([{'schemaSettings': {'schema': 'mock-schema'}}]) + topicMock.getMetadata.mockResolvedValue([{ 'schemaSettings': { 'schema': 'mock-schema' } }]) schemaMock.get.mockResolvedValue(SCHEMA_EXAMPLE) await publisher.initialize() @@ -128,6 +130,22 @@ describe('Publisher', () => { invalidPaths: ['second', 'createdAt', 'fourth.flag'], })) }) + + it('publishes avro json with max allowed date value when date in micros overflows MAX_SAFE_INTEGER', async () => { + publisher = new Publisher(topic, clientMock as unknown as PubSub, new ConsoleLogger(), schemas) + topicMock.exists.mockResolvedValue([true]) + topicMock.getMetadata.mockResolvedValue([{ 'schemaSettings': { 'schema': 'mock-schema' } }]) + schemaMock.get.mockResolvedValue(SCHEMA_EXAMPLE) + await publisher.initialize() + + const message = { first: 'one', createdAt: DATE_WITH_UNSAFE_NUMBER_TIMESTAMP_IN_MICROS } + await publisher.publishMsg(message) + + const avroMessage = Buffer.from(type.toString(message)) + expect(topicMock.publishMessage).toHaveBeenCalledWith({ data: avroMessage, attributes: metadata }) + const decodedMessage = type.fromString(avroMessage.toString()) as IMessageType + expect(decodedMessage.createdAt).toEqual(MAX_DATE_WITH_SAFE_NUMBER_TIMESTAMP_IN_MICROS) + }) }) describe('flush', () => { diff --git a/packages/pubsub/src/__tests__/support/pubsubMock.ts b/packages/pubsub/src/__tests__/support/pubsubMock.ts index b5750bc..43f8964 100644 --- a/packages/pubsub/src/__tests__/support/pubsubMock.ts +++ b/packages/pubsub/src/__tests__/support/pubsubMock.ts @@ -5,6 +5,14 @@ import { ILogger } from '../../ILogger' type EventHandler = (attrs: unknown) => Promise type EventHandlerMap = { [key: string]: EventHandler } +export interface IMessageType { + first?: string, + second?: string, + createdAt?: Date, + third?: string, + fourth?: {flag: boolean} +} + export const SCHEMA_DEFINITION_EXAMPLE = { 'type': 'record', 'name': 'Avro', diff --git a/packages/pubsub/src/logical-types/DateType.ts b/packages/pubsub/src/logical-types/DateType.ts index 14ee8bb..f68f71a 100644 --- a/packages/pubsub/src/logical-types/DateType.ts +++ b/packages/pubsub/src/logical-types/DateType.ts @@ -13,7 +13,22 @@ export class DateType extends types.LogicalType { return new Date(val / 1000) } _toValue(date: any) { - return date instanceof Date ? date.getTime() * 1000 : undefined + if (!(date instanceof Date)) { + return undefined + } + let dateInMillis = date.getTime() * 1000 + // If number is not a safe integer, it will lose precision during conversion: + // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER + // Avsc will throw errors trying to convert number larger than Number.MAX_SAFE_INTEGER - 1 + // Only possibility to fix is to use custom long types like BigInt, but still it will not work for json conversion, + // because of that limiting date to max possible date in micros, if received value larger than that + // https://github.com/mtth/avsc/wiki/Advanced-usage#custom-long-types + if (Number.isSafeInteger(dateInMillis)) { + return dateInMillis + } else { + // max safe date in micros is "2255-06-05T23:47:34.740Z" + return Number.MAX_SAFE_INTEGER - 1 + } } _resolve(type: Type) { if (Type.isType(type, 'logical:timestamp-micros')) {