From 46425a250b6b22f2d7c959c955f683a373f7956b Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Fri, 18 Oct 2024 15:58:05 +0100 Subject: [PATCH 1/2] assert monotonic leader epochs --- pkg/worker/verifier/validator_status.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/pkg/worker/verifier/validator_status.go b/pkg/worker/verifier/validator_status.go index d319587..15f21f5 100644 --- a/pkg/worker/verifier/validator_status.go +++ b/pkg/worker/verifier/validator_status.go @@ -48,14 +48,22 @@ type ValidatorStatus struct { // Last consumed offset per partition. Used to assert monotonicity and check for gaps. lastOffsetConsumed map[int32]int64 + + // Last leader epoch per partition. Used to assert monotonicity. + lastLeaderEpoch map[int32]int32 } func (cs *ValidatorStatus) ValidateRecord(r *kgo.Record, validRanges *TopicOffsetRanges) { expect_header_value := fmt.Sprintf("%06d.%018d", 0, r.Offset) - log.Debugf("Consumed %s on p=%d at o=%d", r.Key, r.Partition, r.Offset) + log.Debugf("Consumed %s on p=%d at o=%d leaderEpoch=%d", r.Key, r.Partition, r.Offset, r.LeaderEpoch) cs.lock.Lock() defer cs.lock.Unlock() + if cs.lastLeaderEpoch[r.Partition] < r.LeaderEpoch { + log.Fatalf("Out of order leader epoch on p=%d at o=%d leaderEpoch=%d. Previous leaderEpoch=%d", + r.Partition, r.Offset, r.LeaderEpoch, cs.lastLeaderEpoch[r.Partition]) + } + var got_header_value string if len(r.Headers) > 0 { got_header_value = string(r.Headers[0].Value) @@ -102,12 +110,16 @@ func (cs *ValidatorStatus) recordOffset(r *kgo.Record) { if cs.lastOffsetConsumed == nil { cs.lastOffsetConsumed = make(map[int32]int64) } + if cs.lastLeaderEpoch == nil { + cs.lastLeaderEpoch = make(map[int32]int32) + } if r.Offset > cs.MaxOffsetsConsumed[r.Partition] { cs.MaxOffsetsConsumed[r.Partition] = r.Offset } cs.lastOffsetConsumed[r.Partition] = r.Offset + cs.lastLeaderEpoch[r.Partition] = r.LeaderEpoch } func (cs *ValidatorStatus) RecordLostOffsets(p int32, count int64) { From fab9660131cd4e6426eda6cec93ecfe1f5c0529b Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Fri, 18 Oct 2024 15:58:53 +0100 Subject: [PATCH 2/2] always validate and record consumed offsets Previously we would record offsets only when consuming expected/valid records. This is overly restrictive as the offset validation and now term validation is validating log semantics (monotonicity). --- go.mod | 4 + pkg/worker/verifier/validator_status.go | 42 ++++--- pkg/worker/verifier/validator_status_test.go | 126 +++++++++++++++++++ 3 files changed, 153 insertions(+), 19 deletions(-) create mode 100644 pkg/worker/verifier/validator_status_test.go diff --git a/go.mod b/go.mod index f7e39ed..54fcbbb 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/google/uuid v1.1.1 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 github.com/sirupsen/logrus v1.8.1 + github.com/stretchr/testify v1.7.0 github.com/twmb/franz-go v1.15.4 github.com/twmb/franz-go/pkg/kadm v0.0.0-20211116225244-e97ad6b8ef3e github.com/twmb/franz-go/pkg/kmsg v1.7.0 @@ -15,6 +16,7 @@ require ( ) require ( + github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/icza/dyno v0.0.0-20200205103839-49cb13720835 // indirect @@ -24,6 +26,7 @@ require ( github.com/mitchellh/mapstructure v1.4.1 // indirect github.com/pelletier/go-toml v1.2.0 // indirect github.com/pierrec/lz4/v4 v4.1.19 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/spf13/afero v1.6.0 // indirect github.com/spf13/cast v1.3.0 // indirect github.com/spf13/cobra v1.1.3 // indirect @@ -37,4 +40,5 @@ require ( golang.org/x/text v0.14.0 // indirect gopkg.in/ini.v1 v1.51.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect + gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) diff --git a/pkg/worker/verifier/validator_status.go b/pkg/worker/verifier/validator_status.go index 15f21f5..c358d22 100644 --- a/pkg/worker/verifier/validator_status.go +++ b/pkg/worker/verifier/validator_status.go @@ -59,51 +59,53 @@ func (cs *ValidatorStatus) ValidateRecord(r *kgo.Record, validRanges *TopicOffse cs.lock.Lock() defer cs.lock.Unlock() - if cs.lastLeaderEpoch[r.Partition] < r.LeaderEpoch { - log.Fatalf("Out of order leader epoch on p=%d at o=%d leaderEpoch=%d. Previous leaderEpoch=%d", + if r.LeaderEpoch < cs.lastLeaderEpoch[r.Partition] { + log.Panicf("Out of order leader epoch on p=%d at o=%d leaderEpoch=%d. Previous leaderEpoch=%d", r.Partition, r.Offset, r.LeaderEpoch, cs.lastLeaderEpoch[r.Partition]) } + currentMax, present := cs.lastOffsetConsumed[r.Partition] + if present { + if currentMax < r.Offset { + expected := currentMax + 1 + if r.Offset != expected { + log.Warnf("Gap detected in consumed offsets. Expected %d, but got %d", expected, r.Offset) + } + } else { + log.Panicf("Out of order read. Max consumed offset(partition=%d)=%d; Current record offset=%d", r.Partition, currentMax, r.Offset) + } + } + var got_header_value string if len(r.Headers) > 0 { got_header_value = string(r.Headers[0].Value) } - if expect_header_value != got_header_value { + recordExpected := expect_header_value == got_header_value + if !recordExpected { shouldBeValid := validRanges.Contains(r.Partition, r.Offset) if shouldBeValid { cs.InvalidReads += 1 - util.Die("Bad read at offset %d on partition %s/%d. Expect '%s', found '%s'", r.Offset, r.Topic, r.Partition, expect_header_value, got_header_value) + log.Panicf("Bad read at offset %d on partition %s/%d. Expect '%s', found '%s'", r.Offset, r.Topic, r.Partition, expect_header_value, got_header_value) } else { cs.OutOfScopeInvalidReads += 1 log.Infof("Ignoring read validation at offset outside valid range %s/%d %d", r.Topic, r.Partition, r.Offset) } } else { - currentMax, present := cs.lastOffsetConsumed[r.Partition] - if present { - if currentMax < r.Offset { - expected := currentMax + 1 - if r.Offset != expected { - log.Warnf("Gap detected in consumed offsets. Expected %d, but got %d", expected, r.Offset) - } - } else { - log.Fatalf("Out of order read. Max consumed offset(partition=%d)=%d; Current record offset=%d", r.Partition, currentMax, r.Offset) - } - } - cs.recordOffset(r) - cs.ValidReads += 1 log.Debugf("Read OK (%s) on p=%d at o=%d", r.Headers[0].Value, r.Partition, r.Offset) } + cs.recordOffset(r, recordExpected) + if time.Since(cs.lastCheckpoint) > time.Second*5 { cs.Checkpoint() cs.lastCheckpoint = time.Now() } } -func (cs *ValidatorStatus) recordOffset(r *kgo.Record) { +func (cs *ValidatorStatus) recordOffset(r *kgo.Record, recordExpected bool) { if cs.MaxOffsetsConsumed == nil { cs.MaxOffsetsConsumed = make(map[int32]int64) } @@ -114,7 +116,8 @@ func (cs *ValidatorStatus) recordOffset(r *kgo.Record) { cs.lastLeaderEpoch = make(map[int32]int32) } - if r.Offset > cs.MaxOffsetsConsumed[r.Partition] { + // We bump highest offset only for valid records. + if r.Offset > cs.MaxOffsetsConsumed[r.Partition] && recordExpected { cs.MaxOffsetsConsumed[r.Partition] = r.Offset } @@ -138,6 +141,7 @@ func (cs *ValidatorStatus) ResetMonotonicityTestState() { defer cs.lock.Unlock() cs.lastOffsetConsumed = make(map[int32]int64) + cs.lastLeaderEpoch = make(map[int32]int32) } func (cs *ValidatorStatus) SetMonotonicityTestStateForPartition(partition int32, offset int64) { diff --git a/pkg/worker/verifier/validator_status_test.go b/pkg/worker/verifier/validator_status_test.go new file mode 100644 index 0000000..e177fa8 --- /dev/null +++ b/pkg/worker/verifier/validator_status_test.go @@ -0,0 +1,126 @@ +package verifier_test + +import ( + "testing" + + "github.com/redpanda-data/kgo-verifier/pkg/worker/verifier" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/twmb/franz-go/pkg/kgo" +) + +func TestValidatorStatus_ValidateRecordHappyPath(t *testing.T) { + validator := verifier.NewValidatorStatus() + validRanges := verifier.NewTopicOffsetRanges("topic", 1) + validRanges.Insert(0, 41) + validRanges.Insert(0, 42) + + validator.ValidateRecord(&kgo.Record{ + Offset: 41, + LeaderEpoch: 0, + Headers: []kgo.RecordHeader{{Key: "key", Value: []byte("000000.000000000000000041")}}, + }, &validRanges) + + validator.ValidateRecord(&kgo.Record{ + Offset: 42, + LeaderEpoch: 0, + Headers: []kgo.RecordHeader{{Key: "key", Value: []byte("000000.000000000000000042")}}, + }, &validRanges) + + validator.ValidateRecord(&kgo.Record{ + Offset: 43, + LeaderEpoch: 1, + }, &validRanges) + + assert.Equal(t, int64(2), validator.ValidReads) + assert.Equal(t, int64(0), validator.InvalidReads) + assert.Equal(t, int64(1), validator.OutOfScopeInvalidReads) + + // Only valid reads increment the max offset consumed. + assert.Equal(t, int64(42), validator.MaxOffsetsConsumed[0]) + +} + +func TestValidatorStatus_ValidateRecordInvalidRead(t *testing.T) { + validator := verifier.NewValidatorStatus() + validRanges := verifier.NewTopicOffsetRanges("topic", 1) + validRanges.Insert(0, 41) + + // Miss-match between expected offset as recorded in the header and the actual offset. + func() { + defer func() { + if r := recover(); r != nil { + assert.Equal(t, "Bad read at offset 41 on partition /0. Expect '000000.000000000000000041', found '000000.000000000000000040'", r.(*logrus.Entry).Message) + } + }() + + validator.ValidateRecord(&kgo.Record{ + Offset: 41, + LeaderEpoch: 0, + Headers: []kgo.RecordHeader{{Key: "key", Value: []byte("000000.000000000000000040")}}, + }, &validRanges) + }() +} + +func TestValidatorStatus_ValidateRecordNonMonotonicOffset(t *testing.T) { + validator := verifier.NewValidatorStatus() + validRanges := verifier.NewTopicOffsetRanges("topic", 1) + + validator.ValidateRecord(&kgo.Record{ + Offset: 41, + LeaderEpoch: 0, + }, &validRanges) + + // Same offset read again. + func() { + defer func() { + if r := recover(); r != nil { + assert.Equal(t, "Out of order read. Max consumed offset(partition=0)=41; Current record offset=41", r.(*logrus.Entry).Message) + } + }() + + validator.ValidateRecord(&kgo.Record{ + Offset: 41, + LeaderEpoch: 0, + }, &validRanges) + }() + + // Lower offset read after a higher offset. + func() { + defer func() { + if r := recover(); r != nil { + assert.Equal(t, "Out of order read. Max consumed offset(partition=0)=41; Current record offset=40", r.(*logrus.Entry).Message) + } + }() + + validator.ValidateRecord(&kgo.Record{ + Offset: 40, + LeaderEpoch: 0, + }, &validRanges) + }() +} + + +func TestValidatorStatus_ValidateRecordNonMonotonicLeaderEpoch(t *testing.T) { + validator := verifier.NewValidatorStatus() + validRanges := verifier.NewTopicOffsetRanges("topic", 1) + + validator.ValidateRecord(&kgo.Record{ + Offset: 41, + LeaderEpoch: 1, + }, &validRanges) + + func() { + defer func() { + if r := recover(); r != nil { + assert.Equal(t, "Out of order leader epoch on p=0 at o=42 leaderEpoch=0. Previous leaderEpoch=1", r.(*logrus.Entry).Message) + } + }() + + validator.ValidateRecord(&kgo.Record{ + Offset: 42, + LeaderEpoch: 0, + }, &validRanges) + }() + +}