Skip to content

Commit

Permalink
verifier: allow to tolerate data loss
Browse files Browse the repository at this point in the history
This mode allows to verify redpanda when write caching is enabled. In
addition to tolerating data loss we also record and export to the
/status endpoint the number of offsets/records that are considered lost
from the point of view of the verifier.
  • Loading branch information
nvartolomei committed Mar 28, 2024
1 parent e35dc1d commit 99de2ad
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 15 deletions.
3 changes: 3 additions & 0 deletions cmd/kgo-verifier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ var (

compressionType = flag.String("compression-type", "", "One of none, gzip, snappy, lz4, zstd, or 'mixed' to pick a random codec for each producer")
compressiblePayload = flag.Bool("compressible-payload", false, "If true, use a highly compressible payload instead of the default random payload")

tolerateDataLoss = flag.Bool("tolerate-data-loss", false, "If true, tolerate data-loss events")
)

func makeWorkerConfig() worker.WorkerConfig {
Expand All @@ -86,6 +88,7 @@ func makeWorkerConfig() worker.WorkerConfig {
Transactions: *useTransactions,
CompressionType: *compressionType,
CompressiblePayload: *compressiblePayload,
TolerateDataLoss: *tolerateDataLoss,
Continuous: *continuous,
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/worker/verifier/group_read_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package verifier

import (
"context"
"errors"
"fmt"
"os"
"sync"
Expand Down Expand Up @@ -253,7 +254,13 @@ func (grw *GroupReadWorker) consumerGroupReadInner(
log.Warnf(
"fiber %v: Consumer group fetch %s/%d e=%v...",
fiberId, t, p, err)
r_err = err
var lossErr *kgo.ErrDataLoss
if grw.config.workerCfg.TolerateDataLoss && errors.As(err, &lossErr) {
grw.Status.Validator.RecordLostOffsets(lossErr.Partition, lossErr.ConsumedTo-lossErr.ResetTo)
grw.Status.Validator.SetMonotonicityTestStateForPartition(p, lossErr.ResetTo-1)
} else {
r_err = err
}
})

if r_err != nil {
Expand Down
41 changes: 33 additions & 8 deletions pkg/worker/verifier/offset_ranges.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type OffsetRange struct {

type OffsetRanges struct {
Ranges []OffsetRange

TolerateDataLoss bool
}

func (ors *OffsetRanges) Insert(o int64) {
Expand All @@ -52,18 +54,41 @@ func (ors *OffsetRanges) Insert(o int64) {
return
}

{
last := ors.Ranges[len(ors.Ranges)-1]

// Handle out of order inserts.
if o < last.Upper {
if ors.TolerateDataLoss {
// Truncate the ranges to the last offset.
for i, r := range ors.Ranges {
if o >= r.Lower && o < r.Upper {
// If the offset is within the range, truncate the range
// and remove all subsequent ranges.
ors.Ranges = ors.Ranges[:i+1]
ors.Ranges[i].Upper = o
break
} else if o < r.Lower {
// If the offset is before the range, truncate the range and all subsequent ranges.
ors.Ranges = ors.Ranges[:i]
break
}
}
} else {
// TODO: more flexible structure for out of order inserts, at the moment
// we rely on franz-go callbacks being invoked in order.
panic(fmt.Sprintf("Out of order offset %d (vs %d %d)", o, last.Lower, last.Upper))
}
}
}

last := &ors.Ranges[len(ors.Ranges)-1]
if o >= last.Lower && o == last.Upper {
// Extend the last range if the offset is the next one.
last.Upper += 1
return
} else {
if o < last.Upper {
// TODO: more flexible structure for out of order inserts, at the moment
// we rely on franz-go callbacks being invoked in order.
util.Die("Out of order offset %d (vs %d %d)", o, last.Lower, last.Upper)
} else {
ors.Ranges = append(ors.Ranges, OffsetRange{Lower: o, Upper: o + 1})
}
// Otherwise, create a new range.
ors.Ranges = append(ors.Ranges, OffsetRange{Lower: o, Upper: o + 1})
}
}

Expand Down
109 changes: 109 additions & 0 deletions pkg/worker/verifier/offset_ranges_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package verifier_test

import (
"reflect"
"testing"

"github.com/redpanda-data/kgo-verifier/pkg/worker/verifier"
)

func TestOffsetRanges(t *testing.T) {
r := verifier.OffsetRanges{}
r.Insert(0)
r.Insert(1)
r.Insert(2)
r.Insert(10)
}

func TestOffsetRangesOutOfOrder(t *testing.T) {
r := verifier.OffsetRanges{}
r.Insert(0)
r.Insert(4)
r.Insert(5)
r.Insert(6)
r.Insert(10)

func() {
defer func() {
if r := recover(); r == nil {
t.Errorf("Expected to panic on out of order insert")
}
}()
r.Insert(3)
}()

}

func TestOffsetRangesTolerateOutOfOrderContinuous(t *testing.T) {
r := verifier.OffsetRanges{
TolerateDataLoss: true,
}
r.Insert(0)
r.Insert(4)
r.Insert(5)
r.Insert(6)
r.Insert(10)

// This will truncate the ranges.
r.Insert(5)

expected := verifier.OffsetRanges{
TolerateDataLoss: true,
}
expected.Insert(0)
expected.Insert(4)
expected.Insert(5)

if !reflect.DeepEqual(expected, r) {
t.Errorf("Expected %v, got %v", expected, r.Ranges)
}
}

func TestOffsetRangesTolerateOutOfOrderGaps(t *testing.T) {
r := verifier.OffsetRanges{
TolerateDataLoss: true,
}
r.Insert(0)
r.Insert(3)
r.Insert(5)
r.Insert(7)
r.Insert(10)

// This will truncate the ranges.
r.Insert(5)

expected := verifier.OffsetRanges{
TolerateDataLoss: true,
}
expected.Insert(0)
expected.Insert(3)
expected.Insert(5)

if !reflect.DeepEqual(expected, r) {
t.Errorf("Expected %v, got %v", expected, r.Ranges)
}
}

func TestOffsetRangesTolerateOutOfOrderInsideGap(t *testing.T) {
r := verifier.OffsetRanges{
TolerateDataLoss: true,
}
r.Insert(0)
r.Insert(3)
r.Insert(7)
r.Insert(10)

// This will truncate the ranges.
r.Insert(5)

expected := verifier.OffsetRanges{
TolerateDataLoss: true,
}
expected.Insert(0)
expected.Insert(3)
expected.Insert(5)

if !reflect.DeepEqual(expected, r) {
t.Errorf("Expected %v, got %v", expected, r)
}
}
20 changes: 15 additions & 5 deletions pkg/worker/verifier/producer_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,25 @@ type ProducerWorker struct {
transactionSTMConfig worker.TransactionSTMConfig
transactionSTM *worker.TransactionSTM
churnProducers bool

tolerateDataLoss bool
}

func NewProducerWorker(cfg ProducerConfig) ProducerWorker {
validOffsets := LoadTopicOffsetRanges(cfg.workerCfg.Topic, cfg.nPartitions)
if cfg.workerCfg.TolerateDataLoss {
for ix := range validOffsets.PartitionRanges {
validOffsets.PartitionRanges[ix].TolerateDataLoss = true
}
}

return ProducerWorker{
config: cfg,
Status: NewProducerWorkerStatus(cfg.workerCfg.Topic),
validOffsets: LoadTopicOffsetRanges(cfg.workerCfg.Topic, cfg.nPartitions),
payload: cfg.valueGenerator.Generate(),
churnProducers: cfg.messagesPerProducerId > 0,
config: cfg,
Status: NewProducerWorkerStatus(cfg.workerCfg.Topic),
validOffsets: validOffsets,
payload: cfg.valueGenerator.Generate(),
churnProducers: cfg.messagesPerProducerId > 0,
tolerateDataLoss: cfg.workerCfg.TolerateDataLoss,
}
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/worker/verifier/seq_read_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package verifier

import (
"context"
"errors"
"sync"

worker "github.com/redpanda-data/kgo-verifier/pkg/worker"
Expand Down Expand Up @@ -139,7 +140,13 @@ func (srw *SeqReadWorker) sequentialReadInner(ctx context.Context, startAt []int
var r_err error
fetches.EachError(func(t string, p int32, err error) {
log.Warnf("Sequential fetch %s/%d e=%v...", t, p, err)
r_err = err
var lossErr *kgo.ErrDataLoss
if srw.config.workerCfg.TolerateDataLoss && errors.As(err, &lossErr) {
srw.Status.Validator.RecordLostOffsets(lossErr.Partition, lossErr.ConsumedTo-lossErr.ResetTo)
srw.Status.Validator.SetMonotonicityTestStateForPartition(p, lossErr.ResetTo-1)
} else {
r_err = err
}
})

if r_err != nil {
Expand Down
24 changes: 24 additions & 0 deletions pkg/worker/verifier/validator_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type ValidatorStatus struct {
// The highest valid offset consumed throughout the consumer's lifetime
MaxOffsetsConsumed map[int32]int64 `json:"max_offsets_consumed"`

LostOffsets map[int32]int64 `json:"lost_offsets"`

// Concurrent access happens when doing random reads
// with multiple reader fibers
lock sync.Mutex
Expand Down Expand Up @@ -103,13 +105,35 @@ func (cs *ValidatorStatus) recordOffset(r *kgo.Record) {
cs.lastOffsetConsumed[r.Partition] = r.Offset
}

func (cs *ValidatorStatus) RecordLostOffsets(p int32, count int64) {
cs.lock.Lock()
defer cs.lock.Unlock()

if cs.LostOffsets == nil {
cs.LostOffsets = make(map[int32]int64)
}

cs.LostOffsets[p] += count
}

func (cs *ValidatorStatus) ResetMonotonicityTestState() {
cs.lock.Lock()
defer cs.lock.Unlock()

cs.lastOffsetConsumed = make(map[int32]int64)
}

func (cs *ValidatorStatus) SetMonotonicityTestStateForPartition(partition int32, offset int64) {
cs.lock.Lock()
defer cs.lock.Unlock()

if cs.lastOffsetConsumed == nil {
cs.lastOffsetConsumed = make(map[int32]int64)
}

cs.lastOffsetConsumed[partition] = offset
}

func (cs *ValidatorStatus) Checkpoint() {
log.Infof("Validator status: %s", cs.String())
}
Expand Down
1 change: 1 addition & 0 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ type WorkerConfig struct {
// incompressible payload.
CompressiblePayload bool

TolerateDataLoss bool
Continuous bool
}

Expand Down

0 comments on commit 99de2ad

Please sign in to comment.