Skip to content

Commit

Permalink
pkg/etcd: support do some operations in one transaction (#296)
Browse files Browse the repository at this point in the history
  • Loading branch information
WangXiangUSTC authored Dec 2, 2019
1 parent 451c58d commit e8675e7
Show file tree
Hide file tree
Showing 2 changed files with 269 additions and 0 deletions.
86 changes: 86 additions & 0 deletions pkg/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package etcd
import (
"context"
"crypto/tls"
"fmt"
"path"
"strings"
"time"
Expand All @@ -30,6 +31,36 @@ type Node struct {
Childs map[string]*Node
}

// OpType is operation's type in etcd
type OpType string

var (
// CreateOp is create operation type
CreateOp OpType = "create"

// UpdateOp is update operation type
UpdateOp OpType = "update"

// DeleteOp is delete operation type
DeleteOp OpType = "delete"
)

// Operation represents an operation in etcd, include create, update and delete.
type Operation struct {
Tp OpType
Key string
Value string
TTL int64
WithPrefix bool

Opts []clientv3.OpOption
}

// String implements Stringer interface.
func (o *Operation) String() string {
return fmt.Sprintf("{Tp: %s, Key: %s, Value: %s, TTL: %d, WithPrefix: %v, Opts: %v}", o.Tp, o.Key, o.Value, o.TTL, o.WithPrefix, o.Opts)
}

// Client is a wrapped etcd client that support some simple method
type Client struct {
client *clientv3.Client
Expand Down Expand Up @@ -217,6 +248,61 @@ func (e *Client) Watch(ctx context.Context, prefix string, revision int64) clien
return e.client.Watch(ctx, prefix, clientv3.WithPrefix())
}

// DoTxn does some operation in one transaction.
// Note: should only have one opereration for one key, otherwise will get duplicate key error.
func (e *Client) DoTxn(ctx context.Context, operations []*Operation) (int64, error) {
cmps := make([]clientv3.Cmp, 0, len(operations))
ops := make([]clientv3.Op, 0, len(operations))

for _, operation := range operations {
operation.Key = keyWithPrefix(e.rootPath, operation.Key)

if operation.TTL > 0 {
if operation.Tp == DeleteOp {
return 0, errors.Errorf("unexpected TTL in delete operation")
}

lcr, err := e.client.Lease.Grant(ctx, operation.TTL)
if err != nil {
return 0, errors.Trace(err)
}
operation.Opts = append(operation.Opts, clientv3.WithLease(lcr.ID))
}

if operation.WithPrefix {
operation.Opts = append(operation.Opts, clientv3.WithPrefix())
}

switch operation.Tp {
case CreateOp:
cmps = append(cmps, clientv3.Compare(clientv3.ModRevision(operation.Key), "=", 0))
ops = append(ops, clientv3.OpPut(operation.Key, operation.Value, operation.Opts...))
case UpdateOp:
cmps = append(cmps, clientv3.Compare(clientv3.ModRevision(operation.Key), ">", 0))
ops = append(ops, clientv3.OpPut(operation.Key, operation.Value, operation.Opts...))
case DeleteOp:
ops = append(ops, clientv3.OpDelete(operation.Key, operation.Opts...))
default:
return 0, errors.Errorf("unknown operation type %s", operation.Tp)
}
}

txnResp, err := e.client.KV.Txn(ctx).If(
cmps...,
).Then(
ops...,
).Commit()
if err != nil {
return 0, errors.Trace(err)
}

if !txnResp.Succeeded {
return 0, errors.Errorf("do transaction failed, operations: %+v", operations)
}

return txnResp.Header.Revision, nil
}

func parseToDirTree(root *Node, path string) *Node {
pathDirs := strings.Split(path, "/")
current := root
Expand Down
183 changes: 183 additions & 0 deletions pkg/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,189 @@ func (t *testEtcdSuite) TestDelete(c *C) {
c.Assert(root.Childs, HasLen, 0)
}

func (t *testEtcdSuite) TestDoTxn(c *C) {
// case1: create two keys in one transaction
ops := []*Operation{
{
Tp: CreateOp,
Key: "test1",
Value: "1",
}, {
Tp: CreateOp,
Key: "test2",
Value: "2",
},
}
revision, err := etcdCli.DoTxn(context.Background(), ops)
c.Assert(err, IsNil)

value1, revision1, err := etcdCli.Get(context.Background(), "test1")
c.Assert(err, IsNil)
c.Assert(string(value1), Equals, "1")
c.Assert(revision1, Equals, revision)

value2, revision2, err := etcdCli.Get(context.Background(), "test2")
c.Assert(err, IsNil)
c.Assert(string(value2), Equals, "2")
c.Assert(revision2, Equals, revision)

// case2: delete, update and create in one transaction
ops = []*Operation{
{
Tp: DeleteOp,
Key: "test1",
}, {
Tp: UpdateOp,
Key: "test2",
Value: "22",
}, {
Tp: CreateOp,
Key: "test3",
Value: "3",
},
}

revision, err = etcdCli.DoTxn(context.Background(), ops)
c.Assert(err, IsNil)

value1, _, err = etcdCli.Get(context.Background(), "test1")
c.Assert(err, ErrorMatches, ".* not found")

value2, revision2, err = etcdCli.Get(context.Background(), "test2")
c.Assert(err, IsNil)
c.Assert(string(value2), Equals, "22")
c.Assert(revision2, Equals, revision)

value3, revision3, err := etcdCli.Get(context.Background(), "test3")
c.Assert(err, IsNil)
c.Assert(string(value3), Equals, "3")
c.Assert(revision3, Equals, revision)

// case3: create keys with TTL
ops = []*Operation{
{
Tp: CreateOp,
Key: "test4",
Value: "4",
TTL: 1,
}, {
Tp: CreateOp,
Key: "test5",
Value: "5",
},
}
revision, err = etcdCli.DoTxn(context.Background(), ops)
c.Assert(err, IsNil)

value4, revision4, err := etcdCli.Get(context.Background(), "test4")
c.Assert(err, IsNil)
c.Assert(string(value4), Equals, "4")
c.Assert(revision4, Equals, revision)

value5, revision5, err := etcdCli.Get(context.Background(), "test5")
c.Assert(err, IsNil)
c.Assert(string(value5), Equals, "5")
c.Assert(revision5, Equals, revision)

// sleep 2 seconds and this key will be deleted
time.Sleep(2 * time.Second)
_, _, err = etcdCli.Get(context.Background(), "test4")
c.Assert(err, ErrorMatches, ".* not found")

// case4: do transaction failed because key is deleted, so can't update
ops = []*Operation{
{
Tp: CreateOp,
Key: "test4",
Value: "4",
}, {
Tp: UpdateOp, // key test1 is deleted, so will update failed
Key: "test1",
Value: "11",
},
}

revision, err = etcdCli.DoTxn(context.Background(), ops)
c.Assert(err, ErrorMatches, "do transaction failed.*")

_, _, err = etcdCli.Get(context.Background(), "test4")
c.Assert(err, ErrorMatches, ".* not found")

// case5: do transaction failed because can't operate one key in one transaction
ops = []*Operation{
{
Tp: CreateOp,
Key: "test6",
Value: "6",
}, {
Tp: UpdateOp,
Key: "test6",
Value: "66",
},
}

_, err = etcdCli.DoTxn(context.Background(), ops)
c.Assert(err, ErrorMatches, "etcdserver: duplicate key given in txn request")

_, _, err = etcdCli.Get(context.Background(), "test6")
c.Assert(err, ErrorMatches, ".* not found")

// case6: do transaction failed because can't create an existing key
ops = []*Operation{
{
Tp: CreateOp,
Key: "test2", // already exist
Value: "222",
}, {
Tp: UpdateOp,
Key: "test5",
Value: "555",
},
}

_, err = etcdCli.DoTxn(context.Background(), ops)
c.Assert(err, ErrorMatches, "do transaction failed.*")

value2, _, err = etcdCli.Get(context.Background(), "test2")
c.Assert(err, IsNil)
c.Assert(string(value2), Equals, "22")

value5, _, err = etcdCli.Get(context.Background(), "test5")
c.Assert(err, IsNil)
c.Assert(string(value5), Equals, "5")

// case7: delete not exist key but will do transaction success
ops = []*Operation{
{
Tp: DeleteOp,
Key: "test7", // not exist
}, {
Tp: CreateOp,
Key: "test8",
Value: "8",
},
}

_, err = etcdCli.DoTxn(context.Background(), ops)
c.Assert(err, IsNil)

value8, _, err := etcdCli.Get(context.Background(), "test8")
c.Assert(err, IsNil)
c.Assert(string(value8), Equals, "8")

// case8: do transaction failed because can't set TTL for delete operation
ops = []*Operation{
{
Tp: DeleteOp,
Key: "test8",
TTL: 1,
},
}

_, err = etcdCli.DoTxn(context.Background(), ops)
c.Assert(err, ErrorMatches, "unexpected TTL in delete operation")
}

func testSetup(t *testing.T) (context.Context, *Client, *integration.ClusterV3) {
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
etcd := NewClient(cluster.RandClient(), "binlog")
Expand Down

0 comments on commit e8675e7

Please sign in to comment.