Skip to content

Commit

Permalink
chore: feedback produce consume (#1638)
Browse files Browse the repository at this point in the history
* fix: add feedback to produce consume

* fix: print partition for consumptions

* fix: doc fix

* fix: add partition when consuming from all partitions

* fix: partition as key
  • Loading branch information
wtrocki authored Jul 12, 2022
1 parent b6a8603 commit 2bd3dfd
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 10 deletions.
2 changes: 1 addition & 1 deletion docs/commands/rhoas_kafka_topic_consume.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 15 additions & 9 deletions pkg/cmd/kafka/topic/consume/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (

kafkaflagutil "github.com/redhat-developer/app-services-cli/pkg/cmd/kafka/flagutil"

"strings"

"github.com/redhat-developer/app-services-cli/pkg/cmd/kafka/kafkacmdutil"
"github.com/redhat-developer/app-services-cli/pkg/core/cmdutil/flagutil"
"github.com/redhat-developer/app-services-cli/pkg/core/ioutil/dump"
Expand All @@ -26,6 +24,7 @@ const (
DefaultOffset = ""
DefaultLimit = 20
DefaultTimestamp = ""
DefaultPartition = -1
FormatKeyValue = "key-value"
)

Expand Down Expand Up @@ -84,7 +83,7 @@ func NewConsumeTopicCommand(f *factory.Factory) *cobra.Command {
flags := kafkaflagutil.NewFlagSet(cmd, f.Localizer)

flags.StringVar(&opts.topicName, "name", "", f.Localizer.MustLocalize("kafka.topic.common.flag.name.description"))
flags.Int32Var(&opts.partition, "partition", 0, f.Localizer.MustLocalize("kafka.topic.consume.flag.partition.description"))
flags.Int32Var(&opts.partition, "partition", DefaultPartition, f.Localizer.MustLocalize("kafka.topic.consume.flag.partition.description"))
flags.StringVar(&opts.date, "from-date", DefaultTimestamp, f.Localizer.MustLocalize("kafka.topic.consume.flag.date.description"))
flags.StringVar(&opts.timestamp, "from-timestamp", DefaultTimestamp, f.Localizer.MustLocalize("kafka.topic.consume.flag.timestamp.description"))
flags.BoolVar(&opts.wait, "wait", false, f.Localizer.MustLocalize("kafka.topic.consume.flag.wait.description"))
Expand Down Expand Up @@ -191,8 +190,11 @@ func consumeAndWait(opts *options, api *kafkainstanceclient.APIClient, kafkaInst

func consume(opts *options, api *kafkainstanceclient.APIClient, kafkaInstance *kafkamgmtclient.KafkaRequest) (*kafkainstanceclient.RecordList, error) {

request := api.RecordsApi.ConsumeRecords(opts.f.Context, opts.topicName).Limit(opts.limit).Partition(opts.partition)

request := api.RecordsApi.ConsumeRecords(opts.f.Context, opts.topicName).Limit(opts.limit)
if opts.partition != DefaultPartition {
opts.f.Logger.Info(opts.f.Localizer.MustLocalize("kafka.topic.consume.partition.value", localize.NewEntry("Partition", opts.partition)))
request = request.Partition(opts.partition)
}
if opts.offset != DefaultOffset {
intOffset, err := strconv.ParseInt(opts.offset, 10, 64)
if err != nil {
Expand Down Expand Up @@ -284,6 +286,10 @@ func outputRecords(opts *options, records *kafkainstanceclient.RecordList) {
} else {
opts.f.Logger.Info(fmt.Sprintf("Key: %v\nMessage: %v", row.Key, row.Value))
}
opts.f.Logger.Info(fmt.Sprintf("Offset: %v", row.Offset))
if opts.partition == DefaultPartition {
opts.f.Logger.Info(fmt.Sprintf("Partition: %v", row.Partition))
}
} else {
_ = dump.Formatted(opts.f.IOStreams.Out, format, row)
opts.f.Logger.Info("")
Expand All @@ -299,10 +305,10 @@ func mapRecordsToRows(topic string, records *[]kafkainstanceclient.Record) []kaf
record := &(*records)[i]
row := kafkaRow{
Topic: topic,
Key: *record.Key,
Value: strings.TrimSuffix(record.Value, "\n"), // trailing new line gives weird printing of table
Partition: *record.Partition,
Offset: *record.Offset,
Key: record.GetKey(),
Value: record.Value,
Partition: record.GetPartition(),
Offset: record.GetOffset(),
}

rows[i] = row
Expand Down
5 changes: 5 additions & 0 deletions pkg/core/localize/locales/en/cmd/kafka.en.toml
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,11 @@ one = 'Consume only messages with a timestamp later than the specified value (re
[kafka.topic.consume.flag.wait.description]
one = 'Wait for messages to consume from topic'

[kafka.topic.consume.partition.value]
one = 'Consuming messages from partition {{.Partition}}'



[kafka.topic.consume.flag.offset.description]
one = 'Consume messages from an offset equal to or greater than the specified value'

Expand Down

0 comments on commit 2bd3dfd

Please sign in to comment.