Skip to content

Commit

Permalink
metrics format changed
Browse files Browse the repository at this point in the history
  • Loading branch information
gmbyapa committed Jul 9, 2019
1 parent af00cad commit fc25c0b
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 15 deletions.
18 changes: 13 additions & 5 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type Consumer interface {
Rebalanced() <-chan Allocation
Partitions(tps []string, handler ReBalanceHandler) (chan Partition, error)
Errors() <-chan *Error
//PartitionConsumer() (PartitionConsumer, error)
Commit() error
Mark(record *Record)
CommitPartition(topic string, partition int32, offset int64) error
Expand Down Expand Up @@ -98,7 +97,7 @@ func NewConsumer(config *Config) (Consumer, error) {
stopping: make(chan bool, 1),
stopped: make(chan bool, 1),
reBalanceNotified: sync.NewCond(new(sync.Mutex)),
partitionMap: newPartitionMap(config.MetricsReporter, config.Logger),
partitionMap: newPartitionMap(config.GroupId, config.MetricsReporter, config.Logger),
}

pMeta, err := newPartitionMeta(config, config.Logger)
Expand All @@ -117,9 +116,18 @@ func NewConsumer(config *Config) (Consumer, error) {
c.context.ctx = ctx
c.context.cancel = cancel

c.metrics.commitLatency = config.MetricsReporter.Observer(`k_stream_consumer_commit_latency_microseconds`, nil)
c.metrics.reBalanceLatency = config.MetricsReporter.Observer(`k_stream_consumer_re_balance_latency_microseconds`, nil)
c.metrics.reBalancing = config.MetricsReporter.Gauge(`k_stream_consumer_rebalancing`, nil)
c.metrics.commitLatency = config.MetricsReporter.Observer(metrics.MetricConf{
Path: `k_stream_consumer_commit_latency_microseconds`,
ConstLabels: map[string]string{`group`: c.config.GroupId},
})
c.metrics.reBalanceLatency = config.MetricsReporter.Observer(metrics.MetricConf{
Path: `k_stream_consumer_re_balance_latency_microseconds`,
ConstLabels: map[string]string{`group`: c.config.GroupId},
})
c.metrics.reBalancing = config.MetricsReporter.Gauge(metrics.MetricConf{
Path: `k_stream_consumer_rebalancing`,
ConstLabels: map[string]string{`group`: c.config.GroupId},
})

return c, nil
}
Expand Down
15 changes: 12 additions & 3 deletions consumer/partition-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,18 @@ func NewPartitionConsumer(c *PartitionConsumerConfig) (PartitionConsumer, error)
}

labels := []string{`topic`, `partition`}
pc.metrics.consumerBuffer = c.MetricsReporter.Gauge(`k_stream_partition_consumer_buffer`, append(labels, []string{`type`}...))
pc.metrics.consumerBufferMax = c.MetricsReporter.Gauge(`k_stream_partition_consumer_buffer_max`, append(labels, []string{`type`}...))
pc.metrics.endToEndLatency = c.MetricsReporter.Observer(`k_stream_partition_consumer_end_to_end_latency_microseconds`, labels)
pc.metrics.consumerBuffer = c.MetricsReporter.Gauge(metrics.MetricConf{
Path: `k_stream_partition_consumer_buffer`,
Labels: append(labels, []string{`type`}...),
})
pc.metrics.consumerBufferMax = c.MetricsReporter.Gauge(metrics.MetricConf{
Path: `k_stream_partition_consumer_buffer_max`,
Labels: append(labels, []string{`type`}...),
})
pc.metrics.endToEndLatency = c.MetricsReporter.Observer(metrics.MetricConf{
Path: `k_stream_partition_consumer_end_to_end_latency_microseconds`,
Labels: labels,
})

return pc, nil
}
Expand Down
1 change: 0 additions & 1 deletion consumer/partition-metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func (m *groupOffsetMeta) Wait(tps []TopicPartition, consumerHost string) error
return requestPartitionMeta(meta, m.logger)
}


func (m *groupOffsetMeta) fetchMeta(tps []TopicPartition, consumerHost string) (map[string][]TopicPartition, error) {

saramaTps := make(map[string][]int32)
Expand Down
20 changes: 16 additions & 4 deletions consumer/partition_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@ import (
)

type PartitionMap struct {
id string
partitions *sync.Map
metricsReporter metrics.Reporter
logger logger.Logger
partitionsBuffer chan Partition
wg *sync.WaitGroup
}

func newPartitionMap(metricsReporter metrics.Reporter, logger logger.Logger) *PartitionMap {
func newPartitionMap(group string, metricsReporter metrics.Reporter, logger logger.Logger) *PartitionMap {
return &PartitionMap{
id: group,
partitions: new(sync.Map),
metricsReporter: metricsReporter,
logger: logger,
Expand All @@ -47,9 +49,19 @@ func (m *PartitionMap) partition(tp TopicPartition, saramaPartition sarama.Parti
stopped: make(chan bool, 1),
done: make(chan bool, 1),
}
p.metrics.consumerBuffer = m.metricsReporter.Gauge(`k_stream_consumer_buffer`, nil)
p.metrics.consumerBufferMax = m.metricsReporter.Gauge(`k_stream_consumer_buffer_max`, nil)
p.metrics.endToEndLatency = m.metricsReporter.Observer(`k_stream_consumer_end_to_end_latency_microseconds`, []string{`topic`, `partition`})
p.metrics.consumerBuffer = m.metricsReporter.Gauge(metrics.MetricConf{
Path: `k_stream_consumer_buffer`,
ConstLabels: map[string]string{`group`: m.id},
})
p.metrics.consumerBufferMax = m.metricsReporter.Gauge(metrics.MetricConf{
Path: `k_stream_consumer_buffer_max`,
ConstLabels: map[string]string{`group`: m.id},
})
p.metrics.endToEndLatency = m.metricsReporter.Observer(metrics.MetricConf{
Path: `k_stream_consumer_end_to_end_latency_microseconds`,
ConstLabels: map[string]string{`group`: m.id},
Labels: []string{`topic`, `partition`},
})
m.partitions.Store(tp.String(), p)
m.partitionsBuffer <- p
} else {
Expand Down
12 changes: 10 additions & 2 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,16 @@ func NewProducer(configs *Config) (Producer, error) {
saramaProducer: prd,
logger: configs.Logger,
metrics: &metricsReporter{
produceLatency: configs.MetricsReporter.Observer(`k_stream_producer_produced_latency_microseconds`, labels),
batchProduceLatency: configs.MetricsReporter.Observer(`k_stream_producer_batch_produced_latency_microseconds`, append(labels, `size`)),
produceLatency: configs.MetricsReporter.Observer(metrics.MetricConf{
Path: `k_stream_producer_produced_latency_microseconds`,
Labels: labels,
ConstLabels: map[string]string{`producer_id`: configs.Id},
}),
batchProduceLatency: configs.MetricsReporter.Observer(metrics.MetricConf{
Path: `k_stream_producer_batch_produced_latency_microseconds`,
Labels: append(labels, `size`),
ConstLabels: map[string]string{`producer_id`: configs.Id},
}),
},
}, nil
}
Expand Down

0 comments on commit fc25c0b

Please sign in to comment.