diff --git a/src/kvraft/client.go b/src/kvraft/client.go index 7e47579..0d79ad4 100644 --- a/src/kvraft/client.go +++ b/src/kvraft/client.go @@ -3,10 +3,14 @@ package kvraft import "6.824/labrpc" import "crypto/rand" import "math/big" + + type Clerk struct { + clientId int64 servers []*labrpc.ClientEnd + seqNum int // You will have to modify this struct. } @@ -17,9 +21,16 @@ func nrand() int64 { return x } +func (ck *Clerk) scheduleNextServer(clientId int) int { + return (clientId + 1) % len(ck.servers) +} + func MakeClerk(servers []*labrpc.ClientEnd) *Clerk { + ck := new(Clerk) + ck.clientId = nrand() ck.servers = servers + ck.seqNum = 0 // You'll have to add code here. return ck } @@ -39,9 +50,23 @@ func MakeClerk(servers []*labrpc.ClientEnd) *Clerk { func (ck *Clerk) Get(key string) string { // You will have to modify this function. - return "" + getRequest := GetArgs{ck.clientId, ck.seqNum, key} + getReply := GetReply{} + serverId := 0 + + for getReply.Err != OK { + ok := ck.servers[serverId].Call("KVServer.Get", &getRequest, &getReply) + + if !ok { + // Network issues/etc + } + serverId = ck.scheduleNextServer(serverId) + } + ck.seqNum++ + return getReply.Value } + // // shared by Put and Append. // @@ -54,6 +79,19 @@ func (ck *Clerk) Get(key string) string { // func (ck *Clerk) PutAppend(key string, value string, op string) { // You will have to modify this function. + putRequest := PutAppendArgs{ck.clientId, ck.seqNum, key, value, op} + putResponse := PutAppendReply{} + serverId := 0 + + for putResponse.Err != OK { + + ok := ck.servers[serverId].Call("KVServer.PutAppend", &putRequest, &putResponse) + if !ok { + + } + serverId = ck.scheduleNextServer(serverId) + } + ck.seqNum++ } func (ck *Clerk) Put(key string, value string) { @@ -62,3 +100,5 @@ func (ck *Clerk) Put(key string, value string) { func (ck *Clerk) Append(key string, value string) { ck.PutAppend(key, value, "Append") } + + diff --git a/src/kvraft/common.go b/src/kvraft/common.go index e5ee442..e6dd473 100644 --- a/src/kvraft/common.go +++ b/src/kvraft/common.go @@ -5,11 +5,12 @@ const ( ErrNoKey = "ErrNoKey" ErrWrongLeader = "ErrWrongLeader" ) - type Err string // Put or Append type PutAppendArgs struct { + ClientId int64 + SeqNum int Key string Value string Op string // "Put" or "Append" @@ -23,6 +24,8 @@ type PutAppendReply struct { } type GetArgs struct { + ClientId int64 + SeqNum int Key string // You'll have to add definitions here. } diff --git a/src/kvraft/server.go b/src/kvraft/server.go index 01f12c6..271853d 100644 --- a/src/kvraft/server.go +++ b/src/kvraft/server.go @@ -7,6 +7,7 @@ import ( "log" "sync" "sync/atomic" + "fmt" ) const Debug = false @@ -23,6 +24,17 @@ type Op struct { // Your definitions here. // Field names must start with capital letters, // otherwise RPC will break. + ClientId int64 + SeqNum int + OpType string + Key string + Value string +} + +// Put() commands should have an empty string as value +type DedupEntry struct { + sequenceNum int + value string } type KVServer struct { @@ -32,18 +44,120 @@ type KVServer struct { applyCh chan raft.ApplyMsg dead int32 // set by Kill() + getConsensusChan chan Op + putConensusChan chan Op maxraftstate int // snapshot if log grows this big - // Your definitions here. + store map[string]string + dedupTable map[int64]DedupEntry } +// Operations on the dedupTable +func (kv *KVServer) getClientEntry(clientId int64) (DedupEntry, bool) { + entry, exists := kv.dedupTable[clientId] + return entry, exists +} + +func (kv *KVServer) upsertClientEntry(clientId int64, sequenceNum int, value string) { + kv.dedupTable[clientId] = DedupEntry{sequenceNum, value} +} func (kv *KVServer) Get(args *GetArgs, reply *GetReply) { - // Your code here. + + + if !kv.isLeader() { + reply.Err = ErrWrongLeader + return + } + + kv.mu.Lock() + + clientEntry, exist := kv.getClientEntry(args.ClientId) + + if exist && clientEntry.sequenceNum >= args.SeqNum { + reply.Value = clientEntry.value + kv.mu.Unlock() + return + } + + opMsg := Op{args.ClientId, args.SeqNum, "Get", args.Key,""} + kv.rf.Start(opMsg) + + kv.mu.Unlock() + + //TODO Loop/range till uuid found? + getMsg := <- kv.getConsensusChan + + kv.mu.Lock() + if getMsg.Key == args.Key { + reply.Value = kv.store[getMsg.Key] + } + kv.mu.Unlock() + + //fmt.Printf("Get request processed by leader %d \n", kv.me) + reply.Err = OK + return +} + +func serializePutAppendArgs(args *PutAppendArgs) string { + s := fmt.Sprintf("(K: %s, V: %s, OP: %s)", args.Key, args.Value, args.Op) + return s +} + +func serializeOpMsg(msg Op) string { + s := fmt.Sprintf("(K: %s, V: %s, OpTye: %s)", msg.Key, msg.Value, msg.OpType) + return s } func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) { // Your code here. + + + if !kv.isLeader() { + reply.Err = ErrWrongLeader + return + } + + //fmt.Printf("Append request %s received by leader %d \n", serializePutAppendArgs(args), kv.me) + opType := args.Op + kv.mu.Lock() + + // Dedup + clientEntry, exist := kv.getClientEntry(args.ClientId) + if exist && clientEntry.sequenceNum >= args.SeqNum { + kv.mu.Unlock() + return + } + + opLog := Op{args.ClientId, args.SeqNum, opType, args.Key, args.Value} + kv.rf.Start(opLog) + kv.mu.Unlock() + + + putAppendMsg := <- kv.putConensusChan + + + kv.mu.Lock() + + if putAppendMsg.OpType == args.Op && putAppendMsg.Key == args.Key && putAppendMsg.Value == args.Value { + switch putAppendMsg.OpType { + case "Put": + kv.store[putAppendMsg.Key] = putAppendMsg.Value + case "Append": + kv.store[putAppendMsg.Key] += putAppendMsg.Value + } + } + + reply.Err = OK + kv.mu.Unlock() + + + return +} + +func (kv *KVServer) isLeader() bool { + _, isLeader := kv.rf.GetState() + return isLeader } // @@ -67,6 +181,26 @@ func (kv *KVServer) killed() bool { return z == 1 } +func (kv *KVServer) readFromApplyCh() { + for kv.killed() == false { + appliedMsg := <- kv.rf.GetApplyCh() + // commitedOperation := appliedMsg.Command + commitedOpLog := appliedMsg.Command.(Op) + // fmt.Println(operationLog.OpType) + // fmt.Printf("Msg arrived! %s\n", serializeOpMsg(commitedOpLog)) + //DebugP(dKv, "Applied msg received! %s", commitedOpLog.OpType) + switch commitedOpLog.OpType { + case "Get": + kv.getConsensusChan <- commitedOpLog + case "Put": + kv.putConensusChan <- commitedOpLog + case "Append": + kv.putConensusChan <- commitedOpLog + } + kv.upsertClientEntry(commitedOpLog.ClientId, commitedOpLog.SeqNum, commitedOpLog.Value) + } +} + // // servers[] contains the ports of the set of // servers that will cooperate via Raft to @@ -85,17 +219,20 @@ func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persiste // call labgob.Register on structures you want // Go's RPC library to marshall/unmarshall. labgob.Register(Op{}) - kv := new(KVServer) kv.me = me kv.maxraftstate = maxraftstate // You may need initialization code here. - + kv.dedupTable = make(map[int64]DedupEntry) + kv.store = make(map[string]string) kv.applyCh = make(chan raft.ApplyMsg) + kv.getConsensusChan = make(chan Op) + kv.putConensusChan = make(chan Op) + kv.rf = raft.Make(servers, me, persister, kv.applyCh) // You may need initialization code here. - + go kv.readFromApplyCh() return kv } diff --git a/src/raft/raft.go b/src/raft/raft.go index a0796cd..eaa0bc2 100644 --- a/src/raft/raft.go +++ b/src/raft/raft.go @@ -1124,12 +1124,12 @@ func (rf *Raft) killed() bool { func (rf *Raft) dedicatedApplier(applyMsgs [] ApplyMsg) { for _, v := range applyMsgs { rf.applyCh <- v - rf.mu.Lock() - rf.lastApplied = v.CommandIndex - DebugP(dLast, "S%d commiting log, updating lastApplied to %d", rf.me, rf.lastApplied) + // rf.mu.Lock() + // rf.lastApplied = v.CommandIndex + // DebugP(dLast, "S%d commiting log, updating lastApplied to %d", rf.me, rf.lastApplied) - DebugP(dCommit, "S%d applying log [(%s)] at term %d, CI %d, isLeader: %v", rf.me, v.Command , rf.currentTerm, rf.lastApplied, rf.nodeStatus == Leader) - rf.mu.Unlock() + // DebugP(dSnap, "S%d applying log [(%s)] at term %d, CI %d, isLeader: %v, msgLen: %d", rf.me, v.Command , rf.currentTerm, rf.lastApplied, rf.nodeStatus == Leader, len(applyMsgs)) + // rf.mu.Unlock() } } @@ -1148,6 +1148,7 @@ func (rf *Raft) lifeCycleManager() { DebugP(dLast,"S%d lastApplied is %d", rf.me, rf.lastApplied) for lastApplied < rf.commitIndex { lastApplied++ + rf.lastApplied++ logEntryToCommit := rf.getLogAtIndex(lastApplied) applyMsg := ApplyMsg{true, logEntryToCommit.Command, lastApplied, false,nil, 0,0} @@ -1359,6 +1360,10 @@ func (rf * Raft) bootStrapState(hostServerId int) { rf.lastIncludedIdx = 0 rf.lastIncludedTerm = 0 } + +func (rf *Raft) GetApplyCh() chan ApplyMsg { + return rf.applyCh +} // // the service or tester wants to create a Raft server. the ports // of all the Raft servers (including this one) are in peers[]. this