Skip to content

Commit

Permalink
In the case of commands that don't contain keys we round robin across…
Browse files Browse the repository at this point in the history
… connections (#88)
  • Loading branch information
filipecosta90 authored Mar 31, 2023
1 parent 3bb69d4 commit 9150983
Showing 1 changed file with 26 additions and 9 deletions.
35 changes: 26 additions & 9 deletions cmd/ftsb_redisearch/cmd_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func (p *processor) Init(workerNumber int, _ bool, totalWorkers int) {
if password != "" {
opts = append(opts, radix.DialAuthPass(password))
}
opts = append(opts, radix.DialTimeout(time.Second*600))

customConnFunc := func(network, addr string) (radix.Conn, error) {
return radix.Dial(network, addr, opts...,
Expand Down Expand Up @@ -69,7 +70,7 @@ func connectionProcessor(p *processor, rateLimiter *rate.Limiter, useRateLimiter
replies := make([]interface{}, 0, 0)
clusterSlots := make([][2]uint16, 0, 0)
clusterAddr := make([]string, 0, 0)

clusterAddrLen := 0
slotP := 0
if !clusterMode {
cmdSlots = append(cmdSlots, make([]radix.CmdAction, 0, 0))
Expand All @@ -83,16 +84,28 @@ func connectionProcessor(p *processor, rateLimiter *rate.Limiter, useRateLimiter
clusterAddr = append(clusterAddr, ClusterNode.Addr)
}
}
clusterAddrLen = len(clusterSlots)
}

for row := range p.rows {
cmdType, cmdQueryId, keyPos, cmd, key, clusterSlot, docFields, bytelen, _ := preProcessCmd(row)
for i, sArr := range clusterSlots {
if clusterSlot >= sArr[0] && clusterSlot < sArr[1] {
slotP = i

if clusterSlot > -1 {
for i, sArr := range clusterSlots {
if clusterSlot >= int(sArr[0]) && clusterSlot < int(sArr[1]) {
slotP = i
}
}
} else {
// round robin slot
slotP++
if slotP >= clusterAddrLen {
slotP = 0
}
}

if debug > 2 {
fmt.Println(keyPos, key, clusterSlot, cmd, strings.Join(docFields, ","), slotP, clusterSlots)
fmt.Println(keyPos, slotP, key, clusterSlot, cmd, strings.Join(docFields, ","), clusterSlots)
}
if useRateLimiter {
r := rateLimiter.ReserveN(time.Now(), int(1))
Expand Down Expand Up @@ -204,7 +217,7 @@ func (p *processor) ProcessBatch(b benchmark_runner.Batch, doLoad bool, rateLimi
func (p *processor) Close(_ bool) {
}

func preProcessCmd(row string) (cmdType string, cmdQueryId string, keyPos int, cmd string, key string, clusterSlot uint16, args []string, bytelen uint64, err error) {
func preProcessCmd(row string) (cmdType string, cmdQueryId string, keyPos int, cmd string, key string, clusterSlot int, args []string, bytelen uint64, err error) {
reader := csv.NewReader(strings.NewReader(row))
argsStr, err := reader.Read()
if err != nil {
Expand All @@ -215,13 +228,17 @@ func preProcessCmd(row string) (cmdType string, cmdQueryId string, keyPos int, c
if len(argsStr) >= 3 {
cmdType = argsStr[0]
cmdQueryId = argsStr[1]
keyPos, _ = strconv.Atoi(argsStr[2])
keyPos = keyPos + 3
initialPos, _ := strconv.Atoi(argsStr[2])

keyPos = initialPos + 3
cmd = argsStr[3]
clusterSlot = -1
if len(argsStr) > 4 {
args = argsStr[4:]
key = argsStr[keyPos]
clusterSlot = radix.ClusterSlot([]byte(key))
}
if initialPos >= 0 {
clusterSlot = int(radix.ClusterSlot([]byte(key)))
}
bytelen = uint64(len(row)) - uint64(len(cmdType))
} else {
Expand Down

0 comments on commit 9150983

Please sign in to comment.