-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathcontinuous.go
365 lines (323 loc) · 8.5 KB
/
continuous.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
package continuous
import (
"crypto/tls"
"fmt"
"io"
"io/ioutil"
"net"
"os"
"os/signal"
"sync"
"syscall"
gnet "github.com/facebookgo/grace/gracenet"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// Continuous is the interface of a basic server
type Continuous interface {
Serve(lis net.Listener) error
Stop() error
GracefulStop() error
}
// Cont keeps your server which implement the Continuous continuously
type Cont struct {
net gnet.Net
name string
pid int
child int
pidfile string
cwd string
logger *zap.Logger
servers []*ContServer
state ContState
wg sync.WaitGroup
doneChan chan struct{}
}
// ContState indicates the state of Cont
type ContState int
const (
Running ContState = iota
Ready
Stopped
)
func (cs ContState) String() string {
switch cs {
case Running:
return "running"
case Stopped:
return "stopped"
case Ready:
return "ready"
}
return ""
}
// ListenOn some network and address
type ListenOn struct {
Network string
Address string
}
// ContServer combines listener, addresss and a continuous
type ContServer struct {
lis net.Listener
srv Continuous
listenOn *ListenOn
tlsConfig *tls.Config
upgrader func(lis net.Listener) net.Listener
}
// Option to new a Cont
type Option func(cont *Cont)
// ProcName custom the procname, use os.Args[0] if not set
func ProcName(name string) Option {
return func(cont *Cont) {
cont.name = name
}
}
// WorkDir custom the work dir, use os.Getwd() if not set
func WorkDir(path string) Option {
return func(cont *Cont) {
cont.cwd = path
}
}
// LoggerOutput sets a io.Writer to output log
func LoggerOutput(out io.Writer) Option {
return func(cont *Cont) {
core := zapcore.NewCore(zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()),
zapcore.AddSync(out), zap.NewAtomicLevelAt(zapcore.InfoLevel))
replace := func(c zapcore.Core) zapcore.Core {
return core
}
cont.logger = cont.logger.WithOptions(zap.WrapCore(replace))
}
}
// PidFile custom the pid file path
func PidFile(filename string) Option {
return func(cont *Cont) {
cont.pidfile = filename
}
}
// New creates a Cont object which upgrades binary continuously
func New(opts ...Option) *Cont {
dir, _ := os.Getwd()
cont := &Cont{name: os.Args[0], cwd: dir, pid: os.Getpid()}
logger, err := zap.NewProduction(zap.AddCaller())
if err != nil {
fmt.Println(err)
os.Exit(-1)
}
cont.logger = logger.With(zap.Int("pid", os.Getpid()))
for _, o := range opts {
o(cont)
}
if cont.pidfile == "" {
cont.pidfile = cont.cwd + "/" + cont.name + ".pid"
}
return cont
}
type ServerOption func(cs *ContServer)
func TLSConfig(c *tls.Config) ServerOption {
return func(cs *ContServer) {
cs.tlsConfig = c
}
}
// ListenerUpgrader upgrade a raw listener to a higher level listener
func ListenerUpgrader(upgrader func(lis net.Listener) net.Listener) ServerOption {
return func(cs *ContServer) {
cs.upgrader = upgrader
}
}
// AddServer and a server which implement Continuous interface
// the added server will start to listen to the socket, but it only accept connections after serving
func (cont *Cont) AddServer(srv Continuous, listenOn *ListenOn, opts ...ServerOption) error {
cs := &ContServer{srv: srv, listenOn: listenOn}
for _, o := range opts {
o(cs)
}
lis, err := cont.net.Listen(listenOn.Network, listenOn.Address)
if err != nil {
return err
}
if cs.tlsConfig != nil {
lis = tls.NewListener(lis, cs.tlsConfig)
}
if cs.upgrader != nil {
lis = cs.upgrader(lis)
}
cs.lis = lis
cont.servers = append(cont.servers, cs)
return nil
}
// Serve run all the servers and wait to handle signals
func (cont *Cont) Serve() error {
cont.logger.Debug("continuous serving")
if err := cont.writePid(); err != nil {
return err
}
if err := cont.serve(); err != nil {
return err
}
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT, syscall.SIGUSR2, syscall.SIGUSR1, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGCHLD)
cont.logger.Debug("waiting for signals")
for {
sig := <-c
cont.logger.Info("got signal", zap.Stringer("value", sig))
switch sig {
case syscall.SIGTERM, syscall.SIGINT:
cont.Stop()
return nil
case syscall.SIGQUIT:
cont.GracefulStop()
return nil
case syscall.SIGUSR1:
if cont.state == Running {
cont.state = Ready
cont.closeListeners()
} else if cont.state == Ready {
cont.wg.Wait() //wait server goroutines to exit
//listen and serve again
if err := cont.openListeners(); err != nil {
cont.logger.Error("open listeners failed", zap.Error(err))
continue
}
if err := cont.serve(); err != nil {
cont.logger.Error("start serve failed", zap.Error(err))
continue
}
cont.state = Running
}
case syscall.SIGUSR2:
if err := cont.upgrade(); err != nil {
cont.logger.Error("upgrade binary failed", zap.Error(err))
}
case syscall.SIGHUP:
if err := cont.upgrade(); err != nil {
cont.logger.Error("upgrade binary failed", zap.Error(err))
continue
}
if err := cont.GracefulStop(); err != nil {
cont.logger.Error("upgrade binary failed", zap.Error(err))
continue
}
return nil
case syscall.SIGCHLD:
p, err := os.FindProcess(cont.child)
if err != nil {
cont.logger.Error("find process failed", zap.Error(err))
}
// wait child process to exit to avoid zombie process
status, err := p.Wait()
if err != nil {
cont.logger.Error("wait child process to exit failed", zap.Error(err))
} else {
if status.Success() {
cont.logger.Info("child exited", zap.Stringer("status", status))
} else {
cont.logger.Error("child exited failed", zap.Stringer("status", status))
}
}
// recover pidfile.old to pidfile
if err := os.Rename(cont.pidfile+".old", cont.pidfile); err != nil {
cont.logger.Error("recover pid file failed", zap.Error(err))
}
}
}
}
// Stop the server immediately
func (cont *Cont) Stop() error {
if cont.doneChan != nil {
close(cont.doneChan)
}
for _, server := range cont.servers {
if err := server.srv.Stop(); err != nil {
return err
}
}
cont.state = Stopped
return nil
}
// GracefulStop the server
func (cont *Cont) GracefulStop() error {
if cont.doneChan != nil {
close(cont.doneChan)
}
for _, server := range cont.servers {
if err := server.srv.GracefulStop(); err != nil {
return err
}
}
cont.state = Stopped
return nil
}
func (cont *Cont) upgrade() error {
// rename pidfile to pidfile.old
if err := os.Rename(cont.pidfile, cont.pidfile+".old"); err != nil {
cont.logger.Warn("rename pid file failed", zap.Error(err))
}
pid, err := cont.net.StartProcess()
if err != nil {
return err
}
cont.logger.Info("new process started", zap.Int("child", pid))
cont.child = pid
return nil
}
func (cont *Cont) closeListeners() {
// close chan to notify Serve to exit and ignore
if cont.doneChan != nil {
close(cont.doneChan)
}
for _, server := range cont.servers {
if err := server.lis.Close(); err != nil {
cont.logger.Error("close listener failed", zap.Error(err), zap.String("listenon", server.listenOn.Address))
}
}
// gracenet internal stores all the active listeners. When we close listeners here, we can not notify gracenet about this
// so it will keep those closed listeners and try to pass to child process which cause errors, so we reinit net here
cont.net = gnet.Net{}
}
func (cont *Cont) openListeners() error {
for _, server := range cont.servers {
lis, err := cont.net.Listen(server.listenOn.Network, server.listenOn.Address)
if err != nil {
return err
}
if server.upgrader != nil {
lis = server.upgrader(lis)
}
server.lis = lis
if server.tlsConfig != nil {
server.lis = tls.NewListener(lis, server.tlsConfig)
}
}
return nil
}
func (cont *Cont) serve() error {
cont.doneChan = make(chan struct{})
for _, server := range cont.servers {
cont.wg.Add(1)
go func(server *ContServer) {
done := false
if err := server.srv.Serve(server.lis); err != nil {
select {
case <-cont.doneChan:
done = true // ignore error which caused by Stop/GracefulStop
cont.logger.Debug("serve close", zap.String("listen", server.listenOn.Address))
default:
}
if !done {
cont.logger.Error("serve failed", zap.Error(err), zap.String("listen", server.listenOn.Address))
}
}
cont.wg.Done()
}(server)
}
cont.state = Running
return nil
}
func (cont *Cont) writePid() error {
return ioutil.WriteFile(cont.pidfile, []byte(fmt.Sprint(cont.pid)), 0644)
}
// Status return the current status
func (cont *Cont) Status() ContState {
return cont.state
}