From 8ac4000274773b216eb1840d40bb280e7b6918bd Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Tue, 15 Oct 2024 13:22:19 +0100 Subject: [PATCH] crash instantly on data loss errors We (nvartolomei and bash) found that GroupReadWorker wouldn't report data loss. The reason is that we didn't fail the test instantly when data loss is detected instead we relied on monotonicity validation to fail. I.e. we would crash when we detected that we would consume an earlier offset compared to what we consumed before. In the GroupReadWorker we reset the monotonicity validation state on any errors because the next attempt at consuming is almost certain to read the second time offsets already consumed. As a result we wouldn't catch monotonicity issues. In retrospect, it would have been better to fail instantly when franz-go detects data loss. So this commit achieves exactly that. --- pkg/worker/verifier/group_read_worker.go | 10 +++++++--- pkg/worker/verifier/seq_read_worker.go | 10 +++++++--- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/pkg/worker/verifier/group_read_worker.go b/pkg/worker/verifier/group_read_worker.go index 0f50a6e..b860763 100644 --- a/pkg/worker/verifier/group_read_worker.go +++ b/pkg/worker/verifier/group_read_worker.go @@ -259,9 +259,13 @@ func (grw *GroupReadWorker) consumerGroupReadInner( "fiber %v: Consumer group fetch %s/%d e=%v...", fiberId, t, p, err) var lossErr *kgo.ErrDataLoss - if grw.config.workerCfg.TolerateDataLoss && errors.As(err, &lossErr) { - grw.Status.Validator.RecordLostOffsets(lossErr.Partition, lossErr.ConsumedTo-lossErr.ResetTo) - grw.Status.Validator.SetMonotonicityTestStateForPartition(p, lossErr.ResetTo-1) + if errors.As(err, &lossErr) { + if grw.config.workerCfg.TolerateDataLoss { + grw.Status.Validator.RecordLostOffsets(lossErr.Partition, lossErr.ConsumedTo-lossErr.ResetTo) + grw.Status.Validator.SetMonotonicityTestStateForPartition(p, lossErr.ResetTo-1) + } else { + log.Fatalf("Unexpected data loss detected: %v", lossErr) + } } else { r_err = err } diff --git a/pkg/worker/verifier/seq_read_worker.go b/pkg/worker/verifier/seq_read_worker.go index 5a17b1c..797b5df 100644 --- a/pkg/worker/verifier/seq_read_worker.go +++ b/pkg/worker/verifier/seq_read_worker.go @@ -154,9 +154,13 @@ func (srw *SeqReadWorker) sequentialReadInner(ctx context.Context, startAt []int fetches.EachError(func(t string, p int32, err error) { log.Warnf("Sequential fetch %s/%d e=%v...", t, p, err) var lossErr *kgo.ErrDataLoss - if srw.config.workerCfg.TolerateDataLoss && errors.As(err, &lossErr) { - srw.Status.Validator.RecordLostOffsets(lossErr.Partition, lossErr.ConsumedTo-lossErr.ResetTo) - srw.Status.Validator.SetMonotonicityTestStateForPartition(p, lossErr.ResetTo-1) + if errors.As(err, &lossErr) { + if srw.config.workerCfg.TolerateDataLoss { + srw.Status.Validator.RecordLostOffsets(lossErr.Partition, lossErr.ConsumedTo-lossErr.ResetTo) + srw.Status.Validator.SetMonotonicityTestStateForPartition(p, lossErr.ResetTo-1) + } else { + log.Fatalf("Unexpected data loss detected: %v", lossErr) + } } else { r_err = err }