diff --git a/packages/pubsub/src/MessageHandlerIdempotent.ts b/packages/pubsub/src/IdempotentMessageHandler.ts similarity index 85% rename from packages/pubsub/src/MessageHandlerIdempotent.ts rename to packages/pubsub/src/IdempotentMessageHandler.ts index 36b2d5e..114f9b6 100644 --- a/packages/pubsub/src/MessageHandlerIdempotent.ts +++ b/packages/pubsub/src/IdempotentMessageHandler.ts @@ -5,13 +5,13 @@ import { ISubscriber } from './SubscriberFactory' type GetIdempotencyKeyFunction = (msg: T, info: IMessageInfo) => string | undefined -export abstract class IdempotencyStorage { - public abstract exists(key: string): Promise +export interface IIdempotencyStorage { + exists(key: string): Promise - public abstract save(key: string): Promise + save(key: string): Promise } -export class RedisIdempotencyStorage implements IdempotencyStorage { +export class RedisIdempotencyStorage implements IIdempotencyStorage { constructor(private readonly redisClient: IRedisClient, private readonly redisDefaultTtl: number) {} @@ -25,7 +25,8 @@ export class RedisIdempotencyStorage implements IdempotencyStorage { } public async save(key: string): Promise { - await this.redisClient.setex(key, this.redisDefaultTtl, Buffer.from(new Date())) + const value = Buffer.from(new Date().toISOString()) + await this.redisClient.setex(key, this.redisDefaultTtl, value) } } @@ -34,7 +35,7 @@ export class RedisIdempotencyStorage implements IdempotencyStorage { * Will check if message was already processed by checking idempotency key in the storage, and will skip it if it was. * After the message is processed, stores message in the idempotency storage */ -export abstract class MessageHandlerIdempotent { +export abstract class IdempotentMessageHandler { /** * * @param subscriber subscriber to listen to @@ -44,7 +45,7 @@ export abstract class MessageHandlerIdempotent { */ protected constructor( private readonly subscriber: ISubscriber, - private readonly idempotencyStorage: IdempotencyStorage, + private readonly idempotencyStorage: IIdempotencyStorage, private readonly getIdempotencyKey: GetIdempotencyKeyFunction = (_: T, info: IMessageInfo) => { return info.attributes[JOIN_IDEMPOTENCY_KEY] diff --git a/packages/pubsub/src/index.ts b/packages/pubsub/src/index.ts index a59777a..f4e07a9 100644 --- a/packages/pubsub/src/index.ts +++ b/packages/pubsub/src/index.ts @@ -3,7 +3,7 @@ export * from './PublisherFactory' export * from './Subscriber' export * from './SubscriberFactory' export * from './MessageHandler' -export * from './MessageHandlerIdempotent' +export * from './IdempotentMessageHandler' export * from './ILogger' export * from './SchemaDeployer' export * from './PublisherInitializer'