Skip to content

Commit

Permalink
Add bitmap and/or/andnot
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangxu19830126 committed Nov 5, 2019
1 parent 42ff789 commit 11486c3
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 73 deletions.
75 changes: 5 additions & 70 deletions pkg/proxy/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package proxy

import (
"bytes"
"fmt"
"strconv"

"github.com/deepfabric/elasticell/pkg/pb/raftcmdpb"
"github.com/fagongzi/goetty/protocol/redis"
)

var (
Expand All @@ -18,14 +16,16 @@ var (
type aggregationReq struct {
reply int
parts []*raftcmdpb.Response
mergeFn func(...*raftcmdpb.Response) *raftcmdpb.Response
mergeFn func([][]byte, ...*raftcmdpb.Response) *raftcmdpb.Response
args [][]byte
}

func newAggregationReq(n int, mergeFn func(...*raftcmdpb.Response) *raftcmdpb.Response) *aggregationReq {
func newAggregationReq(n int, mergeFn func([][]byte, ...*raftcmdpb.Response) *raftcmdpb.Response, args [][]byte) *aggregationReq {
return &aggregationReq{
reply: n,
parts: make([]*raftcmdpb.Response, n, n),
mergeFn: mergeFn,
args: args,
}
}

Expand All @@ -37,72 +37,7 @@ func (req *aggregationReq) addPart(index int, rsp *raftcmdpb.Response) bool {
}

func (req *aggregationReq) merge() *raftcmdpb.Response {
return req.mergeFn(req.parts...)
}

func (p *RedisProxy) doMGet(rs *redisSession, cmd redis.Command) (bool, error) {
n := len(cmd.Args())

if n < 2 {
return false, nil
}

id := newID()
rs.addAggregation(id, newAggregationReq(n, p.doMGetMerge))

for idx, key := range cmd.Args() {
cmd := redis.Command([][]byte{cmdGet, key})
p.addToForward(newReqUUID(bytes.Join([][]byte{id, []byte(fmt.Sprintf("%d", idx))}, sep), cmd, rs))
}

return true, nil
}

func (p *RedisProxy) doMGetMerge(rsps ...*raftcmdpb.Response) *raftcmdpb.Response {
n := len(rsps)
value := make([][]byte, n, n)
for idx, rsp := range rsps {
value[idx] = rsp.BulkResult
}

return &raftcmdpb.Response{
SliceArrayResult: value,
}
}

func (p *RedisProxy) doMSet(rs *redisSession, cmd redis.Command) (bool, error) {
n := len(cmd.Args())
if n < 2 {
return false, nil
}

if n%2 != 0 {
return false, fmt.Errorf("error args count: %d", n)
}

id := newID()
rs.addAggregation(id, newAggregationReq(n/2, p.doMSetMerge))

for i := 0; i < n; i += 2 {
cmd := redis.Command([][]byte{cmdSet, cmd.Args()[i], cmd.Args()[i+1]})
p.addToForward(newReqUUID(bytes.Join([][]byte{id, []byte(fmt.Sprintf("%d", i/2))}, sep), cmd, rs))
}

return true, nil
}

func (p *RedisProxy) doMSetMerge(rsps ...*raftcmdpb.Response) *raftcmdpb.Response {
var value *raftcmdpb.Response

for _, rsp := range rsps {
if len(rsp.ErrorResult) > 0 {
return rsp
}

value = rsp
}

return value
return req.mergeFn(req.args, req.parts...)
}

func isAggregationPart(id []byte) bool {
Expand Down
195 changes: 195 additions & 0 deletions pkg/proxy/aggregation_bitmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package proxy

import (
"bytes"
"strings"

"github.com/deepfabric/elasticell/pkg/pb/raftcmdpb"
"github.com/fagongzi/goetty/protocol/redis"
"github.com/fagongzi/log"
"github.com/fagongzi/util/format"
"github.com/fagongzi/util/hack"
"github.com/pilosa/pilosa/roaring"
)

const (
optionWithBM = "WITHBM"
)

// bmand [withbm|start count] bm1 bm2 [bm3 bm4]
func (p *RedisProxy) doBMAnd(rs *redisSession, cmd redis.Command) (bool, error) {
return p.doBMAggregation(rs, cmd, p.doBMAndMerge)
}

// bmor [withbm|start count] bm1 bm2 [bm3 bm4]
func (p *RedisProxy) doBMOr(rs *redisSession, cmd redis.Command) (bool, error) {
return p.doBMAggregation(rs, cmd, p.doBMOrMerge)
}

// bmandnot [withbm|start count] bm1 bm2 [bm3 bm4]
func (p *RedisProxy) doBMAndNot(rs *redisSession, cmd redis.Command) (bool, error) {
return p.doBMAggregation(rs, cmd, p.doBMAndNotMerge)
}

func (p *RedisProxy) doBMAggregation(rs *redisSession, cmd redis.Command, mergeFn func([][]byte, ...*raftcmdpb.Response) *raftcmdpb.Response) (bool, error) {
offset := 1
n := len(cmd.Args())
if n < 3 {
return false, nil
}

if strings.ToUpper(hack.SliceToString(cmd.Args()[0])) != optionWithBM {
if n < 4 {
return false, nil
}

offset = 2
}

id := newID()
rs.addAggregation(id, newAggregationReq(n-offset, mergeFn, cmd.Args()[0:offset]))

for idx, key := range cmd.Args()[offset:] {
cmd := redis.Command([][]byte{cmdGet, key})
p.addToForward(newReqUUID(bytes.Join([][]byte{id, format.UInt64ToString(uint64(idx))}, sep), cmd, rs))
}

return true, nil
}

func (p *RedisProxy) doBMAndMerge(args [][]byte, rsps ...*raftcmdpb.Response) *raftcmdpb.Response {
bm := roaring.NewBTreeBitmap()
tmp := roaring.NewBTreeBitmap()
var target *roaring.Bitmap
for idx, rsp := range rsps {
if len(rsp.BulkResult) > 0 {
tmp.Containers.Reset()

if idx == 0 {
target = bm
} else {
target = tmp
}

_, _, err := target.ImportRoaringBits(rsp.BulkResult, false, false, 0)
if err != nil {
return &raftcmdpb.Response{
ErrorResult: hack.StringToSlice(err.Error()),
}
}

if idx > 0 {
bm = bm.Intersect(tmp)
}
}
}

return p.buildResult(bm, args)
}

func (p *RedisProxy) doBMOrMerge(args [][]byte, rsps ...*raftcmdpb.Response) *raftcmdpb.Response {
bm := roaring.NewBTreeBitmap()
tmp := roaring.NewBTreeBitmap()
var target *roaring.Bitmap
for idx, rsp := range rsps {
if len(rsp.BulkResult) > 0 {
tmp.Containers.Reset()

if idx == 0 {
target = bm
} else {
target = tmp
}

_, _, err := target.ImportRoaringBits(rsp.BulkResult, false, false, 0)
if err != nil {
return &raftcmdpb.Response{
ErrorResult: hack.StringToSlice(err.Error()),
}
}

if idx > 0 {
bm = bm.Union(tmp)
}
}
}

return p.buildResult(bm, args)
}

func (p *RedisProxy) doBMAndNotMerge(args [][]byte, rsps ...*raftcmdpb.Response) *raftcmdpb.Response {
bm := roaring.NewBTreeBitmap()
tmp := roaring.NewBTreeBitmap()
var target *roaring.Bitmap
for idx, rsp := range rsps {
if len(rsp.BulkResult) > 0 {
tmp.Containers.Reset()

if idx == 0 {
target = bm
} else {
target = tmp
}

_, _, err := target.ImportRoaringBits(rsp.BulkResult, false, false, 0)
if err != nil {
return &raftcmdpb.Response{
ErrorResult: hack.StringToSlice(err.Error()),
}
}

if idx > 0 {
bm = bm.Xor(tmp)
}
}
}

return p.buildResult(bm, args)
}

func (p *RedisProxy) buildResult(bm *roaring.Bitmap, args [][]byte) *raftcmdpb.Response {
log.Debugf("bm and args: %+v", args)
if len(args) < 2 {
buf := bytes.NewBuffer(nil)
bm.WriteTo(buf)
return &raftcmdpb.Response{
BulkResult: buf.Bytes(),
}
}

start, err := format.ParseStrUInt64(hack.SliceToString(args[0]))
if err != nil {
return &raftcmdpb.Response{
ErrorResult: hack.StringToSlice(err.Error()),
}
}

limit, err := format.ParseStrUInt64(hack.SliceToString(args[1]))
if err != nil {
return &raftcmdpb.Response{
ErrorResult: hack.StringToSlice(err.Error()),
}
}

rsp := &raftcmdpb.Response{}
var values [][]byte
count := uint64(0)
itr := bm.Iterator()
itr.Seek(start)
for {
value, eof := itr.Next()
if eof {
break
}

values = append(values, format.UInt64ToString(value))
count++

if count >= limit {
break
}
}
rsp.SliceArrayResult = values
rsp.HasEmptySliceArrayResult = len(values) == 0
return rsp
}
39 changes: 39 additions & 0 deletions pkg/proxy/aggregation_string.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package proxy

import (
"bytes"
"fmt"

"github.com/deepfabric/elasticell/pkg/pb/raftcmdpb"
"github.com/fagongzi/goetty/protocol/redis"
)

func (p *RedisProxy) doMGet(rs *redisSession, cmd redis.Command) (bool, error) {
n := len(cmd.Args())

if n < 2 {
return false, nil
}

id := newID()
rs.addAggregation(id, newAggregationReq(n, p.doMGetMerge, nil))

for idx, key := range cmd.Args() {
cmd := redis.Command([][]byte{cmdGet, key})
p.addToForward(newReqUUID(bytes.Join([][]byte{id, []byte(fmt.Sprintf("%d", idx))}, sep), cmd, rs))
}

return true, nil
}

func (p *RedisProxy) doMGetMerge(args [][]byte, rsps ...*raftcmdpb.Response) *raftcmdpb.Response {
n := len(rsps)
value := make([][]byte, n, n)
for idx, rsp := range rsps {
value[idx] = rsp.BulkResult
}

return &raftcmdpb.Response{
SliceArrayResult: value,
}
}
7 changes: 6 additions & 1 deletion pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,13 @@ func (p *RedisProxy) initSupportCMDs() {
p.supportCmds[cmd] = struct{}{}
}

// kv
p.aggregationCmds["mget"] = p.doMGet
p.aggregationCmds["mset"] = p.doMSet

// bitmap
p.aggregationCmds["bmand"] = p.doBMAnd
p.aggregationCmds["bmor"] = p.doBMOr
p.aggregationCmds["bmandnot"] = p.doBMAndNot
}

func (p *RedisProxy) refreshStores() {
Expand Down
4 changes: 3 additions & 1 deletion pkg/proxy/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@ func (rs *redisSession) resp(rsp *raftcmdpb.Response) {
return
}

log.Debugf("read a aggregation part: %+v", rsp.UUID)
id, index := parseAggregationPart(rsp.UUID)
log.Debugf("parsed a aggregation part: %+v, %d", id, index)
rs.aggLock.RLock()
if req, ok := rs.aggregations[string(id)]; ok {
if req, ok := rs.aggregations[hack.SliceToString(id)]; ok {
if req.addPart(index, rsp) {
rs.resps.Put(req.merge())
}
Expand Down
5 changes: 4 additions & 1 deletion quickstart-cfgs/proxy.json
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@
"bmcontains",
"bmdel",
"bmcount",
"bmrange"
"bmrange",
"bmand",
"bmor",
"bmandnot"
]
}

0 comments on commit 11486c3

Please sign in to comment.