From cc758035f09ff41f6a86f25a04425175c6f52043 Mon Sep 17 00:00:00 2001 From: Nicolas Hallaert <39910164+Rossb0b@users.noreply.github.com> Date: Tue, 25 Jul 2023 14:21:23 +0200 Subject: [PATCH] docs(): Dispatcher & Incomer (#64) refacto(): externalsInitialized options default value doesn't make externals to be init. test(): fix up, close incomer instances refacto(): Incomer options abortRegistrationTime as abortPublishTime refacto(): Dispatcher options default instance --- README.md | 16 +- docs/class/dispatcher.md | 166 ++++++++++++++++++ docs/class/incomer.md | 144 +++++++++++++++ example/fastify/feature/webhook.ts | 25 +-- src/class/eventManagement/dispatcher.class.ts | 4 +- src/class/eventManagement/incomer.class.ts | 18 +- .../class/eventManagement/dispatcher.spec.ts | 8 +- test/UT/class/eventManagement/events.spec.ts | 22 ++- .../class/eventManagement/externals.spec.ts | 8 +- test/UT/class/eventManagement/incomer.spec.ts | 18 +- test/UT/class/eventManagement/ping.spec.ts | 7 +- .../eventManagement/registration.spec.ts | 13 +- 12 files changed, 392 insertions(+), 57 deletions(-) create mode 100644 docs/class/dispatcher.md create mode 100644 docs/class/incomer.md diff --git a/README.md b/README.md index 60987c15..7822394d 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ ## 🚧 Requirements - [Node.js](https://nodejs.org/en/) version 16 or higher +- Docker (for running tests). ## 🚀 Getting Started @@ -130,13 +131,26 @@ if (isDeleteOperation(event.operation)) { ### API +#### Dispatcher & Incomer class + +> There is the documentation of [**Dispatcher**](./docs/class/dispatcher.md), and [**Incomer**](./docs/class/incomer.md) classes. + +--- + #### validate< T extends keyof Events >(options: EventOptions): void -Throw an error if a given event is not internaly known. + +> Throw an error if a given event is not internaly known. + +--- #### isCreateOperation< T extends keyof Events >(operation: EventOptions["operation"]): operation is Operation["create"] +--- + #### isUpdateOperation< T extends keyof Events >(operation: EventOptions["operation"]): operation is Operation["update"] +--- + #### isDeleteOperation< T extends keyof Events >(operation: EventOptions["operation"]): operation is Operation["delete"] ### Types diff --git a/docs/class/dispatcher.md b/docs/class/dispatcher.md new file mode 100644 index 00000000..0b6b9a66 --- /dev/null +++ b/docs/class/dispatcher.md @@ -0,0 +1,166 @@ +

+ Dispatcher +

+ +

+ This class is design as a gateway for events.
+ Firstly, it ensure that the given events are correctly formatted at run-time (using JSON-Schema).
+ Secondly, it ensure that events are spread & dealed. +

+ +## 📚 Usage + +```ts +await initRedis(); +await initRedis({}, "subscriber"); + +const dispatcher = new Dispatcher(); + +await dispatcher.initialize(); + +dispatcher.close(); +``` + +## Types + +```ts +type Prefix = "test" | "development" | "staging" | "production"; + +type GenericEvent = Record & { data: Record }; + +type DispatcherOptions = { + /* Prefix for the channel name, commonly used to distinguish envs */ + prefix?: Prefix; + logger?: Partial & Pick; + standardLog?: StandardLog; + eventsValidation?: { + eventsValidationFn?: Map> | CustomEventsValidationFunctions>; + validationCbFn?: (event: T) => void; + }, + pingInterval?: number; + checkLastActivityInterval?: number; + checkTransactionInterval?: number; + idleTime?: number; +}; +``` + +## Options + +
+logger +
+ +> Default logger is a pino logger.
+> ⚠️ You can inject your own but you must ensure that the provided logger has those methods `info` | `error` | `warn`. + +
+ +--- + +
+standardLog +
+ +> Callback function use to formate logs related to custom events casting. + +```ts +function standardLog> +(event: T & { redisMetadata: { transactionId: string } }) { + const logs = `foo: ${event.foo}`; + + function log(message: string) { + return `(${logs}) ${message}`; + } + + return log; +} +``` + +
+ +--- + +
+eventsValidation + +### eventsValidationFn + +> Map of Ajv validation functions related to events. + +```ts +const eventsValidationFn: MappedEventsValidationFn = new Map(); + +for (const [name, validationSchemas] of Object.entries(eventsValidationSchemas)) { + const operationsValidationFunctions: Map> = new Map(); + + for (const [operation, validationSchema] of Object.entries(validationSchemas)) { + operationsValidationFunctions.set(operation, ajv.compile(validationSchema)); + } + + eventsValidationFn.set(name, operationsValidationFunctions); +} +``` + +### validationCbFn + +> Callback validation function used to validate events according to the given eventsValidationFn. + +```ts +function validate(options: EventOptions) { + const { name, operation, data, scope, metadata } = options; + + if (!eventsValidationFn.has(name)) { + throw new Error(`Unknown "event": ${name}`); + } + + const event = eventsValidationFn.get(name); + if (!event.has(operation.toLocaleLowerCase())) { + throw new Error(`Unknown "operation": ${operation} for the "event": ${name}`); + } + + const operationValidationFunction = event.get(operation.toLocaleLowerCase()); + if (!operationValidationFunction(data)) { + throw new Error(`"event": ${name} | "operation": ${operation}: ${[...operationValidationFunction.errors] + .map((error) => error.message)}`); + } + + if (!metadataValidationFunction(metadata)) { + throw new Error(`metadata: ${[...metadataValidationFunction.errors].map((error) => error.message)}`); + } + + if (!scopeValidationFunction(scope)) { + throw new Error(`scope: ${[...scopeValidationFunction.errors].map((error) => error.message)}`); + } +} +``` + +
+ +--- + +
+Intervals + +### pingInterval + +> The interval use to ping known instances of `incomer`.
+> ⚠️ Must strictly be smaller than the idleTime options. + +### checkLastActivityInterval + +> The interval use to check on known instances of `incomer` state.
+> If those have no recent lastActivity, they are evicted. + +### checkTransactionInterval + +> The interval use to check on `transactions` state.
+> When a transaction related to an event is resolved, his state is update. According to this state, we can define if an event has been dealed through all related instances of `incomer`. + +### idleTime + +> The interval use to determine how many time an instance of an `incomer` can be inactive.
+> ⚠️ Must strictly be greater than the pingInterval options. + +
+ + diff --git a/docs/class/incomer.md b/docs/class/incomer.md new file mode 100644 index 00000000..3d09cb1e --- /dev/null +++ b/docs/class/incomer.md @@ -0,0 +1,144 @@ +

+ Incomer +

+ +

+ This class is design as a client for events.
+ It ensure that events are sended to the Dispatcher or save in the Redis,
+ and execute the provided eventCallback when a event concern this client. +

+ +## 📚 Usage + +```ts +await initRedis(); +await initRedis({}, "subscriber"); + +const AVAILABLE_EVENTS = Object.freeze>( + ([...eventsValidationFn.keys()].map((name) => { + return { + name, + delay: undefined, + horizontalScale: undefined + }; + })).reduce((prev, curr) => Object.assign(prev, { [curr.name]: curr }), {}) as Record +); + +const incomer = new Incomer({ + name: "foo", + eventsCast: [...Object.keys(AVAILABLE_EVENTS)], + eventsSubscribe: [...Object.values(AVAILABLE_EVENTS)], + eventCallback: (event) => { + console.log(event); + } +}); + +await incomer.initialize(); + +await incomer.close(); +``` + +## Types + +```ts +type Prefix = "test" | "development" | "staging" | "production"; + +type GenericEvent = Record & { data: Record }; + +type EventCast = T; + +type EventSubscribe = { + name: T; + delay?: number; + horizontalScale?: boolean; +}; + +type CallBackEventMessage< + T extends GenericEvent = GenericEvent +> = T & { + name: string; +}; + +type EventMessage< + T extends GenericEvent = GenericEvent +> = T & { + name: string; + redisMetadata: IncomerTransactionMetadata; +}; + +type IncomerOptions = { + /* Service name */ + name: string; + logger?: Partial & Pick; + standardLog?: StandardLog; + eventsCast: EventCast[]; + eventsSubscribe: EventSubscribe[]; + eventCallback: (message: CallBackEventMessage) => void; + prefix?: Prefix; + abortPublishTime?: number; + externalsInitialized?: boolean; +}; +``` + + +## Options + +
+logger +
+ +> Default logger is a pino logger.
+> ⚠️ You can inject your own but you must ensure that the provided logger has those methods `info` | `error` | `warn`. + +
+ +--- + +
+standardLog +
+ +> Callback function use to formate logs related to custom events casting. + +```ts +function standardLog> +(event: T & { redisMetadata: { transactionId: string } }) { + const logs = `foo: ${event.foo}`; + + function log(message: string) { + return `(${logs}) ${message}`; + } + + return log; +} +``` + +
+ +--- + +
+abortPublishTime +
+ +> Interval of time during which the `incomer` instance is going to wait to for a response from the `dispatcher` next to the registration demand or any event publishing.
+> If there is no recent activity from the `dispatcher`, those events are not publish and saved in Redis awaiting for the next iteration. + +
+ +--- + +
+externalsInitialized +
+ +> Use to initialize `externals` class. As `false` and with a `prefix` with the value `test` or `development`, it will init a `dispatcher` and an `incomer` in order to run tests without any other accessible APIs. + +
+ +## API + +### publish< K extends GenericEvent | null = null >(event: K extends null ? Omit< EventMessage< T >, "redisMetadata" >): Promise + +> Publish the given event on Redis pubsub.
+> If there is no dispatcher alive, the event isn't publish but saved in Redis awaiting for an incoming publish. diff --git a/example/fastify/feature/webhook.ts b/example/fastify/feature/webhook.ts index 05fcbee0..06734297 100644 --- a/example/fastify/feature/webhook.ts +++ b/example/fastify/feature/webhook.ts @@ -1,6 +1,3 @@ -// Import Node.js Dependencies -import { createHmac } from "crypto"; - // Import Internal Dependencies import * as MyEvents from "../../../src/index"; @@ -11,11 +8,9 @@ import { FastifyInstance } from "fastify"; -const kMyUnisoftToken = process.env.THIRD_PARTY_SECRET!; - export async function webhooksAPI(server: FastifyInstance) { - server.post("/anyEvents", { preHandler: signPayload }, getAnyWebhooks); - server.post("/connector", { preHandler: signPayload }, getConnectorWebhooks); + server.post("/anyEvents", getAnyWebhooks); + server.post("/connector", getConnectorWebhooks); } type GetAnyWebhooksRequest = FastifyRequest<{ @@ -41,19 +36,3 @@ type GetConnectorWebhooksRequest = FastifyRequest<{ async function getConnectorWebhooks(req: GetConnectorWebhooksRequest, reply: FastifyReply) { // Do some code } - -function signPayload(req: GetAnyWebhooksRequest, reply: FastifyReply, done) { - const webhooks = req.body; - const { date, signature } = req.headers; - - const signed = createHmac("sha256", kMyUnisoftToken) - .update(JSON.stringify({ webhooks, date })) - .digest("hex"); - - if (signed !== signature) { - reply.status(401).send(); - } - - done(); -} - diff --git a/src/class/eventManagement/dispatcher.class.ts b/src/class/eventManagement/dispatcher.class.ts index 60775bde..732e4fbd 100644 --- a/src/class/eventManagement/dispatcher.class.ts +++ b/src/class/eventManagement/dispatcher.class.ts @@ -72,7 +72,7 @@ export type DispatcherOptions = { checkLastActivityInterval?: number; checkTransactionInterval?: number; idleTime?: number; -} +}; type DispatcherChannelEvents = { name: "register" }; @@ -123,7 +123,7 @@ export class Dispatcher { private validationCbFn: (event: T) => void = null; private standardLogFn: StandardLog; - constructor(options: DispatcherOptions) { + constructor(options: DispatcherOptions = {}) { this.prefix = options.prefix ?? ""; this.formattedPrefix = options.prefix ? `${options.prefix}-` : ""; this.treeName = this.formattedPrefix + kIncomerStoreName; diff --git a/src/class/eventManagement/incomer.class.ts b/src/class/eventManagement/incomer.class.ts index d4e59472..7a58ef09 100644 --- a/src/class/eventManagement/incomer.class.ts +++ b/src/class/eventManagement/incomer.class.ts @@ -69,7 +69,7 @@ export type IncomerOptions = { eventsSubscribe: EventSubscribe[]; eventCallback: (message: CallBackEventMessage) => void; prefix?: Prefix; - abortRegistrationTime?: number; + abortPublishTime?: number; externalsInitialized?: boolean; }; @@ -92,7 +92,7 @@ export class Incomer < private incomerChannelName: string; private incomerTransactionStore: TransactionStore<"incomer">; private incomerChannel: Channel["IncomerMessages"]>; - private abortRegistrationTime = 60_000; + private abortPublishTime = 60_000; private standardLogFn: StandardLog; private checkRegistrationInterval: NodeJS.Timer; private checkTransactionsStateInterval: NodeJS.Timer; @@ -128,7 +128,7 @@ export class Incomer < }); if ( - (this.prefix === "test" || this.prefix === "development") && !options.externalsInitialized + (this.prefix === "test" || this.prefix === "development") && options.externalsInitialized === false ) { this.externals = new Externals(options); } @@ -137,7 +137,7 @@ export class Incomer < if (!this.dispatcherIsAlive) { return; } - else if (this.lastPingDate + (PING_INTERVAL + this.abortRegistrationTime) < Date.now()) { + else if (this.lastPingDate + (PING_INTERVAL + this.abortPublishTime) < Date.now()) { this.dispatcherIsAlive = false; return; @@ -151,7 +151,7 @@ export class Incomer < catch (error) { this.logger.error(error); } - }, this.abortRegistrationTime).unref(); + }, this.abortPublishTime).unref(); } get subscriber() { @@ -200,7 +200,7 @@ export class Incomer < this.logger.info("Registering as a new incomer on dispatcher"); try { - await once(this, "registered", { signal: AbortSignal.timeout(this.abortRegistrationTime) }); + await once(this, "registered", { signal: AbortSignal.timeout(this.abortPublishTime) }); } catch { this.checkRegistrationInterval = setInterval(async() => { @@ -217,7 +217,7 @@ export class Incomer < } }); - await once(this, "registered", { signal: AbortSignal.timeout(this.abortRegistrationTime) }); + await once(this, "registered", { signal: AbortSignal.timeout(this.abortPublishTime) }); } catch (error) { this.logger.error(error); @@ -231,7 +231,7 @@ export class Incomer < clearInterval(this.checkRegistrationInterval); this.checkRegistrationInterval = undefined; - }, this.abortRegistrationTime * 2).unref(); + }, this.abortPublishTime * 2).unref(); return; } @@ -295,7 +295,7 @@ export class Incomer < private async updateTransactionsStateTimeout() { try { - await setTimeout(this.abortRegistrationTime, undefined, { signal: kCancelTimeout.signal }); + await setTimeout(this.abortPublishTime, undefined, { signal: kCancelTimeout.signal }); kCancelTask.abort(); } catch { diff --git a/test/UT/class/eventManagement/dispatcher.spec.ts b/test/UT/class/eventManagement/dispatcher.spec.ts index 1f3963cd..9ca13cda 100644 --- a/test/UT/class/eventManagement/dispatcher.spec.ts +++ b/test/UT/class/eventManagement/dispatcher.spec.ts @@ -77,7 +77,7 @@ describe("Dispatcher", () => { afterAll(async() => { await clearAllKeys(); - await dispatcher.close(); + dispatcher.close(); await closeRedis("subscriber"); }); @@ -368,7 +368,7 @@ describe("Dispatcher", () => { }); afterAll(async() => { - await dispatcher.close(); + dispatcher.close(); await closeRedis("subscriber"); }); @@ -604,7 +604,7 @@ describe("Dispatcher", () => { }); afterAll(async() => { - await dispatcher.close(); + dispatcher.close(); await closeRedis("subscriber"); }); @@ -1029,7 +1029,7 @@ describe("Dispatcher", () => { }); afterAll(async() => { - await dispatcher.close(); + dispatcher.close(); await closeRedis("subscriber"); }); diff --git a/test/UT/class/eventManagement/events.spec.ts b/test/UT/class/eventManagement/events.spec.ts index 45f82214..6450e5ca 100644 --- a/test/UT/class/eventManagement/events.spec.ts +++ b/test/UT/class/eventManagement/events.spec.ts @@ -30,7 +30,6 @@ const mockedEventComeBackHandler = jest.fn(); describe("Publishing/exploiting a custom event", () => { let dispatcher: Dispatcher>; - let subscriber; beforeAll(async() => { await initRedis({ @@ -38,7 +37,7 @@ describe("Publishing/exploiting a custom event", () => { host: process.env.REDIS_HOST } as any); - subscriber = await initRedis({ + await initRedis({ port: process.env.REDIS_PORT, host: process.env.REDIS_HOST } as any, "subscriber"); @@ -58,7 +57,7 @@ describe("Publishing/exploiting a custom event", () => { }); afterAll(async() => { - await dispatcher.close(); + dispatcher.close(); await closeAllRedis(); }); @@ -91,6 +90,11 @@ describe("Publishing/exploiting a custom event", () => { } }; + afterAll(async() => { + await publisher.close(); + await unConcernedIncomer.close(); + }) + beforeAll(async() => { publisher = new Incomer({ name: randomUUID(), @@ -213,6 +217,12 @@ describe("Publishing/exploiting a custom event", () => { } }; + afterAll(async() => { + await publisher.close(); + await concernedIncomer.close(); + await secondConcernedIncomer.close(); + }); + beforeAll(async() => { publisher = new Incomer({ name: randomUUID(), @@ -411,6 +421,12 @@ describe("Publishing/exploiting a custom event", () => { } } + afterAll(async() => { + await publisher.close(); + await concernedIncomer.close(); + await secondConcernedIncomer.close(); + }); + beforeAll(async() => { publisher = new Incomer({ name: randomUUID(), diff --git a/test/UT/class/eventManagement/externals.spec.ts b/test/UT/class/eventManagement/externals.spec.ts index 41a317f4..ea8248e1 100644 --- a/test/UT/class/eventManagement/externals.spec.ts +++ b/test/UT/class/eventManagement/externals.spec.ts @@ -44,7 +44,7 @@ describe("Init Incomer without Dispatcher alive & prefix as \"development\" | \" eventsCast: ["accountingFolder"], eventsSubscribe: [], eventCallback: eventComeBackHandler, - abortRegistrationTime: 5_000, + abortPublishTime: 5_000, externalsInitialized: true }); @@ -67,7 +67,8 @@ describe("Init Incomer without Dispatcher alive & prefix as \"development\" | \" eventsCast: ["accountingFolder"], eventsSubscribe: [], eventCallback: eventComeBackHandler, - abortRegistrationTime: 5_000 + abortPublishTime: 5_000, + externalsInitialized: false }); test("it should init", async() => { @@ -89,7 +90,8 @@ describe("Init Incomer without Dispatcher alive & prefix as \"development\" | \" eventsCast: ["accountingFolder"], eventsSubscribe: [], eventCallback: eventComeBackHandler, - abortRegistrationTime: 5_000 + abortPublishTime: 5_000, + externalsInitialized: false }); test("it should init", async() => { diff --git a/test/UT/class/eventManagement/incomer.spec.ts b/test/UT/class/eventManagement/incomer.spec.ts index 32b04ffb..53b1c5ba 100644 --- a/test/UT/class/eventManagement/incomer.spec.ts +++ b/test/UT/class/eventManagement/incomer.spec.ts @@ -12,6 +12,9 @@ import * as Logger from "pino"; // Import Internal Dependencies import { Dispatcher, Incomer } from "../../../../src/index"; +// Internal Dependencies Mocks +const mockedIncomerHandleDispatcherMessage = jest.spyOn(Incomer.prototype as any, "handleDispatcherMessages"); + const incomerLogger = Logger.pino({ level: "debug" }); @@ -22,6 +25,7 @@ describe("Init Incomer without Dispatcher alive", () => { } let incomer: Incomer; + let dispatcher: Dispatcher; beforeAll(async() => { await initRedis({ @@ -40,24 +44,28 @@ describe("Init Incomer without Dispatcher alive", () => { eventsCast: [], eventsSubscribe: [], eventCallback: eventComeBackHandler, - abortRegistrationTime: 2_000, + abortPublishTime: 2_000, externalsInitialized: true }); + + dispatcher = new Dispatcher(); }); - test("Incomer should not init", async() => { + test("Incomer should init with or without a Dispatcher", async() => { await incomer.initialize(); - await timers.setTimeout(10_000); - - const dispatcher = new Dispatcher({}); + await timers.setTimeout(5_000); await dispatcher.initialize(); await timers.setTimeout(5_000); + + expect(mockedIncomerHandleDispatcherMessage).toHaveBeenCalled(); }) afterAll(async() => { + dispatcher.close(); + await incomer.close(); await closeAllRedis(); }); }); diff --git a/test/UT/class/eventManagement/ping.spec.ts b/test/UT/class/eventManagement/ping.spec.ts index ba7e79dc..9deb1dc7 100644 --- a/test/UT/class/eventManagement/ping.spec.ts +++ b/test/UT/class/eventManagement/ping.spec.ts @@ -20,7 +20,6 @@ const incomerLogger = Logger.pino({ level: "debug" }); const mockedIncomerLoggerInfo = jest.spyOn(incomerLogger, "info"); -const mockedDispatcherLoggerInfo = jest.spyOn(dispatcherLogger, "info"); describe("Ping", () => { const eventComeBackHandler = async(message) => { @@ -29,7 +28,6 @@ describe("Ping", () => { let dispatcher: Dispatcher; let incomer: Incomer; - let subscriber; beforeAll(async() => { await initRedis({ @@ -37,7 +35,7 @@ describe("Ping", () => { host: process.env.REDIS_HOST } as any); - subscriber = await initRedis({ + await initRedis({ port: process.env.REDIS_PORT, host: process.env.REDIS_HOST } as any, "subscriber"); @@ -68,7 +66,8 @@ describe("Ping", () => { }); afterAll(async() => { - await dispatcher.close(); + dispatcher.close(); + await incomer.close(); await closeAllRedis(); }); diff --git a/test/UT/class/eventManagement/registration.spec.ts b/test/UT/class/eventManagement/registration.spec.ts index 0be8ee65..cca43f7c 100644 --- a/test/UT/class/eventManagement/registration.spec.ts +++ b/test/UT/class/eventManagement/registration.spec.ts @@ -27,7 +27,6 @@ const mockedIncomerLoggerInfo = jest.spyOn(incomerLogger, "info"); describe("Registration", () => { let dispatcher: Dispatcher; let incomer: Incomer; - let subscriber; beforeAll(async() => { await initRedis({ @@ -35,7 +34,7 @@ describe("Registration", () => { host: process.env.REDIS_HOST } as any); - subscriber = await initRedis({ + await initRedis({ port: process.env.REDIS_PORT, host: process.env.REDIS_HOST } as any, "subscriber"); @@ -52,7 +51,7 @@ describe("Registration", () => { }); afterAll(async() => { - await dispatcher.close(); + dispatcher.close(); await closeAllRedis(); }); @@ -65,6 +64,10 @@ describe("Registration", () => { console.log(message); } + afterAll(async() => { + await incomer.close(); + }); + beforeAll(async() => { incomer = new Incomer({ name: "foo", @@ -101,6 +104,10 @@ describe("Registration", () => { console.log(message); } + afterAll(async() => { + await incomer.close(); + }) + beforeAll(async() => { incomer = new Incomer({ name: randomUUID(),