Skip to content

Commit

Permalink
crash instantly on data loss errors
Browse files Browse the repository at this point in the history
We (@nvartolomei and @bash) found that GroupReadWorker wouldn't report
data loss. The reason is that we didn't fail the test instantly when
data loss is detected instead we relied on monotonicity validation to
fail. I.e. we would crash when we detected that we would consume an
earlier offset compared to what we consumed before.

In the GroupReadWorker we reset the monotonicity validation state on any
errors because the next attempt at consuming is almost certain to read
the second time offsets already consumed. As a result we wouldn't catch
monotonicity issues.

In retrospect, it would have been better to fail instantly when franz-go
detects data loss. So this commit achieves exactly that.
  • Loading branch information
nvartolomei committed Nov 28, 2024
1 parent 27986ea commit 7f0f3a9
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 6 deletions.
10 changes: 7 additions & 3 deletions pkg/worker/verifier/group_read_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,13 @@ func (grw *GroupReadWorker) consumerGroupReadInner(
"fiber %v: Consumer group fetch %s/%d e=%v...",
fiberId, t, p, err)
var lossErr *kgo.ErrDataLoss
if grw.config.workerCfg.TolerateDataLoss && errors.As(err, &lossErr) {
grw.Status.Validator.RecordLostOffsets(lossErr.Partition, lossErr.ConsumedTo-lossErr.ResetTo)
grw.Status.Validator.SetMonotonicityTestStateForPartition(p, lossErr.ResetTo-1)
if errors.As(err, &lossErr) {
if grw.config.workerCfg.TolerateDataLoss {
grw.Status.Validator.RecordLostOffsets(lossErr.Partition, lossErr.ConsumedTo-lossErr.ResetTo)
grw.Status.Validator.SetMonotonicityTestStateForPartition(p, lossErr.ResetTo-1)
} else {
log.Fatalf("Unexpected data loss detected: %v", lossErr)
}
} else {
r_err = err
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/worker/verifier/seq_read_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,13 @@ func (srw *SeqReadWorker) sequentialReadInner(ctx context.Context, startAt []int
fetches.EachError(func(t string, p int32, err error) {
log.Warnf("Sequential fetch %s/%d e=%v...", t, p, err)
var lossErr *kgo.ErrDataLoss
if srw.config.workerCfg.TolerateDataLoss && errors.As(err, &lossErr) {
srw.Status.Validator.RecordLostOffsets(lossErr.Partition, lossErr.ConsumedTo-lossErr.ResetTo)
srw.Status.Validator.SetMonotonicityTestStateForPartition(p, lossErr.ResetTo-1)
if errors.As(err, &lossErr) {
if srw.config.workerCfg.TolerateDataLoss {
srw.Status.Validator.RecordLostOffsets(lossErr.Partition, lossErr.ConsumedTo-lossErr.ResetTo)
srw.Status.Validator.SetMonotonicityTestStateForPartition(p, lossErr.ResetTo-1)
} else {
log.Fatalf("Unexpected data loss detected: %v", lossErr)
}
} else {
r_err = err
}
Expand Down

0 comments on commit 7f0f3a9

Please sign in to comment.