Skip to content

Commit

Permalink
always validate and record consumed offsets
Browse files Browse the repository at this point in the history
Previously we would record offsets only when consuming expected/valid
records.  This is overly restrictive as the offset validation and now
term validation is validating log semantics (monotonicity).
  • Loading branch information
nvartolomei committed Oct 21, 2024
1 parent 46425a2 commit 96e9d5b
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions pkg/worker/verifier/validator_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (cs *ValidatorStatus) ValidateRecord(r *kgo.Record, validRanges *TopicOffse
cs.lock.Lock()
defer cs.lock.Unlock()

if cs.lastLeaderEpoch[r.Partition] < r.LeaderEpoch {
if r.LeaderEpoch < cs.lastLeaderEpoch[r.Partition] {
log.Fatalf("Out of order leader epoch on p=%d at o=%d leaderEpoch=%d. Previous leaderEpoch=%d",
r.Partition, r.Offset, r.LeaderEpoch, cs.lastLeaderEpoch[r.Partition])
}
Expand All @@ -69,7 +69,8 @@ func (cs *ValidatorStatus) ValidateRecord(r *kgo.Record, validRanges *TopicOffse
got_header_value = string(r.Headers[0].Value)
}

if expect_header_value != got_header_value {
recordExpected := expect_header_value == got_header_value
if !recordExpected {
shouldBeValid := validRanges.Contains(r.Partition, r.Offset)

if shouldBeValid {
Expand All @@ -91,19 +92,20 @@ func (cs *ValidatorStatus) ValidateRecord(r *kgo.Record, validRanges *TopicOffse
log.Fatalf("Out of order read. Max consumed offset(partition=%d)=%d; Current record offset=%d", r.Partition, currentMax, r.Offset)
}
}
cs.recordOffset(r)

cs.ValidReads += 1
log.Debugf("Read OK (%s) on p=%d at o=%d", r.Headers[0].Value, r.Partition, r.Offset)
}

cs.recordOffset(r, recordExpected)

if time.Since(cs.lastCheckpoint) > time.Second*5 {
cs.Checkpoint()
cs.lastCheckpoint = time.Now()
}
}

func (cs *ValidatorStatus) recordOffset(r *kgo.Record) {
func (cs *ValidatorStatus) recordOffset(r *kgo.Record, recordExpected bool) {
if cs.MaxOffsetsConsumed == nil {
cs.MaxOffsetsConsumed = make(map[int32]int64)
}
Expand All @@ -114,7 +116,8 @@ func (cs *ValidatorStatus) recordOffset(r *kgo.Record) {
cs.lastLeaderEpoch = make(map[int32]int32)
}

if r.Offset > cs.MaxOffsetsConsumed[r.Partition] {
// We bump highest offset only for valid records.
if r.Offset > cs.MaxOffsetsConsumed[r.Partition] && recordExpected {
cs.MaxOffsetsConsumed[r.Partition] = r.Offset
}

Expand All @@ -138,6 +141,7 @@ func (cs *ValidatorStatus) ResetMonotonicityTestState() {
defer cs.lock.Unlock()

cs.lastOffsetConsumed = make(map[int32]int64)
cs.lastLeaderEpoch = make(map[int32]int32)
}

func (cs *ValidatorStatus) SetMonotonicityTestStateForPartition(partition int32, offset int64) {
Expand Down

0 comments on commit 96e9d5b

Please sign in to comment.