From d058a5f323dbf58ed303c4a7349b65517018f8c7 Mon Sep 17 00:00:00 2001 From: JckXia Date: Tue, 19 Dec 2023 15:09:55 -0500 Subject: [PATCH 01/13] Init commit --- src/kvraft/client.go | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/src/kvraft/client.go b/src/kvraft/client.go index 7e47579..9effc1d 100644 --- a/src/kvraft/client.go +++ b/src/kvraft/client.go @@ -39,9 +39,20 @@ func MakeClerk(servers []*labrpc.ClientEnd) *Clerk { func (ck *Clerk) Get(key string) string { // You will have to modify this function. - return "" + getRequest := GetArgs{key} + getReply := GetReply{} + for { + for i := range ck.servers { + ok := cd.servers[i].Call("KVServer.Get", &getRequest, &getReply) + if ok { + break + } + } + } + return getReply.Value } + // // shared by Put and Append. // @@ -54,6 +65,17 @@ func (ck *Clerk) Get(key string) string { // func (ck *Clerk) PutAppend(key string, value string, op string) { // You will have to modify this function. + putRequest := PutAppendArgs{key, value, op} + putResponse := PutAppendReply{} + + for { + for i:= range ck.servers { + ok := cd.servers[i].Call("KVServer.PutAppend", &putRequest. &putResponse) + if !ok { + break + } + } + } } func (ck *Clerk) Put(key string, value string) { From d392224e73c7df931681d6b6ece6104b1f4f8a29 Mon Sep 17 00:00:00 2001 From: JckXia Date: Tue, 19 Dec 2023 15:18:02 -0500 Subject: [PATCH 02/13] Add line to return if kv server isn't leader --- src/kvraft/server.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/kvraft/server.go b/src/kvraft/server.go index 01f12c6..05bfb8d 100644 --- a/src/kvraft/server.go +++ b/src/kvraft/server.go @@ -38,12 +38,20 @@ type KVServer struct { } -func (kv *KVServer) Get(args *GetArgs, reply *GetReply) { +func (kv *KVServer) Get(args *GetArgs, reply *GetReply) bool { // Your code here. + if !kv.isLeader() { return false } + } -func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) { +func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) bool{ // Your code here. + if !kv.isLeader() { return false } +} + +func (kv *KVServer) isLeader() bool { + _, isLeader := kv.rf.GetState() + return isLeader } // From 9402607bd422f3edd1463b7c739e2577eedd79e6 Mon Sep 17 00:00:00 2001 From: JckXia Date: Tue, 19 Dec 2023 15:28:49 -0500 Subject: [PATCH 03/13] Bug fixes to have it working --- src/kvraft/client.go | 9 +++++---- src/kvraft/server.go | 22 ++++++++++++++++++---- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/src/kvraft/client.go b/src/kvraft/client.go index 9effc1d..13abcc9 100644 --- a/src/kvraft/client.go +++ b/src/kvraft/client.go @@ -3,6 +3,7 @@ package kvraft import "6.824/labrpc" import "crypto/rand" import "math/big" + type Clerk struct { @@ -43,8 +44,8 @@ func (ck *Clerk) Get(key string) string { getReply := GetReply{} for { for i := range ck.servers { - ok := cd.servers[i].Call("KVServer.Get", &getRequest, &getReply) - if ok { + ck.servers[i].Call("KVServer.Get", &getRequest, &getReply) + if getReply.Err == OK { break } } @@ -70,8 +71,8 @@ func (ck *Clerk) PutAppend(key string, value string, op string) { for { for i:= range ck.servers { - ok := cd.servers[i].Call("KVServer.PutAppend", &putRequest. &putResponse) - if !ok { + ck.servers[i].Call("KVServer.PutAppend", &putRequest, &putResponse) + if putResponse.Err == OK { break } } diff --git a/src/kvraft/server.go b/src/kvraft/server.go index 05bfb8d..20039a1 100644 --- a/src/kvraft/server.go +++ b/src/kvraft/server.go @@ -23,6 +23,9 @@ type Op struct { // Your definitions here. // Field names must start with capital letters, // otherwise RPC will break. + OpType string + Key string + Value string } type KVServer struct { @@ -38,15 +41,26 @@ type KVServer struct { } -func (kv *KVServer) Get(args *GetArgs, reply *GetReply) bool { +func (kv *KVServer) Get(args *GetArgs, reply *GetReply) { // Your code here. - if !kv.isLeader() { return false } + if !kv.isLeader() { + reply.Err = ErrWrongLeader + return + } + reply.Err = OK + return } -func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) bool{ +func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) { // Your code here. - if !kv.isLeader() { return false } + if !kv.isLeader() { + reply.Err = ErrWrongLeader + return + } + + reply.Err = OK + return } func (kv *KVServer) isLeader() bool { From 8326ebf5adb39cb961c485aaaa1232c4682d248b Mon Sep 17 00:00:00 2001 From: JckXia Date: Wed, 20 Dec 2023 12:24:16 -0500 Subject: [PATCH 04/13] Save progress --- src/kvraft/server.go | 136 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 132 insertions(+), 4 deletions(-) diff --git a/src/kvraft/server.go b/src/kvraft/server.go index 20039a1..506edea 100644 --- a/src/kvraft/server.go +++ b/src/kvraft/server.go @@ -7,6 +7,7 @@ import ( "log" "sync" "sync/atomic" + "fmt" ) const Debug = false @@ -35,30 +36,136 @@ type KVServer struct { applyCh chan raft.ApplyMsg dead int32 // set by Kill() + getConsensusChan chan Op + putConensusChan chan Op maxraftstate int // snapshot if log grows this big + store map[string]string // Your definitions here. } func (kv *KVServer) Get(args *GetArgs, reply *GetReply) { - // Your code here. + + if !kv.isLeader() { reply.Err = ErrWrongLeader return } + // Your code here. + fmt.Printf("Get request received by leader %d \n", kv.me) + kv.mu.Lock() + + opMsg := Op{"Get", args.Key,""} + kv.rf.Start(opMsg) + + kv.mu.Unlock() + + //TODO Loop/range till uuid found? +// getMsg := <- kv.getConsensusChan + + + for { + select { + case getMsg := <- kv.getConsensusChan: + kv.mu.Lock() + //fmt.Println("Message!") + if getMsg.Key == args.Key { + reply.Value = kv.store[getMsg.Key] + } + reply.Err = OK + kv.mu.Unlock() + default: + + return + } + } + + // for getMsg := range kv.getConsensusChan { + // kv.mu.Lock() + // if getMsg.Key == args.Key { + // reply.Value = kv.store[getMsg.Key] + // } + // kv.mu.Unlock() + // } + + // kv.mu.Lock() + // reply.Value = kv.store[getMsg.Key] + + // kv.mu.Unlock() + fmt.Printf("Get request processed by leader %d \n", kv.me) reply.Err = OK return } +func serializePutAppendArgs(args *PutAppendArgs) string { + s := fmt.Sprintf("(K: %s, V: %s, OP: %s)", args.Key, args.Value, args.Op) + return s +} + +func serializeOpMsg(msg Op) string { + s := fmt.Sprintf("(K: %s, V: %s, OpTye: %s)", msg.Key, msg.Value, msg.OpType) + return s +} + func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) { // Your code here. + + if !kv.isLeader() { reply.Err = ErrWrongLeader return } + fmt.Printf("Append request %s received by leader %d \n", serializePutAppendArgs(args), kv.me) + + opType := args.Op + kv.mu.Lock() + + + opLog := Op{opType, args.Key, args.Value} + kv.rf.Start(opLog) + kv.mu.Unlock() + + fmt.Println("Wait for append msg") + // putAppendMsg := <- kv.putConensusChan + + // TODO: Not sure if this is a good idea tbh. + + for { + select { + case putAppendMsg := <- kv.putConensusChan: + kv.mu.Lock() + fmt.Println("Message!") + switch putAppendMsg.OpType { + case "Put": + kv.store[args.Key] = args.Value + case "Append": + kv.store[args.Key] += args.Value + } + reply.Err = OK + kv.mu.Unlock() + default: + + return + } + } + // for putAppendMsg := range kv.putConensusChan { + // kv.mu.Lock() + // switch putAppendMsg.OpType { + // case "Put": + // kv.store[args.Key] = args.Value + // case "Append": + // kv.store[args.Key] += args.Value + // } + // kv.mu.Unlock() + // } + + + + + fmt.Printf("Append request processed by leader %d \n", kv.me) reply.Err = OK return } @@ -89,6 +196,25 @@ func (kv *KVServer) killed() bool { return z == 1 } +func (kv *KVServer) readFromApplyCh() { + for kv.killed() == false { + appliedMsg := <- kv.rf.GetApplyCh() + // commitedOperation := appliedMsg.Command + commitedOpLog := appliedMsg.Command.(Op) + // fmt.Println(operationLog.OpType) + fmt.Printf("Msg arrived! %s\n", serializeOpMsg(commitedOpLog)) + //DebugP(dKv, "Applied msg received! %s", commitedOpLog.OpType) + switch commitedOpLog.OpType { + case "Get": + kv.getConsensusChan <- commitedOpLog + case "Put": + kv.putConensusChan <- commitedOpLog + case "Append": + kv.putConensusChan <- commitedOpLog + } + } +} + // // servers[] contains the ports of the set of // servers that will cooperate via Raft to @@ -107,17 +233,19 @@ func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persiste // call labgob.Register on structures you want // Go's RPC library to marshall/unmarshall. labgob.Register(Op{}) - kv := new(KVServer) kv.me = me kv.maxraftstate = maxraftstate // You may need initialization code here. - + kv.store = make(map[string]string) kv.applyCh = make(chan raft.ApplyMsg) + kv.getConsensusChan = make(chan Op) + kv.putConensusChan = make(chan Op) + kv.rf = raft.Make(servers, me, persister, kv.applyCh) // You may need initialization code here. - + go kv.readFromApplyCh() return kv } From a13d54a6469c075cc888411a2567e64f7cda4c30 Mon Sep 17 00:00:00 2001 From: JckXia Date: Wed, 20 Dec 2023 13:23:00 -0500 Subject: [PATCH 05/13] Revert to a previous soln --- src/kvraft/server.go | 111 ++++++++++++++++++++++--------------------- 1 file changed, 58 insertions(+), 53 deletions(-) diff --git a/src/kvraft/server.go b/src/kvraft/server.go index 506edea..c8c4030 100644 --- a/src/kvraft/server.go +++ b/src/kvraft/server.go @@ -62,31 +62,32 @@ func (kv *KVServer) Get(args *GetArgs, reply *GetReply) { kv.mu.Unlock() //TODO Loop/range till uuid found? -// getMsg := <- kv.getConsensusChan - - - for { - select { - case getMsg := <- kv.getConsensusChan: - kv.mu.Lock() - //fmt.Println("Message!") - if getMsg.Key == args.Key { - reply.Value = kv.store[getMsg.Key] - } - reply.Err = OK - kv.mu.Unlock() - default: - - return - } - } + getMsg := <- kv.getConsensusChan + + + // for { + // select { + // case getMsg := <- kv.getConsensusChan: + // kv.mu.Lock() + // //fmt.Println("Message!") + // if getMsg.Key == args.Key { + // reply.Value = kv.store[getMsg.Key] + // } + // reply.Err = OK + // kv.mu.Unlock() + // default: + // fmt.Println("Returning to get caller") + // reply.Err = OK + // return + // } + // } // for getMsg := range kv.getConsensusChan { - // kv.mu.Lock() - // if getMsg.Key == args.Key { - // reply.Value = kv.store[getMsg.Key] - // } - // kv.mu.Unlock() + kv.mu.Lock() + if getMsg.Key == args.Key { + reply.Value = kv.store[getMsg.Key] + } + kv.mu.Unlock() // } // kv.mu.Lock() @@ -129,42 +130,46 @@ func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) { kv.mu.Unlock() fmt.Println("Wait for append msg") - // putAppendMsg := <- kv.putConensusChan + putAppendMsg := <- kv.putConensusChan - // TODO: Not sure if this is a good idea tbh. - for { - select { - case putAppendMsg := <- kv.putConensusChan: - kv.mu.Lock() - fmt.Println("Message!") - switch putAppendMsg.OpType { - case "Put": - kv.store[args.Key] = args.Value - case "Append": - kv.store[args.Key] += args.Value - } - reply.Err = OK - kv.mu.Unlock() - default: - - return - } + kv.mu.Lock() + fmt.Println("PUT MSG received") + if putAppendMsg.OpType == args.Op && putAppendMsg.Key == args.Key && putAppendMsg.Value == args.Value { + switch putAppendMsg.OpType { + case "Put": + kv.store[putAppendMsg.Key] = putAppendMsg.Value + case "Append": + kv.store[putAppendMsg.Key] += putAppendMsg.Value + } } - // for putAppendMsg := range kv.putConensusChan { - // kv.mu.Lock() - // switch putAppendMsg.OpType { - // case "Put": - // kv.store[args.Key] = args.Value - // case "Append": - // kv.store[args.Key] += args.Value + + reply.Err = OK + kv.mu.Unlock() + + // TODO: Not sure if this is a good idea tbh. + + // for { + // case putAppendMsg := <- kv.putConensusChan: + // kv.mu.Lock() + // fmt.Println("PUT MSG received") + // if putAppendMsg.OpType == args.Op && putAppendMsg.Key == args.Key && putAppendMsg.Value == args.Value { + // switch putAppendMsg.OpType { + // case "Put": + // kv.store[putAppendMsg.Key] = putAppendMsg.Value + // case "Append": + // kv.store[putAppendMsg.Key] += putAppendMsg.Value + // } + // } + + // reply.Err = OK + // kv.mu.Unlock() + // default: + // reply.Err = OK + // return // } - // kv.mu.Unlock() // } - - - fmt.Printf("Append request processed by leader %d \n", kv.me) reply.Err = OK return From c7bd78647d5d7330e273fcb74e206bf3048f523d Mon Sep 17 00:00:00 2001 From: JckXia Date: Wed, 20 Dec 2023 14:18:19 -0500 Subject: [PATCH 06/13] Working 3ABasic --- src/kvraft/client.go | 12 +++++++++--- src/raft/raft.go | 15 ++++++++++----- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/kvraft/client.go b/src/kvraft/client.go index 13abcc9..6825c7c 100644 --- a/src/kvraft/client.go +++ b/src/kvraft/client.go @@ -3,6 +3,7 @@ package kvraft import "6.824/labrpc" import "crypto/rand" import "math/big" +import "fmt" @@ -45,12 +46,15 @@ func (ck *Clerk) Get(key string) string { for { for i := range ck.servers { ck.servers[i].Call("KVServer.Get", &getRequest, &getReply) + if getReply.Err == OK { - break + fmt.Println("CLI: Get response returned and is ok") + return getReply.Value } } } - return getReply.Value + + return "" } @@ -73,10 +77,12 @@ func (ck *Clerk) PutAppend(key string, value string, op string) { for i:= range ck.servers { ck.servers[i].Call("KVServer.PutAppend", &putRequest, &putResponse) if putResponse.Err == OK { - break + fmt.Println("CLI: Put returned and is ok") + return } } } +// fmt.Println("PUT returned!") } func (ck *Clerk) Put(key string, value string) { diff --git a/src/raft/raft.go b/src/raft/raft.go index a0796cd..eaa0bc2 100644 --- a/src/raft/raft.go +++ b/src/raft/raft.go @@ -1124,12 +1124,12 @@ func (rf *Raft) killed() bool { func (rf *Raft) dedicatedApplier(applyMsgs [] ApplyMsg) { for _, v := range applyMsgs { rf.applyCh <- v - rf.mu.Lock() - rf.lastApplied = v.CommandIndex - DebugP(dLast, "S%d commiting log, updating lastApplied to %d", rf.me, rf.lastApplied) + // rf.mu.Lock() + // rf.lastApplied = v.CommandIndex + // DebugP(dLast, "S%d commiting log, updating lastApplied to %d", rf.me, rf.lastApplied) - DebugP(dCommit, "S%d applying log [(%s)] at term %d, CI %d, isLeader: %v", rf.me, v.Command , rf.currentTerm, rf.lastApplied, rf.nodeStatus == Leader) - rf.mu.Unlock() + // DebugP(dSnap, "S%d applying log [(%s)] at term %d, CI %d, isLeader: %v, msgLen: %d", rf.me, v.Command , rf.currentTerm, rf.lastApplied, rf.nodeStatus == Leader, len(applyMsgs)) + // rf.mu.Unlock() } } @@ -1148,6 +1148,7 @@ func (rf *Raft) lifeCycleManager() { DebugP(dLast,"S%d lastApplied is %d", rf.me, rf.lastApplied) for lastApplied < rf.commitIndex { lastApplied++ + rf.lastApplied++ logEntryToCommit := rf.getLogAtIndex(lastApplied) applyMsg := ApplyMsg{true, logEntryToCommit.Command, lastApplied, false,nil, 0,0} @@ -1359,6 +1360,10 @@ func (rf * Raft) bootStrapState(hostServerId int) { rf.lastIncludedIdx = 0 rf.lastIncludedTerm = 0 } + +func (rf *Raft) GetApplyCh() chan ApplyMsg { + return rf.applyCh +} // // the service or tester wants to create a Raft server. the ports // of all the Raft servers (including this one) are in peers[]. this From 0014b980ebe718ea32df1f9d3ce55389679ab305 Mon Sep 17 00:00:00 2001 From: JckXia Date: Wed, 20 Dec 2023 14:40:03 -0500 Subject: [PATCH 07/13] Clean up --- src/kvraft/client.go | 6 ++-- src/kvraft/common.go | 8 +++++- src/kvraft/server.go | 65 ++++++-------------------------------------- 3 files changed, 18 insertions(+), 61 deletions(-) diff --git a/src/kvraft/client.go b/src/kvraft/client.go index 6825c7c..ac3aef0 100644 --- a/src/kvraft/client.go +++ b/src/kvraft/client.go @@ -3,7 +3,7 @@ package kvraft import "6.824/labrpc" import "crypto/rand" import "math/big" -import "fmt" +// import "fmt" @@ -48,7 +48,7 @@ func (ck *Clerk) Get(key string) string { ck.servers[i].Call("KVServer.Get", &getRequest, &getReply) if getReply.Err == OK { - fmt.Println("CLI: Get response returned and is ok") + // fmt.Println("CLI: Get response returned and is ok") return getReply.Value } } @@ -77,7 +77,7 @@ func (ck *Clerk) PutAppend(key string, value string, op string) { for i:= range ck.servers { ck.servers[i].Call("KVServer.PutAppend", &putRequest, &putResponse) if putResponse.Err == OK { - fmt.Println("CLI: Put returned and is ok") + // fmt.Println("CLI: Put returned and is ok") return } } diff --git a/src/kvraft/common.go b/src/kvraft/common.go index e5ee442..5bba5c0 100644 --- a/src/kvraft/common.go +++ b/src/kvraft/common.go @@ -5,9 +5,15 @@ const ( ErrNoKey = "ErrNoKey" ErrWrongLeader = "ErrWrongLeader" ) - type Err string + +const ( + Append = "Append" + Get = "Get" + Put = "Put" +) + // Put or Append type PutAppendArgs struct { Key string diff --git a/src/kvraft/server.go b/src/kvraft/server.go index c8c4030..32007a3 100644 --- a/src/kvraft/server.go +++ b/src/kvraft/server.go @@ -53,7 +53,7 @@ func (kv *KVServer) Get(args *GetArgs, reply *GetReply) { return } // Your code here. - fmt.Printf("Get request received by leader %d \n", kv.me) + //fmt.Printf("Get request received by leader %d \n", kv.me) kv.mu.Lock() opMsg := Op{"Get", args.Key,""} @@ -64,38 +64,14 @@ func (kv *KVServer) Get(args *GetArgs, reply *GetReply) { //TODO Loop/range till uuid found? getMsg := <- kv.getConsensusChan - - // for { - // select { - // case getMsg := <- kv.getConsensusChan: - // kv.mu.Lock() - // //fmt.Println("Message!") - // if getMsg.Key == args.Key { - // reply.Value = kv.store[getMsg.Key] - // } - // reply.Err = OK - // kv.mu.Unlock() - // default: - // fmt.Println("Returning to get caller") - // reply.Err = OK - // return - // } - // } - - // for getMsg := range kv.getConsensusChan { - kv.mu.Lock() + kv.mu.Lock() if getMsg.Key == args.Key { reply.Value = kv.store[getMsg.Key] } - kv.mu.Unlock() - // } + kv.mu.Unlock() - // kv.mu.Lock() - // reply.Value = kv.store[getMsg.Key] - - // kv.mu.Unlock() - fmt.Printf("Get request processed by leader %d \n", kv.me) + //fmt.Printf("Get request processed by leader %d \n", kv.me) reply.Err = OK return } @@ -119,8 +95,7 @@ func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) { return } - fmt.Printf("Append request %s received by leader %d \n", serializePutAppendArgs(args), kv.me) - + //fmt.Printf("Append request %s received by leader %d \n", serializePutAppendArgs(args), kv.me) opType := args.Op kv.mu.Lock() @@ -129,12 +104,12 @@ func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) { kv.rf.Start(opLog) kv.mu.Unlock() - fmt.Println("Wait for append msg") + putAppendMsg := <- kv.putConensusChan kv.mu.Lock() - fmt.Println("PUT MSG received") + if putAppendMsg.OpType == args.Op && putAppendMsg.Key == args.Key && putAppendMsg.Value == args.Value { switch putAppendMsg.OpType { case "Put": @@ -147,31 +122,7 @@ func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) { reply.Err = OK kv.mu.Unlock() - // TODO: Not sure if this is a good idea tbh. - // for { - // case putAppendMsg := <- kv.putConensusChan: - // kv.mu.Lock() - // fmt.Println("PUT MSG received") - // if putAppendMsg.OpType == args.Op && putAppendMsg.Key == args.Key && putAppendMsg.Value == args.Value { - // switch putAppendMsg.OpType { - // case "Put": - // kv.store[putAppendMsg.Key] = putAppendMsg.Value - // case "Append": - // kv.store[putAppendMsg.Key] += putAppendMsg.Value - // } - // } - - // reply.Err = OK - // kv.mu.Unlock() - // default: - // reply.Err = OK - // return - // } - // } - - fmt.Printf("Append request processed by leader %d \n", kv.me) - reply.Err = OK return } @@ -207,7 +158,7 @@ func (kv *KVServer) readFromApplyCh() { // commitedOperation := appliedMsg.Command commitedOpLog := appliedMsg.Command.(Op) // fmt.Println(operationLog.OpType) - fmt.Printf("Msg arrived! %s\n", serializeOpMsg(commitedOpLog)) + // fmt.Printf("Msg arrived! %s\n", serializeOpMsg(commitedOpLog)) //DebugP(dKv, "Applied msg received! %s", commitedOpLog.OpType) switch commitedOpLog.OpType { case "Get": From 8ffc19dda8e53758a9924975326fc48e2a82b280 Mon Sep 17 00:00:00 2001 From: JckXia Date: Sat, 30 Dec 2023 16:29:30 -0500 Subject: [PATCH 08/13] Fix bug --- src/kvraft/common.go | 7 ------- src/kvraft/server.go | 1 - 2 files changed, 8 deletions(-) diff --git a/src/kvraft/common.go b/src/kvraft/common.go index 5bba5c0..344770d 100644 --- a/src/kvraft/common.go +++ b/src/kvraft/common.go @@ -7,13 +7,6 @@ const ( ) type Err string - -const ( - Append = "Append" - Get = "Get" - Put = "Put" -) - // Put or Append type PutAppendArgs struct { Key string diff --git a/src/kvraft/server.go b/src/kvraft/server.go index 32007a3..c38c207 100644 --- a/src/kvraft/server.go +++ b/src/kvraft/server.go @@ -70,7 +70,6 @@ func (kv *KVServer) Get(args *GetArgs, reply *GetReply) { } kv.mu.Unlock() - //fmt.Printf("Get request processed by leader %d \n", kv.me) reply.Err = OK return From 472c1af843504f762e41727af0cfc75181222e56 Mon Sep 17 00:00:00 2001 From: JckXia Date: Sat, 30 Dec 2023 17:32:48 -0500 Subject: [PATCH 09/13] Add a dedupTable entry for client IDs --- src/kvraft/server.go | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/src/kvraft/server.go b/src/kvraft/server.go index c38c207..92d8638 100644 --- a/src/kvraft/server.go +++ b/src/kvraft/server.go @@ -29,6 +29,12 @@ type Op struct { Value string } +// Put() commands should have an empty string as value +type DedupEntry struct { + sequenceNum int + value string +} + type KVServer struct { mu sync.Mutex me int @@ -41,9 +47,21 @@ type KVServer struct { maxraftstate int // snapshot if log grows this big store map[string]string + dedupTable map[int64]DedupEntry // Your definitions here. } +// Operations on the dedupTable +func (kv *KVServer) getClientEntry(clientId int64) (DedupEntry, bool) { + entry, exists := kv.dedupTable[clientId] + return entry, exists +} + +func (kv *KVServer) upsertClientEntry(clientId int64, sequenceNum int, value string) { + kv.dedupTable[clientId] = DedupEntry{sequenceNum, value} +} + + func (kv *KVServer) Get(args *GetArgs, reply *GetReply) { @@ -115,7 +133,7 @@ func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) { kv.store[putAppendMsg.Key] = putAppendMsg.Value case "Append": kv.store[putAppendMsg.Key] += putAppendMsg.Value - } + } } reply.Err = OK From f93f55d4c65907e4f74990079aeea2991e7140dc Mon Sep 17 00:00:00 2001 From: JckXia Date: Sat, 30 Dec 2023 17:55:47 -0500 Subject: [PATCH 10/13] Refactor client side structure --- src/kvraft/client.go | 43 +++++++++++++++++++++++++------------------ 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/src/kvraft/client.go b/src/kvraft/client.go index ac3aef0..653ec65 100644 --- a/src/kvraft/client.go +++ b/src/kvraft/client.go @@ -19,6 +19,10 @@ func nrand() int64 { return x } +func (ck *Clerk) scheduleNextServer(clientId int) int { + return (clientId + 1) % len(ck.servers) +} + func MakeClerk(servers []*labrpc.ClientEnd) *Clerk { ck := new(Clerk) ck.servers = servers @@ -43,20 +47,22 @@ func (ck *Clerk) Get(key string) string { // You will have to modify this function. getRequest := GetArgs{key} getReply := GetReply{} - for { - for i := range ck.servers { - ck.servers[i].Call("KVServer.Get", &getRequest, &getReply) - - if getReply.Err == OK { - // fmt.Println("CLI: Get response returned and is ok") - return getReply.Value - } + serverId := 0 + + for getReply.Err != OK { + ok := ck.servers[serverId].Call("KVServer.Get", &getRequest, &getReply) + + if !ok { + // Network issues/etc } + serverId = ck.scheduleNextServer(serverId) } - - return "" + + return getReply.Value } + + // // shared by Put and Append. @@ -72,17 +78,16 @@ func (ck *Clerk) PutAppend(key string, value string, op string) { // You will have to modify this function. putRequest := PutAppendArgs{key, value, op} putResponse := PutAppendReply{} + serverId := 0 + + for putResponse.Err != OK { + + ok := ck.servers[serverId].Call("KVServer.PutAppend", &putRequest, &putResponse) + if !ok { - for { - for i:= range ck.servers { - ck.servers[i].Call("KVServer.PutAppend", &putRequest, &putResponse) - if putResponse.Err == OK { - // fmt.Println("CLI: Put returned and is ok") - return - } } + serverId = ck.scheduleNextServer(serverId) } -// fmt.Println("PUT returned!") } func (ck *Clerk) Put(key string, value string) { @@ -91,3 +96,5 @@ func (ck *Clerk) Put(key string, value string) { func (ck *Clerk) Append(key string, value string) { ck.PutAppend(key, value, "Append") } + + From 5ff9c3283ee10d6069bc2fc283f76cd83db9e504 Mon Sep 17 00:00:00 2001 From: JckXia Date: Sat, 30 Dec 2023 18:14:23 -0500 Subject: [PATCH 11/13] Complete dedup impl on client's side --- src/kvraft/client.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/kvraft/client.go b/src/kvraft/client.go index 653ec65..0d79ad4 100644 --- a/src/kvraft/client.go +++ b/src/kvraft/client.go @@ -3,12 +3,14 @@ package kvraft import "6.824/labrpc" import "crypto/rand" import "math/big" -// import "fmt" + type Clerk struct { + clientId int64 servers []*labrpc.ClientEnd + seqNum int // You will have to modify this struct. } @@ -24,8 +26,11 @@ func (ck *Clerk) scheduleNextServer(clientId int) int { } func MakeClerk(servers []*labrpc.ClientEnd) *Clerk { + ck := new(Clerk) + ck.clientId = nrand() ck.servers = servers + ck.seqNum = 0 // You'll have to add code here. return ck } @@ -45,7 +50,7 @@ func MakeClerk(servers []*labrpc.ClientEnd) *Clerk { func (ck *Clerk) Get(key string) string { // You will have to modify this function. - getRequest := GetArgs{key} + getRequest := GetArgs{ck.clientId, ck.seqNum, key} getReply := GetReply{} serverId := 0 @@ -57,13 +62,11 @@ func (ck *Clerk) Get(key string) string { } serverId = ck.scheduleNextServer(serverId) } - + ck.seqNum++ return getReply.Value } - - // // shared by Put and Append. // @@ -76,7 +79,7 @@ func (ck *Clerk) Get(key string) string { // func (ck *Clerk) PutAppend(key string, value string, op string) { // You will have to modify this function. - putRequest := PutAppendArgs{key, value, op} + putRequest := PutAppendArgs{ck.clientId, ck.seqNum, key, value, op} putResponse := PutAppendReply{} serverId := 0 @@ -88,6 +91,7 @@ func (ck *Clerk) PutAppend(key string, value string, op string) { } serverId = ck.scheduleNextServer(serverId) } + ck.seqNum++ } func (ck *Clerk) Put(key string, value string) { From c0dcdbdc53bf548c69765477d953090e433dd61a Mon Sep 17 00:00:00 2001 From: JckXia Date: Sat, 30 Dec 2023 20:34:16 -0500 Subject: [PATCH 12/13] Add dedup logic in Get/PutAppend --- src/kvraft/common.go | 4 ++++ src/kvraft/server.go | 31 ++++++++++++++++++++++--------- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/src/kvraft/common.go b/src/kvraft/common.go index 344770d..e6dd473 100644 --- a/src/kvraft/common.go +++ b/src/kvraft/common.go @@ -9,6 +9,8 @@ type Err string // Put or Append type PutAppendArgs struct { + ClientId int64 + SeqNum int Key string Value string Op string // "Put" or "Append" @@ -22,6 +24,8 @@ type PutAppendReply struct { } type GetArgs struct { + ClientId int64 + SeqNum int Key string // You'll have to add definitions here. } diff --git a/src/kvraft/server.go b/src/kvraft/server.go index 92d8638..18a2e12 100644 --- a/src/kvraft/server.go +++ b/src/kvraft/server.go @@ -48,7 +48,6 @@ type KVServer struct { store map[string]string dedupTable map[int64]DedupEntry - // Your definitions here. } // Operations on the dedupTable @@ -61,30 +60,36 @@ func (kv *KVServer) upsertClientEntry(clientId int64, sequenceNum int, value str kv.dedupTable[clientId] = DedupEntry{sequenceNum, value} } - - func (kv *KVServer) Get(args *GetArgs, reply *GetReply) { - + if !kv.isLeader() { reply.Err = ErrWrongLeader return } - // Your code here. - //fmt.Printf("Get request received by leader %d \n", kv.me) + kv.mu.Lock() + clientEntry, exist := kv.getClientEntry(args.ClientId) + + if exist && clientEntry.sequenceNum >= args.SeqNum { + reply.Value = clientEntry.value + kv.mu.Unlock() + return + } + opMsg := Op{"Get", args.Key,""} kv.rf.Start(opMsg) kv.mu.Unlock() - + //TODO Loop/range till uuid found? getMsg := <- kv.getConsensusChan kv.mu.Lock() if getMsg.Key == args.Key { reply.Value = kv.store[getMsg.Key] + kv.upsertClientEntry(args.ClientId, args.SeqNum, reply.Value) } kv.mu.Unlock() @@ -116,12 +121,18 @@ func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) { opType := args.Op kv.mu.Lock() - + // Dedup + clientEntry, exist := kv.getClientEntry(args.ClientId) + if exist && clientEntry.sequenceNum >= args.SeqNum { + kv.mu.Unlock() + return + } + opLog := Op{opType, args.Key, args.Value} kv.rf.Start(opLog) kv.mu.Unlock() - + putAppendMsg := <- kv.putConensusChan @@ -134,6 +145,7 @@ func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) { case "Append": kv.store[putAppendMsg.Key] += putAppendMsg.Value } + kv.upsertClientEntry(args.ClientId, args.SeqNum,"") } reply.Err = OK @@ -211,6 +223,7 @@ func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persiste kv.maxraftstate = maxraftstate // You may need initialization code here. + kv.dedupTable = make(map[int64]DedupEntry) kv.store = make(map[string]string) kv.applyCh = make(chan raft.ApplyMsg) kv.getConsensusChan = make(chan Op) From 6c3aea71bb5ecf7c5bbd154f8d517c3df6139518 Mon Sep 17 00:00:00 2001 From: JckXia Date: Sat, 30 Dec 2023 20:45:21 -0500 Subject: [PATCH 13/13] Save progress --- src/kvraft/server.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/kvraft/server.go b/src/kvraft/server.go index 18a2e12..271853d 100644 --- a/src/kvraft/server.go +++ b/src/kvraft/server.go @@ -24,6 +24,8 @@ type Op struct { // Your definitions here. // Field names must start with capital letters, // otherwise RPC will break. + ClientId int64 + SeqNum int OpType string Key string Value string @@ -78,7 +80,7 @@ func (kv *KVServer) Get(args *GetArgs, reply *GetReply) { return } - opMsg := Op{"Get", args.Key,""} + opMsg := Op{args.ClientId, args.SeqNum, "Get", args.Key,""} kv.rf.Start(opMsg) kv.mu.Unlock() @@ -89,7 +91,6 @@ func (kv *KVServer) Get(args *GetArgs, reply *GetReply) { kv.mu.Lock() if getMsg.Key == args.Key { reply.Value = kv.store[getMsg.Key] - kv.upsertClientEntry(args.ClientId, args.SeqNum, reply.Value) } kv.mu.Unlock() @@ -128,7 +129,7 @@ func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) { return } - opLog := Op{opType, args.Key, args.Value} + opLog := Op{args.ClientId, args.SeqNum, opType, args.Key, args.Value} kv.rf.Start(opLog) kv.mu.Unlock() @@ -145,7 +146,6 @@ func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) { case "Append": kv.store[putAppendMsg.Key] += putAppendMsg.Value } - kv.upsertClientEntry(args.ClientId, args.SeqNum,"") } reply.Err = OK @@ -197,6 +197,7 @@ func (kv *KVServer) readFromApplyCh() { case "Append": kv.putConensusChan <- commitedOpLog } + kv.upsertClientEntry(commitedOpLog.ClientId, commitedOpLog.SeqNum, commitedOpLog.Value) } }