Skip to content

Commit

Permalink
Adding a shutdown method to the AttachmentsFileManager (#1428)
Browse files Browse the repository at this point in the history
## Context

During ActiveDoc shutdown process the AttachmentsFileManager's internal
loop wasn't interrupted. This prevented server to shutdown gracefully.

## Proposed solution

The manager class is now closed properly during ActiveDoc shutdown
process. This still can be improved by implementing:
- Proper close method for AttachmentsStoreProviders (currently it is
dead code)
- Propagating the AbortSignal to the External/Internal store providers
that should abort the upload process

## Has this been tested?

- [x] 👍 yes, I added tests to the test suite

#1428
  • Loading branch information
berhalak authored Feb 12, 2025
1 parent 499df94 commit 86caf39
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 7 deletions.
5 changes: 5 additions & 0 deletions app/server/lib/ActiveDoc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2170,6 +2170,7 @@ export class ActiveDoc extends EventEmitter {
};

try {

this.setMuted();
this._inactivityTimer.disable();
if (this.docClients.clientCount() > 0) {
Expand All @@ -2181,6 +2182,10 @@ export class ActiveDoc extends EventEmitter {

this._triggers.shutdown();

// attachmentFileManager needs to shut down before DocStorage, to allow transfers to finish.
await safeCallAndWait('attachmentFileManager',
this._attachmentFileManager.shutdown.bind(this._attachmentFileManager));

this._redisSubscriber?.quitAsync()
.catch(e => this._log.warn(docSession, "Failed to quit redis subscriber", e));

Expand Down
27 changes: 22 additions & 5 deletions app/server/lib/AttachmentFileManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import log from 'app/server/lib/log';
import {LogMethods} from 'app/server/lib/LogMethods';
import {MemoryWritableStream} from 'app/server/utils/MemoryWritableStream';
import {Readable} from 'node:stream';
import {AbortController} from 'node-abort-controller';


export interface AddFileResult {
fileIdent: string;
Expand Down Expand Up @@ -89,6 +91,8 @@ export class AttachmentFileManager {
// in which case nothing will happen. Map ensures new requests override older pending transfers.
private _pendingFileTransfers: Map<string, AttachmentStoreId | undefined> = new Map();
private _transferJob?: TransferJob;
private _loopAbortController: AbortController = new AbortController();
private _loopAbort = this._loopAbortController?.signal as globalThis.AbortSignal;

/**
* @param _docStorage - Storage of this manager's document.
Expand All @@ -106,6 +110,15 @@ export class AttachmentFileManager {
this._docPoolId = _docInfo ? getDocPoolIdFromDocInfo(_docInfo) : null;
}

public async shutdown(): Promise<void> {
if (this._loopAbort.aborted) {
throw new Error("shutdown already in progress");
}
this._pendingFileTransfers.clear();
this._loopAbortController.abort();
await this._transferJob?.catch(reason => this._log.error({}, `Error during shutdown: ${reason}`));
}

// This attempts to add the attachment to the given store.
// If the file already exists in another store, it doesn't take any action.
// Therefore, there isn't a guarantee that the file exists in the given store, even if this method doesn't error.
Expand Down Expand Up @@ -140,6 +153,9 @@ export class AttachmentFileManager {
}

public async startTransferringAllFilesToOtherStore(newStoreId: AttachmentStoreId | undefined): Promise<void> {
if (this._loopAbort.aborted) {
throw new Error("AttachmentFileManager was shut down");
}
// Take a "snapshot" of the files we want to transfer, and schedule those files for transfer.
// It's possibly that other code will modify the file statuses / list during this process.
// As a consequence, after this process completes, some files may still be in their original
Expand Down Expand Up @@ -240,7 +256,7 @@ export class AttachmentFileManager {

private async _performPendingTransfers() {
try {
while (this._pendingFileTransfers.size > 0) {
while (this._pendingFileTransfers.size > 0 && !this._loopAbort.aborted) {
// Map.entries() will always return the most recent key/value from the map, even after a long async delay
// Meaning we can safely iterate here and know the transfer is up to date.
for (const [fileIdent, targetStoreId] of this._pendingFileTransfers.entries()) {
Expand All @@ -257,7 +273,9 @@ export class AttachmentFileManager {
}
}
} finally {
await this._docStorage.requestVacuum();
if (!this._loopAbort.aborted) {
await this._docStorage.requestVacuum();
}
}
}

Expand Down Expand Up @@ -427,15 +445,14 @@ export class AttachmentFileManager {
// Uploads the file to an attachment store, overwriting the current DB record for the file if successful.
private async _storeFileInAttachmentStore(
store: IAttachmentStore, fileIdent: string, fileData: Buffer
): Promise<string> {
): Promise<void> {

// The underlying store should guarantee the file exists if this method doesn't error,
// so no extra validation is needed here.
await store.upload(this._getDocPoolId(), fileIdent, Readable.from(fileData));

// Insert (or overwrite) the entry for this file in the document database.
await this._docStorage.attachOrUpdateFile(fileIdent, undefined, store.id);

return fileIdent;
}

private async _getFileDataFromAttachmentStore(store: IAttachmentStore, fileIdent: string): Promise<Buffer> {
Expand Down
102 changes: 100 additions & 2 deletions test/server/lib/AttachmentFileManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@ import {
StoreNotAvailableError,
StoresNotConfiguredError
} from "app/server/lib/AttachmentFileManager";
import { getDocPoolIdFromDocInfo } from "app/server/lib/AttachmentStore";
import { AttachmentStoreProvider, IAttachmentStoreProvider } from "app/server/lib/AttachmentStoreProvider";
import { getDocPoolIdFromDocInfo, IAttachmentStore } from "app/server/lib/AttachmentStore";
import {
AttachmentStoreProvider,
IAttachmentStoreConfig,
IAttachmentStoreProvider,
} from "app/server/lib/AttachmentStoreProvider";
import { makeTestingFilesystemStoreConfig } from "test/server/lib/FilesystemAttachmentStore";
import {waitForIt} from 'test/server/wait';
import { assert } from "chai";
import * as sinon from "sinon";
import * as stream from "node:stream";
Expand Down Expand Up @@ -81,11 +86,56 @@ function createDocStorageFake(docName: string): DocStorage {
return new DocStorageFake(docName) as unknown as DocStorage;
}

interface TestAttachmentStore extends IAttachmentStore {
uploads(): number;
finish(): void;
}


/** Creates a fake storage provider that doesn't finish the upload */
export async function makeTestingControlledStore(
name: string = "unfinished"
): Promise<IAttachmentStoreConfig> {
let resolve: () => void;
const createProm = () => new Promise<void>((_resolve) => { resolve = _resolve; });
let promise = createProm();
let uploads = 0;
const neverStore = {
id: name,
async exists() { return false; },
async upload() {
uploads++;
// Every time file is uploaded, we create a new promise that can be resolved to finish the upload.
resolve();
promise = createProm();
return promise;
},
async download() { return; },
async delete() { return; },
async removePool() { return; },
async close() { return; },
finish() { resolve(); },
uploads() { return uploads; },
};
return {
label: name,
spec: {
name: "unfinished",
create: async () => {
return neverStore;
}
},
};
}

const UNFINISHED_STORE = "TEST-INSTALLATION-UUID-unfinished";

async function createFakeAttachmentStoreProvider(): Promise<IAttachmentStoreProvider> {
return new AttachmentStoreProvider(
[
await makeTestingFilesystemStoreConfig("filesystem"),
await makeTestingFilesystemStoreConfig("filesystem-alt"),
await makeTestingControlledStore("unfinished"),
],
"TEST-INSTALLATION-UUID"
);
Expand All @@ -95,11 +145,59 @@ describe("AttachmentFileManager", function() {
let defaultProvider: IAttachmentStoreProvider;
let defaultDocStorageFake: DocStorage;

this.timeout('4s');

beforeEach(async function() {
defaultProvider = await createFakeAttachmentStoreProvider();
defaultDocStorageFake = createDocStorageFake(defaultTestDocName);
});

it("should stop the loop when aborted", async function() {
const manager = new AttachmentFileManager(
defaultDocStorageFake,
defaultProvider,
{ id: "Unimportant", trunkId: null },
);

// Add some file.
await manager.addFile(undefined, ".txt", Buffer.from("first file"));
await manager.addFile(undefined, ".txt", Buffer.from("second file"));
await manager.addFile(undefined, ".txt", Buffer.from("third file"));

// Move this file to a store that never finish uploads.
await manager.startTransferringAllFilesToOtherStore(defaultProvider.listAllStoreIds()[2]);

// Make sure we are running.
assert.isTrue(manager.transferStatus().isRunning);

// Get the store to test if the upload was called. The store is created only once.
const store = await defaultProvider.getStore(UNFINISHED_STORE).then((store) => store as TestAttachmentStore);

// Wait for the first upload to be called.
await waitForIt(() => assert.equal(store.uploads(), 1));

// Finish the upload and go to the next one.
store.finish();

// Wait for the second one to be called.
await waitForIt(() => assert.equal(store.uploads(), 2));

// Now shutdown the manager.
const shutdownProm = manager.shutdown();

// Manager will wait for the loop to finish, so allow the second file to be uploaded.
store.finish();

// Now manager can finish.
await shutdownProm;

// Make sure we are not running.
assert.isFalse(manager.transferStatus().isRunning);

// And make sure that the upload was only called twice.
assert.equal(store.uploads(), 2);
});

it("should throw if uses an external store when no document pool id is available", async function () {
const manager = new AttachmentFileManager(
defaultDocStorageFake,
Expand Down

0 comments on commit 86caf39

Please sign in to comment.