Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer: combine offset map from multiple partially successful replies #59

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 46 additions & 33 deletions pkg/worker/verifier/client_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,53 @@ import (
"errors"
"time"

"github.com/redpanda-data/kgo-verifier/pkg/util"
log "github.com/sirupsen/logrus"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"github.com/vectorizedio/redpanda/src/go/rpk/pkg/kafka"
)

type OffsetResult struct {
offset int64
err error
}

// Try to get offsets, with a retry loop in case any partitions are not
// in a position to respond. This is useful to avoid terminating if e.g.
// the cluster is subject to failure injection while workload runs.
func GetOffsets(client *kgo.Client, topic string, nPartitions int32, t int64) []int64 {
wait_t := 2 * time.Second
combinedResult := make([]int64, nPartitions)
haveResult := make([]bool, nPartitions)

req := formOffsetsReq(topic, nPartitions, t)
for {
result, err := getOffsetsInner(client, topic, nPartitions, t)
if err != nil {
log.Warnf("Retrying getOffsets in %v", wait_t)
time.Sleep(wait_t)
} else {
return result
result := attemptGetOffsets(client, topic, nPartitions, req)
var seenPartitions = int32(0)
for i := 0; i < int(nPartitions); i++ {
if result[i].err == nil {
// update even if seen before
combinedResult[i] = result[i].offset
haveResult[i] = true
}
if haveResult[i] {
seenPartitions += 1
}
}

if seenPartitions == nPartitions {
return combinedResult
}
log.Warnf(
"Got offsets for %d/%d partitions, retrying attemptGetOffsets in %v",
seenPartitions, nPartitions, wait_t)
time.Sleep(wait_t)
}
}

func getOffsetsInner(client *kgo.Client, topic string, nPartitions int32, t int64) ([]int64, error) {
func formOffsetsReq(topic string, nPartitions int32, t int64) *kmsg.ListOffsetsRequest {
log.Infof("Loading offsets for topic %s t=%d...", topic, t)
pOffsets := make([]int64, nPartitions)

req := kmsg.NewPtrListOffsetsRequest()
req.ReplicaID = -1
Expand All @@ -45,37 +65,30 @@ func getOffsetsInner(client *kgo.Client, topic string, nPartitions int32, t int6
}

req.Topics = append(req.Topics, reqTopic)
return req
}

func attemptGetOffsets(client *kgo.Client, topic string, nPartitions int32, req *kmsg.ListOffsetsRequest) []OffsetResult {
pOffsets := make([]OffsetResult, nPartitions)
for i := range pOffsets {
pOffsets[i].err = errors.New("no result")
}

seenPartitions := int32(0)
shards := client.RequestSharded(context.Background(), req)
var r_err error
allFailed := kafka.EachShard(req, shards, func(shard kgo.ResponseShard) {
if shard.Err != nil {
r_err = shard.Err
return
}
kafka.EachShard(req, shards, func(shard kgo.ResponseShard) {
util.Chk(shard.Err, "kafka.EachShard called processor fn on an error result")
nvartolomei marked this conversation as resolved.
Show resolved Hide resolved
resp := shard.Resp.(*kmsg.ListOffsetsResponse)
for _, partition := range resp.Topics[0].Partitions {
if partition.ErrorCode != 0 {
log.Warnf("error fetching %s/%d metadata: %v", topic, partition.Partition, kerr.ErrorForCode(partition.ErrorCode))
r_err = kerr.ErrorForCode(partition.ErrorCode)
err := kerr.ErrorForCode(partition.ErrorCode)
pOffsets[partition.Partition].err = err
log.Warnf("error fetching %s/%d metadata: %v", topic, partition.Partition, err)
} else {
pOffsets[partition.Partition] = OffsetResult{offset: partition.Offset, err: nil}
log.Debugf("Partition %d offset %d", partition.Partition, partition.Offset)
}
pOffsets[partition.Partition] = partition.Offset
seenPartitions += 1
log.Debugf("Partition %d offset %d", partition.Partition, pOffsets[partition.Partition])
}
})

if allFailed {
return nil, errors.New("All offset requests failed")
}

if seenPartitions < nPartitions {
// The results may be partial, simply omitting some partitions while not
// raising any error. We transform this into an error to avoid wrongly
// returning a 0 offset for any missing partitions
return nil, errors.New("Didn't get data for all partitions")
}

return pOffsets, r_err
return pOffsets
}
Loading