From 2c6907c20c0d1b636ded044a79abe7c6ebe4c657 Mon Sep 17 00:00:00 2001 From: Eugene Taran Date: Thu, 16 Jan 2025 10:59:48 +0100 Subject: [PATCH] fix: TECH review fixes --- .../pubsub/src/MessageHandlerIdempotent.ts | 20 +++++++++++ .../pubsub/src/redis-types/IRedisClient.ts | 35 +++++++++++++++++++ 2 files changed, 55 insertions(+) create mode 100644 packages/pubsub/src/redis-types/IRedisClient.ts diff --git a/packages/pubsub/src/MessageHandlerIdempotent.ts b/packages/pubsub/src/MessageHandlerIdempotent.ts index 5a73a4a..4acadcb 100644 --- a/packages/pubsub/src/MessageHandlerIdempotent.ts +++ b/packages/pubsub/src/MessageHandlerIdempotent.ts @@ -1,15 +1,35 @@ import { JOIN_IDEMPOTENCY_KEY } from './Publisher' +import { IRedisClient } from './redis-types/IRedisClient' import { IMessageInfo, IParsedMessage } from './Subscriber' import { ISubscriber } from './SubscriberFactory' type GetIdempotencyKeyFunction = (msg: T, info: IMessageInfo) => string | undefined export abstract class IdempotencyStorage { + public abstract exists(key: string): Promise public abstract save(key: string): Promise } +export class RedisIdempotencyStorage implements IdempotencyStorage { + constructor(private readonly redisClient: IRedisClient, + private readonly redisDefaultTtl: number) {} + + public async exists(key: string): Promise { + const value = await this.redisClient.get(key) + if (value.success) { + return !!value.get() + } else { + return false + } + } + + public async save(key: string): Promise { + await this.redisClient.setex(key, this.redisDefaultTtl, Buffer.from(new Date())) + } +} + /** * Idempotent message handler, requires idempotency storage to be provided. * Will check if message was already processed by checking idempotency key in the storage, and will skip it if it was. diff --git a/packages/pubsub/src/redis-types/IRedisClient.ts b/packages/pubsub/src/redis-types/IRedisClient.ts new file mode 100644 index 0000000..8cedeb7 --- /dev/null +++ b/packages/pubsub/src/redis-types/IRedisClient.ts @@ -0,0 +1,35 @@ +/** + * This file contains interfaces from https://github.com/join-com/caching , + * for a less duplicated code when implementing idempotent handler on services. + */ + +export interface IFailure { + success: false + failure: true + + get(): T +} + +export interface ISuccess { + success: true + failure: false + + get(): T +} + +export type RedisResult = ISuccess | IFailure + +export type OptionalRedisResult = ISuccess | ISuccess | IFailure + +export interface IRedisError { + + error: Error +} + + +export interface IRedisClient { + + get(key: string): Promise> + + setex(key: string, secondsTtl: number, value: Buffer): Promise> +}