Skip to content

Commit

Permalink
Reusing redis client
Browse files Browse the repository at this point in the history
  • Loading branch information
berhalak committed Nov 7, 2024
1 parent 65daf08 commit 8f6e76b
Showing 1 changed file with 16 additions and 11 deletions.
27 changes: 16 additions & 11 deletions app/server/lib/AuditLogger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ export class AuditLogger implements IAuditLogger {
"AuditLogger ",
(requestOrSession) => getLogMeta(requestOrSession)
);
private _redisSubscriber: RedisClient | undefined;
private _redisClient: RedisClient | null = null;
private _redisChannel = `${getPubSubPrefix()}-audit-logger-streaming-destinations:change`;
private _createdPromises: Set<Promise<any>>|null = new Set();
private _closed = false;
Expand All @@ -113,6 +113,7 @@ export class AuditLogger implements IAuditLogger {
}

public async close(timeout = 10_000) {
// Wait 10 seconds for all promises to complete
const start = Date.now();
this._closed = true;
while(this._createdPromises?.size) {
Expand All @@ -126,10 +127,17 @@ export class AuditLogger implements IAuditLogger {
this._logger.error(null, "failed to close audit logger", error);
});
}

// Remove the set, it will prevent from adding new promises. This is just sanity check, as
// code already tests for this._closed before adding new promises.
this._createdPromises = null;

// Clear up TTL Maps that have timers associated with them.
this._installStreamingDestinations.clear();
this._orgStreamingDestinations.clear();
await this._redisSubscriber?.quitAsync();

// Close the redis clients if they weren't already.
await this._redisClient?.quitAsync();
}

/**
Expand Down Expand Up @@ -327,13 +335,13 @@ export class AuditLogger implements IAuditLogger {
return;
}

this._redisSubscriber = createClient(process.env.REDIS_URL);
this._redisSubscriber.subscribe(this._redisChannel);
this._redisSubscriber.on("message", async (message) => {
this._redisClient ??= createClient(process.env.REDIS_URL);
this._redisClient.subscribe(this._redisChannel);
this._redisClient.on("message", async (message) => {
const { orgId } = JSON.parse(message);
this._invalidateStreamingDestinations(orgId);
});
this._redisSubscriber.on("error", async (error) => {
this._redisClient.on("error", async (error) => {
this._logger.error(
null,
`encountered error while subscribed to channel ${this._redisChannel}`,
Expand All @@ -346,10 +354,9 @@ export class AuditLogger implements IAuditLogger {
if (!process.env.REDIS_URL) {
return;
}

const redis = createClient(process.env.REDIS_URL);
this._redisClient ??= createClient(process.env.REDIS_URL);
try {
await redis.publishAsync(this._redisChannel, JSON.stringify({ orgId }));
await this._redisClient.publishAsync(this._redisChannel, JSON.stringify({ orgId }));
} catch (error) {
this._logger.error(
null,
Expand All @@ -359,8 +366,6 @@ export class AuditLogger implements IAuditLogger {
orgId,
}
);
} finally {
await redis.quitAsync();
}
}

Expand Down

0 comments on commit 8f6e76b

Please sign in to comment.