Skip to content

Commit

Permalink
Add Xor
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangxu19830126 committed Nov 5, 2019
1 parent 11486c3 commit 215f996
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 13 deletions.
5 changes: 1 addition & 4 deletions pkg/proxy/aggregation.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package proxy

import (
"bytes"
"strconv"

"github.com/deepfabric/elasticell/pkg/pb/raftcmdpb"
)

var (
sep = []byte("#")
cmdSet = []byte("SET")
cmdGet = []byte("GET")
)
Expand Down Expand Up @@ -45,8 +43,7 @@ func isAggregationPart(id []byte) bool {
}

func parseAggregationPart(id []byte) ([]byte, int) {
data := bytes.Split(id, sep)
return data[0], parseStrInt64(data[1])
return id[0:16], parseStrInt64(id[16:])
}

func parseStrInt64(data []byte) int {
Expand Down
39 changes: 35 additions & 4 deletions pkg/proxy/aggregation_bitmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ func (p *RedisProxy) doBMOr(rs *redisSession, cmd redis.Command) (bool, error) {
return p.doBMAggregation(rs, cmd, p.doBMOrMerge)
}

// bmxor [withbm|start count] bm1 bm2 [bm3 bm4]
func (p *RedisProxy) doBMXor(rs *redisSession, cmd redis.Command) (bool, error) {
return p.doBMAggregation(rs, cmd, p.doBMXorMerge)
}

// bmandnot [withbm|start count] bm1 bm2 [bm3 bm4]
func (p *RedisProxy) doBMAndNot(rs *redisSession, cmd redis.Command) (bool, error) {
return p.doBMAggregation(rs, cmd, p.doBMAndNotMerge)
Expand All @@ -35,12 +40,12 @@ func (p *RedisProxy) doBMAggregation(rs *redisSession, cmd redis.Command, mergeF
offset := 1
n := len(cmd.Args())
if n < 3 {
return false, nil
return false, errInvalidCommand
}

if strings.ToUpper(hack.SliceToString(cmd.Args()[0])) != optionWithBM {
if n < 4 {
return false, nil
return false, errInvalidCommand
}

offset = 2
Expand All @@ -51,7 +56,7 @@ func (p *RedisProxy) doBMAggregation(rs *redisSession, cmd redis.Command, mergeF

for idx, key := range cmd.Args()[offset:] {
cmd := redis.Command([][]byte{cmdGet, key})
p.addToForward(newReqUUID(bytes.Join([][]byte{id, format.UInt64ToString(uint64(idx))}, sep), cmd, rs))
p.addToForward(newReqUUID(append(id, format.UInt64ToString(uint64(idx))...), cmd, rs))
}

return true, nil
Expand Down Expand Up @@ -117,7 +122,7 @@ func (p *RedisProxy) doBMOrMerge(args [][]byte, rsps ...*raftcmdpb.Response) *ra
return p.buildResult(bm, args)
}

func (p *RedisProxy) doBMAndNotMerge(args [][]byte, rsps ...*raftcmdpb.Response) *raftcmdpb.Response {
func (p *RedisProxy) doBMXorMerge(args [][]byte, rsps ...*raftcmdpb.Response) *raftcmdpb.Response {
bm := roaring.NewBTreeBitmap()
tmp := roaring.NewBTreeBitmap()
var target *roaring.Bitmap
Expand Down Expand Up @@ -147,6 +152,32 @@ func (p *RedisProxy) doBMAndNotMerge(args [][]byte, rsps ...*raftcmdpb.Response)
return p.buildResult(bm, args)
}

func (p *RedisProxy) doBMAndNotMerge(args [][]byte, rsps ...*raftcmdpb.Response) *raftcmdpb.Response {
targets := make([]*roaring.Bitmap, 0, len(rsps))
for _, rsp := range rsps {
if len(rsp.BulkResult) > 0 {
bm := roaring.NewBTreeBitmap()
targets = append(targets, bm)
_, _, err := bm.ImportRoaringBits(rsp.BulkResult, false, false, 0)
if err != nil {
return &raftcmdpb.Response{
ErrorResult: hack.StringToSlice(err.Error()),
}
}
}
}

union := targets[0]
and := targets[0]

for _, bm := range targets[1:] {
union = union.Union(bm)
and = and.Intersect(bm)
}

return p.buildResult(union.Xor(and), args)
}

func (p *RedisProxy) buildResult(bm *roaring.Bitmap, args [][]byte) *raftcmdpb.Response {
log.Debugf("bm and args: %+v", args)
if len(args) < 2 {
Expand Down
13 changes: 8 additions & 5 deletions pkg/proxy/aggregation_string.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
package proxy

import (
"bytes"
"fmt"
"errors"

"github.com/deepfabric/elasticell/pkg/pb/raftcmdpb"
"github.com/fagongzi/goetty/protocol/redis"
"github.com/fagongzi/util/format"
)

var (
errInvalidCommand = errors.New("invalid command")
)

func (p *RedisProxy) doMGet(rs *redisSession, cmd redis.Command) (bool, error) {
n := len(cmd.Args())

if n < 2 {
return false, nil
return false, errInvalidCommand
}

id := newID()
rs.addAggregation(id, newAggregationReq(n, p.doMGetMerge, nil))

for idx, key := range cmd.Args() {
cmd := redis.Command([][]byte{cmdGet, key})
p.addToForward(newReqUUID(bytes.Join([][]byte{id, []byte(fmt.Sprintf("%d", idx))}, sep), cmd, rs))
p.addToForward(newReqUUID(append(id, format.UInt64ToString(uint64(idx))...), cmd, rs))
}

return true, nil
Expand Down
1 change: 1 addition & 0 deletions pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ func (p *RedisProxy) initSupportCMDs() {
// bitmap
p.aggregationCmds["bmand"] = p.doBMAnd
p.aggregationCmds["bmor"] = p.doBMOr
p.aggregationCmds["bmxor"] = p.doBMXor
p.aggregationCmds["bmandnot"] = p.doBMAndNot
}

Expand Down
1 change: 1 addition & 0 deletions quickstart-cfgs/proxy.json
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
"bmrange",
"bmand",
"bmor",
"bmxor",
"bmandnot"
]
}

0 comments on commit 215f996

Please sign in to comment.