From e8675e78a036de08afb059d1bf299693166939be Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Mon, 2 Dec 2019 12:33:42 +0800 Subject: [PATCH] pkg/etcd: support do some operations in one transaction (#296) --- pkg/etcd/etcd.go | 86 ++++++++++++++++++++ pkg/etcd/etcd_test.go | 183 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 269 insertions(+) diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index d2b65fde2..1e4d0957b 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -16,6 +16,7 @@ package etcd import ( "context" "crypto/tls" + "fmt" "path" "strings" "time" @@ -30,6 +31,36 @@ type Node struct { Childs map[string]*Node } +// OpType is operation's type in etcd +type OpType string + +var ( + // CreateOp is create operation type + CreateOp OpType = "create" + + // UpdateOp is update operation type + UpdateOp OpType = "update" + + // DeleteOp is delete operation type + DeleteOp OpType = "delete" +) + +// Operation represents an operation in etcd, include create, update and delete. +type Operation struct { + Tp OpType + Key string + Value string + TTL int64 + WithPrefix bool + + Opts []clientv3.OpOption +} + +// String implements Stringer interface. +func (o *Operation) String() string { + return fmt.Sprintf("{Tp: %s, Key: %s, Value: %s, TTL: %d, WithPrefix: %v, Opts: %v}", o.Tp, o.Key, o.Value, o.TTL, o.WithPrefix, o.Opts) +} + // Client is a wrapped etcd client that support some simple method type Client struct { client *clientv3.Client @@ -217,6 +248,61 @@ func (e *Client) Watch(ctx context.Context, prefix string, revision int64) clien return e.client.Watch(ctx, prefix, clientv3.WithPrefix()) } +// DoTxn does some operation in one transaction. +// Note: should only have one opereration for one key, otherwise will get duplicate key error. +func (e *Client) DoTxn(ctx context.Context, operations []*Operation) (int64, error) { + cmps := make([]clientv3.Cmp, 0, len(operations)) + ops := make([]clientv3.Op, 0, len(operations)) + + for _, operation := range operations { + operation.Key = keyWithPrefix(e.rootPath, operation.Key) + + if operation.TTL > 0 { + if operation.Tp == DeleteOp { + return 0, errors.Errorf("unexpected TTL in delete operation") + } + + lcr, err := e.client.Lease.Grant(ctx, operation.TTL) + if err != nil { + return 0, errors.Trace(err) + } + operation.Opts = append(operation.Opts, clientv3.WithLease(lcr.ID)) + } + + if operation.WithPrefix { + operation.Opts = append(operation.Opts, clientv3.WithPrefix()) + } + + switch operation.Tp { + case CreateOp: + cmps = append(cmps, clientv3.Compare(clientv3.ModRevision(operation.Key), "=", 0)) + ops = append(ops, clientv3.OpPut(operation.Key, operation.Value, operation.Opts...)) + case UpdateOp: + cmps = append(cmps, clientv3.Compare(clientv3.ModRevision(operation.Key), ">", 0)) + ops = append(ops, clientv3.OpPut(operation.Key, operation.Value, operation.Opts...)) + case DeleteOp: + ops = append(ops, clientv3.OpDelete(operation.Key, operation.Opts...)) + default: + return 0, errors.Errorf("unknown operation type %s", operation.Tp) + } + } + + txnResp, err := e.client.KV.Txn(ctx).If( + cmps..., + ).Then( + ops..., + ).Commit() + if err != nil { + return 0, errors.Trace(err) + } + + if !txnResp.Succeeded { + return 0, errors.Errorf("do transaction failed, operations: %+v", operations) + } + + return txnResp.Header.Revision, nil +} + func parseToDirTree(root *Node, path string) *Node { pathDirs := strings.Split(path, "/") current := root diff --git a/pkg/etcd/etcd_test.go b/pkg/etcd/etcd_test.go index faa978e05..9c8810955 100644 --- a/pkg/etcd/etcd_test.go +++ b/pkg/etcd/etcd_test.go @@ -193,6 +193,189 @@ func (t *testEtcdSuite) TestDelete(c *C) { c.Assert(root.Childs, HasLen, 0) } +func (t *testEtcdSuite) TestDoTxn(c *C) { + // case1: create two keys in one transaction + ops := []*Operation{ + { + Tp: CreateOp, + Key: "test1", + Value: "1", + }, { + Tp: CreateOp, + Key: "test2", + Value: "2", + }, + } + revision, err := etcdCli.DoTxn(context.Background(), ops) + c.Assert(err, IsNil) + + value1, revision1, err := etcdCli.Get(context.Background(), "test1") + c.Assert(err, IsNil) + c.Assert(string(value1), Equals, "1") + c.Assert(revision1, Equals, revision) + + value2, revision2, err := etcdCli.Get(context.Background(), "test2") + c.Assert(err, IsNil) + c.Assert(string(value2), Equals, "2") + c.Assert(revision2, Equals, revision) + + // case2: delete, update and create in one transaction + ops = []*Operation{ + { + Tp: DeleteOp, + Key: "test1", + }, { + Tp: UpdateOp, + Key: "test2", + Value: "22", + }, { + Tp: CreateOp, + Key: "test3", + Value: "3", + }, + } + + revision, err = etcdCli.DoTxn(context.Background(), ops) + c.Assert(err, IsNil) + + value1, _, err = etcdCli.Get(context.Background(), "test1") + c.Assert(err, ErrorMatches, ".* not found") + + value2, revision2, err = etcdCli.Get(context.Background(), "test2") + c.Assert(err, IsNil) + c.Assert(string(value2), Equals, "22") + c.Assert(revision2, Equals, revision) + + value3, revision3, err := etcdCli.Get(context.Background(), "test3") + c.Assert(err, IsNil) + c.Assert(string(value3), Equals, "3") + c.Assert(revision3, Equals, revision) + + // case3: create keys with TTL + ops = []*Operation{ + { + Tp: CreateOp, + Key: "test4", + Value: "4", + TTL: 1, + }, { + Tp: CreateOp, + Key: "test5", + Value: "5", + }, + } + revision, err = etcdCli.DoTxn(context.Background(), ops) + c.Assert(err, IsNil) + + value4, revision4, err := etcdCli.Get(context.Background(), "test4") + c.Assert(err, IsNil) + c.Assert(string(value4), Equals, "4") + c.Assert(revision4, Equals, revision) + + value5, revision5, err := etcdCli.Get(context.Background(), "test5") + c.Assert(err, IsNil) + c.Assert(string(value5), Equals, "5") + c.Assert(revision5, Equals, revision) + + // sleep 2 seconds and this key will be deleted + time.Sleep(2 * time.Second) + _, _, err = etcdCli.Get(context.Background(), "test4") + c.Assert(err, ErrorMatches, ".* not found") + + // case4: do transaction failed because key is deleted, so can't update + ops = []*Operation{ + { + Tp: CreateOp, + Key: "test4", + Value: "4", + }, { + Tp: UpdateOp, // key test1 is deleted, so will update failed + Key: "test1", + Value: "11", + }, + } + + revision, err = etcdCli.DoTxn(context.Background(), ops) + c.Assert(err, ErrorMatches, "do transaction failed.*") + + _, _, err = etcdCli.Get(context.Background(), "test4") + c.Assert(err, ErrorMatches, ".* not found") + + // case5: do transaction failed because can't operate one key in one transaction + ops = []*Operation{ + { + Tp: CreateOp, + Key: "test6", + Value: "6", + }, { + Tp: UpdateOp, + Key: "test6", + Value: "66", + }, + } + + _, err = etcdCli.DoTxn(context.Background(), ops) + c.Assert(err, ErrorMatches, "etcdserver: duplicate key given in txn request") + + _, _, err = etcdCli.Get(context.Background(), "test6") + c.Assert(err, ErrorMatches, ".* not found") + + // case6: do transaction failed because can't create an existing key + ops = []*Operation{ + { + Tp: CreateOp, + Key: "test2", // already exist + Value: "222", + }, { + Tp: UpdateOp, + Key: "test5", + Value: "555", + }, + } + + _, err = etcdCli.DoTxn(context.Background(), ops) + c.Assert(err, ErrorMatches, "do transaction failed.*") + + value2, _, err = etcdCli.Get(context.Background(), "test2") + c.Assert(err, IsNil) + c.Assert(string(value2), Equals, "22") + + value5, _, err = etcdCli.Get(context.Background(), "test5") + c.Assert(err, IsNil) + c.Assert(string(value5), Equals, "5") + + // case7: delete not exist key but will do transaction success + ops = []*Operation{ + { + Tp: DeleteOp, + Key: "test7", // not exist + }, { + Tp: CreateOp, + Key: "test8", + Value: "8", + }, + } + + _, err = etcdCli.DoTxn(context.Background(), ops) + c.Assert(err, IsNil) + + value8, _, err := etcdCli.Get(context.Background(), "test8") + c.Assert(err, IsNil) + c.Assert(string(value8), Equals, "8") + + // case8: do transaction failed because can't set TTL for delete operation + ops = []*Operation{ + { + Tp: DeleteOp, + Key: "test8", + TTL: 1, + }, + } + + _, err = etcdCli.DoTxn(context.Background(), ops) + c.Assert(err, ErrorMatches, "unexpected TTL in delete operation") +} + func testSetup(t *testing.T) (context.Context, *Client, *integration.ClusterV3) { cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) etcd := NewClient(cluster.RandClient(), "binlog")