diff --git a/docs/commands/rhoas_kafka_topic_consume.md b/docs/commands/rhoas_kafka_topic_consume.md index bf23d4887..02114a281 100644 --- a/docs/commands/rhoas_kafka_topic_consume.md +++ b/docs/commands/rhoas_kafka_topic_consume.md @@ -53,7 +53,7 @@ $ rhoas kafka topic consume --name=topic-1 --format=json | jq -rc .value --limit int32 Maximum number of messages to consume from topic (default 20) --name string Topic name --offset string Consume messages from an offset equal to or greater than the specified value - --partition int32 Consume messages from specified partition (value must be a positive integer) + --partition int32 Consume messages from specified partition (value must be a positive integer) (default -1) --wait Wait for messages to consume from topic ``` diff --git a/pkg/cmd/kafka/topic/consume/consume.go b/pkg/cmd/kafka/topic/consume/consume.go index 60d3e109d..edd1351fb 100644 --- a/pkg/cmd/kafka/topic/consume/consume.go +++ b/pkg/cmd/kafka/topic/consume/consume.go @@ -8,8 +8,6 @@ import ( kafkaflagutil "github.com/redhat-developer/app-services-cli/pkg/cmd/kafka/flagutil" - "strings" - "github.com/redhat-developer/app-services-cli/pkg/cmd/kafka/kafkacmdutil" "github.com/redhat-developer/app-services-cli/pkg/core/cmdutil/flagutil" "github.com/redhat-developer/app-services-cli/pkg/core/ioutil/dump" @@ -26,6 +24,7 @@ const ( DefaultOffset = "" DefaultLimit = 20 DefaultTimestamp = "" + DefaultPartition = -1 FormatKeyValue = "key-value" ) @@ -84,7 +83,7 @@ func NewConsumeTopicCommand(f *factory.Factory) *cobra.Command { flags := kafkaflagutil.NewFlagSet(cmd, f.Localizer) flags.StringVar(&opts.topicName, "name", "", f.Localizer.MustLocalize("kafka.topic.common.flag.name.description")) - flags.Int32Var(&opts.partition, "partition", 0, f.Localizer.MustLocalize("kafka.topic.consume.flag.partition.description")) + flags.Int32Var(&opts.partition, "partition", DefaultPartition, f.Localizer.MustLocalize("kafka.topic.consume.flag.partition.description")) flags.StringVar(&opts.date, "from-date", DefaultTimestamp, f.Localizer.MustLocalize("kafka.topic.consume.flag.date.description")) flags.StringVar(&opts.timestamp, "from-timestamp", DefaultTimestamp, f.Localizer.MustLocalize("kafka.topic.consume.flag.timestamp.description")) flags.BoolVar(&opts.wait, "wait", false, f.Localizer.MustLocalize("kafka.topic.consume.flag.wait.description")) @@ -191,8 +190,11 @@ func consumeAndWait(opts *options, api *kafkainstanceclient.APIClient, kafkaInst func consume(opts *options, api *kafkainstanceclient.APIClient, kafkaInstance *kafkamgmtclient.KafkaRequest) (*kafkainstanceclient.RecordList, error) { - request := api.RecordsApi.ConsumeRecords(opts.f.Context, opts.topicName).Limit(opts.limit).Partition(opts.partition) - + request := api.RecordsApi.ConsumeRecords(opts.f.Context, opts.topicName).Limit(opts.limit) + if opts.partition != DefaultPartition { + opts.f.Logger.Info(opts.f.Localizer.MustLocalize("kafka.topic.consume.partition.value", localize.NewEntry("Partition", opts.partition))) + request = request.Partition(opts.partition) + } if opts.offset != DefaultOffset { intOffset, err := strconv.ParseInt(opts.offset, 10, 64) if err != nil { @@ -284,6 +286,10 @@ func outputRecords(opts *options, records *kafkainstanceclient.RecordList) { } else { opts.f.Logger.Info(fmt.Sprintf("Key: %v\nMessage: %v", row.Key, row.Value)) } + opts.f.Logger.Info(fmt.Sprintf("Offset: %v", row.Offset)) + if opts.partition == DefaultPartition { + opts.f.Logger.Info(fmt.Sprintf("Partition: %v", row.Partition)) + } } else { _ = dump.Formatted(opts.f.IOStreams.Out, format, row) opts.f.Logger.Info("") @@ -299,10 +305,10 @@ func mapRecordsToRows(topic string, records *[]kafkainstanceclient.Record) []kaf record := &(*records)[i] row := kafkaRow{ Topic: topic, - Key: *record.Key, - Value: strings.TrimSuffix(record.Value, "\n"), // trailing new line gives weird printing of table - Partition: *record.Partition, - Offset: *record.Offset, + Key: record.GetKey(), + Value: record.Value, + Partition: record.GetPartition(), + Offset: record.GetOffset(), } rows[i] = row diff --git a/pkg/core/localize/locales/en/cmd/kafka.en.toml b/pkg/core/localize/locales/en/cmd/kafka.en.toml index 8e2ea1c9b..b835ae182 100644 --- a/pkg/core/localize/locales/en/cmd/kafka.en.toml +++ b/pkg/core/localize/locales/en/cmd/kafka.en.toml @@ -637,6 +637,11 @@ one = 'Consume only messages with a timestamp later than the specified value (re [kafka.topic.consume.flag.wait.description] one = 'Wait for messages to consume from topic' +[kafka.topic.consume.partition.value] +one = 'Consuming messages from partition {{.Partition}}' + + + [kafka.topic.consume.flag.offset.description] one = 'Consume messages from an offset equal to or greater than the specified value'