forked from odota/core
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparseManager.js
77 lines (73 loc) · 2.77 KB
/
parseManager.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
var processParse = require('./processParse');
var r = require('./redis');
var redis = r.client;
var jobs = r.jobs;
var cluster = require('cluster');
var numCPUs = require('os').cpus().length;
var config = require('./config');
var buildSets = require('./buildSets');
var parsers;
if (config.NODE_ENV !== "test" && cluster.isMaster) {
buildSets(function() {
start();
});
}
else {
start();
}
function start() {
redis.get("parsers", function(err, result) {
if (err || !result) {
console.log('failed to get parsers from redis, retrying');
return setTimeout(start, 10000);
}
parsers = JSON.parse(result);
var capacity = parsers.length;
if (cluster.isMaster && config.NODE_ENV !== "test") {
console.log("[PARSEMANAGER] starting master");
for (var i = 0; i < capacity; i++) {
if (false) {
//fork a worker for each available parse core
forkWorker(i);
}
else {
//run workers in parallel in a single thread (uses less memory)
runWorker(i);
}
}
}
else {
runWorker(0);
}
function forkWorker(i) {
var worker = cluster.fork({
PARSER_URL: parsers[i]
});
worker.on("exit", function() {
console.log("Worker crashed! Spawning a replacement of worker %s", worker.id);
forkWorker(i);
});
}
function runWorker(i) {
console.log("[PARSEMANAGER] starting worker with pid %s", process.pid);
jobs.process('parse', function(job, ctx, cb) {
console.log("starting parse job: %s", job.id);
job.parser_url = getParserUrl(job);
//TODO check if the assigned url is active
//if not, use ctx to pause and cb(err) (this consumes a retry)
//keep checking status and resume the worker when the parse worker is alive again
return processParse(job, ctx, cb);
});
function getParserUrl(job) {
//node <0.12 doesn't have RR cluster scheduling, so parsing on remote workers may cause us to lose a request if the remote is crashed by another job using the same core/thread
//can process parse requests on localhost to avoid issue
/*
if (job.data.payload.request) {
return job.parser_url = "http://localhost:5200?key=" + config.RETRIEVER_SECRET;
}
*/
return config.PARSER_URL || parsers[i] || parsers[Math.floor(Math.random() * parsers.length)];
}
}
});
}