Skip to content

Latest commit

 

History

History
307 lines (256 loc) · 12.3 KB

Lab2_Raft_2C.md

File metadata and controls

307 lines (256 loc) · 12.3 KB
title date category tags
MIT6.5840(6.824) Lab2: Raft 2C
2024-01-13 09:15:38 -0800
CS课程笔记
MIT6.5840(6.824) 2023
Lab笔记
分布式系统
Go

本文将介绍lab2C部分的实现, lab2C要求实现raft中的持久化功能, 相比lab2A,和lab2B, 本节的难度其实很小, 但复杂的是lab2A,和lab2B中的一些微小的bug会在2C中显现, 并且相对不太容易注意到。

Lab文档见: https://pdos.csail.mit.edu/6.824/labs/lab-raft.html

我的代码: https://github.com/GFX9/MIT6.5840/tree/lab2C

raft原论文

1 bug修复:重复的RPC

在之前的AppendEntries中有这样的代码:

if len(args.Entries) != 0 && len(rf.log) > args.PrevLogIndex+1 && rf.log[args.PrevLogIndex+1].Term != args.Entries[0].Term {
    // 发生了冲突, 移除冲突位置开始后面所有的内容
    DPrintf("server %v 的log与args发生冲突, 进行移除\n", rf.me)
    rf.log = rf.log[:args.PrevLogIndex+1]
}
rf.log = append(rf.log, args.Entries...)

这段代码所做的事情是, 如果将要追加的位置存在日志项, 且日志项与RPC中的日子切片的第一个发生冲突(Term不匹配), 则将冲突位置及其之后的日志项清除掉。

这段代码看起来没有问题,但在高并发的场景下,会存在如下问题:

  1. Leader先发送了AppendEntries RPC, 我们记为RPC1
  2. Follower收到RPC1, 发生上述代码描述的冲突, 将冲突部分的内容清除, 并追加RPC1中的日志切片
  3. 由于并发程度高, LeaderRPC1没有收到回复时又发送了下一个AppendEntries RPC, 由于nextIndexmatchIndex只有在收到回复后才会修改, 因此这个新的AppendEntries RPC, 我们记为RPC2, 与RPC1是一致的
  4. Follower收到RPC2, 由于RPC2RPC1完全相同, 因此其一定不会发生冲突, 结果是Follower将相同的一个日志项切片追加了2次!

在考虑另一个场景:

  1. Leader先发送了AppendEntries RPC, 我们记为RPC1
  2. 由于网络问题, RPC1没有即时到达Follower
  3. Leader又追加了新的log, 此时又发送了AppendEntries RPC, 我们记为RPC2
  4. 由于网络问题, 后发送的RPC2先到达Follower, FollowerRPC2的日志项追加
  5. 此时RPC1到达了Follower, FollowerRPC1的日志项强行追加时将导致log被缩短

解决方案: 也就是说, 除了考虑冲突外, 还需要考虑重复的RPC以及顺序颠倒的RPC, 因此需要检查每个位置的log是否匹配, 不匹配就覆盖, 否则不做更改。

这样对于重复的RPC就不会重复追加, 并且如果RPC顺序颠倒,也就是让Leader多通过一次心跳同步

for idx, log := range args.Entries {
	ridx := args.PrevLogIndex + 1 + idx
	if ridx < len(rf.log) && rf.log[ridx].Term != log.Term {
		// 某位置发生了冲突, 覆盖这个位置开始的所有内容
		rf.log = rf.log[:ridx]
		rf.log = append(rf.log, args.Entries[idx:]...)
		break
	} else if ridx == len(rf.log) {
		// 没有发生冲突但长度更长了, 直接拼接
		rf.log = append(rf.log, args.Entries[idx:]...)
		break
	}
}

这里的bug是完成lab3后才发现的bug, lab2分支的代码还没有进行这样的修改也能通过测例。原因是在我的设计中,Start并没有立即广播心跳, 因此不会存在RPC顺序颠倒的情况, 如果想看最新的代码修改, 参见: https://github.com/GFX9/MIT6.5840/blob/lab3A/src/raft/raft.go#L606

2 优化: 快速回退

在之前的回退实现中, 如果有Follower的日志不匹配, 每次RPC中, Leader会将其nextIndex自减1来重试, 但其在某些情况下会导致效率很低, 因此需要AppendEntries RPC的回复信息携带更多的字段以加速回退, 核心思想就是:Follower返回更多信息给Leader,使其可以以Term为单位来回退

教授在课堂上已经介绍了快速回退的实现机制, 可以看我整理的笔记

我的实现和课堂的介绍基本一致, 只是将XLen空白的Log槽位数改为Log的长度:

2.1 结构体定义

type AppendEntriesReply struct {
	// Your data here (2A).
	Term    int  // currentTerm, for leader to update itself
	Success bool // true if follower contained entry matching prevLogIndex and prevLogTerm
	XTerm   int  // Follower中与Leader冲突的Log对应的Term
	XIndex  int  // Follower中,对应Term为XTerm的第一条Log条目的索引
	XLen    int  // Follower的log的长度
}

2.2 Follower侧的AppendEntries

发现冲突时, 回复的逻辑为:

  1. 如果PrevLogIndex位置不存在日志项, 通过设置reply.XTerm = -1告知Leader, 并将reply.XLen设置为自身日志长度
  2. 如果PrevLogIndex位置日志项存在但Term冲突, 通过reply.XTermreply.XIndex分别告知冲突位置的Term和这个TermFollower中第一次出现的位置

具体代码如下:

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
    rf.mu.Lock()
	defer rf.mu.Unlock()
    ...
	isConflict := false

	// 校验PrevLogIndex和PrevLogTerm不合法
	// 2. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
	if args.PrevLogIndex >= len(rf.log) {
		// PrevLogIndex位置不存在日志项
		reply.XTerm = -1
		reply.XLen = len(rf.log) // Log长度
		isConflict = true
		DPrintf("server %v 的log在PrevLogIndex: %v 位置不存在日志项, Log长度为%v\n", rf.me, args.PrevLogIndex, reply.XLen)
	} else if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
		// PrevLogIndex位置的日志项存在, 但term不匹配
		reply.XTerm = rf.log[args.PrevLogIndex].Term
		i := args.PrevLogIndex
		for rf.log[i].Term == reply.XTerm {
			i -= 1
		}
		reply.XIndex = i + 1
		isConflict = true
		DPrintf("server %v 的log在PrevLogIndex: %v 位置Term不匹配, args.Term=%v, 实际的term=%v\n", rf.me, args.PrevLogIndex, args.PrevLogTerm, reply.XTerm)
	}

	if isConflict {
		reply.Term = rf.currentTerm
		reply.Success = false
		return
	}
    ...
}

2.3 leader侧的handleAppendEntries

如果需要回退, leader的处理逻辑是:

  1. 如果XTerm == -1, 表示PrevLogIndex位置在Follower中不存在log, 需要将nextIndex设置为Followerlog长度即XLen
  2. 如果XTerm != -1, 表示PrevLogIndex位置在Follower中存在log但其TermXTerm, 与prevLogTerm不匹配, 同时XIndex表示这个TermFollower中第一次出现的位置, 需要如下进行判断:
    1. 如果Follower中存在XTerm, 将nextIndex设置为Follower中最后一个term == XTerm的日志项的下一位
    2. 否则, 将nextIndex设置为XIndex

具体代码为:

func (rf *Raft) handleAppendEntries(serverTo int, args *AppendEntriesArgs) {
	// 目前的设计, 重试自动发生在下一次心跳函数, 所以这里不需要死循环

	...

	rf.mu.Lock()
	defer rf.mu.Unlock()
    
    ...

	if reply.Term == rf.currentTerm && rf.role == Leader {
		// term仍然相同, 且自己还是leader, 表名对应的follower在prevLogIndex位置没有与prevLogTerm匹配的项
		// 快速回退的处理
		if reply.XTerm == -1 {
			// PrevLogIndex这个位置在Follower中不存在
			DPrintf("leader %v 收到 server %v 的回退请求, 原因是log过短, 回退前的nextIndex[%v]=%v, 回退后的nextIndex[%v]=%v\n", rf.me, serverTo, serverTo, rf.nextIndex[serverTo], serverTo, reply.XLen)
			rf.nextIndex[serverTo] = reply.XLen
			return
		}

		i := rf.nextIndex[serverTo] - 1
		for i > 0 && rf.log[i].Term > reply.XTerm {
			i -= 1
		}
		if rf.log[i].Term == reply.XTerm {
			// 之前PrevLogIndex发生冲突位置时, Follower的Term自己也有

			DPrintf("leader %v 收到 server %v 的回退请求, 冲突位置的Term为%v, server的这个Term从索引%v开始, 而leader对应的最后一个XTerm索引为%v, 回退前的nextIndex[%v]=%v, 回退后的nextIndex[%v]=%v\n", rf.me, serverTo, reply.XTerm, reply.XIndex, i, serverTo, rf.nextIndex[serverTo], serverTo, i+1)
			rf.nextIndex[serverTo] = i + 1
		} else {
			// 之前PrevLogIndex发生冲突位置时, Follower的Term自己没有
			DPrintf("leader %v 收到 server %v 的回退请求, 冲突位置的Term为%v, server的这个Term从索引%v开始, 而leader对应的XTerm不存在, 回退前的nextIndex[%v]=%v, 回退后的nextIndex[%v]=%v\n", rf.me, serverTo, reply.XTerm, reply.XIndex, serverTo, rf.nextIndex[serverTo], serverTo, reply.XIndex)
			rf.nextIndex[serverTo] = reply.XIndex
		}
		return
	}
}

3 持久化

持久化的内容只包括: votedFor, currentTerm, log, 为什么只需要持久化这三个变量, 也可以参考课堂笔记

3.1 持久化函数

persist函数和readPersist函数很简单, 只需要根据注释的提示完成即可:

func (rf *Raft) persist() {
	// DPrintf("server %v 开始持久化, 最后一个持久化的log为: %v:%v", rf.me, len(rf.log)-1, rf.log[len(rf.log)-1].Cmd)

	w := new(bytes.Buffer)
	e := labgob.NewEncoder(w)
	e.Encode(rf.votedFor)
	e.Encode(rf.currentTerm)
	e.Encode(rf.log)
	raftstate := w.Bytes()
	rf.persister.Save(raftstate, nil)
}

// restore previously persisted state.
func (rf *Raft) readPersist(data []byte) {
	if data == nil || len(data) < 1 { // bootstrap without any state?
		return
	}
	if data == nil || len(data) == 0 {
		return
	}
	r := bytes.NewBuffer(data)
	d := labgob.NewDecoder(r)

	var votedFor int
	var currentTerm int
	var log []Entry
	if d.Decode(&votedFor) != nil ||
		d.Decode(&currentTerm) != nil ||
		d.Decode(&log) != nil {
		DPrintf("readPersist failed\n")
	} else {
		rf.votedFor = votedFor
		rf.currentTerm = currentTerm
		rf.log = log
	}
}

3.2 持久化位置

lab的持久化方案很粗糙, 只要修改了votedFor, currentTerm, log中的任意一个, 则进行持久化, 因此只需要在相应位置调用persist即可, 这里旧不给出代码了, 感兴趣可以直接看我提供的仓库。

特别需要说明的是,当崩溃恢复时,其调用的函数仍然是Make函数, 而nextIndex需要在执行了readPersist后再初始化, 因为readPersist修改了log, 而nextIndex需初始化为log长度

3.3 持久化时是否需要锁?

按照我的理解, 持久化时不需要锁保护log, 原因如下:

  • Leader视角 Leader永远不会删除自己的log(此时没有快照), 因此不需要锁保护
  • Follower视角 尽管Follower可能截断log, 但永远不会截断在commitlog之前, 而持久化只需要保证已经commitlog, 因此也不需要锁

4 测试

4.1 常规测试

  1. 2B测试 由于我们实现了快速回退, 此时可以测试2B, 看看是否速度有显著提升: 执行测试命令
go test -v -run 2B

结果如下:

lab2-2C-test2B

相比于之前快了15s左右, 勉勉强强满足了任务书中的一份子以内... 看了实现还是不够精妙啊

  1. 2C测试 执行测试命令
go test -v -run 2C

结果如下:

lab2-2C-test2C

比官方的示例满了4s, 还不错

4.2 多次测试

raft的许多特性导致其一次测试并不准确, 有些bug需要多次测试才会出现, 编写如下脚本命名为manyTest_2B.sh:

#!/bin/bash

# 初始化计数器
count=0
success_count=0
fail_count=0

# 设置测试次数
max_tests=50

for ((i=1; i<=max_tests; i++))
do
    echo "Running test iteration $i of $max_tests..."
    
    # 运行 go 测试命令
    go test -v -run 2C &> output2C.log
    
    # 检查 go 命令的退出状态
    if [ "$?" -eq 0 ]; then
        # 测试成功
        success_count=$((success_count+1))
        echo "Test iteration $i passed."
        # 如果想保存通过的测试日志,取消下面行的注释
        # mv output2C.log "success_$i.log"
    else
        # 测试失败
        fail_count=$((fail_count+1))
        echo "Test iteration $i failed, check 'failure2C_$i.log' for details."
        mv output2C.log "failure2C_$i.log"
    fi
done

# 报告测试结果
echo "Testing completed: $max_tests iterations run."
echo "Successes: $success_count"
echo "Failures: $fail_count"

再次进行测试:

./manyTest_2C.sh

结果:

lab2-2B-test2