From 6460c17c791b22e22367e70b625fc7d3459d8ed4 Mon Sep 17 00:00:00 2001 From: Nicolas Hallaert <39910164+Rossb0b@users.noreply.github.com> Date: Wed, 4 Oct 2023 01:48:59 +0200 Subject: [PATCH] fix(): have to lazy-fetch as is lap on every key (#114) --- src/class/eventManagement/dispatcher.class.ts | 4 +- src/class/eventManagement/incomer.class.ts | 8 ++-- src/class/store/incomer.class.ts | 38 +++++++++++++++++-- src/class/store/transaction.class.ts | 25 ++---------- 4 files changed, 43 insertions(+), 32 deletions(-) diff --git a/src/class/eventManagement/dispatcher.class.ts b/src/class/eventManagement/dispatcher.class.ts index a2615414..ba16132f 100644 --- a/src/class/eventManagement/dispatcher.class.ts +++ b/src/class/eventManagement/dispatcher.class.ts @@ -473,8 +473,8 @@ export class Dispatcher extends EventEmit instance: "incomer" }); - const transactions = await transactionStore.getAllTransactions(); - const recentPingTransactionKeys = Object.keys(await transactionStore.getAllTransactions()) + const transactions = await transactionStore.getTransactions(); + const recentPingTransactionKeys = Object.keys(await transactionStore.getTransactions()) .filter((transactionKey) => { const transaction = transactions.get(transactionKey); diff --git a/src/class/eventManagement/incomer.class.ts b/src/class/eventManagement/incomer.class.ts index 584d1427..6965f070 100644 --- a/src/class/eventManagement/incomer.class.ts +++ b/src/class/eventManagement/incomer.class.ts @@ -150,7 +150,7 @@ export class Incomer < private logAbortError() { // eslint-disable-next-line no-invalid-this this.logger.warn({ error: kCancelTask.signal.reason }); - kCancelTask.signal.removeEventListener("abort", this.logAbortError); + kCancelTask.signal.removeEventListener("abort", () => this.logAbortError()); } private async checkDispatcherState() { @@ -162,7 +162,7 @@ export class Incomer < this.dispatcherIsAlive = true; - kCancelTask.signal.addEventListener("abort", this.logAbortError, { once: true }); + kCancelTask.signal.addEventListener("abort", () => this.logAbortError(), { once: true }); await Promise.race([this.updateTransactionsStateTimeout(), this.updateTransactionsState()]); } @@ -336,7 +336,7 @@ export class Incomer < try { await timers.setTimeout(this.publishInterval, undefined, { signal: kCancelTimeout.signal }); kCancelTask.abort("Dispatcher state check before publishing more.."); - kCancelTask.signal.removeEventListener("abort", this.logAbortError); + kCancelTask.signal.removeEventListener("abort", () => this.logAbortError()); } catch { // Ignore @@ -364,7 +364,7 @@ export class Incomer < } finally { kCancelTimeout.abort(); - kCancelTask.signal.removeEventListener("abort", this.logAbortError); + kCancelTask.signal.removeEventListener("abort", () => this.logAbortError()); } } diff --git a/src/class/store/incomer.class.ts b/src/class/store/incomer.class.ts index 6ed6ea5c..6af4548c 100644 --- a/src/class/store/incomer.class.ts +++ b/src/class/store/incomer.class.ts @@ -46,12 +46,42 @@ export class IncomerStore extends KVPeer { return providedUUID; } + async* incomerLazyFetch() { + const count = 1000; + let cursor = 0; + let lastResult = count; + + do { + const [lastCursor, incomerKeys] = await this.redis.scan(cursor, "MATCH", `${this.key}-*`, "COUNT", count); + + cursor = Number(lastCursor); + lastResult = incomerKeys.length; + + yield incomerKeys; + + continue; + } + while (lastResult !== 0 && cursor !== 0); + + const [, incomerKeys] = await this.redis.scan(cursor, "MATCH", `${this.key}-*`, "COUNT", count); + + return incomerKeys; + } + async getIncomers(): Promise> { - const [, incomerKeys] = await this.redis.scan(0, "MATCH", `${this.key}-*`); + const incomers: Set = new Set(); + + for await (const incomerKeys of this.incomerLazyFetch()) { + const foundIncomers = await Promise.all(incomerKeys.map( + (incomerKey) => this.getValue(incomerKey) + )); + + for (const incomer of foundIncomers) { + incomers.add(incomer); + } + } - return new Set([...await Promise.all(incomerKeys.map( - (incomerKey) => this.getValue(incomerKey) - ))]); + return incomers; } async updateIncomer(incomer: RegisteredIncomer) { diff --git a/src/class/store/transaction.class.ts b/src/class/store/transaction.class.ts index 7f12a78f..e51198b4 100644 --- a/src/class/store/transaction.class.ts +++ b/src/class/store/transaction.class.ts @@ -95,46 +95,27 @@ export class TransactionStore< } async* transactionLazyFetch() { - let index = 0; - const count = 10; + const count = 1000; let cursor = 0; let lastResult = count; - while (lastResult !== 0 && index <= 10) { + do { const [lastCursor, elements] = await this.redis.scan(cursor, "MATCH", `${this.key}-*`, "COUNT", count); cursor = Number(lastCursor); lastResult = elements.length; - index++; yield elements; continue; } + while (lastResult !== 0 && cursor !== 0); const [, elements] = await this.redis.scan(cursor, "MATCH", `${this.key}-*`, "COUNT", count); return elements; } - async getAllTransactions(): Promise> { - const mappedTransactions = new Map(); - - const [, transactionKeys] = await this.redis.scan(0, "MATCH", `${this.key}-*`); - - const transactions = await Promise.all(transactionKeys.map( - (transactionKey) => this.getValue(transactionKey) - )); - - for (const transaction of transactions) { - if (transaction !== null) { - mappedTransactions.set(transaction.redisMetadata.transactionId, transaction); - } - } - - return mappedTransactions; - } - async getTransactions(): Promise> { const mappedTransactions: Transactions = new Map();