Skip to content

Commit

Permalink
Merge pull request #1064 from getodk/multiple-workers
Browse files Browse the repository at this point in the history
Run multiple worker loops
  • Loading branch information
matthew-white authored Dec 13, 2023
2 parents 4d501e2 + eb59599 commit 6efdd5b
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 125 deletions.
6 changes: 3 additions & 3 deletions lib/bin/run-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ const server = service.listen(config.get('default.server.port'), () => {


////////////////////////////////////////////////////////////////////////////////
// START WORKER
// START WORKERS

const { worker } = require('../worker/worker');
worker(container);
const { workerQueue } = require('../worker/worker');
workerQueue(container).loops(4);


////////////////////////////////////////////////////////////////////////////////
Expand Down
195 changes: 103 additions & 92 deletions lib/worker/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,55 +11,65 @@ const { min } = Math;
const { inspect } = require('util');
const { head } = require('ramda');
const { sql } = require('slonik');
const { timebound, resolve, runSequentially } = require('../util/promise');
const { timebound, runSequentially } = require('../util/promise');
const defaultJobMap = require('./jobs').jobs;
const { noop } = require('../util/util');

// TODO: domain catch/restart? << investigated and seems unlikely.

// we'd love to report to sentry, but it's much more important that we don't fail
// in our error handling code. so we will do anything to pass this and move on.
const reporter = (Sentry) => (err) => {
/* eslint-disable */ // i don't like anything it suggests here.
try { Sentry.captureException(err); }
catch (ierr) {
try { process.stderr.write(inspect(err) + '\n'); process.stderr.write(inspect(ierr) + '\n'); }
catch (_) { /* too scared to try anything at this point */ }
}
/* eslint-enable */
};
// tiny struct thing to just store worker last report status below.
const stati = { idle: Symbol('idle'), check: Symbol('check'), run: Symbol('run') };
class Status {
constructor() { this.set(stati.idle); }
set(status) { this.status = status; this.at = (new Date()).getTime(); }
}

const workerQueue = (container, jobMap = defaultJobMap) => {
const { Sentry, all, run } = container;

// given an event, attempts to run the appropriate jobs for the event and then
// calls the reschedule callback to continue the worker loop. works hard on error
// handling, and will attempt to unclaim the event if a failure occurs.
const runner = (container, jobMap) => {
const { run } = container;
const report = reporter(container.Sentry);
return (event, reschedule) => {
// we'd love to report to sentry, but it's much more important that we don't fail
// in our error handling code. so we will do anything to pass this and move on.
const report = (err) => {
/* eslint-disable */ // i don't like anything it suggests here.
try { Sentry.captureException(err); }
catch (ierr) {
try { process.stderr.write(inspect(err) + '\n'); process.stderr.write(inspect(ierr) + '\n'); }
catch (_) { /* too scared to try anything at this point */ }
}
/* eslint-enable */
};

// given an event, attempts to run the appropriate jobs for the event,
// returning `true` immediately if there is a job to run and `false` if not.
// if there is a job, runJobs() will call the `done` callback once all jobs
// have been run, or once there has been an error. runJobs() works hard on
// error handling, and will attempt to unclaim the event if a failure occurs.
const runJobs = (event, done) => {
if (event == null) return false;
const jobs = jobMap[event.action];
if (jobs == null) return false;

const loggedAt = (event.loggedAt == null) ? '--' : event.loggedAt.toISOString();
const logname = `${event.action}::${loggedAt}::${event.acteeId}`;
process.stdout.write(`[${(new Date()).toISOString()}] start processing event ${logname} (${jobs.length} jobs)\n`);

// run sequentially because a job can start a child transaction and then other jobs can't execute queries via parent transaction.
container.transacting((tc) => timebound(runSequentially(jobs.map((f) => () => f(tc, event))))
.then(() => tc.run(sql`update audits set processed=clock_timestamp() where id=${event.id}`)))
.then(() => { process.stdout.write(`[${(new Date()).toISOString()}] finish processing event ${logname}\n`); })
.catch((err) => {
report(err);
return run(sql`update audits set claimed=null, failures=${event.failures + 1}, "lastFailure"=clock_timestamp() where id=${event.id}`)
.then(() => resolve());
.catch(noop);
})
.then(reschedule, reschedule);
.finally(done);

return true;
};
};

// using a CTE, attempts to atomically grab an available queue event for processing.
// does some work to avoid problematic events. returns (Audit?)
const checker = ({ all }) => () => all(sql`
// using a CTE, attempts to atomically grab an available queue event for processing.
// does some work to avoid problematic events. returns (Audit?)
const check = () => all(sql`
with q as
(select id from audits
where processed is null
Expand All @@ -70,78 +80,79 @@ with q as
limit 1
for update skip locked)
update audits set claimed=clock_timestamp() from q where audits.id=q.id returning *`)
.then(head);
.then(head);

// tiny struct thing to just store worker last report status below.
const stati = { idle: Symbol('idle'), check: Symbol('check'), run: Symbol('run') };
class Status {
constructor() { this.set(stati.idle); }
set(status) { this.status = status; this.at = (new Date()).getTime(); }
}
// main loop. kicks off a check and attempts to process the result of the check.
// if there was something to do, takes a break while that happens; runJobs() will
// call back into the scheduler when it's done.
// if there was nothing to do, immediately schedules a subsequent check at a capped
// exponential backoff rate.
const loop = (defaultDelay = 3000) => {
let enable = true; // we allow the caller to abort for testing.
const status = new Status();
const withStatus = (x, chain) => { status.set(x); return chain; };

// main loop. kicks off a check and attempts to process the result of the check.
// if there was something to do, takes a break while that happens; the runner will
// call back into the scheduler when it's done.
// if there was nothing to do, immediately schedules a subsequent check at a capped
// exponential backoff rate.
const worker = (container, jobMap = defaultJobMap, defaultDelay = 3000) => {
let enable = true; // we allow the caller to abort for testing.
const check = checker(container);
const run = runner(container, jobMap);
const status = new Status();
const withStatus = (x, chain) => { status.set(x); return chain; };
const report = reporter(container.Sentry);

// this is the main loop, which should already try to hermetically catch its own
// failures and restart itself.
const now = (delay = defaultDelay) => {
if (!enable) return;
const wait = () => { waitFor(min(delay * 2, 25000)); }; // eslint-disable-line no-use-before-define
try {
withStatus(stati.check, check())
.then((event) => withStatus(stati.run, run(event, now)))
.then((running) => { if (!running) withStatus(stati.idle, wait()); })
.catch((err) => {
report(err);
process.stderr.write(`!! unexpected worker loop error: ${inspect(err)}\n`);
wait();
});
} catch (ex) {
report(ex);
process.stderr.write(`!! unexpected worker invocation error: ${inspect(ex)}\n`);
wait();
}
};
const waitFor = (amount) => { setTimeout(() => { now(amount); }, amount); }; // awkward..?
now();

// this is the watchdog timer, which ensures that the worker has reported back
// in a reasonable time for what it claims to be doing. if not, it starts a new
// check immediately. there is some theoretical chance if the worker was secretly
// fine we'll end up with extras, but it seems unlikely.
const woof = (which) => {
process.stderr.write(`!! unexpected worker loss in ${which} (${status.at})\n`);
// this is the main loop, which should already try to hermetically catch its own
// failures and restart itself.
const now = (delay = defaultDelay) => {
if (!enable) return;
const wait = () => { waitFor(min(delay * 2, 25000)); }; // eslint-disable-line no-use-before-define
try {
withStatus(stati.check, check())
.then((event) => withStatus(stati.run, runJobs(event, now)))
.then((running) => { if (!running) withStatus(stati.idle, wait()); })
.catch((err) => {
report(err);
process.stderr.write(`!! unexpected worker loop error: ${inspect(err)}\n`);
wait();
});
} catch (ex) {
report(ex);
process.stderr.write(`!! unexpected worker invocation error: ${inspect(ex)}\n`);
wait();
}
};
const waitFor = (amount) => { setTimeout(() => { now(amount); }, amount); }; // awkward..?
now();

// this is the watchdog timer, which ensures that the worker has reported back
// in a reasonable time for what it claims to be doing. if not, it starts a new
// check immediately. there is some theoretical chance if the worker was secretly
// fine we'll end up with extras, but it seems unlikely.
const woof = (which) => {
process.stderr.write(`!! unexpected worker loss in ${which} (${status.at})\n`);
now();
};
const watchdog = setInterval(() => {
const delta = (new Date()).getTime() - status.at;
if ((delta > 120000) && (status.status === stati.idle)) woof('idle');
else if ((delta > 120000) && (status.status === stati.check)) woof('check');
else if ((delta > 720000) && (status.status === stati.run)) woof('run');
}, 60000);

return () => { enable = false; clearInterval(watchdog); };
};

const loops = (count) => {
for (let i = 0; i < count; i += 1) loop();
};
const watchdog = setInterval(() => {
const delta = (new Date()).getTime() - status.at;
if ((delta > 120000) && (status.status === stati.idle)) woof('idle');
else if ((delta > 120000) && (status.status === stati.check)) woof('check');
else if ((delta > 720000) && (status.status === stati.run)) woof('run');
}, 60000);

return () => { enable = false; clearInterval(watchdog); };
};

// for testing: chews through the event queue serially until there is nothing left to process.
const exhaust = async (container, jobMap = defaultJobMap) => {
const check = checker(container);
const run = runner(container, jobMap);
const runWait = (event) => new Promise((done) => {
if (!run(event, () => { done(true); })) done(false);
});
while (await check().then(runWait)); // eslint-disable-line no-await-in-loop
// for testing: chews through the event queue serially until there is nothing left to process.
const exhaust = async () => {
const runWait = (event) => new Promise((done) => {
if (!runJobs(event, () => { done(true); })) done(false);
});
while (await check().then(runWait)); // eslint-disable-line no-await-in-loop
};

return {
loop, loops,
// for testing
exhaust, run: runJobs, check
};
};

module.exports = { runner, checker, worker, exhaust };
const exhaust = (container) => workerQueue(container).exhaust();

module.exports = { workerQueue, exhaust };

6 changes: 3 additions & 3 deletions test/integration/other/analytics-queries.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const { createReadStream, readFileSync } = require('fs');

const { promisify } = require('util');
const testData = require('../../data/xml');
const { runner, exhaust } = require(appRoot + '/lib/worker/worker');
const { exhaust, workerQueue } = require(appRoot + '/lib/worker/worker');

const geoForm = `<h:html xmlns="http://www.w3.org/2002/xforms" xmlns:ev="http://www.w3.org/2001/xml-events" xmlns:h="http://www.w3.org/1999/xhtml" xmlns:jr="http://openrosa.org/javarosa" xmlns:odk="http://www.opendatakit.org/xforms" xmlns:orx="http://openrosa.org/xforms" xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<h:head>
Expand Down Expand Up @@ -305,7 +305,7 @@ describe('analytics task queries', function () {
let event = (await container.Audits.getLatestByAction('submission.attachment.update')).get();
// eslint-disable-next-line prefer-promise-reject-errors
const jobMap = { 'submission.attachment.update': [ () => Promise.reject({ uh: 'oh' }) ] };
await promisify(runner(container, jobMap))(event);
await promisify(workerQueue(container, jobMap).run)(event);

// should still be 0 because the failure count is only at 1, needs to be at 5 to count
event = (await container.Audits.getLatestByAction('submission.attachment.update')).get();
Expand Down Expand Up @@ -377,7 +377,7 @@ describe('analytics task queries', function () {
// eslint-disable-next-line prefer-promise-reject-errors
const jobMap = { 'submission.attachment.update': [ () => Promise.reject({ uh: 'oh' }) ] };
const eventOne = (await container.Audits.getLatestByAction('submission.attachment.update')).get();
await promisify(runner(container, jobMap))(eventOne);
await promisify(workerQueue(container, jobMap).run)(eventOne);

// making this look like it failed 5 times
await asAlice.post('/v1/projects/1/submission')
Expand Down
Loading

0 comments on commit 6efdd5b

Please sign in to comment.