This class is design as a client for events.
It ensure that events are sended to the Dispatcher or save in the Redis,
and execute the provided eventCallback when a event concern this client.
await initRedis();
await initRedis({}, "subscriber");
const AVAILABLE_EVENTS = Object.freeze<Record<keyof Events, EventSubscribe>>(
([...eventsValidationFn.keys()].map((name) => {
return {
name,
delay: undefined,
horizontalScale: undefined
};
})).reduce((prev, curr) => Object.assign(prev, { [curr.name]: curr }), {}) as Record<keyof Events, EventSubscribe>
);
const incomer = new Incomer({
name: "foo",
eventsCast: [...Object.keys(AVAILABLE_EVENTS)],
eventsSubscribe: [...Object.values(AVAILABLE_EVENTS)],
eventCallback: async(event) => {
try {
// Do some async work
}
catch (error) {
if (error.reason === "ANY_RETRY_ERROR") {
return OK({
status: "UNRESOLVED",
reason: error.stack,
retryStrategy: {
maxIteration: 1
}
});
}
return Err(error.stack);
}
return OK({
status: "RESOLVED"
});
}
});
await incomer.initialize();
await incomer.close();
type Prefix = "test" | "development" | "staging" | "production";
type GenericEvent = {
name: string;
data: Record<string, any>;
[key: string]: any;
};
type EventSubscribe<T extends string | keyof Events = string> = {
name: T;
delay?: number;
horizontalScale?: boolean;
};
type CallBackEventMessage<
T extends GenericEvent = GenericEvent
> = T & {
eventTransactionId: string;
};
export type Resolved = "RESOLVED";
export type Unresolved = "UNRESOLVED";
type EventCallbackResponse<T extends Resolved | Unresolved = Resolved | Unresolved> = Result<
T extends Resolved ? {
status: T;
} : {
status: T;
retryStrategy?: {
maxIteration: number;
};
reason: string;
}, string>;
type NestedValidationFunctions = Map<string, ValidateFunction<Record<string, any>>>;
type customValidationCbFn<T extends GenericEvent> = (event: T) => void;
type eventsValidationFn<T extends GenericEvent> = Map<string, ValidateFunction<T> | NestedValidationFunctions>;
type IncomerOptions<T extends GenericEvent = GenericEvent> = {
/* Service name */
name: string;
prefix?: Prefix;
logger?: Partial<Logger> & Pick<Logger, "info" | "warn">;
standardLog?: StandardLog<T>;
eventsCast: string[];
eventsSubscribe: EventSubscribe[];
eventsValidation?: {
eventsValidationFn?: eventsValidationFn<T>;
customValidationCbFn?: customValidationCbFn<T>;
};
eventCallback: (message: CallBackEventMessage<T>) => Promise<EventCallbackResponse>;
dispatcherInactivityOptions?: {
/* max interval between received ping before considering dispatcher off */
maxPingInterval?: number;
/* max interval between a new event (based on ping interval) */
publishInterval?: number;
};
isDispatcherInstance?: boolean;
externalsInitialized?: boolean;
};
logger
Default logger is a pino logger.
⚠️ You can inject your own but you must ensure that the provided logger has those methodsinfo
|error
|warn
|debug
.
standardLog
Callback function use to formate logs related to custom events casting.
type StandardLogOpts<T extends GenericEvent = GenericEvent> = T & {
redisMetadata: {
transactionId: string;
origin?: string;
to?: string;
eventTransactionId?: string;
}
};
Default Callback function used.
function logValueFallback(value: string): string {
return value ?? "none";
}
function standardLog<T extends GenericEvent = EventOptions<keyof Events>>
(data: StandardLogOpts<T>) {
const logs = Array.from(mapped<T>(event)).join("|");
// eslint-disable-next-line max-len
const eventMeta = `name:${logValueFallback(event.name)}|ope:${logValueFallback(event.operation)}|from:${logValueFallback(event.redisMetadata.origin)}|to:${logValueFallback(event.redisMetadata.to)}`;
function log(message: string) {
return `(${logs})(${eventMeta}) ${message}`;
}
return log;
}
externalsInitialized
Use to initialize
externals
class. Asfalse
and with aprefix
with the valuetest
ordevelopment
, it will init adispatcher
and anincomer
in order to run tests without any other accessible APIs.
Publish the given event on Redis pubsub.
If there is no dispatcher alive, the event isn't publish but saved in Redis awaiting for an incoming publish.