Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lab 3 #7

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
42 changes: 41 additions & 1 deletion src/kvraft/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ package kvraft
import "6.824/labrpc"
import "crypto/rand"
import "math/big"




type Clerk struct {
clientId int64
servers []*labrpc.ClientEnd
seqNum int
JckXia marked this conversation as resolved.
Show resolved Hide resolved
// You will have to modify this struct.
}

Expand All @@ -17,9 +21,16 @@ func nrand() int64 {
return x
}

func (ck *Clerk) scheduleNextServer(clientId int) int {
return (clientId + 1) % len(ck.servers)
}

func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {

ck := new(Clerk)
ck.clientId = nrand()
ck.servers = servers
ck.seqNum = 0
// You'll have to add code here.
return ck
}
Expand All @@ -39,9 +50,23 @@ func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
func (ck *Clerk) Get(key string) string {

// You will have to modify this function.
return ""
getRequest := GetArgs{ck.clientId, ck.seqNum, key}
getReply := GetReply{}
serverId := 0

for getReply.Err != OK {
ok := ck.servers[serverId].Call("KVServer.Get", &getRequest, &getReply)

if !ok {
// Network issues/etc
}
serverId = ck.scheduleNextServer(serverId)
}
ck.seqNum++
return getReply.Value
}


//
// shared by Put and Append.
//
Expand All @@ -54,6 +79,19 @@ func (ck *Clerk) Get(key string) string {
//
func (ck *Clerk) PutAppend(key string, value string, op string) {
// You will have to modify this function.
putRequest := PutAppendArgs{ck.clientId, ck.seqNum, key, value, op}
putResponse := PutAppendReply{}
serverId := 0

for putResponse.Err != OK {

ok := ck.servers[serverId].Call("KVServer.PutAppend", &putRequest, &putResponse)
if !ok {

}
serverId = ck.scheduleNextServer(serverId)
}
ck.seqNum++
}

func (ck *Clerk) Put(key string, value string) {
Expand All @@ -62,3 +100,5 @@ func (ck *Clerk) Put(key string, value string) {
func (ck *Clerk) Append(key string, value string) {
ck.PutAppend(key, value, "Append")
}


5 changes: 4 additions & 1 deletion src/kvraft/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ const (
ErrNoKey = "ErrNoKey"
ErrWrongLeader = "ErrWrongLeader"
)

type Err string

// Put or Append
type PutAppendArgs struct {
ClientId int64
SeqNum int
Key string
Value string
Op string // "Put" or "Append"
Expand All @@ -23,6 +24,8 @@ type PutAppendReply struct {
}

type GetArgs struct {
ClientId int64
SeqNum int
Key string
// You'll have to add definitions here.
}
Expand Down
147 changes: 142 additions & 5 deletions src/kvraft/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"sync"
"sync/atomic"
"fmt"
)

const Debug = false
Expand All @@ -23,6 +24,17 @@ type Op struct {
// Your definitions here.
// Field names must start with capital letters,
// otherwise RPC will break.
ClientId int64
SeqNum int
OpType string
Key string
Value string
}

// Put() commands should have an empty string as value
type DedupEntry struct {
sequenceNum int
value string
}

type KVServer struct {
Expand All @@ -32,18 +44,120 @@ type KVServer struct {
applyCh chan raft.ApplyMsg
dead int32 // set by Kill()

getConsensusChan chan Op
putConensusChan chan Op
maxraftstate int // snapshot if log grows this big

// Your definitions here.
store map[string]string
dedupTable map[int64]DedupEntry
}

// Operations on the dedupTable
func (kv *KVServer) getClientEntry(clientId int64) (DedupEntry, bool) {
entry, exists := kv.dedupTable[clientId]
return entry, exists
}

func (kv *KVServer) upsertClientEntry(clientId int64, sequenceNum int, value string) {
kv.dedupTable[clientId] = DedupEntry{sequenceNum, value}
}

func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
// Your code here.


if !kv.isLeader() {
reply.Err = ErrWrongLeader
return
}

kv.mu.Lock()

clientEntry, exist := kv.getClientEntry(args.ClientId)

if exist && clientEntry.sequenceNum >= args.SeqNum {
reply.Value = clientEntry.value
kv.mu.Unlock()
return
}

opMsg := Op{args.ClientId, args.SeqNum, "Get", args.Key,""}
kv.rf.Start(opMsg)

kv.mu.Unlock()

//TODO Loop/range till uuid found?
getMsg := <- kv.getConsensusChan

kv.mu.Lock()
if getMsg.Key == args.Key {
reply.Value = kv.store[getMsg.Key]
}
kv.mu.Unlock()

//fmt.Printf("Get request processed by leader %d \n", kv.me)
reply.Err = OK
return
}

func serializePutAppendArgs(args *PutAppendArgs) string {
s := fmt.Sprintf("(K: %s, V: %s, OP: %s)", args.Key, args.Value, args.Op)
return s
}

func serializeOpMsg(msg Op) string {
s := fmt.Sprintf("(K: %s, V: %s, OpTye: %s)", msg.Key, msg.Value, msg.OpType)
return s
}

func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
// Your code here.


if !kv.isLeader() {
reply.Err = ErrWrongLeader
return
}

//fmt.Printf("Append request %s received by leader %d \n", serializePutAppendArgs(args), kv.me)
opType := args.Op
kv.mu.Lock()

// Dedup
clientEntry, exist := kv.getClientEntry(args.ClientId)
if exist && clientEntry.sequenceNum >= args.SeqNum {
kv.mu.Unlock()
return
}

opLog := Op{args.ClientId, args.SeqNum, opType, args.Key, args.Value}
kv.rf.Start(opLog)
kv.mu.Unlock()


putAppendMsg := <- kv.putConensusChan


kv.mu.Lock()

if putAppendMsg.OpType == args.Op && putAppendMsg.Key == args.Key && putAppendMsg.Value == args.Value {
switch putAppendMsg.OpType {
case "Put":
kv.store[putAppendMsg.Key] = putAppendMsg.Value
case "Append":
kv.store[putAppendMsg.Key] += putAppendMsg.Value
}
}

reply.Err = OK
kv.mu.Unlock()


return
}

func (kv *KVServer) isLeader() bool {
_, isLeader := kv.rf.GetState()
return isLeader
}

//
Expand All @@ -67,6 +181,26 @@ func (kv *KVServer) killed() bool {
return z == 1
}

func (kv *KVServer) readFromApplyCh() {
for kv.killed() == false {
appliedMsg := <- kv.rf.GetApplyCh()
// commitedOperation := appliedMsg.Command
commitedOpLog := appliedMsg.Command.(Op)
// fmt.Println(operationLog.OpType)
// fmt.Printf("Msg arrived! %s\n", serializeOpMsg(commitedOpLog))
//DebugP(dKv, "Applied msg received! %s", commitedOpLog.OpType)
switch commitedOpLog.OpType {
case "Get":
kv.getConsensusChan <- commitedOpLog
case "Put":
kv.putConensusChan <- commitedOpLog
case "Append":
kv.putConensusChan <- commitedOpLog
}
kv.upsertClientEntry(commitedOpLog.ClientId, commitedOpLog.SeqNum, commitedOpLog.Value)
}
}

//
// servers[] contains the ports of the set of
// servers that will cooperate via Raft to
Expand All @@ -85,17 +219,20 @@ func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persiste
// call labgob.Register on structures you want
// Go's RPC library to marshall/unmarshall.
labgob.Register(Op{})

kv := new(KVServer)
kv.me = me
kv.maxraftstate = maxraftstate

// You may need initialization code here.

kv.dedupTable = make(map[int64]DedupEntry)
kv.store = make(map[string]string)
kv.applyCh = make(chan raft.ApplyMsg)
kv.getConsensusChan = make(chan Op)
kv.putConensusChan = make(chan Op)

kv.rf = raft.Make(servers, me, persister, kv.applyCh)

// You may need initialization code here.

go kv.readFromApplyCh()
return kv
}
15 changes: 10 additions & 5 deletions src/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1124,12 +1124,12 @@ func (rf *Raft) killed() bool {
func (rf *Raft) dedicatedApplier(applyMsgs [] ApplyMsg) {
for _, v := range applyMsgs {
rf.applyCh <- v
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: A rewrite is needed here

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential design:
-> a dedicated channel to notify whenever rf.commitIndex is updated (rf.commitIndex > rf.lastApplied

rf.mu.Lock()
rf.lastApplied = v.CommandIndex
DebugP(dLast, "S%d commiting log, updating lastApplied to %d", rf.me, rf.lastApplied)
// rf.mu.Lock()
// rf.lastApplied = v.CommandIndex
// DebugP(dLast, "S%d commiting log, updating lastApplied to %d", rf.me, rf.lastApplied)

DebugP(dCommit, "S%d applying log [(%s)] at term %d, CI %d, isLeader: %v", rf.me, v.Command , rf.currentTerm, rf.lastApplied, rf.nodeStatus == Leader)
rf.mu.Unlock()
// DebugP(dSnap, "S%d applying log [(%s)] at term %d, CI %d, isLeader: %v, msgLen: %d", rf.me, v.Command , rf.currentTerm, rf.lastApplied, rf.nodeStatus == Leader, len(applyMsgs))
// rf.mu.Unlock()
}
}

Expand All @@ -1148,6 +1148,7 @@ func (rf *Raft) lifeCycleManager() {
DebugP(dLast,"S%d lastApplied is %d", rf.me, rf.lastApplied)
for lastApplied < rf.commitIndex {
lastApplied++
rf.lastApplied++
logEntryToCommit := rf.getLogAtIndex(lastApplied)
applyMsg := ApplyMsg{true, logEntryToCommit.Command, lastApplied, false,nil, 0,0}

Expand Down Expand Up @@ -1359,6 +1360,10 @@ func (rf * Raft) bootStrapState(hostServerId int) {
rf.lastIncludedIdx = 0
rf.lastIncludedTerm = 0
}

func (rf *Raft) GetApplyCh() chan ApplyMsg {
return rf.applyCh
}
//
// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
Expand Down