Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding close method to AuditLogger to free up resources. #1292

Merged
merged 10 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 66 additions & 13 deletions app/server/lib/AuditLogger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ export interface IAuditLogger {
requestOrSession: RequestOrSession,
properties: AuditEventProperties<Action>
): Promise<void>;

/**
* Close any resources used by the logger.
*/
close(): Promise<void>;
}

export interface AuditEventProperties<
Expand Down Expand Up @@ -94,8 +99,10 @@ export class AuditLogger implements IAuditLogger {
"AuditLogger ",
(requestOrSession) => getLogMeta(requestOrSession)
);
private _redisSubscriber: RedisClient | undefined;
private _redisClient: RedisClient | null = null;
private _redisChannel = `${getPubSubPrefix()}-audit-logger-streaming-destinations:change`;
private _createdPromises: Set<Promise<any>>|null = new Set();
private _closed = false;

constructor(
private _db: HomeDBManager,
Expand All @@ -105,14 +112,46 @@ export class AuditLogger implements IAuditLogger {
this._subscribeToStreamingDestinations();
}

public async close(timeout = 10_000) {
// Wait 10 seconds for all promises to complete
const start = Date.now();
this._closed = true;
while(this._createdPromises?.size) {
if (Date.now() - start > timeout) {
this._logger.error(null, "timed out waiting for promises created by audit logger to complete");
break;
}
const promises = Array.from(this._createdPromises);
this._createdPromises.clear();
await Promise.allSettled(promises).catch((error) => {
this._logger.error(null, "failed to close audit logger", error);
});
}

// Remove the set, it will prevent from adding new promises. This is just sanity check, as
// code already tests for this._closed before adding new promises.
this._createdPromises = null;

// Clear up TTL Maps that have timers associated with them.
this._installStreamingDestinations.clear();
this._orgStreamingDestinations.clear();

// Close the redis clients if they weren't already.
await this._redisClient?.quitAsync();
}

/**
* Logs an audit event.
*/
public logEvent(
requestOrSession: RequestOrSession,
properties: AuditEventProperties
): void {
this.logEventOrThrow(requestOrSession, properties).catch((error) => {
if (this._closed) {
throw new Error("audit logger is closed");
}

this._track(this.logEventOrThrow(requestOrSession, properties)).catch((error) => {
this._logger.error(requestOrSession, `failed to log audit event`, error);
this._logger.warn(
requestOrSession,
Expand All @@ -131,6 +170,9 @@ export class AuditLogger implements IAuditLogger {
requestOrSession: RequestOrSession,
properties: AuditEventProperties
) {
if (this._closed) {
throw new Error("audit logger is closed");
}
const event = this._buildEventFromProperties(requestOrSession, properties);
const destinations = await this._getOrSetStreamingDestinations(event);
const requests = await Promise.allSettled(
Expand All @@ -152,6 +194,10 @@ export class AuditLogger implements IAuditLogger {
}
}

public length() {
return this._createdPromises?.size ?? 0;
}

private _buildEventFromProperties(
requestOrSession: RequestOrSession,
properties: AuditEventProperties
Expand All @@ -174,10 +220,10 @@ export class AuditLogger implements IAuditLogger {
const orgId = event.context.site?.id;
const destinations = await Promise.all([
Copy link
Contributor Author

@berhalak berhalak Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To recreate similar issue just promises just put those lines here and run the DocApi2 test.

      new Promise<AuditLogStreamingDestination[] | null>((resolve) => setTimeout(resolve, 10 * 1000, null)),
      new Promise<AuditLogStreamingDestination[] | null>((res, reject) => setTimeout(reject, 1, null)),

mapGetOrSet(this._installStreamingDestinations, true, () =>
this._fetchStreamingDestinations()
this._track(this._fetchStreamingDestinations()),
),
!orgId ? null : mapGetOrSet(this._orgStreamingDestinations, orgId, () =>
this._fetchStreamingDestinations(orgId)
this._track(this._fetchStreamingDestinations(orgId))
),
]);
return destinations
Expand Down Expand Up @@ -289,13 +335,13 @@ export class AuditLogger implements IAuditLogger {
return;
}

this._redisSubscriber = createClient(process.env.REDIS_URL);
this._redisSubscriber.subscribe(this._redisChannel);
this._redisSubscriber.on("message", async (message) => {
this._redisClient ??= createClient(process.env.REDIS_URL);
this._redisClient.subscribe(this._redisChannel);
this._redisClient.on("message", async (message) => {
const { orgId } = JSON.parse(message);
this._invalidateStreamingDestinations(orgId);
});
this._redisSubscriber.on("error", async (error) => {
this._redisClient.on("error", async (error) => {
this._logger.error(
null,
`encountered error while subscribed to channel ${this._redisChannel}`,
Expand All @@ -308,10 +354,9 @@ export class AuditLogger implements IAuditLogger {
if (!process.env.REDIS_URL) {
return;
}

const redis = createClient(process.env.REDIS_URL);
this._redisClient ??= createClient(process.env.REDIS_URL);
try {
await redis.publishAsync(this._redisChannel, JSON.stringify({ orgId }));
await this._redisClient.publishAsync(this._redisChannel, JSON.stringify({ orgId }));
} catch (error) {
this._logger.error(
null,
Expand All @@ -321,10 +366,18 @@ export class AuditLogger implements IAuditLogger {
orgId,
}
);
} finally {
await redis.quitAsync();
}
}

private _track(prom: Promise<any>) {
if (!this._createdPromises) {
throw new Error("audit logger is closed");
}
this._createdPromises.add(prom);
return prom.finally(() => {
this._createdPromises?.delete(prom);
});
}
}

export class LogAuditEventError extends Error {
Expand Down
1 change: 1 addition & 0 deletions app/server/lib/FlexServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,7 @@ export class FlexServer implements GristServer {
// Do this after _shutdown, since DocWorkerMap is used during shutdown.
if (this._docWorkerMap) { await this._docWorkerMap.close(); }
if (this._sessionStore) { await this._sessionStore.close(); }
if (this._auditLogger) { await this._auditLogger.close(); }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about what happens if there are slow audit sinks and this close takes time. Not much we can do. We could have started the close earlier, in parallel with other actions - but perhaps we want to audit those :).

Let's leave it so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will go away with jobs. Then we would just abort it, and resume later.

}

public addDocApiForwarder() {
Expand Down
1 change: 1 addition & 0 deletions app/server/lib/GristServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ export function createDummyAuditLogger(): IAuditLogger {
return {
logEvent() { /* do nothing */ },
logEventOrThrow() { return Promise.resolve(); },
close() { return Promise.resolve(); },
};
}

Expand Down
3 changes: 0 additions & 3 deletions test/nbrowser/ColumnOps.ntest.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ describe('ColumnOps.ntest', function() {
await gu.supportOldTimeyTestCode();
await gu.useFixtureDoc(cleanup, "World.grist", true);
await gu.toggleSidePanel('left', 'close');
// The banner appearing mid-test occasionally interferes with the rest of
// the tests (for some unknown reason), so wait for it to appear first.
await $('.test-banner-element').wait();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This banner was only shown on internal CLI (probably), we will sort it differently.

});

afterEach(function() {
Expand Down
3 changes: 0 additions & 3 deletions test/nbrowser/SavePosition.ntest.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ describe('SavePosition.ntest', function() {
this.timeout(Math.max(this.timeout(), 20000)); // Long-running test, unfortunately
await gu.supportOldTimeyTestCode();
await gu.useFixtureDoc(cleanup, "World.grist", true);
// The banner appearing mid-test occasionally interferes with the rest of
// the tests (for some unknown reason), so wait for it to appear first.
await $('.test-banner-element').wait();
});

afterEach(function() {
Expand Down
122 changes: 112 additions & 10 deletions test/server/lib/AuditLogger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ import {
isCreateSiteEvent,
} from "test/server/lib/helpers/AuditLoggerUtils";
import { EnvironmentSnapshot, setTmpLogLevel } from "test/server/testUtils";
import { waitForIt } from 'test/server/wait';

const MAX_CONCURRENT_REQUESTS = 10;

describe("AuditLogger", function () {
this.timeout(10000);
this.timeout('10s');
setTmpLogLevel("error");

let oldEnv: EnvironmentSnapshot;
Expand All @@ -31,7 +34,7 @@ describe("AuditLogger", function () {
process.env.TYPEORM_DATABASE = ":memory:";
process.env.GRIST_DEFAULT_EMAIL = chimpyEmail;
sandbox.stub(Deps, "CACHE_TTL_MS").value(0);
sandbox.stub(Deps, "MAX_CONCURRENT_REQUESTS").value(10);
sandbox.stub(Deps, "MAX_CONCURRENT_REQUESTS").value(MAX_CONCURRENT_REQUESTS);
server = new TestServer(this);
homeUrl = await server.start();
oid = (await server.dbManager.testGetId("NASA")) as number;
Expand All @@ -43,21 +46,18 @@ describe("AuditLogger", function () {
});
});

afterEach(async function () {
await auditLogger.close();
});

after(async function () {
sandbox.restore();
oldEnv.restore();
await server.stop();
});

describe("logEventOrThrow", function () {
beforeEach(async function () {
// Ignore "config.*" audit events; some tests call the `/configs`
// endpoint as part of setup, which triggers them.
nock("https://audit.example.com")
.persist()
.post(/\/events\/.*/, (body) => body.action?.startsWith("config."))
.reply(200);
});
beforeEach(ignoreConfigEvents);

afterEach(function () {
nock.abortPendingRequests();
Expand Down Expand Up @@ -295,4 +295,106 @@ describe("AuditLogger", function () {
);
});
});
describe('closes resources properly', function() {
before(async function() {
// Create the AuditLogger instance that we will close eventually.
logger = new AuditLogger(server.dbManager, {
formatters: [new GenericEventFormatter()],
});

ignoreConfigEvents();

// Wire up the destinations.
await axios.put(
`${homeUrl}/api/install/configs/audit_log_streaming_destinations`,
[
{
id: "62c9e725-1195-48e7-a9f6-0ba164128c20",
name: "other",
url: "https://audit.example.com/events/install",
},
],
chimpy
);
});

afterEach(function () {
nock.cleanAll();
});

after(async function() {
await logger.close();
});

// Start the first test. We test here that audit logger properly clears its queue, without
// closing.
it('on its own', async function() {
// Start a scope to track the requests.
const firstScope = installScope();

// Fire up MAX_CONCURRENT_REQUESTS events to test the logger.
repeat(sendEvent);

// Ensure the scope is done.
await waitForIt(() => assert.isTrue(firstScope.isDone(), 'Scope should be done'), 1000, 10);

// When the scope is done, the logger should clear all the pending requests, as they
// are done (event the destination fetchers)
await waitForIt(() => assert.equal(logger.length(), 0), 1000, 10);
});

// Now test the same but by closing the logger.
it('when closed', async function() {
// Start a scope to track the requests.
const secondScope = installScope();

// Send all events (without waiting for the result)
repeat(sendEvent);

// Now close the logger and wait for all created promises to resolve.
await logger.close();

// Scope should be done.
assert.isTrue(secondScope.isDone());

// And the logger should have cleared all the pending requests.
assert.equal(logger.length(), 0);
});

// Dummy destination creator.
const installScope = () => nock("https://audit.example.com")
.post("/events/install")
.times(MAX_CONCURRENT_REQUESTS)
.reply(200);

// The AuditLogger instance that we will close eventually.
let logger: AuditLogger;

// Helper to send events.
const sendEvent = () => logger.logEvent(null, {
action: "site.create",
details: {
site: {
id: oid,
name: "Grist Labs",
domain: "gristlabs",
},
},
});
});
});

function repeat(fn: (i: number) => void) {
for (let i = 0; i < MAX_CONCURRENT_REQUESTS; i++) {
fn(i);
}
}

function ignoreConfigEvents() {
// Ignore "config.*" audit events; some tests call the `/configs`
// endpoint as part of setup, which triggers them.
nock("https://audit.example.com")
.persist()
.post(/\/events\/.*/, (body) => body.action?.startsWith("config."))
.reply(200);
}
Loading
Loading