Skip to content

Commit

Permalink
assert monotonic leader epochs
Browse files Browse the repository at this point in the history
  • Loading branch information
nvartolomei committed Oct 18, 2024
1 parent a4dff21 commit 46425a2
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion pkg/worker/verifier/validator_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,22 @@ type ValidatorStatus struct {

// Last consumed offset per partition. Used to assert monotonicity and check for gaps.
lastOffsetConsumed map[int32]int64

// Last leader epoch per partition. Used to assert monotonicity.
lastLeaderEpoch map[int32]int32
}

func (cs *ValidatorStatus) ValidateRecord(r *kgo.Record, validRanges *TopicOffsetRanges) {
expect_header_value := fmt.Sprintf("%06d.%018d", 0, r.Offset)
log.Debugf("Consumed %s on p=%d at o=%d", r.Key, r.Partition, r.Offset)
log.Debugf("Consumed %s on p=%d at o=%d leaderEpoch=%d", r.Key, r.Partition, r.Offset, r.LeaderEpoch)
cs.lock.Lock()
defer cs.lock.Unlock()

if cs.lastLeaderEpoch[r.Partition] < r.LeaderEpoch {
log.Fatalf("Out of order leader epoch on p=%d at o=%d leaderEpoch=%d. Previous leaderEpoch=%d",
r.Partition, r.Offset, r.LeaderEpoch, cs.lastLeaderEpoch[r.Partition])
}

var got_header_value string
if len(r.Headers) > 0 {
got_header_value = string(r.Headers[0].Value)
Expand Down Expand Up @@ -102,12 +110,16 @@ func (cs *ValidatorStatus) recordOffset(r *kgo.Record) {
if cs.lastOffsetConsumed == nil {
cs.lastOffsetConsumed = make(map[int32]int64)
}
if cs.lastLeaderEpoch == nil {
cs.lastLeaderEpoch = make(map[int32]int32)
}

if r.Offset > cs.MaxOffsetsConsumed[r.Partition] {
cs.MaxOffsetsConsumed[r.Partition] = r.Offset
}

cs.lastOffsetConsumed[r.Partition] = r.Offset
cs.lastLeaderEpoch[r.Partition] = r.LeaderEpoch
}

func (cs *ValidatorStatus) RecordLostOffsets(p int32, count int64) {
Expand Down

0 comments on commit 46425a2

Please sign in to comment.