Skip to content

Commit

Permalink
COMPLETE project4B & PASS
Browse files Browse the repository at this point in the history
  • Loading branch information
Metafora072 committed Jul 22, 2024
1 parent 916978e commit 7c02f10
Show file tree
Hide file tree
Showing 7 changed files with 365 additions and 4 deletions.
332 changes: 328 additions & 4 deletions kv/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"context"
"github.com/pingcap-incubator/tinykv/kv/transaction/mvcc"

"github.com/pingcap-incubator/tinykv/kv/coprocessor"
"github.com/pingcap-incubator/tinykv/kv/storage"
Expand Down Expand Up @@ -47,20 +48,343 @@ func (server *Server) Snapshot(stream tinykvpb.TinyKv_SnapshotServer) error {
return server.storage.(*raft_storage.RaftStorage).Snapshot(stream)
}

// Transactional API.
// KvGet Transactional API.
// KvGet 在提供的时间戳处从数据库中读取一个值。如果在 KvGet 请求的时候,要读取的 key 被另一个事务锁定,那么 TinyKV 应该返回一个错误。
// 否则,TinyKV 必须搜索该 key 的版本以找到最新的、有效的值。
func (server *Server) KvGet(_ context.Context, req *kvrpcpb.GetRequest) (*kvrpcpb.GetResponse, error) {
// Your Code Here (4B).
return nil, nil
/*
type GetRequest struct {
Context *Context // 一个指向 Context 结构体的指针,包含请求的上下文信息。
Key []byte // 一个字节切片,表示要读取的键。
Version uint64 // 一个无符号 64 位整数,表示读取的时间点(版本)。
}
*/
/*
type GetResponse struct {
RegionError *errorpb.Error ''
Error *KeyError
Value []byte
// True if the requested key doesn't exist; another error will not be signalled.
NotFound bool
}
*/
response := &kvrpcpb.GetResponse{}

// 获取 Reader
// 如果获取 Reader 时发生区域错误(RegionError),则将错误信息返回给客户端。
reader, err := server.storage.Reader(req.GetContext())
// 使用类型断言将 err 转换为 *raft_storage.RegionError 类型,并将结果赋值给 regionErr。
// 同时,ok 是一个布尔值,表示类型断言是否成功。如果 err 是 *raft_storage.RegionError 类型,ok 将为 true,否则为 false。
regionError, ok := err.(*raft_storage.RegionError)
if ok { // 发生区域错误(RegionError)
/*
type RegionError struct {
RequestErr *errorpb.Error
}
*/
response.RegionError = regionError.RequestErr
return response, nil
}

defer reader.Close()
// 创建事务,获取 Lock
txn := mvcc.NewMvccTxn(reader,req.GetVersion())
lock, err := txn.GetLock(req.GetKey())
regionError, ok = err.(*raft_storage.RegionError)
if ok {
response.RegionError = regionError.RequestErr
return response, nil
}

// 如果锁存在且请求的版本号大于或等于锁的时间戳,则返回锁信息,表示需要等待锁释放。
if lock != nil && req.Version >= lock.Ts {
keyError := &kvrpcpb.KeyError{
Locked: &kvrpcpb.LockInfo{
Key: req.Key,
PrimaryLock: lock.Primary,
LockVersion: lock.Ts,
LockTtl: lock.Ttl,
},
}
response.Error = keyError
}

// 尝试获取指定键的值。如果获取值时发生区域错误,则将错误信息返回给客户端。如果值不存在,则设置 NotFound 标志。
value, err := txn.GetValue(req.GetKey())
if err != nil {
regionError, ok = err.(*raft_storage.RegionError)
if ok {
response.RegionError = regionError.RequestErr
return response, nil
}
return nil, err
}

if value == nil {
response.NotFound = true
response.Value = nil
} else {
response.NotFound = false
response.Value = value
}

return response, nil
}

// KvPrewrite 是一个值被实际写入数据库的地方。一个 key 被锁定,一个 key 被存储。我们必须检查另一个事务没有锁定或写入同一个 key 。
func (server *Server) KvPrewrite(_ context.Context, req *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) {
// Your Code Here (4B).
return nil, nil
/*
type PrewriteRequest struct {
Context *Context
// 一个 Mutation 结构体的切片,包含客户端希望作为事务一部分进行的所有写操作(变更)。
Mutations []*Mutation
// Key of the primary lock.
PrimaryLock []byte
StartVersion uint64
LockTtl uint64
}
type PrewriteResponse struct {
RegionError *errorpb.Error
Errors []*KeyError
}
type Mutation struct {
Op
Key []byte
Value []byte
}
const (
Op_Put Op = 0
Op_Del Op = 1
Op_Rollback Op = 2
// Used by TinySQL but not TinyKV.
Op_Lock Op = 3
)
*/
response := &kvrpcpb.PrewriteResponse{
Errors: make([]*kvrpcpb.KeyError, 0),
}

// 获取 Reader
// 如果获取 Reader 时发生区域错误(RegionError),则将错误信息返回给客户端。
reader, err := server.storage.Reader(req.GetContext())
// 使用类型断言将 err 转换为 *raft_storage.RegionError 类型,并将结果赋值给 regionErr。
// 同时,ok 是一个布尔值,表示类型断言是否成功。如果 err 是 *raft_storage.RegionError 类型,ok 将为 true,否则为 false。
regionError, ok := err.(*raft_storage.RegionError)
if ok { // 发生区域错误(RegionError)
/*
type RegionError struct {
RequestErr *errorpb.Error
}
*/
response.RegionError = regionError.RequestErr
return response, nil
}

defer reader.Close()
// 创建事务并检测冲突
txn := mvcc.NewMvccTxn(reader,req.GetStartVersion())
for _, mutation := range req.Mutations {
// MostRecentWrite 查询传入 key 的最新 Write
write, commitTS, err := txn.MostRecentWrite(mutation.GetKey())
if err != nil {
regionError, ok := err.(*raft_storage.RegionError)
if ok {
response.RegionError = regionError.RequestErr
return response, nil
}
return nil, err
}

// 如果这两个条件都满足,说明在当前事务开始之后已经有提交的写操作,这会导致写冲突。
if write != nil && req.GetStartVersion() <= commitTS {
keyError := &kvrpcpb.KeyError{
Conflict: &kvrpcpb.WriteConflict{
StartTs: req.StartVersion,
ConflictTs: commitTS,
Key: mutation.GetKey(),
Primary: req.GetPrimaryLock(),
},
}
response.Errors = append(response.Errors, keyError)
continue
}

// 检测 Key 是否有 Lock 锁住,如果有的话则说明别的事务可能正在修改
lock, err := txn.GetLock(mutation.GetKey())
if err != nil {
regionError, ok := err.(*raft_storage.RegionError)
if ok {
response.RegionError = regionError.RequestErr
return response, nil
}
return nil, err
}
if lock != nil {
keyError := &kvrpcpb.KeyError{
Locked: &kvrpcpb.LockInfo{
Key: mutation.GetKey(),
PrimaryLock: req.GetPrimaryLock(),
LockVersion: lock.Ts,
LockTtl: lock.Ttl,
},
}
response.Errors = append(response.Errors, keyError)
continue
}

var writeKind mvcc.WriteKind

switch mutation.GetOp() {
case kvrpcpb.Op_Put:
writeKind = mvcc.WriteKindPut
txn.PutValue(mutation.GetKey(), mutation.GetValue())
case kvrpcpb.Op_Del:
writeKind = mvcc.WriteKindDelete
txn.DeleteValue(mutation.GetKey())
case kvrpcpb.Op_Rollback:
continue
case kvrpcpb.Op_Lock:
continue
}

// 加锁
addLock := &mvcc.Lock{
Primary: req.GetPrimaryLock(),
Ts: req.GetStartVersion(),
Ttl: req.GetLockTtl(),
Kind: writeKind,
}
txn.PutLock(mutation.GetKey(), addLock)
}

// 如果有出错,需要 abort 事务
if len(response.Errors) > 0 {
return response, nil
}

// 写入事务中暂存的修改到 storage 中
err = server.storage.Write(req.GetContext(),txn.Writes())
if err != nil {
regionError, ok := err.(*raft_storage.RegionError)
if ok {
response.RegionError = regionError.RequestErr
return response, nil
}
return nil, err
}
return response, nil
}

func (server *Server) KvCommit(_ context.Context, req *kvrpcpb.CommitRequest) (*kvrpcpb.CommitResponse, error) {
// Your Code Here (4B).
return nil, nil
/*
type CommitRequest struct {
Context *Context
// Identifies the transaction, must match the start_version in the transaction's
// prewrite request.
StartVersion uint64
// Must match the keys mutated by the transaction's prewrite request.
Keys [][]byte
// Must be greater than start_version.
CommitVersion uint64
}
// Empty if the commit is successful.
type CommitResponse struct {
RegionError *errorpb.Error ''
Error *KeyError
}
*/
response := &kvrpcpb.CommitResponse{}

// 获取 Reader
// 如果获取 Reader 时发生区域错误(RegionError),则将错误信息返回给客户端。
reader, err := server.storage.Reader(req.GetContext())
// 使用类型断言将 err 转换为 *raft_storage.RegionError 类型,并将结果赋值给 regionErr。
// 同时,ok 是一个布尔值,表示类型断言是否成功。如果 err 是 *raft_storage.RegionError 类型,ok 将为 true,否则为 false。
regionError, ok := err.(*raft_storage.RegionError)
if ok { // 发生区域错误(RegionError)
/*
type RegionError struct {
RequestErr *errorpb.Error
}
*/
response.RegionError = regionError.RequestErr
return response, nil
}

defer reader.Close()
// 创建事务并等待锁
txn := mvcc.NewMvccTxn(reader,req.GetStartVersion())
// 等待所有需要提交的键的锁,确保事务的原子性。使用 defer 确保在函数结束时释放锁。
server.Latches.WaitForLatches(req.GetKeys())
defer server.Latches.ReleaseLatches(req.GetKeys())

for _, curKey := range req.GetKeys() {
// CurrentWrite 查询当前事务(根据 start timestamp)下,传入 key 的最新 Write。
write,_ ,err := txn.CurrentWrite(curKey)
if err != nil {
regionError, ok := err.(*raft_storage.RegionError)
if ok {
response.RegionError = regionError.RequestErr
return response, nil
}
return nil, err
}

// 检查是否重复提交
if write != nil && write.Kind != mvcc.WriteKindRollback && req.GetStartVersion() == write.StartTS {
return response, nil
}

// 检查 curKey 的 lock 是否还存在
lock, err := txn.GetLock(curKey)
if err != nil {
regionError, ok := err.(*raft_storage.RegionError)
if ok {
response.RegionError = regionError.RequestErr
return response, nil
}
return nil, err
}
if lock == nil {
keyError := &kvrpcpb.KeyError{
Retryable: "true",
}
response.Error = keyError
return response, nil
}

if lock.Ts != req.GetStartVersion() {
keyError := &kvrpcpb.KeyError{
Retryable: "true",
}
response.Error = keyError
return response, nil
}

// 正常提交事务
addWrite := &mvcc.Write{
StartTS: req.GetStartVersion(),
Kind: lock.Kind,
}
// 将提交版本的写入记录写入事务,并删除键的锁。
txn.PutWrite(curKey,req.GetCommitVersion(),addWrite)
txn.DeleteLock(curKey)
}

// 写入事务中暂存的修改到 storage 中
err = server.storage.Write(req.GetContext(),txn.Writes())
if err != nil {
regionError, ok := err.(*raft_storage.RegionError)
if ok {
response.RegionError = regionError.RequestErr
return response, nil
}
return nil, err
}
return response, nil
}

func (server *Server) KvScan(_ context.Context, req *kvrpcpb.ScanRequest) (*kvrpcpb.ScanResponse, error) {
Expand Down
Binary file added note/project4.assets/QQ截图20240722230151.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added note/project4.assets/QQ截图20240723002110.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added note/project4.assets/QQ截图20240723003649.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added note/project4.assets/QQ截图20240723022253.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added note/project4.assets/QQ截图20240723022822.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
37 changes: 37 additions & 0 deletions note/project4.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
## 介绍

**4A:** 本部分是对 mvcc 模块的实现,供 project4B/C 调用,主要涉及修改的代码文件是 transaction.go。需要利用对 CFLock, CFDefault 和 CFWrite 三个 CF 的一些操作来实现 mvcc。

**4B:** project4B 主要实现事务的两段提交,即 prewrite 和 commit,需要完善的文件是 server.go。要注意的是,这要需要通过 server.Latches 对 keys 进行加锁。

**4C:**



## 记录

4A `transaction.go`中的 CurrentWrite函数,为什么要判断?

![](project4.assets/QQ截图20240722220046.png)

4B 注意 Region error

![](project4.assets/QQ截图20240723002110.png)



4B KvGet方法,图中如果不判断 RegionError 会怎样?

![](project4.assets/QQ截图20240723003649.png)

4B KvCommit方法,图中如果不判断会怎样?

![](project4.assets/QQ截图20240723022253.png)

4B KvCommit方法,图中如果不判断会怎样?

![](project4.assets/QQ截图20240723022822.png)

## 说明

![ ](project4.assets/QQ截图20240722230151.png)

0 comments on commit 7c02f10

Please sign in to comment.