Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add prometheus receiver #177

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -522,3 +522,27 @@ receivers:
foo: bar
url: http://127.0.0.1:3100/loki/api/v1/push
```

# Prometheus

The Prometheus receiver emits event count metrics that Prometheus can scrape. Prometheus must
be configured to scrape the kubernetes-event-exporter metrics.

Resource kinds and event reasons must be specified. Metrics will be emitted for only those
resources and their associated events.

```yaml
receivers:
- name: "prometheus"
prometheus:
# Specify a prefix for all event count metrics
eventsMetricsNamePrefix: "metric_prefix_"
# Specify resource kinds and which event reasons to capture for each kind
# Only events with the given reasons for each kind will be emitted
reasonFilter:
Pod:
- "FailedMount"
- "Unhealthy"
Job:
- "DeadlineExceeded"
```
6 changes: 6 additions & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,9 @@ receivers:
deliveryStreamName: "kubernetes-events"
region: "us-east-1"
deDot: true
- name: "prometheus"
prometheus:
- "Pod"
cherylfbrown marked this conversation as resolved.
Show resolved Hide resolved
reasonFilter:
Pod:
- "FailedMount"
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/prometheus/exporter-toolkit v0.10.0
github.com/rs/zerolog v1.28.0
github.com/slack-go/slack v0.12.0
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.9.0
google.golang.org/api v0.107.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
k8s.io/api v0.26.7
Expand Down Expand Up @@ -103,6 +103,7 @@ require (
github.com/spf13/cast v1.3.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/xdg-go/scram v1.1.2
github.com/stretchr/objx v0.5.2 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/net v0.17.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
Expand All @@ -304,6 +306,8 @@ github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
Expand Down
104 changes: 104 additions & 0 deletions pkg/sinks/prometheus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package sinks

import (
"context"
"strings"

"k8s.io/utils/strings/slices"

"github.com/prometheus/client_golang/prometheus"
"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
"github.com/rs/zerolog/log"
)

func newGaugeVec(opts prometheus.GaugeOpts, labelNames []string) *prometheus.GaugeVec {
v := prometheus.NewGaugeVec(opts, labelNames)
prometheus.MustRegister(v)
return v
}

type PrometheusConfig struct {
EventsMetricsNamePrefix string `yaml:"eventsMetricsNamePrefix"`
ReasonFilter map[string][]string `yaml:"reasonFilter"`
}

type PrometheusGaugeVec interface {
With(labels prometheus.Labels) prometheus.Gauge
Delete(labels prometheus.Labels) bool
}

type PrometheusSink struct {
cfg *PrometheusConfig
kinds []string
metricsByKind map[string]PrometheusGaugeVec
}

func NewPrometheusSink(config *PrometheusConfig) (Sink, error) {
if config.EventsMetricsNamePrefix == "" {
config.EventsMetricsNamePrefix = "event_exporter_"
}

metricsByKind := map[string]PrometheusGaugeVec{}

log.Info().Msgf("Initializing new Prometheus sink...")
kinds := []string{}
for kind := range config.ReasonFilter {
kinds = append(kinds, kind)
metricName := config.EventsMetricsNamePrefix + strings.ToLower(kind) + "_event_count"
metricLabels := []string{strings.ToLower(kind), "namespace", "reason"}
metricsByKind[kind] = newGaugeVec(
prometheus.GaugeOpts{
Name: metricName,
Help: "Event counts for " + kind + " resources.",
}, metricLabels)

log.Info().Msgf("Created metric: %s, will emit events: %v with additional labels: %v", kind, config.ReasonFilter[kind], metricLabels)
}

return &PrometheusSink{
cfg: config,
kinds: kinds,
metricsByKind: metricsByKind,
}, nil
}

func (o *PrometheusSink) Send(ctx context.Context, ev *kube.EnhancedEvent) error {
cherylfbrown marked this conversation as resolved.
Show resolved Hide resolved
kind := ev.InvolvedObject.Kind
if slices.Contains(o.kinds, kind) {
for _, reason := range o.cfg.ReasonFilter[kind] {
cherylfbrown marked this conversation as resolved.
Show resolved Hide resolved
if ev.Reason == reason {
SetEventCount(o.metricsByKind[kind], ev.InvolvedObject, reason, ev.Count)
} else {
DeleteEventCount(o.metricsByKind[kind], ev.InvolvedObject, reason)
}
}
}

return nil
}

func (o *PrometheusSink) Close() {
// No-op
}

func getMetricLabels(obj kube.EnhancedObjectReference, reason string) prometheus.Labels {
prometheusLabels := prometheus.Labels{
strings.ToLower(obj.Kind): obj.Name,
"namespace": obj.Namespace,
"reason": reason,
}

return prometheusLabels
}

func SetEventCount(metric PrometheusGaugeVec, obj kube.EnhancedObjectReference, reason string, count int32) {
labels := getMetricLabels(obj, reason)
log.Info().Msgf("Setting event count metric with labels: %v", labels)
metric.With(labels).Set(float64(count))
}

func DeleteEventCount(metric PrometheusGaugeVec, obj kube.EnhancedObjectReference, reason string) {
labels := getMetricLabels(obj, reason)
log.Info().Msgf("Deleting event count metric with labels: %v", labels)
metric.Delete(labels)
}
137 changes: 137 additions & 0 deletions pkg/sinks/prometheus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package sinks

import (
"context"
"strings"
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
"github.com/stretchr/testify/mock"
)

type mockGauge struct {
mock.Mock
prometheus.Gauge
}

func (m *mockGauge) Set(count float64) {
m.Called(count)
}

type mockGuageVec struct {
mock.Mock
*prometheus.GaugeVec
}

func (v *mockGuageVec) With(labels prometheus.Labels) prometheus.Gauge {
withArgs := v.Called(labels)
return withArgs.Get(0).(prometheus.Gauge)
}

func (v *mockGuageVec) Delete(labels prometheus.Labels) bool {
deleteArgs := v.Called(labels)
return deleteArgs.Get(0).(bool)
}

func mockEvent(kind string, name string, namespace string, reason string, count int32) *kube.EnhancedEvent {
ev := &kube.EnhancedEvent{}
ev.Reason = reason
ev.Count = count
ev.InvolvedObject.Kind = kind
ev.InvolvedObject.Name = name
ev.InvolvedObject.Namespace = namespace

return ev
}

func TestPrometheusSink_Send(t *testing.T) {
configKind := "Pod"
configReason := "Starting"
testEvent := mockEvent("Pod", "testpod", "testnamespace", "Starting", 1)

tests := []struct {
name string
configKind string
configReason string
ev *kube.EnhancedEvent
wantPrometheusLabels prometheus.Labels
wantErr bool
wantSetCalled bool
wantDeleteCalled bool
}{
{
name: "emits desired resource event",
configKind: configKind,
configReason: configReason,
ev: testEvent,
wantPrometheusLabels: prometheus.Labels{
strings.ToLower(configKind): testEvent.InvolvedObject.Name,
"namespace": testEvent.InvolvedObject.Namespace,
"reason": configReason,
},
wantErr: false,
wantSetCalled: true,
wantDeleteCalled: false,
},
{
name: "deletes desired resource event",
configKind: configKind,
configReason: "Creating",
ev: testEvent,
wantPrometheusLabels: prometheus.Labels{
strings.ToLower(configKind): testEvent.InvolvedObject.Name,
"namespace": testEvent.InvolvedObject.Namespace,
"reason": "Creating",
},
wantErr: false,
wantSetCalled: false,
wantDeleteCalled: true,
},
{
name: "does nothing if kind is not expected",
configKind: "ReplicaSet",
configReason: "SuccessfulCreate",
ev: testEvent,
wantPrometheusLabels: prometheus.Labels{},
wantErr: false,
wantSetCalled: false,
wantDeleteCalled: false,
},
}
for _, tt := range tests {
mockGauge := &mockGauge{}
mockGauge.On("Set", mock.Anything).Return()
mockPodMetric := &mockGuageVec{}
mockPodMetric.On("With", mock.Anything).Return(mockGauge)
mockPodMetric.On("Delete", mock.Anything).Return(true)

t.Run(tt.name, func(t *testing.T) {
o := &PrometheusSink{
cfg: &PrometheusConfig{
EventsMetricsNamePrefix: "test_prefix_",
ReasonFilter: map[string][]string{tt.configKind: {tt.configReason}},
},
kinds: []string{tt.configKind},
metricsByKind: map[string]PrometheusGaugeVec{tt.configKind: mockPodMetric},
}
if err := o.Send(context.TODO(), tt.ev); (err != nil) != tt.wantErr {
t.Errorf("PrometheusSink.Send() error = %v, wantErr %v", err, tt.wantErr)
}

if tt.wantSetCalled {
mockPodMetric.AssertCalled(t, "With", tt.wantPrometheusLabels)
mockGauge.AssertCalled(t, "Set", float64(1))
} else {
mockPodMetric.AssertNotCalled(t, "With")
mockGauge.AssertNotCalled(t, "Set")
}

if tt.wantDeleteCalled {
mockPodMetric.AssertCalled(t, "Delete", tt.wantPrometheusLabels)
} else {
mockPodMetric.AssertNotCalled(t, "Delete")
}
})
}
}
5 changes: 5 additions & 0 deletions pkg/sinks/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ReceiverConfig struct {
BigQuery *BigQueryConfig `yaml:"bigquery"`
EventBridge *EventBridgeConfig `yaml:"eventbridge"`
Pipe *PipeConfig `yaml:"pipe"`
Prometheus *PrometheusConfig `yaml:"prometheus"`
}

func (r *ReceiverConfig) Validate() error {
Expand Down Expand Up @@ -122,5 +123,9 @@ func (r *ReceiverConfig) GetSink() (Sink, error) {
return NewLoki(r.Loki)
}

if r.Prometheus != nil {
return NewPrometheusSink(r.Prometheus)
}

return nil, errors.New("unknown sink")
}