Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Catch activity fetch failures and retry #1954

Merged
merged 1 commit into from
Jan 9, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 53 additions & 44 deletions imports/server/gdriveActivityFetcher.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Meteor } from "meteor/meteor";
import Flags from "../Flags";
import Logger from "../Logger";
import {
ACTIVITY_GRANULARITY,
ACTIVITY_SEGMENTS,
Expand Down Expand Up @@ -160,57 +161,65 @@ async function featureFlagChanged() {

async function fetchActivityLoop() {
while (true) {
// Loop until the feature flag is disabled (i.e. the disabler is not
// disabled)
while (true) {
if (!(await Flags.activeAsync(FEATURE_FLAG_NAME))) {
break;
try {
// Loop until the feature flag is disabled (i.e. the disabler is not
// disabled)
while (true) {
if (!(await Flags.activeAsync(FEATURE_FLAG_NAME))) {
break;
}
await featureFlagChanged();
}
await featureFlagChanged();
}

await withLock("drive-activity", async (renew) => {
// Ensure that we continue to hold the lock as long as we're alive.
let renewInterval;
try {
const renewalFailure = new Promise<boolean>((r) => {
renewInterval = Meteor.setInterval(async () => {
try {
await renew();
} catch (e) {
// We failed to renew the lock
r(true);
}
}, PREEMPT_TIMEOUT / 2);
});
await withLock("drive-activity", async (renew) => {
// Ensure that we continue to hold the lock as long as we're alive.
let renewInterval;
try {
const renewalFailure = new Promise<boolean>((r) => {
renewInterval = Meteor.setInterval(async () => {
try {
await renew();
} catch (e) {
// We failed to renew the lock
r(true);
}
}, PREEMPT_TIMEOUT / 2);
});

// As long as we are alive and the feature flag is not active, hold the
// lock and keep looping
while (true) {
if (await Flags.activeAsync(FEATURE_FLAG_NAME)) {
return; // from withLock
}
// As long as we are alive and the feature flag is not active, hold the
// lock and keep looping
while (true) {
if (await Flags.activeAsync(FEATURE_FLAG_NAME)) {
return; // from withLock
}

await fetchDriveActivity();
await fetchDriveActivity();

// Wake up every 5 seconds (+/- 1 second of jitter)
const sleep = new Promise<boolean>((r) => {
Meteor.setTimeout(
() => r(false),
4 * 1000 + Math.random() * 2 * 1000,
);
});
const renewalFailed = await Promise.race([sleep, renewalFailure]);
if (renewalFailed) {
return; // from withLock
// Wake up every 5 seconds (+/- 1 second of jitter)
const sleep = new Promise<boolean>((r) => {
Meteor.setTimeout(
() => r(false),
4 * 1000 + Math.random() * 2 * 1000,
);
});
const renewalFailed = await Promise.race([sleep, renewalFailure]);
if (renewalFailed) {
return; // from withLock
}
}
} finally {
if (renewInterval) {
Meteor.clearInterval(renewInterval);
}
}
} finally {
if (renewInterval) {
Meteor.clearInterval(renewInterval);
}
}
});
});
} catch (error) {
Logger.error("Error fetching drive activity", { error });
// Sleep for 5 seconds before retrying
await new Promise((r) => {
Meteor.setTimeout(r, 5000);
});
}
}
}

Expand Down