-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathplugin.js
156 lines (134 loc) · 4.27 KB
/
plugin.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
/// <reference path="./global.d.ts" />
'use strict'
// Needed to work with dates & postgresql
// See https://node-postgres.com/features/types/
process.env.TZ = 'UTC'
const cronPlugin = require('./lib/cron')
const Executor = require('./lib/executor')
const { scheduler } = require('timers/promises')
const { on } = require('events')
/** @param {import('fastify').FastifyInstance} app */
module.exports = async function (app, options) {
const lock = Number(options.lock) || 42
/* c8 ignore next 1 */
const leaderPoll = Number(options.leaderPoll) || 10000
app.log.info('Locking cron plugin to advisory lock %d', lock)
const dummyExecutor = {
execute () {},
updateTimer () {
const { db, sql } = app.platformatic
db.query(sql`
NOTIFY "update_timer";
`)
/* c8 ignore next 3 */
.catch((err) => {
app.log.error({ err }, 'Error in dummy updateTimer')
})
},
stop () {}
}
let executor = dummyExecutor
let elected = false
const abortController = new AbortController()
async function amITheLeader () {
const { db, sql } = app.platformatic
await db.task(async (t) => {
while (!abortController.signal.aborted) {
const [{ leader }] = await t.query(sql`
SELECT pg_try_advisory_lock(${lock}) as leader;
`)
if (leader && !elected) {
app.log.info('This instance is the leader')
executor = new Executor(app)
executor.execute()
elected = true
;(async () => {
await t.query(sql`
LISTEN "update_timer";
`)
for await (const notification of on(t._driver.client, 'notification', { signal: abortController.signal })) {
app.log.debug({ notification }, 'Received notification')
try {
await executor.execute()
/* c8 ignore next 3 */
} catch (err) {
app.log.warn({ err }, 'error while processing notification')
}
// TODO: write automated tests for this
}
/* c8 ignore next 19 */
})()
.catch((err) => {
if (err.name !== 'AbortError') {
// an error occurred, and it's expected
app.log.error({ err }, 'Error in notification loop')
} else {
abortController.abort()
}
})
} else if (leader && elected) {
app.log.debug('This instance is still the leader')
} else if (!leader && elected) {
// this should never happen
app.log.warn('This instance was the leader but is not anymore')
await executor.stop()
executor = dummyExecutor
elected = false
} else {
app.log.debug('This instance is not the leader')
executor = dummyExecutor
}
try {
await scheduler.wait(leaderPoll, { signal: abortController.signal })
} catch {
break
}
}
})
app.log.debug('leader loop stopped')
}
let leaderLoop = amITheLeader()
retryLeaderLoop(leaderLoop)
/* c8 ignore next 10 */
function retryLeaderLoop () {
leaderLoop.catch((err) => {
app.log.error({ err }, 'Error in leader loop')
return executor.stop()
}).then(() => {
if (!abortController.signal.aborted) {
leaderLoop = amITheLeader()
retryLeaderLoop(leaderLoop)
}
})
}
app.platformatic.addEntityHooks('message', {
async insert (original, { inputs, ...rest }) {
const now = new Date() // now
for (const input of inputs) {
input.when = now
}
const res = await original({ inputs, ...rest })
for (const input of inputs) {
const date = new Date(input.when)
executor.updateTimer(date)
}
return res
},
async save (original, { input, ...rest }) {
if (!input.when) {
input.when = new Date() // now
}
const res = await original({ input, ...rest })
const date = new Date(input.when)
executor.updateTimer(date)
return res
}
})
await app.register(cronPlugin)
await executor.execute()
app.addHook('onClose', async () => {
abortController.abort()
await leaderLoop
executor.stop()
})
}