Skip to content

Commit

Permalink
fix(): have to lazy-fetch as is lap on every key (#114)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rossb0b authored Oct 3, 2023
1 parent b364611 commit 6460c17
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 32 deletions.
4 changes: 2 additions & 2 deletions src/class/eventManagement/dispatcher.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -473,8 +473,8 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit
instance: "incomer"
});

const transactions = await transactionStore.getAllTransactions();
const recentPingTransactionKeys = Object.keys(await transactionStore.getAllTransactions())
const transactions = await transactionStore.getTransactions();
const recentPingTransactionKeys = Object.keys(await transactionStore.getTransactions())
.filter((transactionKey) => {
const transaction = transactions.get(transactionKey);

Expand Down
8 changes: 4 additions & 4 deletions src/class/eventManagement/incomer.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ export class Incomer <
private logAbortError() {
// eslint-disable-next-line no-invalid-this
this.logger.warn({ error: kCancelTask.signal.reason });
kCancelTask.signal.removeEventListener("abort", this.logAbortError);
kCancelTask.signal.removeEventListener("abort", () => this.logAbortError());
}

private async checkDispatcherState() {
Expand All @@ -162,7 +162,7 @@ export class Incomer <

this.dispatcherIsAlive = true;

kCancelTask.signal.addEventListener("abort", this.logAbortError, { once: true });
kCancelTask.signal.addEventListener("abort", () => this.logAbortError(), { once: true });

await Promise.race([this.updateTransactionsStateTimeout(), this.updateTransactionsState()]);
}
Expand Down Expand Up @@ -336,7 +336,7 @@ export class Incomer <
try {
await timers.setTimeout(this.publishInterval, undefined, { signal: kCancelTimeout.signal });
kCancelTask.abort("Dispatcher state check before publishing more..");
kCancelTask.signal.removeEventListener("abort", this.logAbortError);
kCancelTask.signal.removeEventListener("abort", () => this.logAbortError());
}
catch {
// Ignore
Expand Down Expand Up @@ -364,7 +364,7 @@ export class Incomer <
}
finally {
kCancelTimeout.abort();
kCancelTask.signal.removeEventListener("abort", this.logAbortError);
kCancelTask.signal.removeEventListener("abort", () => this.logAbortError());
}
}

Expand Down
38 changes: 34 additions & 4 deletions src/class/store/incomer.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,42 @@ export class IncomerStore extends KVPeer<RegisteredIncomer> {
return providedUUID;
}

async* incomerLazyFetch() {
const count = 1000;
let cursor = 0;
let lastResult = count;

do {
const [lastCursor, incomerKeys] = await this.redis.scan(cursor, "MATCH", `${this.key}-*`, "COUNT", count);

cursor = Number(lastCursor);
lastResult = incomerKeys.length;

yield incomerKeys;

continue;
}
while (lastResult !== 0 && cursor !== 0);

const [, incomerKeys] = await this.redis.scan(cursor, "MATCH", `${this.key}-*`, "COUNT", count);

return incomerKeys;
}

async getIncomers(): Promise<Set<RegisteredIncomer>> {
const [, incomerKeys] = await this.redis.scan(0, "MATCH", `${this.key}-*`);
const incomers: Set<RegisteredIncomer> = new Set();

for await (const incomerKeys of this.incomerLazyFetch()) {
const foundIncomers = await Promise.all(incomerKeys.map(
(incomerKey) => this.getValue(incomerKey)
));

for (const incomer of foundIncomers) {
incomers.add(incomer);
}
}

return new Set([...await Promise.all(incomerKeys.map(
(incomerKey) => this.getValue(incomerKey)
))]);
return incomers;
}

async updateIncomer(incomer: RegisteredIncomer) {
Expand Down
25 changes: 3 additions & 22 deletions src/class/store/transaction.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,46 +95,27 @@ export class TransactionStore<
}

async* transactionLazyFetch() {
let index = 0;
const count = 10;
const count = 1000;
let cursor = 0;
let lastResult = count;

while (lastResult !== 0 && index <= 10) {
do {
const [lastCursor, elements] = await this.redis.scan(cursor, "MATCH", `${this.key}-*`, "COUNT", count);

cursor = Number(lastCursor);
lastResult = elements.length;
index++;

yield elements;

continue;
}
while (lastResult !== 0 && cursor !== 0);

const [, elements] = await this.redis.scan(cursor, "MATCH", `${this.key}-*`, "COUNT", count);

return elements;
}

async getAllTransactions(): Promise<Transactions<T>> {
const mappedTransactions = new Map();

const [, transactionKeys] = await this.redis.scan(0, "MATCH", `${this.key}-*`);

const transactions = await Promise.all(transactionKeys.map(
(transactionKey) => this.getValue(transactionKey)
));

for (const transaction of transactions) {
if (transaction !== null) {
mappedTransactions.set(transaction.redisMetadata.transactionId, transaction);
}
}

return mappedTransactions;
}

async getTransactions(): Promise<Transactions<T>> {
const mappedTransactions: Transactions<T> = new Map();

Expand Down

0 comments on commit 6460c17

Please sign in to comment.