From c5071307f1a291aead18c0de076dfdabb8c412f4 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Wed, 10 Apr 2024 11:26:25 +0100 Subject: [PATCH 1/3] repeater: remove redundant consumer group option This is set few lines before. Setting an empty string is no-op. No need to check if it non-empty. --- pkg/worker/repeater/repeater_worker.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/worker/repeater/repeater_worker.go b/pkg/worker/repeater/repeater_worker.go index 333a2ab..165065e 100644 --- a/pkg/worker/repeater/repeater_worker.go +++ b/pkg/worker/repeater/repeater_worker.go @@ -276,10 +276,6 @@ func (v *Worker) Init() { kgo.ConsumerGroup(v.config.Group), }...) - if v.config.Group != "" { - opts = append(opts, kgo.ConsumerGroup(v.config.Group)) - } - client, err := kgo.NewClient(opts...) util.Chk(err, "unable to initialize client: %v", err) v.client = client From d55c9b974092cfc1655712c53fcd720db7b2fb56 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Wed, 10 Apr 2024 11:28:54 +0100 Subject: [PATCH 2/3] repeater: remove dead code Leftovers from previous refactor /shrug --- cmd/kgo-repeater/main.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/cmd/kgo-repeater/main.go b/cmd/kgo-repeater/main.go index d095f5c..5f55b8c 100644 --- a/cmd/kgo-repeater/main.go +++ b/cmd/kgo-repeater/main.go @@ -115,13 +115,6 @@ func main() { topicsList = strings.Split(*topic, ",") } - wConfig := worker.NewWorkerConfig( - "kgo", *brokers, *trace, topicsList[0], *linger, *maxBufferedRecords, *useTransactions, *compressionType, *compressiblePayload, *username, *password, *enableTls) - opts := wConfig.MakeKgoOpts() - opts = append(opts, []kgo.Opt{ - kgo.ProducerBatchMaxBytes(1024 * 1024), - }...) - dataInFlightPerWorker := (*initialDataMb * 1024 * 1024) / uint64(*workers) if dataInFlightPerWorker / *payloadSize <= 0 { From 96366a178a31fb9f498efbea7f2336b1d5132837 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Wed, 10 Apr 2024 11:30:10 +0100 Subject: [PATCH 3/3] repeater: panic if group is not set Repeater does not work without a consumer group. It will start but not do any work. --- cmd/kgo-repeater/main.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cmd/kgo-repeater/main.go b/cmd/kgo-repeater/main.go index 5f55b8c..071f795 100644 --- a/cmd/kgo-repeater/main.go +++ b/cmd/kgo-repeater/main.go @@ -115,6 +115,10 @@ func main() { topicsList = strings.Split(*topic, ",") } + if *group == "" { + panic("A consumer group must be provided via the -group flag") + } + dataInFlightPerWorker := (*initialDataMb * 1024 * 1024) / uint64(*workers) if dataInFlightPerWorker / *payloadSize <= 0 {