From b8f3a7aff5b5da6027279d5909b5b10b78d55ebd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaros=C5=82aw=20Sadzi=C5=84ski?= Date: Tue, 5 Nov 2024 15:28:28 +0100 Subject: [PATCH 01/10] Fixing in full --- app/server/lib/AuditLogger.ts | 87 ++++++++++++++++++++++++++--------- app/server/lib/FlexServer.ts | 1 + app/server/lib/GristServer.ts | 1 + 3 files changed, 68 insertions(+), 21 deletions(-) diff --git a/app/server/lib/AuditLogger.ts b/app/server/lib/AuditLogger.ts index 4e935aa857..50896ed2b2 100644 --- a/app/server/lib/AuditLogger.ts +++ b/app/server/lib/AuditLogger.ts @@ -44,6 +44,8 @@ export interface IAuditLogger { requestOrSession: RequestOrSession, properties: AuditEventProperties ): Promise; + + close(): Promise; } export interface AuditEventProperties< @@ -96,6 +98,7 @@ export class AuditLogger implements IAuditLogger { ); private _redisSubscriber: RedisClient | undefined; private _redisChannel = `${getPubSubPrefix()}-audit-logger-streaming-destinations:change`; + private _jobs: Promise[] = []; constructor( private _db: HomeDBManager, @@ -105,6 +108,17 @@ export class AuditLogger implements IAuditLogger { this._subscribeToStreamingDestinations(); } + public async close() { + // for (const job of this._jobs) { + // await job.catch((error) => { + // this._logger.error(null, `failed to close audit logger`, error); + // }); + // } + this._jobs = []; + this._installStreamingDestinations.clear(); + this._orgStreamingDestinations.clear(); + } + /** * Logs an audit event. */ @@ -127,30 +141,43 @@ export class AuditLogger implements IAuditLogger { /** * Logs an audit event or throws an error on failure. */ - public async logEventOrThrow( + public logEventOrThrow( requestOrSession: RequestOrSession, properties: AuditEventProperties ) { - const event = this._buildEventFromProperties(requestOrSession, properties); - const destinations = await this._getOrSetStreamingDestinations(event); - const requests = await Promise.allSettled( - destinations.map((destination: AuditLogStreamingDestination) => - this._streamEventToDestination(event, destination) - ) - ); - const errors = requests - .filter( - (request): request is PromiseRejectedResult => - request.status === "rejected" - ) - .map(({ reason }) => reason); - if (errors.length > 0) { - throw new LogAuditEventError( - "encountered errors while streaming audit event", - { event, errors } + const work = this._logEventOrThrow(requestOrSession, properties); + this._jobs.push(work); + return work; + } + + /** + * Logs an audit event or throws an error on failure. + */ + private async _logEventOrThrow( + requestOrSession: RequestOrSession, + properties: AuditEventProperties + ) { + const event = this._buildEventFromProperties(requestOrSession, properties); + const destinations = await this._getOrSetStreamingDestinations(event); + const requests = await Promise.allSettled( + destinations.map((destination: AuditLogStreamingDestination) => + this._streamEventToDestination(event, destination) + ) ); + const errors = requests + .filter( + (request): request is PromiseRejectedResult => + request.status === "rejected" + ) + .map(({ reason }) => reason); + if (errors.length > 0) { + throw new LogAuditEventError( + "encountered errors while streaming audit event", + { event, errors } + ); + } } - } + private _buildEventFromProperties( requestOrSession: RequestOrSession, @@ -172,7 +199,7 @@ export class AuditLogger implements IAuditLogger { private async _getOrSetStreamingDestinations(event: AuditEvent) { const orgId = event.context.site?.id; - const destinations = await Promise.all([ + const destinations = await Promise.allSettled([ mapGetOrSet(this._installStreamingDestinations, true, () => this._fetchStreamingDestinations() ), @@ -180,7 +207,25 @@ export class AuditLogger implements IAuditLogger { this._fetchStreamingDestinations(orgId) ), ]); - return destinations + + + const rejectd = destinations.filter( + (d): d is PromiseRejectedResult => d.status === "rejected" + ); + if (rejectd.length > 0) { + throw new Error( + `failed to fetch streaming destinations: ${rejectd + .map(({ reason }) => reason.message) + .join(", ")}` + ); + } + const fulfilled = destinations.filter( + (d): d is PromiseFulfilledResult => + d.status === "fulfilled" + ).map(({ value }) => value); + + + return fulfilled .filter((d): d is AuditLogStreamingDestination[] => d !== null) .flat(); } diff --git a/app/server/lib/FlexServer.ts b/app/server/lib/FlexServer.ts index 92b70647bf..1447671a05 100644 --- a/app/server/lib/FlexServer.ts +++ b/app/server/lib/FlexServer.ts @@ -960,6 +960,7 @@ export class FlexServer implements GristServer { // Do this after _shutdown, since DocWorkerMap is used during shutdown. if (this._docWorkerMap) { await this._docWorkerMap.close(); } if (this._sessionStore) { await this._sessionStore.close(); } + if (this._auditLogger) { await this._auditLogger.close(); } } public addDocApiForwarder() { diff --git a/app/server/lib/GristServer.ts b/app/server/lib/GristServer.ts index c9b564e067..30aab6be60 100644 --- a/app/server/lib/GristServer.ts +++ b/app/server/lib/GristServer.ts @@ -175,6 +175,7 @@ export function createDummyAuditLogger(): IAuditLogger { return { logEvent() { /* do nothing */ }, logEventOrThrow() { return Promise.resolve(); }, + close() { return Promise.resolve(); }, }; } From 5f1ec3360aa3ae9bedf446b66bf5dcc052d697e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaros=C5=82aw=20Sadzi=C5=84ski?= Date: Tue, 5 Nov 2024 15:52:38 +0100 Subject: [PATCH 02/10] Small refactoring --- app/server/lib/AuditLogger.ts | 42 ++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/app/server/lib/AuditLogger.ts b/app/server/lib/AuditLogger.ts index 50896ed2b2..f033a12f33 100644 --- a/app/server/lib/AuditLogger.ts +++ b/app/server/lib/AuditLogger.ts @@ -45,6 +45,9 @@ export interface IAuditLogger { properties: AuditEventProperties ): Promise; + /** + * Close any resources used by the logger. + */ close(): Promise; } @@ -98,7 +101,7 @@ export class AuditLogger implements IAuditLogger { ); private _redisSubscriber: RedisClient | undefined; private _redisChannel = `${getPubSubPrefix()}-audit-logger-streaming-destinations:change`; - private _jobs: Promise[] = []; + private _logsInProgress: Promise[] = []; constructor( private _db: HomeDBManager, @@ -109,14 +112,19 @@ export class AuditLogger implements IAuditLogger { } public async close() { - // for (const job of this._jobs) { - // await job.catch((error) => { - // this._logger.error(null, `failed to close audit logger`, error); - // }); - // } - this._jobs = []; this._installStreamingDestinations.clear(); this._orgStreamingDestinations.clear(); + const listToClear = this._logsInProgress.slice(); + this._logsInProgress = []; + for (const logCall of listToClear) { + // All log calls should be awaited already and errors should be logged. + await logCall.catch(() => {}); + } + if (this._logsInProgress.length > 0) { + this._logger.error(null, "logEvent called after close", { + count: this._logsInProgress.length, + }); + } } /** @@ -145,9 +153,9 @@ export class AuditLogger implements IAuditLogger { requestOrSession: RequestOrSession, properties: AuditEventProperties ) { - const work = this._logEventOrThrow(requestOrSession, properties); - this._jobs.push(work); - return work; + const logInProgress = this._logEventOrThrow(requestOrSession, properties); + this._logsInProgress.push(logInProgress); + return logInProgress; } /** @@ -177,7 +185,7 @@ export class AuditLogger implements IAuditLogger { ); } } - + private _buildEventFromProperties( requestOrSession: RequestOrSession, @@ -208,23 +216,17 @@ export class AuditLogger implements IAuditLogger { ), ]); - - const rejectd = destinations.filter( + const rejected = destinations.filter( (d): d is PromiseRejectedResult => d.status === "rejected" ); - if (rejectd.length > 0) { - throw new Error( - `failed to fetch streaming destinations: ${rejectd - .map(({ reason }) => reason.message) - .join(", ")}` - ); + if (rejected.length > 0) { + throw new Error(`failed to fetch streaming destinations: ${rejected.map(({ reason }) => reason)}`); } const fulfilled = destinations.filter( (d): d is PromiseFulfilledResult => d.status === "fulfilled" ).map(({ value }) => value); - return fulfilled .filter((d): d is AuditLogStreamingDestination[] => d !== null) .flat(); From 6b6cecf0cebe611a5cdde80fd0308c3f71bf3e5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaros=C5=82aw=20Sadzi=C5=84ski?= Date: Tue, 5 Nov 2024 20:11:05 +0100 Subject: [PATCH 03/10] Removing banner check --- app/server/lib/AuditLogger.ts | 110 ++++++++++++++----------------- test/nbrowser/ColumnOps.ntest.js | 3 - 2 files changed, 49 insertions(+), 64 deletions(-) diff --git a/app/server/lib/AuditLogger.ts b/app/server/lib/AuditLogger.ts index f033a12f33..22994bb237 100644 --- a/app/server/lib/AuditLogger.ts +++ b/app/server/lib/AuditLogger.ts @@ -101,7 +101,8 @@ export class AuditLogger implements IAuditLogger { ); private _redisSubscriber: RedisClient | undefined; private _redisChannel = `${getPubSubPrefix()}-audit-logger-streaming-destinations:change`; - private _logsInProgress: Promise[] = []; + private _createdPromises: Promise[] = []; + private _closed = false; constructor( private _db: HomeDBManager, @@ -112,19 +113,14 @@ export class AuditLogger implements IAuditLogger { } public async close() { + this._closed = true; this._installStreamingDestinations.clear(); this._orgStreamingDestinations.clear(); - const listToClear = this._logsInProgress.slice(); - this._logsInProgress = []; - for (const logCall of listToClear) { - // All log calls should be awaited already and errors should be logged. - await logCall.catch(() => {}); - } - if (this._logsInProgress.length > 0) { - this._logger.error(null, "logEvent called after close", { - count: this._logsInProgress.length, - }); - } + const promises = this._createdPromises; + this._createdPromises = []; + await Promise.allSettled(promises).catch((error) => { + this._logger.error(null, "failed to close audit logger", error); + }); } /** @@ -134,7 +130,11 @@ export class AuditLogger implements IAuditLogger { requestOrSession: RequestOrSession, properties: AuditEventProperties ): void { - this.logEventOrThrow(requestOrSession, properties).catch((error) => { + if (this._closed) { + throw new Error("audit logger is closed"); + } + + this._track(this.logEventOrThrow(requestOrSession, properties)).catch((error) => { this._logger.error(requestOrSession, `failed to log audit event`, error); this._logger.warn( requestOrSession, @@ -149,43 +149,33 @@ export class AuditLogger implements IAuditLogger { /** * Logs an audit event or throws an error on failure. */ - public logEventOrThrow( + public async logEventOrThrow( requestOrSession: RequestOrSession, properties: AuditEventProperties ) { - const logInProgress = this._logEventOrThrow(requestOrSession, properties); - this._logsInProgress.push(logInProgress); - return logInProgress; - } - - /** - * Logs an audit event or throws an error on failure. - */ - private async _logEventOrThrow( - requestOrSession: RequestOrSession, - properties: AuditEventProperties - ) { - const event = this._buildEventFromProperties(requestOrSession, properties); - const destinations = await this._getOrSetStreamingDestinations(event); - const requests = await Promise.allSettled( - destinations.map((destination: AuditLogStreamingDestination) => - this._streamEventToDestination(event, destination) - ) + if (this._closed) { + throw new Error("audit logger is closed"); + } + const event = this._buildEventFromProperties(requestOrSession, properties); + const destinations = await this._getOrSetStreamingDestinations(event); + const requests = await Promise.allSettled( + destinations.map((destination: AuditLogStreamingDestination) => + this._streamEventToDestination(event, destination) + ) + ); + const errors = requests + .filter( + (request): request is PromiseRejectedResult => + request.status === "rejected" + ) + .map(({ reason }) => reason); + if (errors.length > 0) { + throw new LogAuditEventError( + "encountered errors while streaming audit event", + { event, errors } ); - const errors = requests - .filter( - (request): request is PromiseRejectedResult => - request.status === "rejected" - ) - .map(({ reason }) => reason); - if (errors.length > 0) { - throw new LogAuditEventError( - "encountered errors while streaming audit event", - { event, errors } - ); - } } - + } private _buildEventFromProperties( requestOrSession: RequestOrSession, @@ -207,27 +197,15 @@ export class AuditLogger implements IAuditLogger { private async _getOrSetStreamingDestinations(event: AuditEvent) { const orgId = event.context.site?.id; - const destinations = await Promise.allSettled([ + const destinations = await Promise.all([ mapGetOrSet(this._installStreamingDestinations, true, () => - this._fetchStreamingDestinations() + this._track(this._fetchStreamingDestinations()), ), !orgId ? null : mapGetOrSet(this._orgStreamingDestinations, orgId, () => - this._fetchStreamingDestinations(orgId) + this._track(this._fetchStreamingDestinations(orgId)) ), ]); - - const rejected = destinations.filter( - (d): d is PromiseRejectedResult => d.status === "rejected" - ); - if (rejected.length > 0) { - throw new Error(`failed to fetch streaming destinations: ${rejected.map(({ reason }) => reason)}`); - } - const fulfilled = destinations.filter( - (d): d is PromiseFulfilledResult => - d.status === "fulfilled" - ).map(({ value }) => value); - - return fulfilled + return destinations .filter((d): d is AuditLogStreamingDestination[] => d !== null) .flat(); } @@ -372,6 +350,16 @@ export class AuditLogger implements IAuditLogger { await redis.quitAsync(); } } + + private _track(prom: Promise) { + this._createdPromises.push(prom); + return prom.finally(() => { + const index = this._createdPromises.indexOf(prom); + if (index !== -1) { + this._createdPromises.splice(index, 1); + } + }); + } } export class LogAuditEventError extends Error { diff --git a/test/nbrowser/ColumnOps.ntest.js b/test/nbrowser/ColumnOps.ntest.js index db02881657..77221197ea 100644 --- a/test/nbrowser/ColumnOps.ntest.js +++ b/test/nbrowser/ColumnOps.ntest.js @@ -12,9 +12,6 @@ describe('ColumnOps.ntest', function() { await gu.supportOldTimeyTestCode(); await gu.useFixtureDoc(cleanup, "World.grist", true); await gu.toggleSidePanel('left', 'close'); - // The banner appearing mid-test occasionally interferes with the rest of - // the tests (for some unknown reason), so wait for it to appear first. - await $('.test-banner-element').wait(); }); afterEach(function() { From 22a4ab213c413b901f150ca13f506b89a650ce0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaros=C5=82aw=20Sadzi=C5=84ski?= Date: Tue, 5 Nov 2024 22:59:43 +0100 Subject: [PATCH 04/10] Removing banner check from SavePosition.ntest --- test/nbrowser/SavePosition.ntest.js | 3 --- 1 file changed, 3 deletions(-) diff --git a/test/nbrowser/SavePosition.ntest.js b/test/nbrowser/SavePosition.ntest.js index 2622cea5ff..51392d0fd9 100644 --- a/test/nbrowser/SavePosition.ntest.js +++ b/test/nbrowser/SavePosition.ntest.js @@ -9,9 +9,6 @@ describe('SavePosition.ntest', function() { this.timeout(Math.max(this.timeout(), 20000)); // Long-running test, unfortunately await gu.supportOldTimeyTestCode(); await gu.useFixtureDoc(cleanup, "World.grist", true); - // The banner appearing mid-test occasionally interferes with the rest of - // the tests (for some unknown reason), so wait for it to appear first. - await $('.test-banner-element').wait(); }); afterEach(function() { From ae05ba9c10dd8f54edb9eae58661316871661235 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaros=C5=82aw=20Sadzi=C5=84ski?= Date: Wed, 6 Nov 2024 10:55:23 +0100 Subject: [PATCH 05/10] Adding test --- app/common/delay.ts | 17 +++++ app/server/lib/AuditLogger.ts | 4 ++ test/server/lib/AuditLogger.ts | 122 ++++++++++++++++++++++++++++++--- test/server/lib/GristJobs.ts | 18 +---- 4 files changed, 134 insertions(+), 27 deletions(-) diff --git a/app/common/delay.ts b/app/common/delay.ts index c509c42e0e..af92679e5e 100644 --- a/app/common/delay.ts +++ b/app/common/delay.ts @@ -1,3 +1,5 @@ +import {MaybePromise} from 'app/plugin/gutil'; + /** * Returns a promise that resolves in the given number of milliseconds. * (A replica of bluebird.delay using native promises.) @@ -5,3 +7,18 @@ export function delay(msec: number): Promise { return new Promise((resolve) => setTimeout(resolve, msec)); } + +export async function waitToPass(fn: () => MaybePromise, maxWaitMs: number = 2000) { + const start = Date.now(); + while (Date.now() - start < maxWaitMs) { + try { + await fn(); + return true; + } catch (e) { + // continue after a small delay. + await delay(10); + } + } + await fn(); + return true; +} diff --git a/app/server/lib/AuditLogger.ts b/app/server/lib/AuditLogger.ts index 22994bb237..c9673fdde0 100644 --- a/app/server/lib/AuditLogger.ts +++ b/app/server/lib/AuditLogger.ts @@ -177,6 +177,10 @@ export class AuditLogger implements IAuditLogger { } } + public length() { + return this._createdPromises.length; + } + private _buildEventFromProperties( requestOrSession: RequestOrSession, properties: AuditEventProperties diff --git a/test/server/lib/AuditLogger.ts b/test/server/lib/AuditLogger.ts index 7b326810a4..038804e01d 100644 --- a/test/server/lib/AuditLogger.ts +++ b/test/server/lib/AuditLogger.ts @@ -1,3 +1,4 @@ +import {waitToPass} from 'app/common/delay'; import { GenericEventFormatter } from "app/server/lib/AuditEventFormatter"; import { AuditLogger, Deps, IAuditLogger } from "app/server/lib/AuditLogger"; import axios from "axios"; @@ -12,8 +13,10 @@ import { } from "test/server/lib/helpers/AuditLoggerUtils"; import { EnvironmentSnapshot, setTmpLogLevel } from "test/server/testUtils"; +const MAX_CONCURRENT_REQUESTS = 10; + describe("AuditLogger", function () { - this.timeout(10000); + this.timeout('10s'); setTmpLogLevel("error"); let oldEnv: EnvironmentSnapshot; @@ -31,7 +34,7 @@ describe("AuditLogger", function () { process.env.TYPEORM_DATABASE = ":memory:"; process.env.GRIST_DEFAULT_EMAIL = chimpyEmail; sandbox.stub(Deps, "CACHE_TTL_MS").value(0); - sandbox.stub(Deps, "MAX_CONCURRENT_REQUESTS").value(10); + sandbox.stub(Deps, "MAX_CONCURRENT_REQUESTS").value(MAX_CONCURRENT_REQUESTS); server = new TestServer(this); homeUrl = await server.start(); oid = (await server.dbManager.testGetId("NASA")) as number; @@ -43,6 +46,10 @@ describe("AuditLogger", function () { }); }); + afterEach(async function () { + await auditLogger.close(); + }); + after(async function () { sandbox.restore(); oldEnv.restore(); @@ -50,14 +57,7 @@ describe("AuditLogger", function () { }); describe("logEventOrThrow", function () { - beforeEach(async function () { - // Ignore "config.*" audit events; some tests call the `/configs` - // endpoint as part of setup, which triggers them. - nock("https://audit.example.com") - .persist() - .post(/\/events\/.*/, (body) => body.action?.startsWith("config.")) - .reply(200); - }); + beforeEach(ignoreConfigEvents); afterEach(function () { nock.abortPendingRequests(); @@ -295,4 +295,106 @@ describe("AuditLogger", function () { ); }); }); + describe('closes resources properly', function() { + before(async function() { + // Create the AuditLogger instance that we will close eventually. + logger = new AuditLogger(server.dbManager, { + formatters: [new GenericEventFormatter()], + }); + + ignoreConfigEvents(); + + // Wire up the destinations. + await axios.put( + `${homeUrl}/api/install/configs/audit_log_streaming_destinations`, + [ + { + id: "62c9e725-1195-48e7-a9f6-0ba164128c20", + name: "other", + url: "https://audit.example.com/events/install", + }, + ], + chimpy + ); + }); + + afterEach(function () { + nock.cleanAll(); + }); + + after(async function() { + await logger.close(); + }); + + // Start the first test. We test here that audit logger properly clears its queue, without + // closing. + it('on its own', async function() { + // Start a scope to track the requests. + const firstScope = installScope(); + + // Fire up MAX_CONCURRENT_REQUESTS events to test the logger. + repeat(sendEvent); + + // Ensure the scope is done. + await waitToPass(() => assert.isTrue(firstScope.isDone(), 'Scope should be done')); + + // When the scope is done, the logger should clear all the pending requests, as they + // are done (event the destination fetchers) + await waitToPass(() => assert.equal(logger.length(), 0)); + }); + + // Now test the same but by closing the logger. + it('when closed', async function() { + // Start a scope to track the requests. + const secondScope = installScope(); + + // Send all events (without waiting for the result) + repeat(sendEvent); + + // Now close the logger and wait for all created promises to resolve. + await logger.close(); + + // Scope should be done. + assert.isTrue(secondScope.isDone()); + + // And the logger should have cleared all the pending requests. + assert.equal(logger.length(), 0); + }); + + // Dummy destination creator. + const installScope = () => nock("https://audit.example.com") + .post("/events/install") + .times(MAX_CONCURRENT_REQUESTS) + .reply(200); + + // The AuditLogger instance that we will close eventually. + let logger: AuditLogger; + + // Helper to send events. + const sendEvent = () => logger.logEvent(null, { + action: "site.create", + details: { + site: { + id: oid, + name: "Grist Labs", + domain: "gristlabs", + }, + }, + }); + }); }); + +function repeat(fn: (i: number) => void) { + for (let i = 0; i < MAX_CONCURRENT_REQUESTS; i++) { + fn(i); + } +} + +function ignoreConfigEvents() { + // Ignore "config.*" audit events; some tests call the `/configs` + // endpoint as part of setup, which triggers them. + nock("https://audit.example.com") + .persist() + .post(/\/events\/.*/, (body) => body.action?.startsWith("config.")) + .reply(200); +} diff --git a/test/server/lib/GristJobs.ts b/test/server/lib/GristJobs.ts index be03a4a0c7..4682ba94b2 100644 --- a/test/server/lib/GristJobs.ts +++ b/test/server/lib/GristJobs.ts @@ -1,4 +1,4 @@ -import { delay } from 'app/common/delay'; +import { delay, waitToPass } from 'app/common/delay'; import { GristBullMQJobs, GristJobs } from 'app/server/lib/GristJobs'; import { assert } from 'chai'; @@ -133,19 +133,3 @@ describe('GristJobs', function() { } }); }); - -async function waitToPass(fn: () => Promise, - maxWaitMs: number = 2000) { - const start = Date.now(); - while (Date.now() - start < maxWaitMs) { - try { - await fn(); - return true; - } catch (e) { - // continue after a small delay. - await delay(10); - } - } - await fn(); - return true; -} From 4bab205fce0132f7cbe6d81b16432b79435449d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaros=C5=82aw=20Sadzi=C5=84ski?= Date: Thu, 7 Nov 2024 11:06:06 +0100 Subject: [PATCH 06/10] Addressing comments --- app/common/delay.ts | 17 ----------------- app/server/lib/AuditLogger.ts | 13 +++++-------- test/server/lib/AuditLogger.ts | 6 +++--- test/server/lib/GristJobs.ts | 13 +++++++------ test/server/wait.ts | 28 ++++++++++++++++++++-------- 5 files changed, 35 insertions(+), 42 deletions(-) diff --git a/app/common/delay.ts b/app/common/delay.ts index af92679e5e..c509c42e0e 100644 --- a/app/common/delay.ts +++ b/app/common/delay.ts @@ -1,5 +1,3 @@ -import {MaybePromise} from 'app/plugin/gutil'; - /** * Returns a promise that resolves in the given number of milliseconds. * (A replica of bluebird.delay using native promises.) @@ -7,18 +5,3 @@ import {MaybePromise} from 'app/plugin/gutil'; export function delay(msec: number): Promise { return new Promise((resolve) => setTimeout(resolve, msec)); } - -export async function waitToPass(fn: () => MaybePromise, maxWaitMs: number = 2000) { - const start = Date.now(); - while (Date.now() - start < maxWaitMs) { - try { - await fn(); - return true; - } catch (e) { - // continue after a small delay. - await delay(10); - } - } - await fn(); - return true; -} diff --git a/app/server/lib/AuditLogger.ts b/app/server/lib/AuditLogger.ts index c9673fdde0..2d755bf3f9 100644 --- a/app/server/lib/AuditLogger.ts +++ b/app/server/lib/AuditLogger.ts @@ -101,7 +101,7 @@ export class AuditLogger implements IAuditLogger { ); private _redisSubscriber: RedisClient | undefined; private _redisChannel = `${getPubSubPrefix()}-audit-logger-streaming-destinations:change`; - private _createdPromises: Promise[] = []; + private _createdPromises: Set> = new Set(); private _closed = false; constructor( @@ -117,7 +117,7 @@ export class AuditLogger implements IAuditLogger { this._installStreamingDestinations.clear(); this._orgStreamingDestinations.clear(); const promises = this._createdPromises; - this._createdPromises = []; + this._createdPromises = new Set(); await Promise.allSettled(promises).catch((error) => { this._logger.error(null, "failed to close audit logger", error); }); @@ -178,7 +178,7 @@ export class AuditLogger implements IAuditLogger { } public length() { - return this._createdPromises.length; + return this._createdPromises.size; } private _buildEventFromProperties( @@ -356,12 +356,9 @@ export class AuditLogger implements IAuditLogger { } private _track(prom: Promise) { - this._createdPromises.push(prom); + this._createdPromises.add(prom); return prom.finally(() => { - const index = this._createdPromises.indexOf(prom); - if (index !== -1) { - this._createdPromises.splice(index, 1); - } + this._createdPromises.delete(prom); }); } } diff --git a/test/server/lib/AuditLogger.ts b/test/server/lib/AuditLogger.ts index 038804e01d..079c8bd7f2 100644 --- a/test/server/lib/AuditLogger.ts +++ b/test/server/lib/AuditLogger.ts @@ -1,4 +1,3 @@ -import {waitToPass} from 'app/common/delay'; import { GenericEventFormatter } from "app/server/lib/AuditEventFormatter"; import { AuditLogger, Deps, IAuditLogger } from "app/server/lib/AuditLogger"; import axios from "axios"; @@ -12,6 +11,7 @@ import { isCreateSiteEvent, } from "test/server/lib/helpers/AuditLoggerUtils"; import { EnvironmentSnapshot, setTmpLogLevel } from "test/server/testUtils"; +import { waitForIt } from 'test/server/wait'; const MAX_CONCURRENT_REQUESTS = 10; @@ -336,11 +336,11 @@ describe("AuditLogger", function () { repeat(sendEvent); // Ensure the scope is done. - await waitToPass(() => assert.isTrue(firstScope.isDone(), 'Scope should be done')); + await waitForIt(() => assert.isTrue(firstScope.isDone(), 'Scope should be done')); // When the scope is done, the logger should clear all the pending requests, as they // are done (event the destination fetchers) - await waitToPass(() => assert.equal(logger.length(), 0)); + await waitForIt(() => assert.equal(logger.length(), 0)); }); // Now test the same but by closing the logger. diff --git a/test/server/lib/GristJobs.ts b/test/server/lib/GristJobs.ts index 4682ba94b2..f7a6656621 100644 --- a/test/server/lib/GristJobs.ts +++ b/test/server/lib/GristJobs.ts @@ -1,6 +1,7 @@ -import { delay, waitToPass } from 'app/common/delay'; -import { GristBullMQJobs, GristJobs } from 'app/server/lib/GristJobs'; -import { assert } from 'chai'; +import {delay} from 'app/common/delay'; +import {GristBullMQJobs, GristJobs} from 'app/server/lib/GristJobs'; +import {assert} from 'chai'; +import {waitForIt} from 'test/server/wait'; describe('GristJobs', function() { this.timeout(20000); @@ -26,17 +27,17 @@ describe('GristJobs', function() { defaultCt++; }); await q.add('add', {delta: 2}); - await waitToPass(async () => { + await waitForIt(async () => { assert.equal(ct, 2); assert.equal(defaultCt, 0); }); await q.add('add', {delta: 3}); - await waitToPass(async () => { + await waitForIt(async () => { assert.equal(ct, 5); assert.equal(defaultCt, 0); }); await q.add('badd', {delta: 4}); - await waitToPass(async () => { + await waitForIt(async () => { assert.equal(ct, 5); assert.equal(defaultCt, 1); }); diff --git a/test/server/wait.ts b/test/server/wait.ts index c9f9004358..04b825e653 100644 --- a/test/server/wait.ts +++ b/test/server/wait.ts @@ -1,18 +1,30 @@ -import * as bluebird from 'bluebird'; +import {delay} from 'app/common/delay'; +import {MaybePromise} from 'app/plugin/gutil'; /** - * Wait some time for a check to pass. Allow a pause between checks. + * A helper function that invokes a function until it passes without throwing an error. + * + * Notice: unlike `waitForPass` from `gristUtils`, this function doesn't use browser to delay + * execution, so it's suitable for server-side tests. + * + * @param fn Function that throws an error if the condition is not met. + * @param maxWaitMs Maximum time to wait for the condition to be met. + * @param stepWaitMs Time to wait between attempts to check the condition. */ -export async function waitForIt(check: () => Promise|void, maxWaitMs: number, - stepWaitMs: number = 1000) { +export async function waitForIt(fn: () => MaybePromise, maxWaitMs: number = 2000, + stepWaitMs: number = 100) { const start = Date.now(); - for (;;) { + const timePassed = () => Date.now() - start; + // eslint-disable-next-line no-constant-condition + while (true) { try { - await check(); + await fn(); return; } catch (e) { - if (Date.now() - start > maxWaitMs) { throw e; } + if (timePassed() > maxWaitMs) { + throw e; + } + await delay(stepWaitMs); } - await bluebird.delay(stepWaitMs); } } From 78e7702a8ac3007fbff43a708126e5426fd1bc79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaros=C5=82aw=20Sadzi=C5=84ski?= Date: Thu, 7 Nov 2024 13:04:06 +0100 Subject: [PATCH 07/10] Abort controller, but still test failes, not sure why --- app/server/lib/AuditLogger.ts | 5 +++++ app/server/lib/shutdown.js | 2 ++ test/server/lib/AuditLogger.ts | 4 ++-- test/server/lib/GristJobs.ts | 5 ++++- test/server/wait.ts | 2 +- 5 files changed, 14 insertions(+), 4 deletions(-) diff --git a/app/server/lib/AuditLogger.ts b/app/server/lib/AuditLogger.ts index 2d755bf3f9..79983ae5a7 100644 --- a/app/server/lib/AuditLogger.ts +++ b/app/server/lib/AuditLogger.ts @@ -28,6 +28,8 @@ import fetch from "node-fetch"; import { createClient, RedisClient } from "redis"; import { inspect } from "util"; import { v4 as uuidv4 } from "uuid"; +import {AbortController} from 'node-abort-controller'; + export interface IAuditLogger { /** @@ -103,6 +105,7 @@ export class AuditLogger implements IAuditLogger { private _redisChannel = `${getPubSubPrefix()}-audit-logger-streaming-destinations:change`; private _createdPromises: Set> = new Set(); private _closed = false; + private _abortController = new AbortController(); constructor( private _db: HomeDBManager, @@ -113,6 +116,7 @@ export class AuditLogger implements IAuditLogger { } public async close() { + this._abortController.abort(); this._closed = true; this._installStreamingDestinations.clear(); this._orgStreamingDestinations.clear(); @@ -244,6 +248,7 @@ export class AuditLogger implements IAuditLogger { body: this._buildStreamingDestinationPayload(event, destination), agent: proxyAgent(new URL(url)), timeout: 10_000, + signal: this._abortController.signal, }); if (!resp.ok) { throw new Error( diff --git a/app/server/lib/shutdown.js b/app/server/lib/shutdown.js index 104a227e57..8e3f36e590 100644 --- a/app/server/lib/shutdown.js +++ b/app/server/lib/shutdown.js @@ -60,6 +60,8 @@ function runCleanupHandlers() { return Promise.try(handler.method.bind(handler.context)).timeout(handler.timeout) .catch(function(err) { log.warn(`Cleanup error for '${handler.name}' handler: ` + err); + console.error(`Cleanup error for '${handler.name}' handler:`); + console.error(err.stack || err); }); })); } diff --git a/test/server/lib/AuditLogger.ts b/test/server/lib/AuditLogger.ts index 079c8bd7f2..14dea4c199 100644 --- a/test/server/lib/AuditLogger.ts +++ b/test/server/lib/AuditLogger.ts @@ -336,11 +336,11 @@ describe("AuditLogger", function () { repeat(sendEvent); // Ensure the scope is done. - await waitForIt(() => assert.isTrue(firstScope.isDone(), 'Scope should be done')); + await waitForIt(() => assert.isTrue(firstScope.isDone(), 'Scope should be done'), 1000, 100); // When the scope is done, the logger should clear all the pending requests, as they // are done (event the destination fetchers) - await waitForIt(() => assert.equal(logger.length(), 0)); + await waitForIt(() => assert.equal(logger.length(), 0), 1000, 100); }); // Now test the same but by closing the logger. diff --git a/test/server/lib/GristJobs.ts b/test/server/lib/GristJobs.ts index f7a6656621..4eac23a2d7 100644 --- a/test/server/lib/GristJobs.ts +++ b/test/server/lib/GristJobs.ts @@ -1,7 +1,8 @@ import {delay} from 'app/common/delay'; import {GristBullMQJobs, GristJobs} from 'app/server/lib/GristJobs'; import {assert} from 'chai'; -import {waitForIt} from 'test/server/wait'; +import {partialRight} from 'lodash'; +import {waitForIt as origWaitForIt} from 'test/server/wait'; describe('GristJobs', function() { this.timeout(20000); @@ -134,3 +135,5 @@ describe('GristJobs', function() { } }); }); + +const waitForIt = partialRight(origWaitForIt, 2000, 10); diff --git a/test/server/wait.ts b/test/server/wait.ts index 04b825e653..480fd29338 100644 --- a/test/server/wait.ts +++ b/test/server/wait.ts @@ -12,7 +12,7 @@ import {MaybePromise} from 'app/plugin/gutil'; * @param stepWaitMs Time to wait between attempts to check the condition. */ export async function waitForIt(fn: () => MaybePromise, maxWaitMs: number = 2000, - stepWaitMs: number = 100) { + stepWaitMs: number = 1000) { const start = Date.now(); const timePassed = () => Date.now() - start; // eslint-disable-next-line no-constant-condition From 8391c0343ce2d3c69cb88c042b11e15729966aa8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaros=C5=82aw=20Sadzi=C5=84ski?= Date: Thu, 7 Nov 2024 13:38:32 +0100 Subject: [PATCH 08/10] Fixing test --- app/server/lib/AuditLogger.ts | 35 ++++++++++++++++++++-------------- app/server/lib/shutdown.js | 4 ++-- test/server/lib/AuditLogger.ts | 4 ++-- test/server/lib/GristJobs.ts | 10 ++++------ 4 files changed, 29 insertions(+), 24 deletions(-) diff --git a/app/server/lib/AuditLogger.ts b/app/server/lib/AuditLogger.ts index 79983ae5a7..7e88a9424c 100644 --- a/app/server/lib/AuditLogger.ts +++ b/app/server/lib/AuditLogger.ts @@ -28,8 +28,6 @@ import fetch from "node-fetch"; import { createClient, RedisClient } from "redis"; import { inspect } from "util"; import { v4 as uuidv4 } from "uuid"; -import {AbortController} from 'node-abort-controller'; - export interface IAuditLogger { /** @@ -103,9 +101,8 @@ export class AuditLogger implements IAuditLogger { ); private _redisSubscriber: RedisClient | undefined; private _redisChannel = `${getPubSubPrefix()}-audit-logger-streaming-destinations:change`; - private _createdPromises: Set> = new Set(); + private _createdPromises: Set>|null = new Set(); private _closed = false; - private _abortController = new AbortController(); constructor( private _db: HomeDBManager, @@ -115,16 +112,24 @@ export class AuditLogger implements IAuditLogger { this._subscribeToStreamingDestinations(); } - public async close() { - this._abortController.abort(); + public async close(timeout = 10_000) { + const start = Date.now(); this._closed = true; + while(this._createdPromises?.size) { + if (Date.now() - start > timeout) { + this._logger.error(null, "timed out waiting for promises created by audit logger to complete"); + break; + } + const promises = Array.from(this._createdPromises); + this._createdPromises.clear(); + await Promise.allSettled(promises).catch((error) => { + this._logger.error(null, "failed to close audit logger", error); + }); + } + this._createdPromises = null; this._installStreamingDestinations.clear(); this._orgStreamingDestinations.clear(); - const promises = this._createdPromises; - this._createdPromises = new Set(); - await Promise.allSettled(promises).catch((error) => { - this._logger.error(null, "failed to close audit logger", error); - }); + await this._redisSubscriber?.quitAsync(); } /** @@ -182,7 +187,7 @@ export class AuditLogger implements IAuditLogger { } public length() { - return this._createdPromises.size; + return this._createdPromises?.size ?? 0; } private _buildEventFromProperties( @@ -248,7 +253,6 @@ export class AuditLogger implements IAuditLogger { body: this._buildStreamingDestinationPayload(event, destination), agent: proxyAgent(new URL(url)), timeout: 10_000, - signal: this._abortController.signal, }); if (!resp.ok) { throw new Error( @@ -361,9 +365,12 @@ export class AuditLogger implements IAuditLogger { } private _track(prom: Promise) { + if (!this._createdPromises) { + throw new Error("audit logger is closed"); + } this._createdPromises.add(prom); return prom.finally(() => { - this._createdPromises.delete(prom); + this._createdPromises?.delete(prom); }); } } diff --git a/app/server/lib/shutdown.js b/app/server/lib/shutdown.js index 8e3f36e590..34958b181c 100644 --- a/app/server/lib/shutdown.js +++ b/app/server/lib/shutdown.js @@ -60,8 +60,8 @@ function runCleanupHandlers() { return Promise.try(handler.method.bind(handler.context)).timeout(handler.timeout) .catch(function(err) { log.warn(`Cleanup error for '${handler.name}' handler: ` + err); - console.error(`Cleanup error for '${handler.name}' handler:`); - console.error(err.stack || err); + // console.error(`Cleanup error for '${handler.name}' handler:`); + // console.error(err.stack || err); }); })); } diff --git a/test/server/lib/AuditLogger.ts b/test/server/lib/AuditLogger.ts index 14dea4c199..f39cb0fd6a 100644 --- a/test/server/lib/AuditLogger.ts +++ b/test/server/lib/AuditLogger.ts @@ -336,11 +336,11 @@ describe("AuditLogger", function () { repeat(sendEvent); // Ensure the scope is done. - await waitForIt(() => assert.isTrue(firstScope.isDone(), 'Scope should be done'), 1000, 100); + await waitForIt(() => assert.isTrue(firstScope.isDone(), 'Scope should be done'), 1000, 10); // When the scope is done, the logger should clear all the pending requests, as they // are done (event the destination fetchers) - await waitForIt(() => assert.equal(logger.length(), 0), 1000, 100); + await waitForIt(() => assert.equal(logger.length(), 0), 1000, 10); }); // Now test the same but by closing the logger. diff --git a/test/server/lib/GristJobs.ts b/test/server/lib/GristJobs.ts index 4eac23a2d7..e8e4de72b3 100644 --- a/test/server/lib/GristJobs.ts +++ b/test/server/lib/GristJobs.ts @@ -1,8 +1,7 @@ import {delay} from 'app/common/delay'; import {GristBullMQJobs, GristJobs} from 'app/server/lib/GristJobs'; import {assert} from 'chai'; -import {partialRight} from 'lodash'; -import {waitForIt as origWaitForIt} from 'test/server/wait'; +import {waitForIt} from 'test/server/wait'; describe('GristJobs', function() { this.timeout(20000); @@ -31,17 +30,17 @@ describe('GristJobs', function() { await waitForIt(async () => { assert.equal(ct, 2); assert.equal(defaultCt, 0); - }); + }, 2000, 10); await q.add('add', {delta: 3}); await waitForIt(async () => { assert.equal(ct, 5); assert.equal(defaultCt, 0); - }); + }, 2000, 10); await q.add('badd', {delta: 4}); await waitForIt(async () => { assert.equal(ct, 5); assert.equal(defaultCt, 1); - }); + }, 2000, 10); } finally { await jobs.stop({obliterate: true}); } @@ -136,4 +135,3 @@ describe('GristJobs', function() { }); }); -const waitForIt = partialRight(origWaitForIt, 2000, 10); From af23948370719b1c4fa68681e208608acf1d135c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaros=C5=82aw=20Sadzi=C5=84ski?= Date: Thu, 7 Nov 2024 14:41:16 +0100 Subject: [PATCH 09/10] Reusing redis client --- app/server/lib/AuditLogger.ts | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/app/server/lib/AuditLogger.ts b/app/server/lib/AuditLogger.ts index 7e88a9424c..513401fd1b 100644 --- a/app/server/lib/AuditLogger.ts +++ b/app/server/lib/AuditLogger.ts @@ -99,7 +99,7 @@ export class AuditLogger implements IAuditLogger { "AuditLogger ", (requestOrSession) => getLogMeta(requestOrSession) ); - private _redisSubscriber: RedisClient | undefined; + private _redisClient: RedisClient | null = null; private _redisChannel = `${getPubSubPrefix()}-audit-logger-streaming-destinations:change`; private _createdPromises: Set>|null = new Set(); private _closed = false; @@ -113,6 +113,7 @@ export class AuditLogger implements IAuditLogger { } public async close(timeout = 10_000) { + // Wait 10 seconds for all promises to complete const start = Date.now(); this._closed = true; while(this._createdPromises?.size) { @@ -126,10 +127,17 @@ export class AuditLogger implements IAuditLogger { this._logger.error(null, "failed to close audit logger", error); }); } + + // Remove the set, it will prevent from adding new promises. This is just sanity check, as + // code already tests for this._closed before adding new promises. this._createdPromises = null; + + // Clear up TTL Maps that have timers associated with them. this._installStreamingDestinations.clear(); this._orgStreamingDestinations.clear(); - await this._redisSubscriber?.quitAsync(); + + // Close the redis clients if they weren't already. + await this._redisClient?.quitAsync(); } /** @@ -327,13 +335,13 @@ export class AuditLogger implements IAuditLogger { return; } - this._redisSubscriber = createClient(process.env.REDIS_URL); - this._redisSubscriber.subscribe(this._redisChannel); - this._redisSubscriber.on("message", async (message) => { + this._redisClient ??= createClient(process.env.REDIS_URL); + this._redisClient.subscribe(this._redisChannel); + this._redisClient.on("message", async (message) => { const { orgId } = JSON.parse(message); this._invalidateStreamingDestinations(orgId); }); - this._redisSubscriber.on("error", async (error) => { + this._redisClient.on("error", async (error) => { this._logger.error( null, `encountered error while subscribed to channel ${this._redisChannel}`, @@ -346,10 +354,9 @@ export class AuditLogger implements IAuditLogger { if (!process.env.REDIS_URL) { return; } - - const redis = createClient(process.env.REDIS_URL); + this._redisClient ??= createClient(process.env.REDIS_URL); try { - await redis.publishAsync(this._redisChannel, JSON.stringify({ orgId })); + await this._redisClient.publishAsync(this._redisChannel, JSON.stringify({ orgId })); } catch (error) { this._logger.error( null, @@ -359,8 +366,6 @@ export class AuditLogger implements IAuditLogger { orgId, } ); - } finally { - await redis.quitAsync(); } } From 6de01d571615a388f8d751e4f3b9350f71148b0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaros=C5=82aw=20Sadzi=C5=84ski?= Date: Fri, 8 Nov 2024 10:45:37 +0100 Subject: [PATCH 10/10] Removing leftovers from debugging --- app/server/lib/shutdown.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/app/server/lib/shutdown.js b/app/server/lib/shutdown.js index 34958b181c..104a227e57 100644 --- a/app/server/lib/shutdown.js +++ b/app/server/lib/shutdown.js @@ -60,8 +60,6 @@ function runCleanupHandlers() { return Promise.try(handler.method.bind(handler.context)).timeout(handler.timeout) .catch(function(err) { log.warn(`Cleanup error for '${handler.name}' handler: ` + err); - // console.error(`Cleanup error for '${handler.name}' handler:`); - // console.error(err.stack || err); }); })); }