Skip to content

Commit

Permalink
refactor(/class/eventManagement/dispatcher): handle promises rejectio… (
Browse files Browse the repository at this point in the history
#258)

* refactor(/class/eventManagement/dispatcher): handle promises rejection on eventsHandler events

* fix(class/eventManagement/incomer): fix publish on unpublished events & document json schema

* test(UT/class/eventManagement/handle-inactive): fix ut

* test(ut/class/eventManagement/incomer): fix ut
  • Loading branch information
Rossb0b authored Jul 25, 2024
1 parent b38d834 commit 203540e
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 97 deletions.
4 changes: 3 additions & 1 deletion docs/class/incomer.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const incomer = new Incomer({
eventsSubscribe: [...Object.values(AVAILABLE_EVENTS)],
eventCallback: (event) => {
console.log(event);

return OK({ status: "RESOLVED" });
}
});

Expand Down Expand Up @@ -73,7 +75,7 @@ type IncomerOptions<T extends GenericEvent = GenericEvent> = {
standardLog?: StandardLog<T>;
eventsCast: EventCast[];
eventsSubscribe: EventSubscribe[];
eventCallback: (message: CallBackEventMessage<T>) => void;
eventCallback: (message: CallBackEventMessage<T>) => Promise<EventCallbackResponse>;
prefix?: Prefix;
abortPublishTime?: number;
externalsInitialized?: boolean;
Expand Down
96 changes: 39 additions & 57 deletions src/class/eventManagement/dispatcher.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,63 +218,53 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit
});

this.eventsHandler
.on("APPROVEMENT", async(registrationEvent: IncomerRegistrationMessage) => {
try {
await this.approveIncomer(registrationEvent);
}
catch (error) {
this.logger.error({
channel: "dispatcher",
message: registrationEvent,
error: error.stack
.on("APPROVEMENT", (registrationEvent: IncomerRegistrationMessage) => {
this.approveIncomer(registrationEvent)
.catch((error) => {
this.logger.error({
channel: "dispatcher",
message: registrationEvent,
error: error.stack
});
});
}
})
.on("CLOSE", async(channel: string, closeEvent: CloseMessage) => {
try {
const { redisMetadata } = closeEvent;

const relatedIncomer = await this.incomerStore.getIncomer(redisMetadata.origin);
.on("CLOSE", (channel: string, closeEvent: CloseMessage) => {
const { redisMetadata } = closeEvent;

if (!relatedIncomer) {
this.logger.warn({ channel }, "Unable to find the Incomer closing the connection");
this.incomerStore.getIncomer(redisMetadata.origin)
.then((relatedIncomer) => {
if (!relatedIncomer) {
this.logger.warn({ channel }, "Unable to find the Incomer closing the connection");

return;
}
return;
}

await this.removeNonActives([relatedIncomer]);
}
catch (error) {
this.logger.error({
channel,
message: closeEvent,
error: error.stack
this.removeNonActives([relatedIncomer]);
}).catch((error) => {
this.logger.error({
channel: "dispatcher",
message: closeEvent,
error: error.stack
});
});
}
})
.on("RETRY", async(channel: string, retryEvent: RetryMessage) => {
try {
await this.handleRetryEvent(retryEvent);
}
catch (error) {
this.logger.error({
channel,
message: retryEvent,
error: error.stack
.on("RETRY", (channel: string, retryEvent: RetryMessage) => {
this.handleRetryEvent(retryEvent)
.catch((error) => {
this.logger.error({
channel,
message: retryEvent,
error: error.stack
});
});
}
})
.on("CUSTOM_EVENT", async(channel: string, customEvent: EventMessage<T>) => {
try {
await this.handleCustomEvents(channel, customEvent);
}
catch (error) {
this.logger.error({
channel,
.on("CUSTOM_EVENT", (channel: string, customEvent: EventMessage<T>) => {
this.handleCustomEvents(channel, customEvent)
.catch((error) => this.logger.error({
channel: "dispatcher",
message: customEvent,
error: error.stack
});
}
}));
});

this.resolveTransactionsInterval = setInterval(() => {
Expand Down Expand Up @@ -568,7 +558,7 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit

if (recentPingTransactionKeys.length > 0) {
toResolve.push(Promise.all([
this.updateIncomerState(inactive.providedUUID),
this.incomerStore.updateIncomerState(inactive.providedUUID),
transactionStore.deleteTransactions(recentPingTransactionKeys)
]));

Expand All @@ -595,7 +585,7 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit
const { providedUUID: uuid } = incomer;

if (incomer.baseUUID === this.selfProvidedUUID) {
await this.updateIncomerState(uuid);
await this.incomerStore.updateIncomerState(uuid);

continue;
}
Expand Down Expand Up @@ -656,14 +646,6 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit
}
}

private async updateIncomerState(origin: string) {
try {
await this.incomerStore.updateIncomerState(origin);
}
catch (error) {
this.logger.error({ uuid: origin, error: error.stack }, "Failed to update incomer state");
}
}

private async setAsActiveDispatcher() {
const incomers = await this.incomerStore.getIncomers();
Expand Down Expand Up @@ -873,7 +855,7 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit
}));
}

await this.updateIncomerState(redisMetadata.origin);
await this.incomerStore.updateIncomerState(redisMetadata.origin);
await Promise.all([
...toResolve,
senderTransactionStore.updateTransaction(transactionId, {
Expand Down
37 changes: 11 additions & 26 deletions src/class/eventManagement/incomer.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ function isIncomerChannelMessage<T extends GenericEvent = GenericEvent>(value:
return value.name !== "APPROVEMENT";
}

type Resolved = "RESOLVED";
type Unresolved = "UNRESOLVED";
export type Resolved = "RESOLVED";
export type Unresolved = "UNRESOLVED";

export type EventCallbackResponse<T extends Resolved | Unresolved = Resolved | Unresolved> = Result<
T extends Resolved ? {
Expand Down Expand Up @@ -222,7 +222,7 @@ export class Incomer <
if (
transaction.redisMetadata.mainTransaction &&
!transaction.redisMetadata.published &&
transaction.aliveSince + this.maxPingInterval < Date.now()
Number(transaction.aliveSince) + Number(this.maxPingInterval) < Date.now()
) {
return this.incomerChannel.publish({
...transaction,
Expand Down Expand Up @@ -324,10 +324,6 @@ export class Incomer <
return;
}

if (this.providedUUID) {
this.subscriber.unsubscribe(`${this.prefix ? `${this.prefix}-` : ""}${this.providedUUID}`);
}

const event = {
name: "REGISTER" as const,
data: {
Expand Down Expand Up @@ -588,24 +584,13 @@ export class Incomer <
): Promise<void> {
const { name } = message;

match<IncomerChannelEvents<T>>({ name, message } as IncomerChannelEvents<T>)
.with({
name: "PING"
},
async(res: { name: "PING", message: DispatcherPingMessage }) => this.handlePing(channel, res.message))
.with(P._,
async(res: { name: string, message: DistributedEventMessage<T> }) => this.customEvent({
...res, channel
})
)
.exhaustive()
.catch((error) => {
this.logger.error({
channel: "incomer",
error: error.stack,
message
});
});
if (name === "PING") {
await this.handlePing(channel, message as DispatcherPingMessage);

return;
}

await this.customEvent({ name, channel, message: message as DistributedEventMessage<T> });
}

private async handlePing(channel: string, message: DispatcherPingMessage) {
Expand Down Expand Up @@ -671,7 +656,7 @@ export class Incomer <

const callbackResult = await this.eventCallback({ ...event, eventTransactionId } as unknown as CallBackEventMessage<T>);

if (callbackResult.ok) {
if (callbackResult && callbackResult.ok) {
if (Symbol.for(callbackResult.val.status) === RESOLVED) {
await store.updateTransaction(formattedTransaction.redisMetadata.transactionId, {
...formattedTransaction,
Expand Down
5 changes: 4 additions & 1 deletion src/schema/events/document.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
},
"kind": {
"enum": ["AF", "PF", "DB", "ED"]
},
"name": {
"type": "string"
}
},
"required": ["id", "kind"],
"required": ["id", "kind", "name"],
"additionalProperties": false
},
"delete": {
Expand Down
2 changes: 1 addition & 1 deletion test/UT/class/eventManagement/dispatcher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ describe("Dispatcher", () => {
});

test("It should have update the update the incomer last activity", async () => {
await timers.setTimeout(10_000);
await timers.setTimeout(15_000);

const pongTransactionToRetrieve = await incomerTransactionStore.getTransactionById(pongTransaction.redisMetadata.transactionId!);
const pingTransaction = await dispatcherTransactionStore.getTransactionById(pingTransactionId);
Expand Down
3 changes: 2 additions & 1 deletion test/UT/class/eventManagement/events.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
getRedis
} from "@myunisoft/redis";
import * as Logger from "pino";
import { Ok } from "@openally/result";

// Import Internal Dependencies
import {
Expand All @@ -26,7 +27,7 @@ import { validate } from "../../../../src/index";
const incomerLogger = Logger.pino({
level: "debug"
});
const mockedEventComeBackHandler = jest.fn();
const mockedEventComeBackHandler = jest.fn().mockImplementation(() => Ok({ status: "RESOLVED" }));

async function updateRegisterTransactionState(
publisherOldTransacStore: TransactionStore<"incomer">,
Expand Down
3 changes: 2 additions & 1 deletion test/UT/class/eventManagement/externals.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
getRedis
} from "@myunisoft/redis";
import * as Logger from "pino";
import { Ok } from "@openally/result";

// Import Internal Dependencies
import { Incomer } from "../../../../src/index";
Expand Down Expand Up @@ -32,7 +33,7 @@ afterAll(async() => {
});

describe("Init Incomer without Dispatcher alive & prefix as \"test\"", () => {
const eventComeBackHandler = () => void 0;
const eventComeBackHandler = jest.fn().mockImplementation(() => Ok({ status: "RESOLVED" }));;

describe("With externalsInitialized at true", () => {
const incomer: Incomer = new Incomer({
Expand Down
10 changes: 8 additions & 2 deletions test/UT/class/eventManagement/handle-inactive-no-backup.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
getRedis
} from "@myunisoft/redis";
import * as Logger from "pino";
import { Ok } from "@openally/result";

// Import Internal Dependencies
import {
Expand All @@ -26,7 +27,7 @@ import { TransactionStore } from "../../../../src/class/store/transaction.class"
const dispatcherLogger = Logger.pino({
level: "debug"
});
const mockedEventComeBackHandler = jest.fn();
const mockedEventComeBackHandler = jest.fn().mockImplementation(() => Ok({ status: "RESOLVED" }));

describe("Publishing/exploiting a custom event & inactive incomer", () => {
let dispatcher: Dispatcher<EventOptions<keyof Events>>;
Expand Down Expand Up @@ -200,7 +201,12 @@ describe("Publishing/exploiting a custom event & inactive incomer", () => {
dispatcher["subscriber"]!.on("message", (channel, message) => dispatcher["handleMessages"](channel, message));
secondConcernedIncomer["subscriber"]!.on("message", (channel, message) => secondConcernedIncomer["handleMessages"](channel, message));

await timers.setTimeout(5_000);
await timers.setTimeout(1_000);

const incomer = [...(await dispatcher["incomerStore"].getIncomers()).values()].find((incomer) => incomer.baseUUID === concernedIncomer.baseUUID);
await dispatcher["removeNonActives"]([incomer!]);

await timers.setTimeout(1_000);

const mockCalls = mockedSetTransaction.mock.calls.flat();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
getRedis
} from "@myunisoft/redis";
import * as Logger from "pino";
import { Ok } from "@openally/result";

// Import Internal Dependencies
import {
Expand All @@ -26,7 +27,7 @@ import { TransactionStore } from "../../../../src/class/store/transaction.class"
const dispatcherLogger = Logger.pino({
level: "debug"
});
const mockedEventComeBackHandler = jest.fn();
const mockedEventComeBackHandler = jest.fn().mockImplementation(() => Ok({ status: "RESOLVED" }));

describe("Publishing/exploiting a custom event & inactive incomer", () => {
let dispatcher: Dispatcher<EventOptions<keyof Events>>;
Expand Down
15 changes: 11 additions & 4 deletions test/UT/class/eventManagement/incomer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
closeAllRedis,
getRedis
} from "@myunisoft/redis";
import { Ok } from "@openally/result";

// Import Internal Dependencies
import { Dispatcher, Incomer } from "../../../../src/index";
Expand All @@ -18,7 +19,7 @@ const mockedDispatcherRemoveNonActives = jest.spyOn(Dispatcher.prototype as any,
const kIdleTime = 4_000;

describe("Init Incomer without Dispatcher alive", () => {
const eventComeBackHandler = () => void 0;
const eventComeBackHandler = jest.fn().mockImplementation(() => Ok({ status: "RESOLVED" }));;

const pingInterval = 2_000;

Expand Down Expand Up @@ -93,9 +94,16 @@ describe("Init Incomer without Dispatcher alive", () => {
});

test("Incomer calling close, it should remove the given Incomer", async() => {
await incomer.close();
await incomer["incomerChannel"].publish({
name: "CLOSE",
redisMetadata: {
origin: incomer["providedUUID"],
incomerName: incomer["name"],
prefix: incomer["prefix"]
}
});

await timers.setTimeout(500);
await timers.setTimeout(1_000);

expect(mockedDispatcherRemoveNonActives).toHaveBeenCalled();

Expand All @@ -106,7 +114,6 @@ describe("Init Incomer without Dispatcher alive", () => {

afterAll(async() => {
await dispatcherIncomer.close();
await incomer.close();;
await closeAllRedis();
});
});
3 changes: 2 additions & 1 deletion test/UT/class/eventManagement/ping.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
getRedis
} from "@myunisoft/redis";
import * as Logger from "pino";
import { Ok } from "@openally/result";

// Import Internal Dependencies
import { Dispatcher, Incomer } from "../../../../src/index";
Expand All @@ -23,7 +24,7 @@ const incomerLogger = Logger.pino({
const mockedIncomerLoggerDebug = jest.spyOn(incomerLogger, "debug");

describe("Ping", () => {
const eventComeBackHandler = () => void 0;
const eventComeBackHandler = jest.fn().mockImplementation(() => Ok({ status: "RESOLVED" }));;

let dispatcher: Dispatcher;
let incomer: Incomer;
Expand Down
Loading

0 comments on commit 203540e

Please sign in to comment.