forked from odota/core
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoperations.js
101 lines (97 loc) · 3.14 KB
/
operations.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
var db = require('./db');
var utility = require('./utility');
var convert64to32 = utility.convert64to32;
var generateJob = utility.generateJob;
var async = require('async');
var r = require("./redis");
var jobs = r.jobs;
var updatePlayerCaches = require('./updatePlayerCaches');
var redis = r.client;
function insertMatch(match, cb) {
async.series([function(cb) {
//set to queued, unless we specified something earlier (like skipped)
//calling functions should explicitly set match.parse_status = 0 if they want the match to be queued for parse
//this way full history doesn't overwrite existing parse_status
//parse_status may not exist on matches we get via full history!
updatePlayerCaches(match, {
type: "api"
}, cb);
}], function decideParse(err) {
if (err) {
//error occured
return cb(err);
}
else if (match.parse_status !== 0) {
//not parsing this match
//this isn't a error, although we want to report that back to user if it was a request
cb(err);
}
else {
if (match.request) {
//process requests with higher priority, one attempt only
match.priority = "high";
match.attempts = 1;
}
//queue it and finish
return queueReq("parse", match, function(err, job2) {
cb(err, job2);
});
}
});
}
function insertMatchProgress(match, job, cb) {
insertMatch(match, function(err, job2) {
if (err) {
return cb(err);
}
if (!job2) {
//succeeded in API, but cant parse this replay
job.progress(100, 100, "This replay is unavailable.");
cb();
}
else {
//wait for parse to finish
job.progress(0, 100, "Parsing replay...");
//request, parse and log the progress
job2.on('progress', function(prog) {
job.progress(prog, 100);
});
job2.on('failed', function(err) {
cb(err);
});
job2.on('complete', function() {
job.progress(100, 100, "Parse complete!");
cb();
});
}
});
}
function insertPlayer(player, cb) {
var account_id = Number(convert64to32(player.steamid));
player.last_summaries_update = new Date();
db.players.update({
account_id: account_id
}, {
$set: player
}, {
upsert: true
}, function(err) {
cb(err);
});
}
function queueReq(type, payload, cb) {
var job = generateJob(type, payload);
var kuejob = jobs.create(job.type, job).attempts(payload.attempts || 15).backoff({
delay: 60 * 1000,
type: 'exponential'
}).removeOnComplete(true).priority(payload.priority || 'normal').save(function(err) {
console.log("[KUE] created jobid: %s", kuejob.id);
cb(err, kuejob);
});
}
module.exports = {
insertPlayer: insertPlayer,
insertMatch: insertMatch,
insertMatchProgress: insertMatchProgress,
queueReq: queueReq
};