Skip to content

Commit

Permalink
feat(meshmetrics): profiles implementation (#9624)
Browse files Browse the repository at this point in the history
* feat(meshmetric): implement metrics profiles

Added interface over metrics mutators

Signed-off-by: slonka <[email protected]>

* feat(meshmetric): add first test

Signed-off-by: slonka <[email protected]>

* feat(meshmetric): add contains match type

Signed-off-by: slonka <[email protected]>

* feat(meshmetric): make check pass

Signed-off-by: slonka <[email protected]>

* feat(meshmetric): rename test case

Signed-off-by: slonka <[email protected]>

* feat(meshmetric): update golden files

Signed-off-by: slonka <[email protected]>

* feat(meshmetric): rewrite policy to yaml

Signed-off-by: slonka <[email protected]>

* feat(meshmetric): add tests for None and All profiles

Signed-off-by: slonka <[email protected]>

* feat(meshmetric): add tests for include

Signed-off-by: slonka <[email protected]>

* feat(meshmetric): add tests for exclude

Signed-off-by: slonka <[email protected]>

* feat(meshmetric): need to figure out why the mutator is not getting inside the metric family loop

Signed-off-by: slonka <[email protected]>

* feat(meshmetric): start rewriting otel mutator

Signed-off-by: slonka <[email protected]>

* feat(meshmetric): otel mutator compiles

Signed-off-by: slonka <[email protected]>

* feat(meshmetric): remove logs

Signed-off-by: slonka <[email protected]>

* feat(meshmetric): adjust example to make e2e pass

Signed-off-by: slonka <[email protected]>

* feat(meshmetric): make format work

Signed-off-by: slonka <[email protected]>

* feat(meshmetric): adjust one more log line

Signed-off-by: slonka <[email protected]>

* feat(meshmetric): remove profiles mutator since its not called for old flow

Signed-off-by: slonka <[email protected]>

* feat(meshmetric): rename function

Signed-off-by: slonka <[email protected]>

---------

Signed-off-by: slonka <[email protected]>
  • Loading branch information
slonka authored Mar 27, 2024
1 parent 084554e commit 175abec
Show file tree
Hide file tree
Showing 43 changed files with 886 additions and 97 deletions.
30 changes: 15 additions & 15 deletions app/kuma-dp/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,26 +303,26 @@ func getApplicationsToScrape(kumaSidecarConfiguration *types.KumaSidecarConfigur
if kumaSidecarConfiguration != nil {
for _, item := range kumaSidecarConfiguration.Metrics.Aggregate {
applicationsToScrape = append(applicationsToScrape, metrics.ApplicationToScrape{
Address: item.Address,
Name: item.Name,
Path: item.Path,
Port: item.Port,
IsIPv6: net.IsAddressIPv6(item.Address),
QueryModifier: metrics.RemoveQueryParameters,
OtelMutator: metrics.ParsePrometheusMetrics,
Address: item.Address,
Name: item.Name,
Path: item.Path,
Port: item.Port,
IsIPv6: net.IsAddressIPv6(item.Address),
QueryModifier: metrics.RemoveQueryParameters,
MeshMetricMutator: metrics.AggregatedOtelMutator(),
})
}
}
// by default add envoy configuration
applicationsToScrape = append(applicationsToScrape, metrics.ApplicationToScrape{
Name: "envoy",
Path: "/stats",
Address: "127.0.0.1",
Port: envoyAdminPort,
IsIPv6: false,
QueryModifier: metrics.AddPrometheusFormat,
Mutator: metrics.MergeClusters,
OtelMutator: metrics.MergeClustersForOpenTelemetry,
Name: "envoy",
Path: "/stats",
Address: "127.0.0.1",
Port: envoyAdminPort,
IsIPv6: false,
QueryModifier: metrics.AddPrometheusFormat,
Mutator: metrics.AggregatedMetricsMutator(metrics.MergeClustersForPrometheus),
MeshMetricMutator: metrics.AggregatedOtelMutator(metrics.MergeClustersForOpenTelemetry),
})
return applicationsToScrape
}
Expand Down
30 changes: 15 additions & 15 deletions app/kuma-dp/pkg/dataplane/meshmetrics/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,25 +262,25 @@ func (cf *ConfigFetcher) mapApplicationToApplicationToScrape(applications []xds.
address = application.Address
}
applicationsToScrape = append(applicationsToScrape, metrics.ApplicationToScrape{
Name: pointer.Deref(application.Name),
Address: address,
Path: application.Path,
Port: application.Port,
IsIPv6: utilnet.IsAddressIPv6(address),
QueryModifier: metrics.RemoveQueryParameters,
OtelMutator: metrics.ParsePrometheusMetrics,
Name: pointer.Deref(application.Name),
Address: address,
Path: application.Path,
Port: application.Port,
IsIPv6: utilnet.IsAddressIPv6(address),
QueryModifier: metrics.RemoveQueryParameters,
MeshMetricMutator: metrics.AggregatedOtelMutator(),
})
}

applicationsToScrape = append(applicationsToScrape, metrics.ApplicationToScrape{
Name: "envoy",
Path: "/stats",
Address: cf.envoyAdminAddress,
Port: cf.envoyAdminPort,
IsIPv6: false,
QueryModifier: metrics.AggregatedQueryParametersModifier(metrics.AddPrometheusFormat, metrics.AddSidecarParameters(sidecar)),
Mutator: metrics.MergeClusters,
OtelMutator: metrics.MergeClustersForOpenTelemetry,
Name: "envoy",
Path: "/stats",
Address: cf.envoyAdminAddress,
Port: cf.envoyAdminPort,
IsIPv6: false,
QueryModifier: metrics.AggregatedQueryParametersModifier(metrics.AddPrometheusFormat, metrics.AddSidecarParameters(sidecar)),
Mutator: metrics.AggregatedMetricsMutator(metrics.MergeClustersForPrometheus),
MeshMetricMutator: metrics.AggregatedOtelMutator(metrics.ProfileMutatorGenerator(sidecar), metrics.MergeClustersForOpenTelemetry),
})

return applicationsToScrape
Expand Down
37 changes: 6 additions & 31 deletions app/kuma-dp/pkg/dataplane/metrics/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package metrics

import (
"fmt"
"io"
"regexp"
"sort"
"strings"

"github.com/pkg/errors"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"

"github.com/kumahq/kuma/pkg/xds/bootstrap"
)
Expand All @@ -21,14 +19,8 @@ const (

const MeshTrafficLabelName = "kuma_io_mesh_traffic"

func MergeClusters(in io.Reader, out io.Writer) error {
var parser expfmt.TextParser
metricFamilies, err := parser.TextToMetricFamilies(in)
if err != nil {
return err
}

for _, metricFamily := range metricFamilies {
func MergeClustersForPrometheus(in map[string]*io_prometheus_client.MetricFamily) error {
for _, metricFamily := range in {
switch {
case isClusterMetricFamily(metricFamily):
if err := handleClusterMetric(metricFamily); err != nil {
Expand All @@ -39,42 +31,25 @@ func MergeClusters(in io.Reader, out io.Writer) error {
return err
}
}

if _, err := expfmt.MetricFamilyToText(out, metricFamily); err != nil {
return err
}
if _, err := out.Write([]byte("\n")); err != nil {
return err
}
}

return nil
}

func MergeClustersForOpenTelemetry(in io.Reader) ([]*io_prometheus_client.MetricFamily, error) {
var parser expfmt.TextParser
metricFamilies, err := parser.TextToMetricFamilies(in)
if err != nil {
return nil, err
}

var metrics []*io_prometheus_client.MetricFamily
func MergeClustersForOpenTelemetry(metricFamilies map[string]*io_prometheus_client.MetricFamily) error {
for _, metricFamily := range metricFamilies {
switch {
case isClusterMetricFamily(metricFamily):
if err := handleClusterMetric(metricFamily); err != nil {
return nil, err
return err
}
case isHttpMetricFamily(metricFamily):
if err := handleHttpMetricFamily(metricFamily); err != nil {
return nil, err
return err
}
}

metrics = append(metrics, metricFamily)
}

return metrics, nil
return nil
}

func handleClusterMetric(metricFamily *io_prometheus_client.MetricFamily) error {
Expand Down
2 changes: 1 addition & 1 deletion app/kuma-dp/pkg/dataplane/metrics/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var _ = Describe("Merge", func() {
Expect(err).ToNot(HaveOccurred())

actual := new(bytes.Buffer)
err = MergeClusters(input, actual)
err = AggregatedMetricsMutator(MergeClustersForPrometheus)(input, actual)
Expect(err).ToNot(HaveOccurred())

Expect(toLines(actual)).To(ConsistOf(toLines(expected)))
Expand Down
2 changes: 1 addition & 1 deletion app/kuma-dp/pkg/dataplane/metrics/metrics_format_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/kumahq/kuma/pkg/util/pointer"
)

func FromPrometheusMetrics(appMetrics []*io_prometheus_client.MetricFamily, mesh string, dataplane string, service string) []metricdata.Metrics {
func FromPrometheusMetrics(appMetrics map[string]*io_prometheus_client.MetricFamily, mesh string, dataplane string, service string) []metricdata.Metrics {
extraAttributes := extraAttributesFrom(mesh, dataplane, service)

var openTelemetryMetrics []metricdata.Metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var _ = Describe("Metrics format mapper", func() {
Expect(err).ToNot(HaveOccurred())

// when
metrics, err := ParsePrometheusMetrics(input)
metrics, err := AggregatedOtelMutator()(input)
Expect(err).ToNot(HaveOccurred())
openTelemetryMetrics := FromPrometheusMetrics(metrics, "default", "dpp-1", "test-service")

Expand Down
61 changes: 61 additions & 0 deletions app/kuma-dp/pkg/dataplane/metrics/metrics_mutator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package metrics

import (
"io"

io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
)

type (
MetricsMutator func(in io.Reader, out io.Writer) error
PrometheusMutator func(in map[string]*io_prometheus_client.MetricFamily) error
MeshMetricMutator func(in io.Reader) (map[string]*io_prometheus_client.MetricFamily, error)
)

func AggregatedOtelMutator(metricsMutators ...PrometheusMutator) MeshMetricMutator {
return func(in io.Reader) (map[string]*io_prometheus_client.MetricFamily, error) {
var parser expfmt.TextParser
metricFamilies, err := parser.TextToMetricFamilies(in)
if err != nil {
return nil, err
}

for _, m := range metricsMutators {
err := m(metricFamilies)
if err != nil {
return nil, err
}
}

return metricFamilies, nil
}
}

func AggregatedMetricsMutator(metricsMutators ...PrometheusMutator) MetricsMutator {
return func(in io.Reader, out io.Writer) error {
var parser expfmt.TextParser
metricFamilies, err := parser.TextToMetricFamilies(in)
if err != nil {
return err
}

for _, m := range metricsMutators {
err := m(metricFamilies)
if err != nil {
return err
}
}

for _, metricFamily := range metricFamilies {
if _, err := expfmt.MetricFamilyToText(out, metricFamily); err != nil {
return err
}
if _, err := out.Write([]byte("\n")); err != nil {
return err
}
}

return nil
}
}
15 changes: 1 addition & 14 deletions app/kuma-dp/pkg/dataplane/metrics/metrics_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,13 @@ package metrics

import (
"context"
"io"
"net/http"
"net/url"
"sync"

io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"go.opentelemetry.io/otel/sdk/instrumentation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"golang.org/x/exp/maps"

"github.com/kumahq/kuma/pkg/core"
)
Expand Down Expand Up @@ -105,7 +101,7 @@ func (ap *AggregatedProducer) fetchStats(ctx context.Context, app ApplicationToS
}
defer resp.Body.Close()

metricsFromApplication, err := app.OtelMutator(resp.Body)
metricsFromApplication, err := app.MeshMetricMutator(resp.Body)
if err != nil {
log.Error(err, "failed to mutate metrics")
return nil
Expand All @@ -126,12 +122,3 @@ func (ap *AggregatedProducer) makeRequest(ctx context.Context, req *http.Request
return ap.httpClientIPv4.Do(req)
}
}

func ParsePrometheusMetrics(in io.Reader) ([]*io_prometheus_client.MetricFamily, error) {
var parser expfmt.TextParser
metricFamilies, err := parser.TextToMetricFamilies(in)
if err != nil {
return nil, err
}
return maps.Values(metricFamilies), nil
}
Loading

0 comments on commit 175abec

Please sign in to comment.