diff --git a/README.md b/README.md
index 60987c15..7822394d 100644
--- a/README.md
+++ b/README.md
@@ -15,6 +15,7 @@
## 🚧 Requirements
- [Node.js](https://nodejs.org/en/) version 16 or higher
+- Docker (for running tests).
## 🚀 Getting Started
@@ -130,13 +131,26 @@ if (isDeleteOperation(event.operation)) {
### API
+#### Dispatcher & Incomer class
+
+> There is the documentation of [**Dispatcher**](./docs/class/dispatcher.md), and [**Incomer**](./docs/class/incomer.md) classes.
+
+---
+
#### validate< T extends keyof Events >(options: EventOptions): void
-Throw an error if a given event is not internaly known.
+
+> Throw an error if a given event is not internaly known.
+
+---
#### isCreateOperation< T extends keyof Events >(operation: EventOptions["operation"]): operation is Operation["create"]
+---
+
#### isUpdateOperation< T extends keyof Events >(operation: EventOptions["operation"]): operation is Operation["update"]
+---
+
#### isDeleteOperation< T extends keyof Events >(operation: EventOptions["operation"]): operation is Operation["delete"]
### Types
diff --git a/docs/class/dispatcher.md b/docs/class/dispatcher.md
new file mode 100644
index 00000000..0b6b9a66
--- /dev/null
+++ b/docs/class/dispatcher.md
@@ -0,0 +1,166 @@
+
+ Dispatcher
+
+
+
+ This class is design as a gateway for events.
+ Firstly, it ensure that the given events are correctly formatted at run-time (using JSON-Schema).
+ Secondly, it ensure that events are spread & dealed.
+
+
+## 📚 Usage
+
+```ts
+await initRedis();
+await initRedis({}, "subscriber");
+
+const dispatcher = new Dispatcher();
+
+await dispatcher.initialize();
+
+dispatcher.close();
+```
+
+## Types
+
+```ts
+type Prefix = "test" | "development" | "staging" | "production";
+
+type GenericEvent = Record & { data: Record };
+
+type DispatcherOptions = {
+ /* Prefix for the channel name, commonly used to distinguish envs */
+ prefix?: Prefix;
+ logger?: Partial & Pick;
+ standardLog?: StandardLog;
+ eventsValidation?: {
+ eventsValidationFn?: Map> | CustomEventsValidationFunctions>;
+ validationCbFn?: (event: T) => void;
+ },
+ pingInterval?: number;
+ checkLastActivityInterval?: number;
+ checkTransactionInterval?: number;
+ idleTime?: number;
+};
+```
+
+## Options
+
+
+logger
+
+
+> Default logger is a pino logger.
+> ⚠️ You can inject your own but you must ensure that the provided logger has those methods `info` | `error` | `warn`.
+
+
+
+---
+
+
+standardLog
+
+
+> Callback function use to formate logs related to custom events casting.
+
+```ts
+function standardLog>
+(event: T & { redisMetadata: { transactionId: string } }) {
+ const logs = `foo: ${event.foo}`;
+
+ function log(message: string) {
+ return `(${logs}) ${message}`;
+ }
+
+ return log;
+}
+```
+
+
+
+---
+
+
+eventsValidation
+
+### eventsValidationFn
+
+> Map of Ajv validation functions related to events.
+
+```ts
+const eventsValidationFn: MappedEventsValidationFn = new Map();
+
+for (const [name, validationSchemas] of Object.entries(eventsValidationSchemas)) {
+ const operationsValidationFunctions: Map> = new Map();
+
+ for (const [operation, validationSchema] of Object.entries(validationSchemas)) {
+ operationsValidationFunctions.set(operation, ajv.compile(validationSchema));
+ }
+
+ eventsValidationFn.set(name, operationsValidationFunctions);
+}
+```
+
+### validationCbFn
+
+> Callback validation function used to validate events according to the given eventsValidationFn.
+
+```ts
+function validate(options: EventOptions) {
+ const { name, operation, data, scope, metadata } = options;
+
+ if (!eventsValidationFn.has(name)) {
+ throw new Error(`Unknown "event": ${name}`);
+ }
+
+ const event = eventsValidationFn.get(name);
+ if (!event.has(operation.toLocaleLowerCase())) {
+ throw new Error(`Unknown "operation": ${operation} for the "event": ${name}`);
+ }
+
+ const operationValidationFunction = event.get(operation.toLocaleLowerCase());
+ if (!operationValidationFunction(data)) {
+ throw new Error(`"event": ${name} | "operation": ${operation}: ${[...operationValidationFunction.errors]
+ .map((error) => error.message)}`);
+ }
+
+ if (!metadataValidationFunction(metadata)) {
+ throw new Error(`metadata: ${[...metadataValidationFunction.errors].map((error) => error.message)}`);
+ }
+
+ if (!scopeValidationFunction(scope)) {
+ throw new Error(`scope: ${[...scopeValidationFunction.errors].map((error) => error.message)}`);
+ }
+}
+```
+
+
+
+---
+
+
+Intervals
+
+### pingInterval
+
+> The interval use to ping known instances of `incomer`.
+> ⚠️ Must strictly be smaller than the idleTime options.
+
+### checkLastActivityInterval
+
+> The interval use to check on known instances of `incomer` state.
+> If those have no recent lastActivity, they are evicted.
+
+### checkTransactionInterval
+
+> The interval use to check on `transactions` state.
+> When a transaction related to an event is resolved, his state is update. According to this state, we can define if an event has been dealed through all related instances of `incomer`.
+
+### idleTime
+
+> The interval use to determine how many time an instance of an `incomer` can be inactive.
+> ⚠️ Must strictly be greater than the pingInterval options.
+
+
+
+
diff --git a/docs/class/incomer.md b/docs/class/incomer.md
new file mode 100644
index 00000000..3d09cb1e
--- /dev/null
+++ b/docs/class/incomer.md
@@ -0,0 +1,144 @@
+
+ Incomer
+
+
+
+ This class is design as a client for events.
+ It ensure that events are sended to the Dispatcher or save in the Redis,
+ and execute the provided eventCallback when a event concern this client.
+
+
+## 📚 Usage
+
+```ts
+await initRedis();
+await initRedis({}, "subscriber");
+
+const AVAILABLE_EVENTS = Object.freeze>(
+ ([...eventsValidationFn.keys()].map((name) => {
+ return {
+ name,
+ delay: undefined,
+ horizontalScale: undefined
+ };
+ })).reduce((prev, curr) => Object.assign(prev, { [curr.name]: curr }), {}) as Record
+);
+
+const incomer = new Incomer({
+ name: "foo",
+ eventsCast: [...Object.keys(AVAILABLE_EVENTS)],
+ eventsSubscribe: [...Object.values(AVAILABLE_EVENTS)],
+ eventCallback: (event) => {
+ console.log(event);
+ }
+});
+
+await incomer.initialize();
+
+await incomer.close();
+```
+
+## Types
+
+```ts
+type Prefix = "test" | "development" | "staging" | "production";
+
+type GenericEvent = Record & { data: Record };
+
+type EventCast = T;
+
+type EventSubscribe = {
+ name: T;
+ delay?: number;
+ horizontalScale?: boolean;
+};
+
+type CallBackEventMessage<
+ T extends GenericEvent = GenericEvent
+> = T & {
+ name: string;
+};
+
+type EventMessage<
+ T extends GenericEvent = GenericEvent
+> = T & {
+ name: string;
+ redisMetadata: IncomerTransactionMetadata;
+};
+
+type IncomerOptions = {
+ /* Service name */
+ name: string;
+ logger?: Partial & Pick;
+ standardLog?: StandardLog;
+ eventsCast: EventCast[];
+ eventsSubscribe: EventSubscribe[];
+ eventCallback: (message: CallBackEventMessage) => void;
+ prefix?: Prefix;
+ abortPublishTime?: number;
+ externalsInitialized?: boolean;
+};
+```
+
+
+## Options
+
+
+logger
+
+
+> Default logger is a pino logger.
+> ⚠️ You can inject your own but you must ensure that the provided logger has those methods `info` | `error` | `warn`.
+
+
+
+---
+
+
+standardLog
+
+
+> Callback function use to formate logs related to custom events casting.
+
+```ts
+function standardLog>
+(event: T & { redisMetadata: { transactionId: string } }) {
+ const logs = `foo: ${event.foo}`;
+
+ function log(message: string) {
+ return `(${logs}) ${message}`;
+ }
+
+ return log;
+}
+```
+
+
+
+---
+
+
+abortPublishTime
+
+
+> Interval of time during which the `incomer` instance is going to wait to for a response from the `dispatcher` next to the registration demand or any event publishing.
+> If there is no recent activity from the `dispatcher`, those events are not publish and saved in Redis awaiting for the next iteration.
+
+
+
+---
+
+
+externalsInitialized
+
+
+> Use to initialize `externals` class. As `false` and with a `prefix` with the value `test` or `development`, it will init a `dispatcher` and an `incomer` in order to run tests without any other accessible APIs.
+
+
+
+## API
+
+### publish< K extends GenericEvent | null = null >(event: K extends null ? Omit< EventMessage< T >, "redisMetadata" >): Promise
+
+> Publish the given event on Redis pubsub.
+> If there is no dispatcher alive, the event isn't publish but saved in Redis awaiting for an incoming publish.
diff --git a/example/fastify/feature/webhook.ts b/example/fastify/feature/webhook.ts
index 05fcbee0..06734297 100644
--- a/example/fastify/feature/webhook.ts
+++ b/example/fastify/feature/webhook.ts
@@ -1,6 +1,3 @@
-// Import Node.js Dependencies
-import { createHmac } from "crypto";
-
// Import Internal Dependencies
import * as MyEvents from "../../../src/index";
@@ -11,11 +8,9 @@ import {
FastifyInstance
} from "fastify";
-const kMyUnisoftToken = process.env.THIRD_PARTY_SECRET!;
-
export async function webhooksAPI(server: FastifyInstance) {
- server.post("/anyEvents", { preHandler: signPayload }, getAnyWebhooks);
- server.post("/connector", { preHandler: signPayload }, getConnectorWebhooks);
+ server.post("/anyEvents", getAnyWebhooks);
+ server.post("/connector", getConnectorWebhooks);
}
type GetAnyWebhooksRequest = FastifyRequest<{
@@ -41,19 +36,3 @@ type GetConnectorWebhooksRequest = FastifyRequest<{
async function getConnectorWebhooks(req: GetConnectorWebhooksRequest, reply: FastifyReply) {
// Do some code
}
-
-function signPayload(req: GetAnyWebhooksRequest, reply: FastifyReply, done) {
- const webhooks = req.body;
- const { date, signature } = req.headers;
-
- const signed = createHmac("sha256", kMyUnisoftToken)
- .update(JSON.stringify({ webhooks, date }))
- .digest("hex");
-
- if (signed !== signature) {
- reply.status(401).send();
- }
-
- done();
-}
-
diff --git a/src/class/eventManagement/dispatcher.class.ts b/src/class/eventManagement/dispatcher.class.ts
index 60775bde..732e4fbd 100644
--- a/src/class/eventManagement/dispatcher.class.ts
+++ b/src/class/eventManagement/dispatcher.class.ts
@@ -72,7 +72,7 @@ export type DispatcherOptions = {
checkLastActivityInterval?: number;
checkTransactionInterval?: number;
idleTime?: number;
-}
+};
type DispatcherChannelEvents = { name: "register" };
@@ -123,7 +123,7 @@ export class Dispatcher {
private validationCbFn: (event: T) => void = null;
private standardLogFn: StandardLog;
- constructor(options: DispatcherOptions) {
+ constructor(options: DispatcherOptions = {}) {
this.prefix = options.prefix ?? "";
this.formattedPrefix = options.prefix ? `${options.prefix}-` : "";
this.treeName = this.formattedPrefix + kIncomerStoreName;
diff --git a/src/class/eventManagement/incomer.class.ts b/src/class/eventManagement/incomer.class.ts
index d4e59472..7a58ef09 100644
--- a/src/class/eventManagement/incomer.class.ts
+++ b/src/class/eventManagement/incomer.class.ts
@@ -69,7 +69,7 @@ export type IncomerOptions = {
eventsSubscribe: EventSubscribe[];
eventCallback: (message: CallBackEventMessage) => void;
prefix?: Prefix;
- abortRegistrationTime?: number;
+ abortPublishTime?: number;
externalsInitialized?: boolean;
};
@@ -92,7 +92,7 @@ export class Incomer <
private incomerChannelName: string;
private incomerTransactionStore: TransactionStore<"incomer">;
private incomerChannel: Channel["IncomerMessages"]>;
- private abortRegistrationTime = 60_000;
+ private abortPublishTime = 60_000;
private standardLogFn: StandardLog;
private checkRegistrationInterval: NodeJS.Timer;
private checkTransactionsStateInterval: NodeJS.Timer;
@@ -128,7 +128,7 @@ export class Incomer <
});
if (
- (this.prefix === "test" || this.prefix === "development") && !options.externalsInitialized
+ (this.prefix === "test" || this.prefix === "development") && options.externalsInitialized === false
) {
this.externals = new Externals(options);
}
@@ -137,7 +137,7 @@ export class Incomer <
if (!this.dispatcherIsAlive) {
return;
}
- else if (this.lastPingDate + (PING_INTERVAL + this.abortRegistrationTime) < Date.now()) {
+ else if (this.lastPingDate + (PING_INTERVAL + this.abortPublishTime) < Date.now()) {
this.dispatcherIsAlive = false;
return;
@@ -151,7 +151,7 @@ export class Incomer <
catch (error) {
this.logger.error(error);
}
- }, this.abortRegistrationTime).unref();
+ }, this.abortPublishTime).unref();
}
get subscriber() {
@@ -200,7 +200,7 @@ export class Incomer <
this.logger.info("Registering as a new incomer on dispatcher");
try {
- await once(this, "registered", { signal: AbortSignal.timeout(this.abortRegistrationTime) });
+ await once(this, "registered", { signal: AbortSignal.timeout(this.abortPublishTime) });
}
catch {
this.checkRegistrationInterval = setInterval(async() => {
@@ -217,7 +217,7 @@ export class Incomer <
}
});
- await once(this, "registered", { signal: AbortSignal.timeout(this.abortRegistrationTime) });
+ await once(this, "registered", { signal: AbortSignal.timeout(this.abortPublishTime) });
}
catch (error) {
this.logger.error(error);
@@ -231,7 +231,7 @@ export class Incomer <
clearInterval(this.checkRegistrationInterval);
this.checkRegistrationInterval = undefined;
- }, this.abortRegistrationTime * 2).unref();
+ }, this.abortPublishTime * 2).unref();
return;
}
@@ -295,7 +295,7 @@ export class Incomer <
private async updateTransactionsStateTimeout() {
try {
- await setTimeout(this.abortRegistrationTime, undefined, { signal: kCancelTimeout.signal });
+ await setTimeout(this.abortPublishTime, undefined, { signal: kCancelTimeout.signal });
kCancelTask.abort();
}
catch {
diff --git a/test/UT/class/eventManagement/dispatcher.spec.ts b/test/UT/class/eventManagement/dispatcher.spec.ts
index 1f3963cd..9ca13cda 100644
--- a/test/UT/class/eventManagement/dispatcher.spec.ts
+++ b/test/UT/class/eventManagement/dispatcher.spec.ts
@@ -77,7 +77,7 @@ describe("Dispatcher", () => {
afterAll(async() => {
await clearAllKeys();
- await dispatcher.close();
+ dispatcher.close();
await closeRedis("subscriber");
});
@@ -368,7 +368,7 @@ describe("Dispatcher", () => {
});
afterAll(async() => {
- await dispatcher.close();
+ dispatcher.close();
await closeRedis("subscriber");
});
@@ -604,7 +604,7 @@ describe("Dispatcher", () => {
});
afterAll(async() => {
- await dispatcher.close();
+ dispatcher.close();
await closeRedis("subscriber");
});
@@ -1029,7 +1029,7 @@ describe("Dispatcher", () => {
});
afterAll(async() => {
- await dispatcher.close();
+ dispatcher.close();
await closeRedis("subscriber");
});
diff --git a/test/UT/class/eventManagement/events.spec.ts b/test/UT/class/eventManagement/events.spec.ts
index 45f82214..6450e5ca 100644
--- a/test/UT/class/eventManagement/events.spec.ts
+++ b/test/UT/class/eventManagement/events.spec.ts
@@ -30,7 +30,6 @@ const mockedEventComeBackHandler = jest.fn();
describe("Publishing/exploiting a custom event", () => {
let dispatcher: Dispatcher>;
- let subscriber;
beforeAll(async() => {
await initRedis({
@@ -38,7 +37,7 @@ describe("Publishing/exploiting a custom event", () => {
host: process.env.REDIS_HOST
} as any);
- subscriber = await initRedis({
+ await initRedis({
port: process.env.REDIS_PORT,
host: process.env.REDIS_HOST
} as any, "subscriber");
@@ -58,7 +57,7 @@ describe("Publishing/exploiting a custom event", () => {
});
afterAll(async() => {
- await dispatcher.close();
+ dispatcher.close();
await closeAllRedis();
});
@@ -91,6 +90,11 @@ describe("Publishing/exploiting a custom event", () => {
}
};
+ afterAll(async() => {
+ await publisher.close();
+ await unConcernedIncomer.close();
+ })
+
beforeAll(async() => {
publisher = new Incomer({
name: randomUUID(),
@@ -213,6 +217,12 @@ describe("Publishing/exploiting a custom event", () => {
}
};
+ afterAll(async() => {
+ await publisher.close();
+ await concernedIncomer.close();
+ await secondConcernedIncomer.close();
+ });
+
beforeAll(async() => {
publisher = new Incomer({
name: randomUUID(),
@@ -411,6 +421,12 @@ describe("Publishing/exploiting a custom event", () => {
}
}
+ afterAll(async() => {
+ await publisher.close();
+ await concernedIncomer.close();
+ await secondConcernedIncomer.close();
+ });
+
beforeAll(async() => {
publisher = new Incomer({
name: randomUUID(),
diff --git a/test/UT/class/eventManagement/externals.spec.ts b/test/UT/class/eventManagement/externals.spec.ts
index 41a317f4..ea8248e1 100644
--- a/test/UT/class/eventManagement/externals.spec.ts
+++ b/test/UT/class/eventManagement/externals.spec.ts
@@ -44,7 +44,7 @@ describe("Init Incomer without Dispatcher alive & prefix as \"development\" | \"
eventsCast: ["accountingFolder"],
eventsSubscribe: [],
eventCallback: eventComeBackHandler,
- abortRegistrationTime: 5_000,
+ abortPublishTime: 5_000,
externalsInitialized: true
});
@@ -67,7 +67,8 @@ describe("Init Incomer without Dispatcher alive & prefix as \"development\" | \"
eventsCast: ["accountingFolder"],
eventsSubscribe: [],
eventCallback: eventComeBackHandler,
- abortRegistrationTime: 5_000
+ abortPublishTime: 5_000,
+ externalsInitialized: false
});
test("it should init", async() => {
@@ -89,7 +90,8 @@ describe("Init Incomer without Dispatcher alive & prefix as \"development\" | \"
eventsCast: ["accountingFolder"],
eventsSubscribe: [],
eventCallback: eventComeBackHandler,
- abortRegistrationTime: 5_000
+ abortPublishTime: 5_000,
+ externalsInitialized: false
});
test("it should init", async() => {
diff --git a/test/UT/class/eventManagement/incomer.spec.ts b/test/UT/class/eventManagement/incomer.spec.ts
index 32b04ffb..53b1c5ba 100644
--- a/test/UT/class/eventManagement/incomer.spec.ts
+++ b/test/UT/class/eventManagement/incomer.spec.ts
@@ -12,6 +12,9 @@ import * as Logger from "pino";
// Import Internal Dependencies
import { Dispatcher, Incomer } from "../../../../src/index";
+// Internal Dependencies Mocks
+const mockedIncomerHandleDispatcherMessage = jest.spyOn(Incomer.prototype as any, "handleDispatcherMessages");
+
const incomerLogger = Logger.pino({
level: "debug"
});
@@ -22,6 +25,7 @@ describe("Init Incomer without Dispatcher alive", () => {
}
let incomer: Incomer;
+ let dispatcher: Dispatcher;
beforeAll(async() => {
await initRedis({
@@ -40,24 +44,28 @@ describe("Init Incomer without Dispatcher alive", () => {
eventsCast: [],
eventsSubscribe: [],
eventCallback: eventComeBackHandler,
- abortRegistrationTime: 2_000,
+ abortPublishTime: 2_000,
externalsInitialized: true
});
+
+ dispatcher = new Dispatcher();
});
- test("Incomer should not init", async() => {
+ test("Incomer should init with or without a Dispatcher", async() => {
await incomer.initialize();
- await timers.setTimeout(10_000);
-
- const dispatcher = new Dispatcher({});
+ await timers.setTimeout(5_000);
await dispatcher.initialize();
await timers.setTimeout(5_000);
+
+ expect(mockedIncomerHandleDispatcherMessage).toHaveBeenCalled();
})
afterAll(async() => {
+ dispatcher.close();
+ await incomer.close();
await closeAllRedis();
});
});
diff --git a/test/UT/class/eventManagement/ping.spec.ts b/test/UT/class/eventManagement/ping.spec.ts
index ba7e79dc..9deb1dc7 100644
--- a/test/UT/class/eventManagement/ping.spec.ts
+++ b/test/UT/class/eventManagement/ping.spec.ts
@@ -20,7 +20,6 @@ const incomerLogger = Logger.pino({
level: "debug"
});
const mockedIncomerLoggerInfo = jest.spyOn(incomerLogger, "info");
-const mockedDispatcherLoggerInfo = jest.spyOn(dispatcherLogger, "info");
describe("Ping", () => {
const eventComeBackHandler = async(message) => {
@@ -29,7 +28,6 @@ describe("Ping", () => {
let dispatcher: Dispatcher;
let incomer: Incomer;
- let subscriber;
beforeAll(async() => {
await initRedis({
@@ -37,7 +35,7 @@ describe("Ping", () => {
host: process.env.REDIS_HOST
} as any);
- subscriber = await initRedis({
+ await initRedis({
port: process.env.REDIS_PORT,
host: process.env.REDIS_HOST
} as any, "subscriber");
@@ -68,7 +66,8 @@ describe("Ping", () => {
});
afterAll(async() => {
- await dispatcher.close();
+ dispatcher.close();
+ await incomer.close();
await closeAllRedis();
});
diff --git a/test/UT/class/eventManagement/registration.spec.ts b/test/UT/class/eventManagement/registration.spec.ts
index 0be8ee65..cca43f7c 100644
--- a/test/UT/class/eventManagement/registration.spec.ts
+++ b/test/UT/class/eventManagement/registration.spec.ts
@@ -27,7 +27,6 @@ const mockedIncomerLoggerInfo = jest.spyOn(incomerLogger, "info");
describe("Registration", () => {
let dispatcher: Dispatcher;
let incomer: Incomer;
- let subscriber;
beforeAll(async() => {
await initRedis({
@@ -35,7 +34,7 @@ describe("Registration", () => {
host: process.env.REDIS_HOST
} as any);
- subscriber = await initRedis({
+ await initRedis({
port: process.env.REDIS_PORT,
host: process.env.REDIS_HOST
} as any, "subscriber");
@@ -52,7 +51,7 @@ describe("Registration", () => {
});
afterAll(async() => {
- await dispatcher.close();
+ dispatcher.close();
await closeAllRedis();
});
@@ -65,6 +64,10 @@ describe("Registration", () => {
console.log(message);
}
+ afterAll(async() => {
+ await incomer.close();
+ });
+
beforeAll(async() => {
incomer = new Incomer({
name: "foo",
@@ -101,6 +104,10 @@ describe("Registration", () => {
console.log(message);
}
+ afterAll(async() => {
+ await incomer.close();
+ })
+
beforeAll(async() => {
incomer = new Incomer({
name: randomUUID(),