-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstate.go
476 lines (407 loc) · 11 KB
/
state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
package memberlist
import (
"fmt"
"github.com/hashicorp/memberlist/pkg"
"math"
"math/rand"
"net"
"strings"
"sync/atomic"
"time"
)
type NodeStateType int
const (
StateAlive NodeStateType = iota
StateSuspect
StateDead
StateLeft
)
// Node 体现了一个集群中节点的状态
type Node struct {
Name string
Addr net.IP
Port uint16
Meta []byte // 节点委托的元信息
State NodeStateType // State of the node.
PMin uint8 // Minimum 协议版本 this understands
PMax uint8 // Maximum 协议版本 this understands
PCur uint8 // Current version node is speaking
DMin uint8 // Min 协议版本 for the delegate to understand
DMax uint8 // Max 协议版本 for the delegate to understand
DCur uint8 // Current version delegate is speaking
}
// Address 返回host:Port
func (n *Node) Address() string {
return pkg.JoinHostPort(n.Addr.String(), n.Port)
}
// FullAddress 返回Address
func (n *Node) FullAddress() pkg.Address {
return pkg.Address{
Addr: pkg.JoinHostPort(n.Addr.String(), n.Port),
Name: n.Name,
}
}
// String returns the node name
func (n *Node) String() string {
return n.Name
}
// NodeState 管理其他节点的状态视图
type NodeState struct {
Node
Incarnation uint32 // Last known incarnation number
State NodeStateType // 当前的状态
StateChange time.Time // Time last state change happened
}
// Address returns the host:Port form of a node's Address, suitable for use
// with a Transport.
func (n *NodeState) Address() string {
return n.Node.Address()
}
// FullAddress returns the node name and host:Port form of a node's Address,
// suitable for use with a Transport.
func (n *NodeState) FullAddress() pkg.Address {
return n.Node.FullAddress()
}
func (n *NodeState) DeadOrLeft() bool {
return n.State == StateDead || n.State == StateLeft
}
// AckHandler 注册确认、否认处理函数
type AckHandler struct {
ackFn func([]byte, time.Time)
nackFn func()
timer *time.Timer
}
// NoPingResponseError is used to indicate a 'Ping' packet was
// successfully issued but no response was received
type NoPingResponseError struct {
node string
}
func (f NoPingResponseError) Error() string {
return fmt.Sprintf("No response from node %s", f.node)
}
// Schedule 开启定时器
func (m *Members) Schedule() {
m.tickerLock.Lock()
defer m.tickerLock.Unlock()
if len(m.tickers) > 0 {
return
}
stopCh := make(chan struct{})
// 开始定时探活
if m.Config.ProbeInterval > 0 {
t := time.NewTicker(m.Config.ProbeInterval) // Config.go:190 1s
go m.triggerFunc(m.Config.ProbeInterval, t.C, stopCh, m.Probe)
//m.Probe()
m.tickers = append(m.tickers, t)
}
if m.Config.PushPullInterval > 0 {
// 开始 push、pull 触发器
go m.PushPullTrigger(stopCh)
}
// gossip 定时器
if m.Config.GossipInterval > 0 && m.Config.GossipNodes > 0 { // 每隔100ms,将消息随机发送到3个节点
t := time.NewTicker(m.Config.GossipInterval)
go m.triggerFunc(m.Config.GossipInterval, t.C, stopCh, m.Gossip)
m.tickers = append(m.tickers, t)
}
if len(m.tickers) > 0 {
m.stopTickCh = stopCh
}
}
// triggerFunc 没收到一条消息就执行f(),知道收到stop信号
func (m *Members) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) {
randStagger := time.Duration(uint64(rand.Int63()) % uint64(stagger))
// 开始时 随机睡眠0~stagger
select {
case <-time.After(randStagger):
case <-stop:
return
}
for {
select {
case <-C:
f()
case <-stop:
return
}
}
}
// PushPullTrigger 周期性的push\pull 直到收到stop信号。不使用triggerFunc,因为触发时间不固定,基于集群大小避免网络堵塞
func (m *Members) PushPullTrigger(stop <-chan struct{}) {
interval := m.Config.PushPullInterval
// 开始时 随机睡眠0~randStagger
randStagger := time.Duration(uint64(rand.Int63()) % uint64(interval))
select {
case <-time.After(randStagger):
case <-stop:
return
}
// 使用动态的ticker
for {
tickTime := PushPullScale(interval, m.EstNumNodes())
select {
case <-time.After(tickTime):
m.PushPull()
case <-stop:
return
}
}
}
// 停止后台循环
func (m *Members) deschedule() {
m.tickerLock.Lock()
defer m.tickerLock.Unlock()
// 如果我们没有定时器,那么我们就没有被调度。
if len(m.tickers) == 0 {
return
}
// 关闭stop channel,使所有的ticker监听者停止。
close(m.stopTickCh)
// 关闭定时器
for _, t := range m.tickers {
t.Stop()
}
m.tickers = nil
}
// Probe 用于对所有节点的探活
func (m *Members) Probe() {
// 探测过的数量
numCheck := 0
START:
m.NodeLock.RLock()
// 当所有的节点都检查完了一遍
if numCheck >= len(m.Nodes) {
m.NodeLock.RUnlock()
return
}
if m.probeIndex >= len(m.Nodes) {
m.NodeLock.RUnlock()
m.ResetNodes() // 清理无效节点,打乱顺序
m.probeIndex = 0
numCheck++
goto START
}
// 是否跳过节点探测
skip := false
var node NodeState
node = *m.Nodes[m.probeIndex]
// 本机 或 节点在Dead、left的状态都跳过
if node.Name == m.Config.Name {
skip = true
} else if node.DeadOrLeft() {
skip = true
}
m.NodeLock.RUnlock()
m.probeIndex++
if skip {
numCheck++
goto START
}
m.ProbeNode(&node)
}
// ProbeNodeByAddr only by test
func (m *Members) ProbeNodeByAddr(Addr string) {
m.NodeLock.RLock()
n := m.NodeMap[Addr]
m.NodeLock.RUnlock()
m.ProbeNode(n)
}
// FailedRemote 检查错误并决定它是否表明另一端有故障。
func FailedRemote(err error) bool {
switch t := err.(type) {
case *net.OpError:
if strings.HasPrefix(t.Net, "tcp") {
switch t.Op {
case "dial", "read", "write":
return true
}
} else if strings.HasPrefix(t.Net, "udp") {
switch t.Op {
case "write":
return true
}
}
}
return false
}
// ResetNodes 清除Dead节点,并将节点列表刷新
func (m *Members) ResetNodes() {
m.NodeLock.Lock()
defer m.NodeLock.Unlock()
// 移除Dead node ,超过了Dead interval的
DeadIdx := MoveDeadNodes(m.Nodes, m.Config.GossipToTheDeadTime)
// 第一个在m.Nodes Dead的节点的索引
for i := DeadIdx; i < len(m.Nodes); i++ {
delete(m.NodeMap, m.Nodes[i].Name)
m.Nodes[i] = nil
}
// 修剪节点以排除死节点
m.Nodes = m.Nodes[0:DeadIdx]
// 更新集群节点数量
atomic.StoreUint32(&m.numNodes, uint32(DeadIdx))
// 打乱节点列表
ShuffleNodes(m.Nodes)
}
// NextIncarnation 以线程安全的方式返回下一个incarnation的编号。
func (m *Members) NextIncarnation() uint32 {
return atomic.AddUint32(&m.incarnation, 1)
}
func (m *Members) CurIncarnation() uint32 {
return m.incarnation
}
// skipIncarnation incarnation number添加偏移量
func (m *Members) skipIncarnation(offset uint32) uint32 {
return atomic.AddUint32(&m.incarnation, offset)
}
// EstNumNodes 用于获得当前估计的节点数
func (m *Members) EstNumNodes() int {
return int(atomic.LoadUint32(&m.numNodes))
}
type AckMessage struct {
Complete bool
Payload []byte
Timestamp time.Time
}
// SetProbeChannels 当消息确认时 发到ackCh。如果超时 complete字段会是false
// 不需要确认,发送空struct或nil
func (m *Members) SetProbeChannels(seqNo uint32, ackCh chan AckMessage, nackCh chan struct{}, timeout time.Duration) {
ackFn := func(payload []byte, timestamp time.Time) {
select {
case ackCh <- AckMessage{true, payload, timestamp}:
default:
}
}
nackFn := func() {
select {
case nackCh <- struct{}{}:
default:
}
}
ah := &AckHandler{ackFn, nackFn, nil}
m.AckLock.Lock()
m.AckHandlers[seqNo] = ah
m.AckLock.Unlock()
ah.timer = time.AfterFunc(timeout, func() {
m.AckLock.Lock()
delete(m.AckHandlers, seqNo)
m.AckLock.Unlock()
select {
case ackCh <- AckMessage{false, nil, time.Now()}:
default:
}
})
}
// SetAckHandler 是用来附加一个处理程序,当收到一个给定序列号的ack时被调用。
// 序列号的ack时调用的处理程序。如果达到超时,处理程序将被删除。这用于间接ping,
// 所以不配置nacks的函数。
func (m *Members) SetAckHandler(seqNo uint32, ackFn func([]byte, time.Time), timeout time.Duration) {
ah := &AckHandler{ackFn, nil, nil}
m.AckLock.Lock()
m.AckHandlers[seqNo] = ah
m.AckLock.Unlock()
ah.timer = time.AfterFunc(timeout, func() {
m.AckLock.Lock()
delete(m.AckHandlers, seqNo)
m.AckLock.Unlock()
})
}
// Refute 当收到传来的关于本节点被怀疑或死亡的信息时,会发送一个Alive gossip message。
// 它将确保incarnation超过给定的 accusedInc 值,或者你可以提供 0 来获取下一个incarnation。
// 这将改变传入的节点状态,所以必须在持有NodeLock情况下调用这个。
func (m *Members) Refute(me *NodeState, accusedInc uint32) {
inc := m.NextIncarnation()
if accusedInc >= inc {
inc = m.skipIncarnation(accusedInc - inc + 1)
}
me.Incarnation = inc
// 减少health,因为我们被要求反驳一个问题。
m.Awareness.ApplyDelta(1)
a := Alive{
Incarnation: inc,
Node: me.Name,
Addr: me.Addr,
Port: me.Port,
Meta: me.Meta,
Vsn: []uint8{
me.PMin, me.PMax, me.PCur,
me.DMin, me.DMax, me.DCur,
},
}
m.EncodeBroadcast(me.Addr.String(), AliveMsg, a)
}
// VerifyProtocol 验证远端传来的NodeState 的协议版本与委托版本
func (m *Members) VerifyProtocol(remote []PushNodeState) error {
m.NodeLock.RLock()
defer m.NodeLock.RUnlock()
var maxpmin, minpmax uint8
var maxdmin, mindmax uint8
minpmax = math.MaxUint8
mindmax = math.MaxUint8
for _, rn := range remote {
if rn.State != StateAlive {
continue
}
if len(rn.Vsn) == 0 {
continue
}
if rn.Vsn[0] > maxpmin {
maxpmin = rn.Vsn[0]
}
if rn.Vsn[1] < minpmax {
minpmax = rn.Vsn[1]
}
if rn.Vsn[3] > maxdmin {
maxdmin = rn.Vsn[3]
}
if rn.Vsn[4] < mindmax {
mindmax = rn.Vsn[4]
}
}
for _, n := range m.Nodes {
if n.State != StateAlive {
continue
}
if n.PMin > maxpmin {
maxpmin = n.PMin
}
if n.PMax < minpmax {
minpmax = n.PMax
}
if n.DMin > maxdmin {
maxdmin = n.DMin
}
if n.DMax < mindmax {
mindmax = n.DMax
}
}
for _, n := range remote {
var nPCur, nDCur uint8
if len(n.Vsn) > 0 {
nPCur = n.Vsn[2]
nDCur = n.Vsn[5]
}
if nPCur < maxpmin || nPCur > minpmax {
return fmt.Errorf("Node '%s' 协议版本 (%d) is incompatible: [%d, %d]", n.Name, nPCur, maxpmin, minpmax)
}
if nDCur < maxdmin || nDCur > mindmax {
return fmt.Errorf("Node '%s' delegate 协议版本 (%d) is incompatible: [%d, %d]", n.Name, nDCur, maxdmin, mindmax)
}
}
for _, n := range m.Nodes {
nPCur := n.PCur
nDCur := n.DCur
if nPCur < maxpmin || nPCur > minpmax {
return fmt.Errorf(
"Node '%s' 协议版本 (%d) is incompatible: [%d, %d]",
n.Name, nPCur, maxpmin, minpmax)
}
if nDCur < maxdmin || nDCur > mindmax {
return fmt.Errorf("Node '%s' delegate 协议版本 (%d) is incompatible: [%d, %d]", n.Name, nDCur, maxdmin, mindmax)
}
}
return nil
}
// NextSeqNo 以线程安全的方式返回一个可用的序列号
func (m *Members) NextSeqNo() uint32 {
return atomic.AddUint32(&m.SequenceNum, 1)
}