From 88c17ed7abf4d541040183275f3661a4d153d162 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Mon, 17 Jul 2023 09:52:38 +0200 Subject: [PATCH] Include entry metadata bytes in metrics tracking ingestion stats (#9808) **What this PR does / why we need it**: In https://github.com/grafana/loki/pull/9694 we added support for adding metadata labels to each entry in a push payload. In this PR we update the following metrics to take into account the bytes needed to store those labels: - `loki_distributor_bytes_received_total` - `distributor_bytes_received` **Which issue(s) this PR fixes**: Fixes # **Special notes for your reviewer**: **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [ ] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) --------- Co-authored-by: Sandeep Sukhani --- pkg/loghttp/push/push.go | 22 ++++++--- pkg/loghttp/push/push_test.go | 90 ++++++++++++++++++++++++++--------- 2 files changed, 82 insertions(+), 30 deletions(-) diff --git a/pkg/loghttp/push/push.go b/pkg/loghttp/push/push.go index 889c8d08f647d..5eb5e27c199a4 100644 --- a/pkg/loghttp/push/push.go +++ b/pkg/loghttp/push/push.go @@ -82,10 +82,11 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete contentType := r.Header.Get(contentType) var ( - entriesSize int64 - streamLabelsSize int64 - totalEntries int64 - req logproto.PushRequest + entriesSize int64 + nonIndexedLabelsSize int64 + streamLabelsSize int64 + totalEntries int64 + req logproto.PushRequest ) contentType, _ /* params */, err := mime.ParseMediaType(contentType) @@ -132,9 +133,15 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete } for _, e := range s.Entries { totalEntries++ - entriesSize += int64(len(e.Line)) - bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(int64(len(e.Line)))) - bytesReceivedStats.Inc(int64(len(e.Line))) + var entryLabelsSize int64 + for _, l := range e.NonIndexedLabels { + entryLabelsSize += int64(len(l.Name) + len(l.Value)) + } + entrySize := int64(len(e.Line)) + entryLabelsSize + entriesSize += entrySize + nonIndexedLabelsSize += entryLabelsSize + bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(entrySize)) + bytesReceivedStats.Inc(entrySize) if e.Timestamp.After(mostRecentEntry) { mostRecentEntry = e.Timestamp } @@ -157,6 +164,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete "entries", totalEntries, "streamLabelsSize", humanize.Bytes(uint64(streamLabelsSize)), "entriesSize", humanize.Bytes(uint64(entriesSize)), + "nonIndexedLabelsSize", humanize.Bytes(uint64(nonIndexedLabelsSize)), "totalSize", humanize.Bytes(uint64(entriesSize+streamLabelsSize)), "mostRecentLagMs", time.Since(mostRecentEntry).Milliseconds(), ) diff --git a/pkg/loghttp/push/push_test.go b/pkg/loghttp/push/push_test.go index 4249e13355834..8aa09e4a5212a 100644 --- a/pkg/loghttp/push/push_test.go +++ b/pkg/loghttp/push/push_test.go @@ -4,12 +4,15 @@ import ( "bytes" "compress/flate" "compress/gzip" + "fmt" "log" "net/http/httptest" "strings" "testing" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -41,12 +44,15 @@ func deflateString(source string) string { } func TestParseRequest(t *testing.T) { - tests := []struct { + var previousBytesReceived, previousLinesReceived int + for index, test := range []struct { path string body string contentType string contentEncoding string valid bool + expectedBytes int + expectedLines int }{ { path: `/loki/api/v1/push`, @@ -61,10 +67,12 @@ func TestParseRequest(t *testing.T) { valid: false, }, { - path: `/loki/api/v1/push`, - body: `{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`, - contentType: `application/json`, - valid: true, + path: `/loki/api/v1/push`, + body: `{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`, + contentType: `application/json`, + valid: true, + expectedBytes: len("fizzbuzz"), + expectedLines: 1, }, { path: `/loki/api/v1/push`, @@ -72,6 +80,8 @@ func TestParseRequest(t *testing.T) { contentType: `application/json`, contentEncoding: ``, valid: true, + expectedBytes: len("fizzbuzz"), + expectedLines: 1, }, { path: `/loki/api/v1/push`, @@ -86,6 +96,8 @@ func TestParseRequest(t *testing.T) { contentType: `application/json`, contentEncoding: `gzip`, valid: true, + expectedBytes: len("fizzbuzz"), + expectedLines: 1, }, { path: `/loki/api/v1/push`, @@ -93,6 +105,8 @@ func TestParseRequest(t *testing.T) { contentType: `application/json`, contentEncoding: `deflate`, valid: true, + expectedBytes: len("fizzbuzz"), + expectedLines: 1, }, { path: `/loki/api/v1/push`, @@ -107,6 +121,8 @@ func TestParseRequest(t *testing.T) { contentType: `application/json; charset=utf-8`, contentEncoding: `gzip`, valid: true, + expectedBytes: len("fizzbuzz"), + expectedLines: 1, }, { path: `/loki/api/v1/push`, @@ -114,6 +130,8 @@ func TestParseRequest(t *testing.T) { contentType: `application/json; charset=utf-8`, contentEncoding: `deflate`, valid: true, + expectedBytes: len("fizzbuzz"), + expectedLines: 1, }, { path: `/loki/api/v1/push`, @@ -157,24 +175,50 @@ func TestParseRequest(t *testing.T) { contentEncoding: `deflate`, valid: false, }, - } + { + path: `/loki/api/v1/push`, + body: deflateString(`{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz", {"a": "a", "b": "b"} ] ] }]}`), + contentType: `application/json; charset=utf-8`, + contentEncoding: `deflate`, + valid: true, + expectedBytes: len("fizzbuzz") + 2*len("a") + 2*len("b"), + expectedLines: 1, + }, + } { + t.Run(fmt.Sprintf("test %d", index), func(t *testing.T) { + bytesIngested.Reset() + linesIngested.Reset() + + request := httptest.NewRequest("POST", test.path, strings.NewReader(test.body)) + if len(test.contentType) > 0 { + request.Header.Add("Content-Type", test.contentType) + } + if len(test.contentEncoding) > 0 { + request.Header.Add("Content-Encoding", test.contentEncoding) + } + + data, err := ParseRequest(util_log.Logger, "fake", request, nil) + + bytesReceived := int(bytesReceivedStats.Value()["total"].(int64)) - previousBytesReceived + previousBytesReceived += bytesReceived + linesReceived := int(linesReceivedStats.Value()["total"].(int64)) - previousLinesReceived + previousLinesReceived += linesReceived - // Testing input array - for index, test := range tests { - request := httptest.NewRequest("POST", test.path, strings.NewReader(test.body)) - if len(test.contentType) > 0 { - request.Header.Add("Content-Type", test.contentType) - } - if len(test.contentEncoding) > 0 { - request.Header.Add("Content-Encoding", test.contentEncoding) - } - data, err := ParseRequest(util_log.Logger, "", request, nil) - if test.valid { - assert.Nil(t, err, "Should not give error for %d", index) - assert.NotNil(t, data, "Should give data for %d", index) - } else { - assert.NotNil(t, err, "Should give error for %d", index) - assert.Nil(t, data, "Should not give data for %d", index) - } + if test.valid { + assert.Nil(t, err, "Should not give error for %d", index) + assert.NotNil(t, data, "Should give data for %d", index) + require.Equal(t, test.expectedBytes, bytesReceived) + require.Equal(t, test.expectedLines, linesReceived) + require.Equal(t, float64(test.expectedBytes), testutil.ToFloat64(bytesIngested.WithLabelValues("fake", ""))) + require.Equal(t, float64(test.expectedLines), testutil.ToFloat64(linesIngested.WithLabelValues("fake"))) + } else { + assert.NotNil(t, err, "Should give error for %d", index) + assert.Nil(t, data, "Should not give data for %d", index) + require.Equal(t, 0, bytesReceived) + require.Equal(t, 0, linesReceived) + require.Equal(t, float64(0), testutil.ToFloat64(bytesIngested.WithLabelValues("fake", ""))) + require.Equal(t, float64(0), testutil.ToFloat64(linesIngested.WithLabelValues("fake"))) + } + }) } }