Skip to content

Commit

Permalink
docs(): Dispatcher & Incomer (#64)
Browse files Browse the repository at this point in the history
refacto(): externalsInitialized options default value doesn't make externals to be init.
test(): fix up, close incomer instances
refacto(): Incomer options abortRegistrationTime as abortPublishTime
refacto(): Dispatcher options default instance
  • Loading branch information
Rossb0b authored Jul 25, 2023
1 parent 636f570 commit cc75803
Show file tree
Hide file tree
Showing 12 changed files with 392 additions and 57 deletions.
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
## 🚧 Requirements

- [Node.js](https://nodejs.org/en/) version 16 or higher
- Docker (for running tests).

## 🚀 Getting Started

Expand Down Expand Up @@ -130,13 +131,26 @@ if (isDeleteOperation(event.operation)) {

### API

#### Dispatcher & Incomer class

> There is the documentation of [**Dispatcher**](./docs/class/dispatcher.md), and [**Incomer**](./docs/class/incomer.md) classes.
---

#### validate< T extends keyof Events >(options: EventOptions<T>): void
Throw an error if a given event is not internaly known.

> Throw an error if a given event is not internaly known.
---

#### isCreateOperation< T extends keyof Events >(operation: EventOptions<T>["operation"]): operation is Operation["create"]

---

#### isUpdateOperation< T extends keyof Events >(operation: EventOptions<T>["operation"]): operation is Operation["update"]

---

#### isDeleteOperation< T extends keyof Events >(operation: EventOptions<T>["operation"]): operation is Operation["delete"]

### Types
Expand Down
166 changes: 166 additions & 0 deletions docs/class/dispatcher.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
<p align="center"><h1 align="center">
Dispatcher
</h1></p>

<p align="center">
This class is design as a gateway for events. <br/>
Firstly, it ensure that the given events are correctly formatted at run-time (using JSON-Schema). <br/>
Secondly, it ensure that events are spread & dealed.
</p>

## 📚 Usage

```ts
await initRedis();
await initRedis({}, "subscriber");

const dispatcher = new Dispatcher();

await dispatcher.initialize();

dispatcher.close();
```

## Types

```ts
type Prefix = "test" | "development" | "staging" | "production";

type GenericEvent = Record<string, any> & { data: Record<string, any> };

type DispatcherOptions<T extends GenericEvent = GenericEvent> = {
/* Prefix for the channel name, commonly used to distinguish envs */
prefix?: Prefix;
logger?: Partial<Logger> & Pick<Logger, "info" | "warn">;
standardLog?: StandardLog<T>;
eventsValidation?: {
eventsValidationFn?: Map<string, ValidateFunction<Record<string, any>> | CustomEventsValidationFunctions>;
validationCbFn?: (event: T) => void;
},
pingInterval?: number;
checkLastActivityInterval?: number;
checkTransactionInterval?: number;
idleTime?: number;
};
```

## Options

<details>
<summary><b>logger</b></summary>
<br/>

> Default logger is a pino logger. <br/>
> ⚠️ You can inject your own but you must ensure that the provided logger has those methods `info` | `error` | `warn`.
</details>

---

<details>
<summary><b>standardLog</b></summary>
<br/>

> Callback function use to formate logs related to custom events casting.
```ts
function standardLog<T extends GenericEvent = EventOptions<keyof Events>>
(event: T & { redisMetadata: { transactionId: string } }) {
const logs = `foo: ${event.foo}`;

function log(message: string) {
return `(${logs}) ${message}`;
}

return log;
}
```

</details>

---

<details>
<summary><b>eventsValidation</b></summary>

### eventsValidationFn

> Map of Ajv validation functions related to events.
```ts
const eventsValidationFn: MappedEventsValidationFn = new Map<string, CustomEventsValidationFunctions>();

for (const [name, validationSchemas] of Object.entries(eventsValidationSchemas)) {
const operationsValidationFunctions: Map<string, ValidateFunction<OperationFunctions>> = new Map();

for (const [operation, validationSchema] of Object.entries(validationSchemas)) {
operationsValidationFunctions.set(operation, ajv.compile(validationSchema));
}

eventsValidationFn.set(name, operationsValidationFunctions);
}
```

### validationCbFn

> Callback validation function used to validate events according to the given eventsValidationFn.
```ts
function validate<T extends keyof Events = keyof Events>(options: EventOptions<T>) {
const { name, operation, data, scope, metadata } = options;

if (!eventsValidationFn.has(name)) {
throw new Error(`Unknown "event": ${name}`);
}

const event = eventsValidationFn.get(name);
if (!event.has(operation.toLocaleLowerCase())) {
throw new Error(`Unknown "operation": ${operation} for the "event": ${name}`);
}

const operationValidationFunction = event.get(operation.toLocaleLowerCase());
if (!operationValidationFunction(data)) {
throw new Error(`"event": ${name} | "operation": ${operation}: ${[...operationValidationFunction.errors]
.map((error) => error.message)}`);
}

if (!metadataValidationFunction(metadata)) {
throw new Error(`metadata: ${[...metadataValidationFunction.errors].map((error) => error.message)}`);
}

if (!scopeValidationFunction(scope)) {
throw new Error(`scope: ${[...scopeValidationFunction.errors].map((error) => error.message)}`);
}
}
```

</details>

---

<details>
<summary><b>Intervals</b></summary>

### pingInterval

> The interval use to ping known instances of `incomer`. <br/>
> ⚠️ Must strictly be smaller than the idleTime options.
### checkLastActivityInterval

> The interval use to check on known instances of `incomer` state. <br/>
> If those have no recent lastActivity, they are evicted.
### checkTransactionInterval

> The interval use to check on `transactions` state. <br/>
> When a transaction related to an event is resolved, his state is update. According to this state, we can define if an event has been dealed through all related instances of `incomer`.
### idleTime

> The interval use to determine how many time an instance of an `incomer` can be inactive. <br/>
> ⚠️ Must strictly be greater than the pingInterval options.
</details>


144 changes: 144 additions & 0 deletions docs/class/incomer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
<p align="center"><h1 align="center">
Incomer
</h1></p>

<p align="center">
This class is design as a client for events. <br/>
It ensure that events are sended to the <b>Dispatcher</b> or save in the Redis, <br/>
and execute the provided <b>eventCallback</b> when a event concern this client.
</p>

## 📚 Usage

```ts
await initRedis();
await initRedis({}, "subscriber");

const AVAILABLE_EVENTS = Object.freeze<Record<keyof Events, EventSubscribe>>(
([...eventsValidationFn.keys()].map((name) => {
return {
name,
delay: undefined,
horizontalScale: undefined
};
})).reduce((prev, curr) => Object.assign(prev, { [curr.name]: curr }), {}) as Record<keyof Events, EventSubscribe>
);

const incomer = new Incomer({
name: "foo",
eventsCast: [...Object.keys(AVAILABLE_EVENTS)],
eventsSubscribe: [...Object.values(AVAILABLE_EVENTS)],
eventCallback: (event) => {
console.log(event);
}
});

await incomer.initialize();

await incomer.close();
```

## Types

```ts
type Prefix = "test" | "development" | "staging" | "production";

type GenericEvent = Record<string, any> & { data: Record<string, any> };

type EventCast<T extends string | keyof Events = string> = T;

type EventSubscribe<T extends string | keyof Events = string> = {
name: T;
delay?: number;
horizontalScale?: boolean;
};

type CallBackEventMessage<
T extends GenericEvent = GenericEvent
> = T & {
name: string;
};

type EventMessage<
T extends GenericEvent = GenericEvent
> = T & {
name: string;
redisMetadata: IncomerTransactionMetadata;
};

type IncomerOptions<T extends GenericEvent = GenericEvent> = {
/* Service name */
name: string;
logger?: Partial<Logger> & Pick<Logger, "info" | "warn">;
standardLog?: StandardLog<T>;
eventsCast: EventCast[];
eventsSubscribe: EventSubscribe[];
eventCallback: (message: CallBackEventMessage<T>) => void;
prefix?: Prefix;
abortPublishTime?: number;
externalsInitialized?: boolean;
};
```


## Options

<details>
<summary><b>logger</b></summary>
<br/>

> Default logger is a pino logger. <br/>
> ⚠️ You can inject your own but you must ensure that the provided logger has those methods `info` | `error` | `warn`.
</details>

---

<details>
<summary><b>standardLog</b></summary>
<br/>

> Callback function use to formate logs related to custom events casting.
```ts
function standardLog<T extends GenericEvent = EventOptions<keyof Events>>
(event: T & { redisMetadata: { transactionId: string } }) {
const logs = `foo: ${event.foo}`;

function log(message: string) {
return `(${logs}) ${message}`;
}

return log;
}
```

</details>

---

<details>
<summary><b>abortPublishTime</b></summary>
<br/>

> Interval of time during which the `incomer` instance is going to wait to for a response from the `dispatcher` next to the registration demand or any event publishing. <br/>
> If there is no recent activity from the `dispatcher`, those events are not publish and saved in Redis awaiting for the next iteration.
</details>

---

<details>
<summary><b>externalsInitialized</b></summary>
<br/>

> Use to initialize `externals` class. As `false` and with a `prefix` with the value `test` or `development`, it will init a `dispatcher` and an `incomer` in order to run tests without any other accessible APIs.
</details>

## API

### publish< K extends GenericEvent | null = null >(event: K extends null ? Omit< EventMessage< T >, "redisMetadata" >): Promise<void>

> Publish the given event on Redis pubsub. <br/>
> If there is no dispatcher alive, the event isn't publish but saved in Redis awaiting for an incoming publish.
25 changes: 2 additions & 23 deletions example/fastify/feature/webhook.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
// Import Node.js Dependencies
import { createHmac } from "crypto";

// Import Internal Dependencies
import * as MyEvents from "../../../src/index";

Expand All @@ -11,11 +8,9 @@ import {
FastifyInstance
} from "fastify";

const kMyUnisoftToken = process.env.THIRD_PARTY_SECRET!;

export async function webhooksAPI(server: FastifyInstance) {
server.post("/anyEvents", { preHandler: signPayload }, getAnyWebhooks);
server.post("/connector", { preHandler: signPayload }, getConnectorWebhooks);
server.post("/anyEvents", getAnyWebhooks);
server.post("/connector", getConnectorWebhooks);
}

type GetAnyWebhooksRequest = FastifyRequest<{
Expand All @@ -41,19 +36,3 @@ type GetConnectorWebhooksRequest = FastifyRequest<{
async function getConnectorWebhooks(req: GetConnectorWebhooksRequest, reply: FastifyReply) {
// Do some code
}

function signPayload(req: GetAnyWebhooksRequest, reply: FastifyReply, done) {
const webhooks = req.body;
const { date, signature } = req.headers;

const signed = createHmac("sha256", kMyUnisoftToken)
.update(JSON.stringify({ webhooks, date }))
.digest("hex");

if (signed !== signature) {
reply.status(401).send();
}

done();
}

Loading

0 comments on commit cc75803

Please sign in to comment.