title | date | category | tags | |||||
---|---|---|---|---|---|---|---|---|
MIT6.5840(6.824) Lab2: Raft 2C |
2024-01-13 09:15:38 -0800 |
|
|
本文将介绍lab2C
部分的实现, lab2C
要求实现raft
中的持久化功能, 相比lab2A
,和lab2B
, 本节的难度其实很小, 但复杂的是lab2A
,和lab2B
中的一些微小的bug
会在2C
中显现, 并且相对不太容易注意到。
Lab文档
见: https://pdos.csail.mit.edu/6.824/labs/lab-raft.html
我的代码: https://github.com/GFX9/MIT6.5840/tree/lab2C
在之前的AppendEntries
中有这样的代码:
if len(args.Entries) != 0 && len(rf.log) > args.PrevLogIndex+1 && rf.log[args.PrevLogIndex+1].Term != args.Entries[0].Term {
// 发生了冲突, 移除冲突位置开始后面所有的内容
DPrintf("server %v 的log与args发生冲突, 进行移除\n", rf.me)
rf.log = rf.log[:args.PrevLogIndex+1]
}
rf.log = append(rf.log, args.Entries...)
这段代码所做的事情是, 如果将要追加的位置存在日志项, 且日志项与RPC
中的日子切片的第一个发生冲突(Term
不匹配), 则将冲突位置及其之后的日志项清除掉。
这段代码看起来没有问题,但在高并发的场景下,会存在如下问题:
Leader
先发送了AppendEntries RPC
, 我们记为RPC1
Follower
收到RPC1
, 发生上述代码描述的冲突, 将冲突部分的内容清除, 并追加RPC1
中的日志切片- 由于并发程度高,
Leader
在RPC1
没有收到回复时又发送了下一个AppendEntries RPC
, 由于nextIndex
和matchIndex
只有在收到回复后才会修改, 因此这个新的AppendEntries RPC
, 我们记为RPC2
, 与RPC1
是一致的 Follower
收到RPC2
, 由于RPC2
和RPC1
完全相同, 因此其一定不会发生冲突, 结果是Follower
将相同的一个日志项切片追加了2次!
在考虑另一个场景:
Leader
先发送了AppendEntries RPC
, 我们记为RPC1
- 由于网络问题,
RPC1
没有即时到达Follower
Leader
又追加了新的log
, 此时又发送了AppendEntries RPC
, 我们记为RPC2
- 由于网络问题, 后发送的
RPC2
先到达Follower
,Follower
把RPC2
的日志项追加 - 此时
RPC1
到达了Follower
,Follower
把RPC1
的日志项强行追加时将导致log
被缩短
解决方案:
也就是说, 除了考虑冲突外, 还需要考虑重复的RPC
以及顺序颠倒的RPC
, 因此需要检查每个位置的log
是否匹配, 不匹配就覆盖, 否则不做更改。
这样对于重复的RPC
就不会重复追加, 并且如果RPC
顺序颠倒,也就是让Leader
多通过一次心跳同步
for idx, log := range args.Entries {
ridx := args.PrevLogIndex + 1 + idx
if ridx < len(rf.log) && rf.log[ridx].Term != log.Term {
// 某位置发生了冲突, 覆盖这个位置开始的所有内容
rf.log = rf.log[:ridx]
rf.log = append(rf.log, args.Entries[idx:]...)
break
} else if ridx == len(rf.log) {
// 没有发生冲突但长度更长了, 直接拼接
rf.log = append(rf.log, args.Entries[idx:]...)
break
}
}
这里的bug是完成
lab3
后才发现的bug
,lab2
分支的代码还没有进行这样的修改也能通过测例。原因是在我的设计中,Start
并没有立即广播心跳, 因此不会存在RPC
顺序颠倒的情况, 如果想看最新的代码修改, 参见: https://github.com/GFX9/MIT6.5840/blob/lab3A/src/raft/raft.go#L606
在之前的回退实现中, 如果有Follower
的日志不匹配, 每次RPC
中, Leader
会将其nextIndex
自减1来重试, 但其在某些情况下会导致效率很低, 因此需要AppendEntries RPC
的回复信息携带更多的字段以加速回退, 核心思想就是:Follower
返回更多信息给Leader
,使其可以以Term
为单位来回退
教授在课堂上已经介绍了快速回退的实现机制, 可以看我整理的笔记
我的实现和课堂的介绍基本一致, 只是将XLen
从空白的Log槽位数
改为Log的长度
:
type AppendEntriesReply struct {
// Your data here (2A).
Term int // currentTerm, for leader to update itself
Success bool // true if follower contained entry matching prevLogIndex and prevLogTerm
XTerm int // Follower中与Leader冲突的Log对应的Term
XIndex int // Follower中,对应Term为XTerm的第一条Log条目的索引
XLen int // Follower的log的长度
}
发现冲突时, 回复的逻辑为:
- 如果
PrevLogIndex
位置不存在日志项, 通过设置reply.XTerm = -1
告知Leader
, 并将reply.XLen
设置为自身日志长度 - 如果
PrevLogIndex
位置日志项存在但Term
冲突, 通过reply.XTerm
和reply.XIndex
分别告知冲突位置的Term
和这个Term
在Follower
中第一次出现的位置
具体代码如下:
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
...
isConflict := false
// 校验PrevLogIndex和PrevLogTerm不合法
// 2. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
if args.PrevLogIndex >= len(rf.log) {
// PrevLogIndex位置不存在日志项
reply.XTerm = -1
reply.XLen = len(rf.log) // Log长度
isConflict = true
DPrintf("server %v 的log在PrevLogIndex: %v 位置不存在日志项, Log长度为%v\n", rf.me, args.PrevLogIndex, reply.XLen)
} else if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
// PrevLogIndex位置的日志项存在, 但term不匹配
reply.XTerm = rf.log[args.PrevLogIndex].Term
i := args.PrevLogIndex
for rf.log[i].Term == reply.XTerm {
i -= 1
}
reply.XIndex = i + 1
isConflict = true
DPrintf("server %v 的log在PrevLogIndex: %v 位置Term不匹配, args.Term=%v, 实际的term=%v\n", rf.me, args.PrevLogIndex, args.PrevLogTerm, reply.XTerm)
}
if isConflict {
reply.Term = rf.currentTerm
reply.Success = false
return
}
...
}
如果需要回退, leader
的处理逻辑是:
- 如果
XTerm == -1
, 表示PrevLogIndex
位置在Follower
中不存在log
, 需要将nextIndex
设置为Follower
的log
长度即XLen
- 如果
XTerm != -1
, 表示PrevLogIndex
位置在Follower
中存在log
但其Term
为XTerm
, 与prevLogTerm
不匹配, 同时XIndex
表示这个Term
在Follower
中第一次出现的位置, 需要如下进行判断:- 如果
Follower
中存在XTerm
, 将nextIndex
设置为Follower
中最后一个term == XTerm
的日志项的下一位 - 否则, 将
nextIndex
设置为XIndex
- 如果
具体代码为:
func (rf *Raft) handleAppendEntries(serverTo int, args *AppendEntriesArgs) {
// 目前的设计, 重试自动发生在下一次心跳函数, 所以这里不需要死循环
...
rf.mu.Lock()
defer rf.mu.Unlock()
...
if reply.Term == rf.currentTerm && rf.role == Leader {
// term仍然相同, 且自己还是leader, 表名对应的follower在prevLogIndex位置没有与prevLogTerm匹配的项
// 快速回退的处理
if reply.XTerm == -1 {
// PrevLogIndex这个位置在Follower中不存在
DPrintf("leader %v 收到 server %v 的回退请求, 原因是log过短, 回退前的nextIndex[%v]=%v, 回退后的nextIndex[%v]=%v\n", rf.me, serverTo, serverTo, rf.nextIndex[serverTo], serverTo, reply.XLen)
rf.nextIndex[serverTo] = reply.XLen
return
}
i := rf.nextIndex[serverTo] - 1
for i > 0 && rf.log[i].Term > reply.XTerm {
i -= 1
}
if rf.log[i].Term == reply.XTerm {
// 之前PrevLogIndex发生冲突位置时, Follower的Term自己也有
DPrintf("leader %v 收到 server %v 的回退请求, 冲突位置的Term为%v, server的这个Term从索引%v开始, 而leader对应的最后一个XTerm索引为%v, 回退前的nextIndex[%v]=%v, 回退后的nextIndex[%v]=%v\n", rf.me, serverTo, reply.XTerm, reply.XIndex, i, serverTo, rf.nextIndex[serverTo], serverTo, i+1)
rf.nextIndex[serverTo] = i + 1
} else {
// 之前PrevLogIndex发生冲突位置时, Follower的Term自己没有
DPrintf("leader %v 收到 server %v 的回退请求, 冲突位置的Term为%v, server的这个Term从索引%v开始, 而leader对应的XTerm不存在, 回退前的nextIndex[%v]=%v, 回退后的nextIndex[%v]=%v\n", rf.me, serverTo, reply.XTerm, reply.XIndex, serverTo, rf.nextIndex[serverTo], serverTo, reply.XIndex)
rf.nextIndex[serverTo] = reply.XIndex
}
return
}
}
持久化的内容只包括: votedFor
, currentTerm
, log
, 为什么只需要持久化这三个变量, 也可以参考课堂笔记
persist
函数和readPersist
函数很简单, 只需要根据注释的提示完成即可:
func (rf *Raft) persist() {
// DPrintf("server %v 开始持久化, 最后一个持久化的log为: %v:%v", rf.me, len(rf.log)-1, rf.log[len(rf.log)-1].Cmd)
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.votedFor)
e.Encode(rf.currentTerm)
e.Encode(rf.log)
raftstate := w.Bytes()
rf.persister.Save(raftstate, nil)
}
// restore previously persisted state.
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
if data == nil || len(data) == 0 {
return
}
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
var votedFor int
var currentTerm int
var log []Entry
if d.Decode(&votedFor) != nil ||
d.Decode(¤tTerm) != nil ||
d.Decode(&log) != nil {
DPrintf("readPersist failed\n")
} else {
rf.votedFor = votedFor
rf.currentTerm = currentTerm
rf.log = log
}
}
lab
的持久化方案很粗糙, 只要修改了votedFor
, currentTerm
, log
中的任意一个, 则进行持久化, 因此只需要在相应位置调用persist
即可, 这里旧不给出代码了, 感兴趣可以直接看我提供的仓库。
特别需要说明的是,当崩溃恢复时,其调用的函数仍然是Make
函数, 而nextIndex
需要在执行了readPersist
后再初始化, 因为readPersist
修改了log
, 而nextIndex
需初始化为log
长度
按照我的理解, 持久化时不需要锁保护log
, 原因如下:
Leader
视角Leader
永远不会删除自己的log
(此时没有快照), 因此不需要锁保护Follower
视角 尽管Follower
可能截断log
, 但永远不会截断在commit
的log
之前, 而持久化只需要保证已经commit
的log
, 因此也不需要锁
2B
测试 由于我们实现了快速回退, 此时可以测试2B
, 看看是否速度有显著提升: 执行测试命令
go test -v -run 2B
结果如下:
相比于之前快了15s左右, 勉勉强强满足了任务书中的一份子以内... 看了实现还是不够精妙啊
2C
测试 执行测试命令
go test -v -run 2C
结果如下:
比官方的示例满了4s, 还不错
raft
的许多特性导致其一次测试并不准确, 有些bug需要多次测试才会出现, 编写如下脚本命名为manyTest_2B.sh
:
#!/bin/bash
# 初始化计数器
count=0
success_count=0
fail_count=0
# 设置测试次数
max_tests=50
for ((i=1; i<=max_tests; i++))
do
echo "Running test iteration $i of $max_tests..."
# 运行 go 测试命令
go test -v -run 2C &> output2C.log
# 检查 go 命令的退出状态
if [ "$?" -eq 0 ]; then
# 测试成功
success_count=$((success_count+1))
echo "Test iteration $i passed."
# 如果想保存通过的测试日志,取消下面行的注释
# mv output2C.log "success_$i.log"
else
# 测试失败
fail_count=$((fail_count+1))
echo "Test iteration $i failed, check 'failure2C_$i.log' for details."
mv output2C.log "failure2C_$i.log"
fi
done
# 报告测试结果
echo "Testing completed: $max_tests iterations run."
echo "Successes: $success_count"
echo "Failures: $fail_count"
再次进行测试:
./manyTest_2C.sh
结果: