Skip to content

Commit

Permalink
fix: TECH review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
eugene-taran committed Jan 16, 2025
1 parent 982a265 commit 2c6907c
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 0 deletions.
20 changes: 20 additions & 0 deletions packages/pubsub/src/MessageHandlerIdempotent.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,35 @@
import { JOIN_IDEMPOTENCY_KEY } from './Publisher'
import { IRedisClient } from './redis-types/IRedisClient'
import { IMessageInfo, IParsedMessage } from './Subscriber'
import { ISubscriber } from './SubscriberFactory'

type GetIdempotencyKeyFunction<T> = (msg: T, info: IMessageInfo) => string | undefined

export abstract class IdempotencyStorage {

public abstract exists(key: string): Promise<boolean>

public abstract save(key: string): Promise<void>
}

export class RedisIdempotencyStorage implements IdempotencyStorage {
constructor(private readonly redisClient: IRedisClient,
private readonly redisDefaultTtl: number) {}

public async exists(key: string): Promise<boolean> {
const value = await this.redisClient.get(key)
if (value.success) {
return !!value.get()
} else {
return false
}
}

public async save(key: string): Promise<void> {
await this.redisClient.setex(key, this.redisDefaultTtl, Buffer.from(new Date()))
}
}

/**
* Idempotent message handler, requires idempotency storage to be provided.
* Will check if message was already processed by checking idempotency key in the storage, and will skip it if it was.
Expand Down
35 changes: 35 additions & 0 deletions packages/pubsub/src/redis-types/IRedisClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* This file contains interfaces from https://github.com/join-com/caching ,
* for a less duplicated code when implementing idempotent handler on services.
*/

export interface IFailure<T> {
success: false
failure: true

get(): T
}

export interface ISuccess<T> {
success: true
failure: false

get(): T
}

export type RedisResult<T, R> = ISuccess<T> | IFailure<R>

export type OptionalRedisResult<T, R> = ISuccess<T> | ISuccess<undefined> | IFailure<R>

export interface IRedisError {

error: Error
}


export interface IRedisClient {

get(key: string): Promise<OptionalRedisResult<Buffer, IRedisError>>

setex(key: string, secondsTtl: number, value: Buffer): Promise<RedisResult<void, IRedisError>>
}

0 comments on commit 2c6907c

Please sign in to comment.