Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include entry metadata bytes in metrics tracking ingestion stats #9808

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions pkg/loghttp/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete

contentType := r.Header.Get(contentType)
var (
entriesSize int64
streamLabelsSize int64
totalEntries int64
req logproto.PushRequest
entriesSize int64
nonIndexedLabelsSize int64
streamLabelsSize int64
totalEntries int64
req logproto.PushRequest
)

contentType, _ /* params */, err := mime.ParseMediaType(contentType)
Expand Down Expand Up @@ -132,9 +133,15 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete
}
for _, e := range s.Entries {
totalEntries++
entriesSize += int64(len(e.Line))
bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(int64(len(e.Line))))
bytesReceivedStats.Inc(int64(len(e.Line)))
var entryLabelsSize int64
for _, l := range e.NonIndexedLabels {
entryLabelsSize += int64(len(l.Name) + len(l.Value))
}
entrySize := int64(len(e.Line)) + entryLabelsSize
entriesSize += entrySize
nonIndexedLabelsSize += entryLabelsSize
bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(entrySize))
bytesReceivedStats.Inc(entrySize)
if e.Timestamp.After(mostRecentEntry) {
mostRecentEntry = e.Timestamp
}
Expand All @@ -157,6 +164,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete
"entries", totalEntries,
"streamLabelsSize", humanize.Bytes(uint64(streamLabelsSize)),
"entriesSize", humanize.Bytes(uint64(entriesSize)),
"nonIndexedLabelsSize", humanize.Bytes(uint64(nonIndexedLabelsSize)),
"totalSize", humanize.Bytes(uint64(entriesSize+streamLabelsSize)),
"mostRecentLagMs", time.Since(mostRecentEntry).Milliseconds(),
)
Expand Down
90 changes: 67 additions & 23 deletions pkg/loghttp/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"bytes"
"compress/flate"
"compress/gzip"
"fmt"
"log"
"net/http/httptest"
"strings"
"testing"

"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

util_log "github.com/grafana/loki/pkg/util/log"
)
Expand Down Expand Up @@ -41,12 +44,15 @@ func deflateString(source string) string {
}

func TestParseRequest(t *testing.T) {
tests := []struct {
var previousBytesReceived, previousLinesReceived int
for index, test := range []struct {
path string
body string
contentType string
contentEncoding string
valid bool
expectedBytes int
expectedLines int
}{
{
path: `/loki/api/v1/push`,
Expand All @@ -61,17 +67,21 @@ func TestParseRequest(t *testing.T) {
valid: false,
},
{
path: `/loki/api/v1/push`,
body: `{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`,
contentType: `application/json`,
valid: true,
path: `/loki/api/v1/push`,
body: `{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`,
contentType: `application/json`,
valid: true,
expectedBytes: len("fizzbuzz"),
expectedLines: 1,
},
{
path: `/loki/api/v1/push`,
body: `{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`,
contentType: `application/json`,
contentEncoding: ``,
valid: true,
expectedBytes: len("fizzbuzz"),
expectedLines: 1,
},
{
path: `/loki/api/v1/push`,
Expand All @@ -86,13 +96,17 @@ func TestParseRequest(t *testing.T) {
contentType: `application/json`,
contentEncoding: `gzip`,
valid: true,
expectedBytes: len("fizzbuzz"),
expectedLines: 1,
},
{
path: `/loki/api/v1/push`,
body: deflateString(`{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`),
contentType: `application/json`,
contentEncoding: `deflate`,
valid: true,
expectedBytes: len("fizzbuzz"),
expectedLines: 1,
},
{
path: `/loki/api/v1/push`,
Expand All @@ -107,13 +121,17 @@ func TestParseRequest(t *testing.T) {
contentType: `application/json; charset=utf-8`,
contentEncoding: `gzip`,
valid: true,
expectedBytes: len("fizzbuzz"),
expectedLines: 1,
},
{
path: `/loki/api/v1/push`,
body: deflateString(`{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`),
contentType: `application/json; charset=utf-8`,
contentEncoding: `deflate`,
valid: true,
expectedBytes: len("fizzbuzz"),
expectedLines: 1,
},
{
path: `/loki/api/v1/push`,
Expand Down Expand Up @@ -157,24 +175,50 @@ func TestParseRequest(t *testing.T) {
contentEncoding: `deflate`,
valid: false,
},
}
{
path: `/loki/api/v1/push`,
body: deflateString(`{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz", {"a": "a", "b": "b"} ] ] }]}`),
contentType: `application/json; charset=utf-8`,
contentEncoding: `deflate`,
valid: true,
expectedBytes: len("fizzbuzz") + 2*len("a") + 2*len("b"),
expectedLines: 1,
},
} {
t.Run(fmt.Sprintf("test %d", index), func(t *testing.T) {
bytesIngested.Reset()
linesIngested.Reset()

request := httptest.NewRequest("POST", test.path, strings.NewReader(test.body))
if len(test.contentType) > 0 {
request.Header.Add("Content-Type", test.contentType)
}
if len(test.contentEncoding) > 0 {
request.Header.Add("Content-Encoding", test.contentEncoding)
}

data, err := ParseRequest(util_log.Logger, "fake", request, nil)

bytesReceived := int(bytesReceivedStats.Value()["total"].(int64)) - previousBytesReceived
previousBytesReceived += bytesReceived
linesReceived := int(linesReceivedStats.Value()["total"].(int64)) - previousLinesReceived
previousLinesReceived += linesReceived

// Testing input array
for index, test := range tests {
request := httptest.NewRequest("POST", test.path, strings.NewReader(test.body))
if len(test.contentType) > 0 {
request.Header.Add("Content-Type", test.contentType)
}
if len(test.contentEncoding) > 0 {
request.Header.Add("Content-Encoding", test.contentEncoding)
}
data, err := ParseRequest(util_log.Logger, "", request, nil)
if test.valid {
assert.Nil(t, err, "Should not give error for %d", index)
assert.NotNil(t, data, "Should give data for %d", index)
} else {
assert.NotNil(t, err, "Should give error for %d", index)
assert.Nil(t, data, "Should not give data for %d", index)
}
if test.valid {
assert.Nil(t, err, "Should not give error for %d", index)
assert.NotNil(t, data, "Should give data for %d", index)
require.Equal(t, test.expectedBytes, bytesReceived)
require.Equal(t, test.expectedLines, linesReceived)
require.Equal(t, float64(test.expectedBytes), testutil.ToFloat64(bytesIngested.WithLabelValues("fake", "")))
require.Equal(t, float64(test.expectedLines), testutil.ToFloat64(linesIngested.WithLabelValues("fake")))
} else {
assert.NotNil(t, err, "Should give error for %d", index)
assert.Nil(t, data, "Should not give data for %d", index)
require.Equal(t, 0, bytesReceived)
require.Equal(t, 0, linesReceived)
require.Equal(t, float64(0), testutil.ToFloat64(bytesIngested.WithLabelValues("fake", "")))
require.Equal(t, float64(0), testutil.ToFloat64(linesIngested.WithLabelValues("fake")))
}
})
}
}