Skip to content

Commit

Permalink
COMPLETE project2B
Browse files Browse the repository at this point in the history
  • Loading branch information
Metafora072 committed Jul 17, 2024
1 parent f35e1c7 commit 28e1760
Show file tree
Hide file tree
Showing 10 changed files with 567 additions and 17 deletions.
315 changes: 315 additions & 0 deletions kv/raftstore/peer_msg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package raftstore

import (
"fmt"
"github.com/pingcap-incubator/tinykv/kv/raftstore/meta"
"github.com/pingcap-incubator/tinykv/kv/util/engine_util"
pb "github.com/pingcap-incubator/tinykv/proto/pkg/eraftpb"
"time"

"github.com/Connor1996/badger/y"
Expand Down Expand Up @@ -38,11 +41,297 @@ func newPeerMsgHandler(peer *peer, ctx *GlobalContext) *peerMsgHandler {
}
}

// HandleRaftReady 处理 rawNode 传递来的 Ready,对这些 entries 进行 apply
// 每执行完一次 apply,都需要对 proposals 中的相应 Index 的 proposal 进行 callback 回应(调用 cb.Done()),然后从中删除这个 proposal
func (d *peerMsgHandler) HandleRaftReady() {
if d.stopped {
return
}
// Your Code Here (2B).

// 判断是否 Ready
if d.RaftGroup.HasReady() == false {
return
}

ready := d.RaftGroup.Ready()

// 调用 SaveReadyState 将 Ready 中需要持久化的内容保存到 badger
_, err := d.peerStorage.SaveReadyState(&ready)
if err != nil {
log.Panic(err)
}

// 调用 d.Send() 方法将 Ready 中的 Msg 发送出去
d.Send(d.ctx.trans, ready.Messages)

// 处理待 apply 的日志
if len(ready.CommittedEntries) > 0 {
entries := ready.CommittedEntries

writeBatch := &engine_util.WriteBatch{}
for _, entry := range entries {
writeBatch = d.processCommittedEntry(&entry,writeBatch)
// 节点有可能在 processCommittedEntry 返回之后就销毁了,如果销毁了需要直接返回,保证对这个节点而言不会再 DB 中写入数据
if d.stopped {
return
}
}

/*
type RaftApplyState struct {
// Record the applied index of the state machine to make sure
// not apply any index twice after restart.
AppliedIndex uint64 `protobuf:"varint,1,opt,name=applied_index,json=appliedIndex,proto3" json:"applied_index,omitempty"`
// Record the index and term of the last raft log that have been truncated. (Used in 2C)
TruncatedState *RaftTruncatedState `protobuf:"bytes,2,opt,name=truncated_state,json=truncatedState" json:"truncated_state,omitempty"`
}
*/
// 更新 peer_storage.applyState
d.peerStorage.applyState.AppliedIndex = entries[len(entries)-1].Index

// ??
err := writeBatch.SetMeta(meta.ApplyStateKey(d.regionId),d.peerStorage.applyState)
if err != nil {
log.Panic(err)
}

// 一次性执行所有的 Command 操作和 ApplyState 更新操作
writeBatch.MustWriteToDB(d.peerStorage.Engines.Kv)

}

// 调用 d.RaftGroup.Advance() 推进 RawNode,更新 raft 状态
d.RaftGroup.Advance(ready)
}

func (d *peerMsgHandler) processCommittedEntry(entry *pb.Entry,writeBatch *engine_util.WriteBatch) *engine_util.WriteBatch {
//fmt.Println("processCommittedEntry called!")
//EntryType_EntryNormal (值为 0): 这种类型表示普通的日志条目,通常用于存储客户端的请求或命令。这些命令会被应用到状态机中,以保持集群的一致性。
//EntryType_EntryConfChange (值为 1): 这种类型表示配置变更日志条目,用于集群配置的更改,例如添加或删除节点。当这种类型的条目被提交时,Raft 节点会应用配置变更,以更新集群的成员信息。

if entry.EntryType == pb.EntryType_EntryConfChange { // 日志条目是配置变更条目
confChange := &pb.ConfChange{}

err := confChange.Unmarshal(entry.Data)
if err != nil {
log.Panic(err)
}
log.Infof("EntryType_EntryConfChange")

return d.processConfChange(entry,confChange,writeBatch)
}
// 反序列化 entry.Data 中的数据
request := &raft_cmdpb.RaftCmdRequest{}

err := request.Unmarshal(entry.Data)
if err != nil {
log.Panic(err)
}
// TODO
//if request.AdminRequest != nil {
// // return d.processAdminRequest(entry, requests, kvWB)
//} else {
// return d.processRequest(entry, request, writeBatch)
//}

return d.processRequest(entry, request, writeBatch)
}

// processConfChange 处理配置变更类型的日志条目
func (d *peerMsgHandler) processConfChange(entry *pb.Entry,confChange *pb.ConfChange,writeBatch *engine_util.WriteBatch) *engine_util.WriteBatch {
// 反序列化 entry.Data 中的数据
request := &raft_cmdpb.RaftCmdRequest{}
err := request.Unmarshal(entry.Data)
if err != nil {
log.Panic(err)
}
// ------------------------------------------------------------------------------- ?
// 检查 Command Request 中的 RegionEpoch 是否是过期的,以此判定是不是一个重复的请求
// 实验指导书中提到,测试程序可能会多次提交同一个 ConfChange 直到 ConfChange 被应用
// CheckRegionEpoch 检查 RaftCmdRequest 头部携带的 RegionEpoch 是不是和 currentRegionEpoch 匹配
if err, ok := util.CheckRegionEpoch(request, d.Region(), true).(*util.ErrEpochNotMatch); ok {
log.Infof("[processConfChange] %v RegionEpoch not match", d.PeerId())
d.processProposal(entry, ErrResp(err))
return writeBatch
}
// ---------------------------------------------------------------------------------

/*
type ConfChangeType int32
const (
ConfChangeType_AddNode ConfChangeType = 0
ConfChangeType_RemoveNode ConfChangeType = 1
)
ConfChangeType_AddNode (值为 0): 表示添加节点操作,用于将一个新节点添加到 Raft 集群中。这通常用于扩展集群的规模,提高系统的容错能力和可用性。
ConfChangeType_RemoveNode (值为 1): 表示移除节点操作,用于从 Raft 集群中删除一个节点。这通常用于缩减集群规模或移除故障节点,以保持集群的健康状态.
*/
switch confChange.ChangeType {
case pb.ConfChangeType_AddNode: // 添加节点操作,用于将一个新节点添加到 Raft 集群中。
log.Infof("[AddNode] %v add %v", d.PeerId(), confChange.NodeId)
// 待添加的节点必须原先在 Region 中不存在
if d.getPeerIndex(confChange.NodeId) == uint64(len(d.Region().Peers)) { // 不存在
d.Region().Peers = append(d.Region().Peers,)
}
case pb.ConfChangeType_RemoveNode: // 移除节点操作,用于从 Raft 集群中删除一个节点。
// TODO
}

return writeBatch

}

// getPeerIndex 根据需要添加或者删除的 Peer id,找到 region 中是否已经存在这个 Peer,不存在则返回Peer数组的len
func (d *peerMsgHandler) getPeerIndex(nodeId uint64) uint64 {
for idx,peer := range d.peerStorage.region.Peers {
if peer.GetId() == nodeId {
return uint64(idx)
}
}
return uint64(len(d.peerStorage.region.Peers))
}

// processRequest 处理 commit 的 Put/Get/Delete/Snap 类型 command
func (d *peerMsgHandler) processRequest(entry *pb.Entry,request *raft_cmdpb.RaftCmdRequest,writeBatch *engine_util.WriteBatch) *engine_util.WriteBatch {
raftCmdResponse := &raft_cmdpb.RaftCmdResponse{
Header: &raft_cmdpb.RaftResponseHeader{},
Responses: make([]*raft_cmdpb.Response, 0),
}

// **`CmdType_Invalid` (值为 0)**: 表示无效的命令类型,通常用于初始化或错误处理。
// **`CmdType_Get` (值为 1)**: 表示获取操作,用于从存储中读取数据。
// **`CmdType_Put` (值为 3)**: 表示存储操作,用于将数据写入存储。
// **`CmdType_Delete` (值为 4)**: 表示删除操作,用于从存储中删除数据。
// **`CmdType_Snap` (值为 5)**: 表示快照操作,用于创建存储的快照,以便进行备份或恢复。
for _, curRequest := range request.Requests {
switch curRequest.CmdType {
case raft_cmdpb.CmdType_Invalid:
continue
case raft_cmdpb.CmdType_Get: // 表示获取操作,用于从存储中读取数据。
err := util.CheckKeyInRegion(curRequest.Get.Key,d.Region())
if err != nil {
//These errors are mainly related to Region. So it is also a member of RaftResponseHeader of RaftCmdResponse.
//When proposing a request or applying a command, there may be some errors.
//If that, you should return the raft command response with the error, then the error will be further passed to gRPC response.
//You can use BindRespError provided in kv/raftstore/cmd_resp.go to convert these errors to errors defined in errorpb.proto when returning the response with an error.
BindRespError(raftCmdResponse, err)
continue
}

// Get 和 Snap 请求需要先将之前的结果写到 DB ??
writeBatch.MustWriteToDB(d.peerStorage.Engines.Kv)

writeBatch = &engine_util.WriteBatch{}

value, _ := engine_util.GetCF(d.peerStorage.Engines.Kv,curRequest.Get.Cf,curRequest.Get.Key)
/*
type GetResponse struct {
Value []byte `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
*/
raftCmdResponse.Responses = append(raftCmdResponse.Responses, &raft_cmdpb.Response{
CmdType: raft_cmdpb.CmdType_Get,
Get: &raft_cmdpb.GetResponse{Value: value},
})
case raft_cmdpb.CmdType_Put: // 表示存储操作,用于将数据写入存储。
err := util.CheckKeyInRegion(curRequest.Put.Key,d.Region())
if err != nil {
//These errors are mainly related to Region. So it is also a member of RaftResponseHeader of RaftCmdResponse.
//When proposing a request or applying a command, there may be some errors.
//If that, you should return the raft command response with the error, then the error will be further passed to gRPC response.
//You can use BindRespError provided in kv/raftstore/cmd_resp.go to convert these errors to errors defined in errorpb.proto when returning the response with an error.
BindRespError(raftCmdResponse, err)
continue
}

writeBatch.SetCF(curRequest.Put.Cf,curRequest.Put.Key,curRequest.Put.Value)
/*
type PutResponse struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
*/
raftCmdResponse.Responses = append(raftCmdResponse.Responses, &raft_cmdpb.Response{
CmdType: raft_cmdpb.CmdType_Put,
Put: &raft_cmdpb.PutResponse{},
})
case raft_cmdpb.CmdType_Delete: // 表示删除操作,用于从存储中删除数据。
err := util.CheckKeyInRegion(curRequest.Delete.Key,d.Region())
if err != nil {
//These errors are mainly related to Region. So it is also a member of RaftResponseHeader of RaftCmdResponse.
//When proposing a request or applying a command, there may be some errors.
//If that, you should return the raft command response with the error, then the error will be further passed to gRPC response.
//You can use BindRespError provided in kv/raftstore/cmd_resp.go to convert these errors to errors defined in errorpb.proto when returning the response with an error.
BindRespError(raftCmdResponse, err)
continue
}
writeBatch.DeleteCF(curRequest.Delete.Cf,curRequest.Delete.Key)
/*
type DeleteResponse struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
*/
raftCmdResponse.Responses = append(raftCmdResponse.Responses, &raft_cmdpb.Response{
CmdType: raft_cmdpb.CmdType_Delete,
Delete: &raft_cmdpb.DeleteResponse{},
})
case raft_cmdpb.CmdType_Snap: // 表示快照操作,用于创建存储的快照,以便进行备份或恢复。
if request.Header.GetRegionEpoch().GetVersion() != d.Region().GetRegionEpoch().GetVersion() {
err := &util.ErrEpochNotMatch{}
BindRespError(raftCmdResponse,err)
continue
}
// Get 和 Snap 请求需要先将结果写到 DB,否 则的话如果有多个 entry 同时被 apply,客户端无法及时看到写入的结果 ??
writeBatch.MustWriteToDB(d.peerStorage.Engines.Kv)
writeBatch = &engine_util.WriteBatch{}
/*
type SnapResponse struct {
Region *metapb.Region `protobuf:"bytes,1,opt,name=region" json:"region,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
*/
raftCmdResponse.Responses = append(raftCmdResponse.Responses, &raft_cmdpb.Response{
CmdType: raft_cmdpb.CmdType_Snap,
Snap: &raft_cmdpb.SnapResponse{Region: d.Region()},
})
// Snap TODO
}
}

d.processProposal(entry, raftCmdResponse)
return writeBatch
}


// processProposal 找到等待 entry 的回调(proposal),存入操作的执行结果(raftCmdResponse)
func (d *peerMsgHandler) processProposal(entry *pb.Entry, raftCmdResponse *raft_cmdpb.RaftCmdResponse) {
for len(d.proposals) > 0 {
curProposal := d.proposals[0]
if curProposal.term < entry.Term || curProposal.index < entry.Index {
NotifyStaleReq(curProposal.term,curProposal.cb)
d.proposals = d.proposals[1:]
continue
}

if curProposal.term == entry.Term && curProposal.index == entry.Index {
if curProposal.cb != nil {
curProposal.cb.Txn = d.peerStorage.Engines.Kv.NewTransaction(false)
}
curProposal.cb.Done(raftCmdResponse)
d.proposals = d.proposals[1:]
continue
}

return
}
}

func (d *peerMsgHandler) HandleMsg(msg message.Msg) {
Expand Down Expand Up @@ -107,13 +396,39 @@ func (d *peerMsgHandler) preProposeRaftCommand(req *raft_cmdpb.RaftCmdRequest) e
return err
}

//将 client 的请求包装成 entry 传递给 raft 层
func (d *peerMsgHandler) proposeRaftCommand(msg *raft_cmdpb.RaftCmdRequest, cb *message.Callback) {
err := d.preProposeRaftCommand(msg)
if err != nil {
cb.Done(ErrResp(err))
return
}
// Your Code Here (2B).
if msg.Requests != nil {
// 封装回调函数 callback
curProposal := &proposal{
index: d.RaftGroup.Raft.RaftLog.LastIndex() + 1,
term: d.RaftGroup.Raft.Term,
cb: cb,
}

d.proposals = append(d.proposals,curProposal)

// 将 RaftCmdRequest 序列化为字节流
marshalRes, err := msg.Marshal()
if err != nil {
log.Panic(err)
}

// 将序列化的字节流包装成 entry 传递给 raft 层的 MessageType_MsgPropose
err = d.RaftGroup.Propose(marshalRes)
if err != nil {
log.Panic(err)
}
} else {
// TODO
}

}

func (d *peerMsgHandler) onTick() {
Expand Down
Loading

0 comments on commit 28e1760

Please sign in to comment.