Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into feat/achievements
Browse files Browse the repository at this point in the history
  • Loading branch information
LomyW committed Dec 18, 2023
2 parents 14ce44a + 6b6c233 commit c42ff66
Show file tree
Hide file tree
Showing 14 changed files with 304 additions and 173 deletions.
16 changes: 14 additions & 2 deletions graphql/admin.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,29 @@
import { executeJob } from '../jobs/manualExecution';
import { Arg, Authorized, Mutation, Resolver } from 'type-graphql';
import { Role } from './authorizations';
import { clearFilestore } from './files';
import { rateLimitSummary, resetRateLimits } from './rate-limit';
import { jobExists } from '../jobs/list';
import { UserInputError } from 'apollo-server-express';
import { runJob } from '../jobs/execute';
import { Doc } from './util';

// Mutations for managing the backend, should usually only be used for testing purposes

@Resolver()
export class AdminMutationsResolver {
@Mutation((returns) => Boolean)
@Authorized(Role.ADMIN)
@Doc('Schedules a Job for immediate Execution - The request might time out while the job is still running')
async _executeJob(@Arg('job') job: string) {
await executeJob(job);
if (!jobExists(job)) {
throw new UserInputError(`No Job named '${job}'`);
}

const success = await runJob(job);
if (!success) {
throw new Error(`Job Execution failed`);
}

return true;
}

Expand Down
1 change: 1 addition & 0 deletions graphql/authorizations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ export const authorizationEnhanceMap: Required<ResolversEnhanceMap> = {
Achievement_template: allAdmin,
User_achievement: allAdmin,
Achievement_event: allAdmin,
Job_run: { _all: nobody },
};

/* Some entities are generally accessible by multiple users, however some fields of them are
Expand Down
15 changes: 14 additions & 1 deletion graphql/user/fields.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,20 @@ export class UserFieldsResolver {
if (studentQuery) {
const students = await prisma.student.findMany({
select: { firstname: true, lastname: true, email: true, id: true },
where: { ...studentQuery, active: true, verification: null },
where: {
AND: [
studentQuery,
{
active: true,
verification: null,
},
// For now we exclude unscreened helpers, as they wont be interested
// in most of our marketing campaigns anyways
{
OR: [{ screening: { success: true } }, { instructor_screening: { success: true } }],
},
],
},
});
result.push(...students.map(userForStudent));
}
Expand Down
29 changes: 28 additions & 1 deletion integration-tests/10_admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { randomBytes } from 'crypto';
import { test } from './base';
import { adminClient } from './base/clients';
import { pupilOne, studentOne } from './01_user';
import { prisma } from '../common/prisma';

/* eslint-disable */

Expand Down Expand Up @@ -222,7 +223,7 @@ void test('Admin Manage Notifications', async () => {

assert.ok(!scheduled.find((it) => it.notificationID === id), 'Concrete notification already visible?');

await adminClient.request(`mutation { _executeJob(job: "Notification") }`);
await adminClient.request(`mutation { _executeJob(job: "checkReminders") }`);

const {
me: { concreteNotifications: sent },
Expand Down Expand Up @@ -284,3 +285,29 @@ void test('Admin Manage Notifications', async () => {
assert.strictEqual(notification2.message.body, 'TEST');
assert.strictEqual(notification2.message.navigateTo, null);
});

void test('Job Synchronization', async () => {
await adminClient.requestShallFail(`mutation RunNonExistentJob { _executeJob(job: "FindTheAnswerToTheUniverse") }`);

const timeBefore = new Date();

const results = await Promise.allSettled([
adminClient.request(`mutation RunJob1 { _executeJob(job: "NOTHING_DO_NOT_USE") }`),
adminClient.request(`mutation RunJob2 { _executeJob(job: "NOTHING_DO_NOT_USE") }`),
adminClient.request(`mutation RunJob3 { _executeJob(job: "NOTHING_DO_NOT_USE") }`),
adminClient.request(`mutation RunJob4 { _executeJob(job: "NOTHING_DO_NOT_USE") }`),
adminClient.request(`mutation RunJob5 { _executeJob(job: "NOTHING_DO_NOT_USE") }`),
]);

const succeeded = results.filter((it) => it.status === 'fulfilled').length;
assert.strictEqual(1, succeeded, 'Expected only one concurrent run to succeed');

const successBooked = await prisma.job_run.findMany({
where: { job_name: 'NOTHING_DO_NOT_USE', startedAt: { gt: timeBefore } },
});

assert.strictEqual(successBooked.length, 1, 'Expected only one job to be booked to the database');
assert.ok(!!successBooked[0].endedAt, 'Expected endedAt to be set after job execution');

await adminClient.request(`mutation RunJob6 { _executeJob(job: "NOTHING_DO_NOT_USE") }`);
});
135 changes: 135 additions & 0 deletions jobs/execute.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import { JobName, allJobs } from './list';
import tracer from '../common/logger/tracing';
import { getLogger } from '../common/logger/logger';
import { metrics, metricsRouter } from '../common/logger/metrics';
import { prisma } from '../common/prisma';
import { Prisma, job_run } from '@prisma/client';
import assert from 'assert';

const logger = getLogger('Job Execution');

enum LockStatus {
// We failed to aquire the lock as the transaction was rolled back by the database
// (due to a conflict), but we don't yet know whether another job is currently running
// Best to retry soon
ROLLBACK = 1,
// Failed to aquire a lock because the same job is already running
CONFLICT = 2,
AQUIRED = 3,
}

export async function runJob(jobName: JobName): Promise<boolean> {
let success = false;

try {
logger.info(`Starting to run Job '${jobName}'`, { jobName });

// ---------- AQUIRE --------------
// Prevent Job Runs running concurrently (across dynos), as jobs usually lack synchronization internally
// To synchronize we use the 'job_run' table in our Postgres
// During insert we need transaction level SERIALIZABLE to prevent two jobs from inserting a new job run
// at the same time

let jobRun: job_run;
let lockStatus: LockStatus = LockStatus.ROLLBACK as LockStatus;
let lockRetries = 5;

do {
try {
// Wait between 0 and 1000ms to reduce the likelihood of transaction deadlocks
// (as a lot of Cron Jobs fire at exactly the same time)
await new Promise((resolve) => setTimeout(resolve, Math.floor(Math.random() * 1000)));

jobRun = await prisma.$transaction(
async (jobPrisma) => {
const runningJob = await jobPrisma.job_run.findFirst({
where: {
job_name: jobName,
endedAt: { equals: null },
},
});

if (runningJob) {
logger.error(
`Cannot concurrently execute Job '${jobName}' as it is already running on '${runningJob.worker}' since ${runningJob.startedAt}`,
undefined,
{ jobName, runningJob }
);
lockStatus = LockStatus.CONFLICT;
return undefined;
}

lockStatus = LockStatus.AQUIRED;

return await jobPrisma.job_run.create({
data: { job_name: jobName, worker: process.env.DYNO ?? '?' },
});
// It is important that the transaction ends here and the INSERT above is commited
// Otherwise we would continue execution, and the commit would be rolled back after the job actually executed
},
{ isolationLevel: Prisma.TransactionIsolationLevel.Serializable }
);
} catch (error) {
logger.warn(`Aquiring Lock failed - ${error.message}`, { jobName, error });
// The transaction was aborted, likely because the DB rolled back the deadlock
lockStatus = LockStatus.ROLLBACK;
lockRetries -= 1;
}
} while (lockStatus === LockStatus.ROLLBACK && lockRetries > 0);

if (lockStatus === LockStatus.CONFLICT) {
return false;
}

if (lockStatus === LockStatus.ROLLBACK) {
logger.error(`Failed to aquire Lock after at most 5 retries - This might leave the system in a locked state requiring manual cleanup!`, undefined, {
jobName,
});
return false;
}

assert.ok(lockStatus === LockStatus.AQUIRED);
assert.ok(runJob != null);

logger.info(`Aquired Table Lock to run Job '${jobName}'`, { jobName });

// ---------- RUN ----------------

const span = tracer.startSpan(jobName);
await tracer.scope().activate(span, async () => {
let hasError = false;
try {
const job = allJobs[jobName];
await job();
success = true;
} catch (e) {
logger.error(`Can't execute job: ${jobName} due to error`, e);
logger.debug(e);
hasError = true;
}

metrics.JobCountExecuted.inc({ hasError: `${hasError}`, name: jobName });

span.finish();
});

logger.info(`Finished Running Job '${jobName}', releasing table lock`, { jobName });

// ---------- RELEASE -------------
await prisma.job_run.update({
where: { job_name_startedAt: { startedAt: jobRun.startedAt, job_name: jobRun.job_name } },
data: { endedAt: new Date() },
});

logger.info(`Finished Job '${jobName}'`, { jobName });
} catch (error) {
logger.error(error.message);
logger.error(`Failure during Job Scheduling - This might leave the system in a locked state requiring manual cleanup!`, error, { jobName });
success = false;
// Eventually we now have a job run in the job_run table that has no endedAt,
// but which will never finish. To unlock this again, simply delete this entry
// (This should only happen in the rare case that the Dyno is killed (!) during execution)
}

return success;
}
39 changes: 30 additions & 9 deletions jobs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ import moment from 'moment-timezone';
import { getLogger } from '../common/logger/logger';
import { scheduleJobs } from './scheduler';
import * as scheduler from './scheduler';
import { allJobs } from './list';
import { configureGracefulShutdown } from './shutdown';
import { executeJob } from './manualExecution';
import { jobExists, regularJobs } from './list';
import { runJob } from './execute';
import express from 'express';
import { metricsRouter } from '../common/logger/metrics';
import http from 'http';

// Ensure Notification hooks are always loaded
import './../common/notification/hooks';
Expand All @@ -22,21 +25,39 @@ log.info('Backend started');
moment.locale('de'); //set global moment date format
moment.tz.setDefault('Europe/Berlin'); //set global timezone (which is then used also for cron job scheduling and moment.format calls)

// SETUP: Metrics registration
registerAchievementMetrics();

//SETUP: schedule jobs

//SETUP: Add a graceful shutdown to the scheduler used
configureGracefulShutdown(scheduler);
registerAchievementMetrics();

// Add Metrics Server to Jobs Dyno
async function startMetricsServer() {
const app = express();
app.use('/metrics', metricsRouter);

const port = process.env.PORT || 5100;

const server = http.createServer(app);

// Start listening
await new Promise<void>((res) => server.listen(port, res));
log.info(`Server listening on port ${port}`);
}

if (process.env.METRICS_SERVER_ENABLED === 'true') {
startMetricsServer().catch((e) => log.error('Failed to setup metrics server', e));
}

// Manual job execution via npm run jobs -- --execute <name>
if (process.argv.length >= 4 && process.argv[2] === '--execute') {
const job = process.argv[3];
if (!jobExists(job)) {
throw new Error(`No Job named '${job}'`);
}

log.info(`Manually executing ${job}, creating DB connection`);
void executeJob(job);

void runJob(job);
} else {
log.info('To directly run one of the jobs, use --execute <name>, we now schedule Cron Jobs to run in the future');
void scheduleJobs(allJobs);
scheduleJobs(regularJobs);
}
68 changes: 45 additions & 23 deletions jobs/list.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,61 @@
import { CSCronJob } from './types';

//import the jobs
import * as Notification from '../common/notification';
import { cleanupSecrets } from '../common/secret';
import dropOldNotificationContexts from './periodic/drop-old-notification-contexts';
import { runInterestConfirmations } from '../common/match/pool';
import { checkReminders } from '../common/notification';
import { cleanupSecrets } from '../common/secret';
import anonymiseAttendanceLog from './periodic/anonymise-attendance-log';
import syncToWebflow from './periodic/sync-to-webflow';
import { postStatisticsToSlack } from './slack-statistics';
import dropOldNotificationContexts from './periodic/drop-old-notification-contexts';
import flagInactiveConversationsAsReadonly from './periodic/flag-old-conversations';
import redactInactiveAccounts from './periodic/redact-inactive-accounts';
import { sendInactivityNotification } from './periodic/redact-inactive-accounts/send-inactivity-notification';
import { deactivateInactiveAccounts } from './periodic/redact-inactive-accounts/deactivate-inactive-accounts';
import { sendInactivityNotification } from './periodic/redact-inactive-accounts/send-inactivity-notification';
import syncToWebflow from './periodic/sync-to-webflow';
import { postStatisticsToSlack } from './slack-statistics';
import notificationsEndedYesterday from './periodic/notification-courses-ended-yesterday';

export const allJobs = {
cleanupSecrets,
dropOldNotificationContexts,
runInterestConfirmations,
anonymiseAttendanceLog,
syncToWebflow,
postStatisticsToSlack,
redactInactiveAccounts,
sendInactivityNotification,
deactivateInactiveAccounts,
flagInactiveConversationsAsReadonly,
notificationsEndedYesterday,
checkReminders,

// For Integration Tests only:
NOTHING_DO_NOT_USE: async () => {
await new Promise((resolve) => setTimeout(resolve, 1000));
},
} as const;

export type JobName = keyof typeof allJobs;
export const jobExists = (name: string): name is JobName => name in allJobs;

// A list of all jobs that should be scheduled at the moment
export const allJobs: CSCronJob[] = [
export type ScheduledJob = { cronTime: string; name: JobName };
export const regularJobs: ScheduledJob[] = [
// every morning, quite early (but only on Monday and Thursday)
// { cronTime: "00 55 07 * * 1,4", jobFunction: initialInterestConfirmationRequests},
{ cronTime: '00 55 07 * * 1,4', jobFunction: runInterestConfirmations, name: 'runInterestConfirmations' },
// { cronTime: "00 56 08 * * *", jobFunction: tutoringMatchMaking}, // only scheduled manually, at the moment
{ cronTime: '00 55 07 * * 1,4', name: 'runInterestConfirmations' },
// every morning, but a little bit later
// every 10 minutes during the day (to distribute load and send out notifications faster)
{ cronTime: '00 */10 * * * *', jobFunction: Notification.checkReminders, name: 'checkReminders' },
{ cronTime: '00 */10 * * * *', name: 'checkReminders' },
// each night - database cleanups
{ cronTime: '00 00 05 * * *', jobFunction: anonymiseAttendanceLog, name: 'anonymiseAttendanceLog' },
{ cronTime: '00 00 04 * * *', jobFunction: cleanupSecrets, name: 'cleanupSecrets' },
{ cronTime: '00 00 01 * * *', jobFunction: dropOldNotificationContexts, name: 'dropOldNotificationContexts' },
{ cronTime: '00 00 05 * * *', name: 'anonymiseAttendanceLog' },
{ cronTime: '00 00 04 * * *', name: 'cleanupSecrets' },
{ cronTime: '00 00 01 * * *', name: 'dropOldNotificationContexts' },
// Account redaction
{ cronTime: '00 00 01 * * *', jobFunction: deactivateInactiveAccounts, name: 'deactivateInactiveAccounts' },
{ cronTime: '00 00 02 * * *', jobFunction: redactInactiveAccounts, name: 'redactInactiveAccounts' },
{ cronTime: '00 00 02 * * *', jobFunction: sendInactivityNotification, name: 'sendInactivityNotification' },
{ cronTime: '00 00 01 * * *', name: 'deactivateInactiveAccounts' },
{ cronTime: '00 00 02 * * *', name: 'redactInactiveAccounts' },
{ cronTime: '00 00 02 * * *', name: 'sendInactivityNotification' },
// Synch DB data to webflow CMS
{ cronTime: '00 */15 * * * *', jobFunction: syncToWebflow, name: 'syncToWebflow' },
{ cronTime: '00 */15 * * * *', name: 'syncToWebflow' },
// Send Slack Messages monthly:
{ cronTime: '00 00 10 01 * *', jobFunction: postStatisticsToSlack, name: 'postStatisticsToSlack' },
{ cronTime: '00 00 10 01 * *', name: 'postStatisticsToSlack' },
// Disable old chats on a daily basis:
{ cronTime: '00 00 10 * * *', name: 'flagInactiveConversationsAsReadonly' },
// Every night, trigger actions for courses that ended yesterday
{ cronTime: '00 00 10 * * *', jobFunction: notificationsEndedYesterday, name: 'notificationsEndedYesterday' },
{ cronTime: '00 00 10 * * *', name: 'notificationsEndedYesterday' },
];
Loading

0 comments on commit c42ff66

Please sign in to comment.