From 237efd0e6de874c3525567ca5fe2f867b9930280 Mon Sep 17 00:00:00 2001 From: Alexey Bashtanov Date: Tue, 22 Oct 2024 10:59:02 +0100 Subject: [PATCH] consumer: combine offset map from multiple partially successful replies If the number of partitions is high and leadership transfers happen often we may be unable to get a single all-successful result for a long time. Instead, combine a full result out of tolerate partially successful ones. --- pkg/worker/verifier/client_helpers.go | 79 ++++++++++++++++----------- 1 file changed, 46 insertions(+), 33 deletions(-) diff --git a/pkg/worker/verifier/client_helpers.go b/pkg/worker/verifier/client_helpers.go index b905393..9256c11 100644 --- a/pkg/worker/verifier/client_helpers.go +++ b/pkg/worker/verifier/client_helpers.go @@ -5,6 +5,7 @@ import ( "errors" "time" + "github.com/redpanda-data/kgo-verifier/pkg/util" log "github.com/sirupsen/logrus" "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" @@ -12,26 +13,45 @@ import ( "github.com/vectorizedio/redpanda/src/go/rpk/pkg/kafka" ) +type OffsetResult struct { + offset int64 + err error +} + // Try to get offsets, with a retry loop in case any partitions are not // in a position to respond. This is useful to avoid terminating if e.g. // the cluster is subject to failure injection while workload runs. func GetOffsets(client *kgo.Client, topic string, nPartitions int32, t int64) []int64 { wait_t := 2 * time.Second + combinedResult := make([]int64, nPartitions) + haveResult := make([]bool, nPartitions) + + req := formOffsetsReq(topic, nPartitions, t) for { - result, err := getOffsetsInner(client, topic, nPartitions, t) - if err != nil { - log.Warnf("Retrying getOffsets in %v", wait_t) - time.Sleep(wait_t) - } else { - return result + result := attemptGetOffsets(client, topic, nPartitions, req) + var seenPartitions = int32(0) + for i := 0; i < int(nPartitions); i++ { + if result[i].err == nil { + // update even if seen before + combinedResult[i] = result[i].offset + haveResult[i] = true + } + if haveResult[i] { + seenPartitions += 1 + } } - + if seenPartitions == nPartitions { + return combinedResult + } + log.Warnf( + "Got offsets for %d/%d partitions, retrying attemptGetOffsets in %v", + seenPartitions, nPartitions, wait_t) + time.Sleep(wait_t) } } -func getOffsetsInner(client *kgo.Client, topic string, nPartitions int32, t int64) ([]int64, error) { +func formOffsetsReq(topic string, nPartitions int32, t int64) *kmsg.ListOffsetsRequest { log.Infof("Loading offsets for topic %s t=%d...", topic, t) - pOffsets := make([]int64, nPartitions) req := kmsg.NewPtrListOffsetsRequest() req.ReplicaID = -1 @@ -45,37 +65,30 @@ func getOffsetsInner(client *kgo.Client, topic string, nPartitions int32, t int6 } req.Topics = append(req.Topics, reqTopic) + return req +} + +func attemptGetOffsets(client *kgo.Client, topic string, nPartitions int32, req *kmsg.ListOffsetsRequest) []OffsetResult { + pOffsets := make([]OffsetResult, nPartitions) + for i := range pOffsets { + pOffsets[i].err = errors.New("no result") + } - seenPartitions := int32(0) shards := client.RequestSharded(context.Background(), req) - var r_err error - allFailed := kafka.EachShard(req, shards, func(shard kgo.ResponseShard) { - if shard.Err != nil { - r_err = shard.Err - return - } + kafka.EachShard(req, shards, func(shard kgo.ResponseShard) { + util.Chk(shard.Err, "kafka.EachShard called processor fn on an error result") resp := shard.Resp.(*kmsg.ListOffsetsResponse) for _, partition := range resp.Topics[0].Partitions { if partition.ErrorCode != 0 { - log.Warnf("error fetching %s/%d metadata: %v", topic, partition.Partition, kerr.ErrorForCode(partition.ErrorCode)) - r_err = kerr.ErrorForCode(partition.ErrorCode) + err := kerr.ErrorForCode(partition.ErrorCode) + pOffsets[partition.Partition].err = err + log.Warnf("error fetching %s/%d metadata: %v", topic, partition.Partition, err) + } else { + pOffsets[partition.Partition] = OffsetResult{offset: partition.Offset, err: nil} + log.Debugf("Partition %d offset %d", partition.Partition, partition.Offset) } - pOffsets[partition.Partition] = partition.Offset - seenPartitions += 1 - log.Debugf("Partition %d offset %d", partition.Partition, pOffsets[partition.Partition]) } }) - if allFailed { - return nil, errors.New("All offset requests failed") - } - - if seenPartitions < nPartitions { - // The results may be partial, simply omitting some partitions while not - // raising any error. We transform this into an error to avoid wrongly - // returning a 0 offset for any missing partitions - return nil, errors.New("Didn't get data for all partitions") - } - - return pOffsets, r_err + return pOffsets }