Skip to content

Commit

Permalink
Merge pull request #52 from redpanda-data/nv/write-caching
Browse files Browse the repository at this point in the history
verifier: tolerate data loss
  • Loading branch information
nvartolomei authored Mar 29, 2024
2 parents 5bc1cfa + 1bfaab4 commit 8f4fdb7
Show file tree
Hide file tree
Showing 14 changed files with 473 additions and 119 deletions.
26 changes: 26 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: Go

on:
push:
branches:
- main
pull_request: {}

jobs:
build:

runs-on: ubuntu-latest
strategy:
matrix:
go-version: [ 'stable' ]

steps:
- uses: actions/checkout@v4
- name: Setup Go ${{ matrix.go-version }}
uses: actions/setup-go@v4
with:
go-version: ${{ matrix.go-version }}
- name: Display Go version
run: go version
- name: Test & Build
run: make
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ kgo-repeater
kgo-verifier
valid_offsets*.json

.idea
.idea
.vscode
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@

all: build
all: test build

build: build-verifier build-repeater

test:
go test -v ./...

build-verifier:
go build -o kgo-verifier cmd/kgo-verifier/main.go

Expand Down
82 changes: 44 additions & 38 deletions cmd/kgo-verifier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ var (
seqConsumeCount = flag.Int("seq_read_msgs", -1, "Seq/group consumer: set max number of records to consume")
batchMaxBytes = flag.Int("batch_max_bytes", 1048576, "the maximum batch size to allow per-partition (must be less than Kafka's max.message.bytes, producing)")
cgReaders = flag.Int("consumer_group_readers", 0, "Number of parallel readers in the consumer group")
cgName = flag.String("consumer_group_name", "", "The name of the consumer group. Generated randomly if not set.")
linger = flag.Duration("linger", 0, "if non-zero, linger to use when producing")
maxBufferedRecords = flag.Uint("max-buffered-records", 1024, "Producer buffer size: the default of 1 is makes roughly one event per batch, useful for measurement. Set to something higher to make it easier to max out bandwidth.")
remote = flag.Bool("remote", false, "Remote control mode, driven by HTTP calls, for use in automated tests")
remotePort = flag.Uint("remote-port", 7884, "HTTP listen port for remote control/query")
loop = flag.Bool("loop", false, "For readers, run indefinitely until stopped via signal or HTTP call")
loop = flag.Bool("loop", false, "For readers, repeatedly consume from the beginning, looping to the beginning after hitting the end of the topic until stopped via signal")
continuous = flag.Bool("continuous", false, "For readers, wait for new messages to arrive after hitting the end of the topic until stopped via signal or HTTP call")
name = flag.String("client-name", "kgo", "Name of kafka client")
fakeTimestampMs = flag.Int64("fake-timestamp-ms", -1, "Producer: set artificial batch timestamps on an incrementing basis, starting from this number")
fakeTimestampStepMs = flag.Int64("fake-timestamp-step-ms", 1, "Producer: step size used to increment fake timestamp")
Expand All @@ -67,6 +69,8 @@ var (

compressionType = flag.String("compression-type", "", "One of none, gzip, snappy, lz4, zstd, or 'mixed' to pick a random codec for each producer")
compressiblePayload = flag.Bool("compressible-payload", false, "If true, use a highly compressible payload instead of the default random payload")

tolerateDataLoss = flag.Bool("tolerate-data-loss", false, "If true, tolerate data-loss events")
)

func makeWorkerConfig() worker.WorkerConfig {
Expand All @@ -84,6 +88,8 @@ func makeWorkerConfig() worker.WorkerConfig {
Transactions: *useTransactions,
CompressionType: *compressionType,
CompressiblePayload: *compressiblePayload,
TolerateDataLoss: *tolerateDataLoss,
Continuous: *continuous,
}

return c
Expand Down Expand Up @@ -179,7 +185,11 @@ func main() {

mux.HandleFunc("/shutdown", func(w http.ResponseWriter, r *http.Request) {
log.Info("Remote request /shutdown")
shutdownChan <- 1
select {
case shutdownChan <- 1:
default:
log.Warn("shutdown channel is full, skipping")
}
})

mux.HandleFunc("/last_pass", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -197,7 +207,11 @@ func main() {
log.Warn("unable to parse timeout query param, skipping printing stack trace logs")
}
}
lastPassChan <- 1
select {
case lastPassChan <- 1:
default:
log.Warn("last_pass channel is full, skipping")
}
})

mux.HandleFunc("/print_stack", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -212,6 +226,21 @@ func main() {
}
}()

if *loop && *continuous {
util.Die("Cannot use -loop and -continuous together")
}

ctx, cancel := context.WithCancel(context.Background())
loopState := util.NewLoopState(*loop)
go func() {
<-lastPassChan
if *continuous {
cancel()
} else {
loopState.RequestLastPass()
}
}()

if *pCount > 0 {
log.Info("Starting producer...")
pwc := verifier.NewProducerConfig(makeWorkerConfig(), "producer", nPartitions, *mSize, *pCount, *fakeTimestampMs, *fakeTimestampStepMs, (*produceRateLimitBps), *keySetCardinality, *msgsPerProducerId)
Expand All @@ -226,34 +255,23 @@ func main() {
waitErr := pw.Wait()
util.Chk(err, "Producer error: %v", waitErr)
log.Info("Finished producer.")
}

if *seqRead {
} else if *seqRead {
srw := verifier.NewSeqReadWorker(verifier.NewSeqReadConfig(
makeWorkerConfig(), "sequential", nPartitions, *seqConsumeCount,
(*consumeTputMb)*1024*1024,
))
workers = append(workers, &srw)

firstPass := true
lastPass := false
for firstPass || (!lastPass && *loop) {
for loopState.Next() {
log.Info("Starting sequential read pass")
firstPass = false
lastPass = len(lastPassChan) != 0
if lastPass {
log.Info("This will be the last pass")
}
waitErr := srw.Wait()
waitErr := srw.Wait(ctx)
if waitErr != nil {
// Proceed around the loop, to be tolerant of e.g. kafka client
// construct failures on unavailable cluster
log.Warnf("Error from sequeqntial read worker: %v", err)
}
}
}

if *cCount > 0 {
} else if *cCount > 0 {
var wg sync.WaitGroup
var randomWorkers []*verifier.RandomReadWorker
for i := 0; i < *parallelRead; i++ {
Expand All @@ -265,14 +283,7 @@ func main() {
workers = append(workers, &worker)
}

firstPass := true
lastPass := false
for firstPass || (!lastPass && *loop) {
lastPass = len(lastPassChan) != 0
if lastPass {
log.Info("This will be the last pass")
}
firstPass = false
for loopState.Next() {
for _, w := range randomWorkers {
wg.Add(1)
go func(worker *verifier.RandomReadWorker) {
Expand All @@ -288,25 +299,20 @@ func main() {
}
wg.Wait()
}
}
} else if *cgReaders > 0 {
if *loop && *cgName != "" {
util.Die("Cannot use -loop and -consumer_group_name together")
}

if *cgReaders > 0 {
grw := verifier.NewGroupReadWorker(
verifier.NewGroupReadConfig(
makeWorkerConfig(), "groupReader", nPartitions, *cgReaders,
makeWorkerConfig(), *cgName, nPartitions, *cgReaders,
*seqConsumeCount, (*consumeTputMb)*1024*1024))
workers = append(workers, &grw)

firstPass := true
lastPass := false
for firstPass || (!lastPass && *loop) {
for loopState.Next() {
log.Info("Starting group read pass")
lastPass = len(lastPassChan) != 0
if lastPass {
log.Info("This will be the last pass")
}
firstPass = false
waitErr := grw.Wait()
waitErr := grw.Wait(ctx)
util.Chk(waitErr, "Consumer error: %v", err)
}
}
Expand Down
56 changes: 53 additions & 3 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@

package util

import (
"fmt"
"os"
"sync"

log "github.com/sirupsen/logrus"
)


func Die(msg string, args ...interface{}) {
formatted := fmt.Sprintf(msg, args...)
log.Error(formatted)
Expand All @@ -19,4 +18,55 @@ func Chk(err error, msg string, args ...interface{}) {
if err != nil {
Die(msg, args...)
}
}
}

// loopState is a helper struct holding common state for managing consumer loops.
type loopState struct {
mu sync.RWMutex

// `loop` is set to false when looping should stop. If it is false initially
// then `Next()` must return true at least once.
// To achieve that, we set lastPass to true to indicate that Next() returned
// true at least once when `loop` was false and on the next run we know that
// we are done and need to fuse the state.
loop bool
lastPass bool

// fused is set to true after Next returns false. It is used to enforce
// the invariant that Next must not be called after it returned false
// previously.
fused bool
}

// NewLoopState creates a state object for managing a consumer loops.
func NewLoopState(loop bool) *loopState {
return &loopState{loop: loop}
}

// RequestLastPass requests for the loop to run one more time before exiting.
func (ls *loopState) RequestLastPass() {
ls.mu.Lock()
defer ls.mu.Unlock()

ls.loop = false
}

// Next returns true if current loop iteration should run.
func (ls *loopState) Next() bool {
ls.mu.Lock()
defer ls.mu.Unlock()

if ls.fused {
panic("invariant: Next must not be called after it returned false previously")
}

if ls.lastPass {
ls.fused = true
return false
} else if !ls.loop {
log.Info("This is the last pass.")
ls.lastPass = true
}

return true
}
62 changes: 62 additions & 0 deletions pkg/util/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package util_test

import (
"testing"

"github.com/redpanda-data/kgo-verifier/pkg/util"
)

func TestLoopStateDefault(t *testing.T) {
s := util.NewLoopState(false)
if !s.Next() {
t.Error("Next must returns true on first pass")
}
if s.Next() {
t.Error("Next must return false after first pass")
}
}

func TestLoopStateDoLoop(t *testing.T) {
s := util.NewLoopState(true)
for i := 0; i < 3; i++ {
if !s.Next() {
t.Error("Next must return true as long as Loop is set to true")
}
}
s.RequestLastPass()
if !s.Next() {
t.Error("Next must return true after RequestLastPass")
}
if s.Next() {
t.Error("Next must return false after RequestLastPass")
}

func() {
defer func() {
if r := recover(); r == nil {
t.Error("Next must panic after RequestLastPass and third Next call")
}
}()
s.Next()
}()
}

func TestLoopStateDoLoopStopImmediately(t *testing.T) {
s := util.NewLoopState(true)
s.RequestLastPass()
if !s.Next() {
t.Error("Next must return true after RequestLastPass")
}
if s.Next() {
t.Error("Next must return false after RequestLastPass")
}

func() {
defer func() {
if r := recover(); r == nil {
t.Error("Next must panic after RequestLastPass and third Next call")
}
}()
s.Next()
}()
}
4 changes: 2 additions & 2 deletions pkg/worker/repeater/repeater_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (v *Worker) ConsumeRecord(r *kgo.Record) {

v.globalStats.E2e_latency.Update(e2e_latency)

log.Debugf("Consume %s token %06d, total latency %s", v.config.workerCfg.Name, token, e2e_latency)
log.Debugf("Consume %s token %06d, total latency %v", v.config.workerCfg.Name, token, e2e_latency)
v.pending <- int64(token)
}

Expand Down Expand Up @@ -495,7 +495,7 @@ loop:
})

}
log.Debug("Consume %s dropping out", v.config.workerCfg.Name)
log.Debugf("Consume %s dropping out", v.config.workerCfg.Name)

sync_ctx, _ := context.WithTimeout(context.Background(), 500*time.Millisecond)
v.client.CommitUncommittedOffsets(sync_ctx)
Expand Down
Loading

0 comments on commit 8f4fdb7

Please sign in to comment.