Skip to content

Commit

Permalink
Include entry metadata bytes in metrics tracking ingestion stats (#9808)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
In #9694 we added support for adding
metadata labels to each entry in a push payload. In this PR we update
the following metrics to take into account the bytes needed to store
those labels:

-  `loki_distributor_bytes_received_total`
- `distributor_bytes_received`

**Which issue(s) this PR fixes**:
Fixes #<issue number>

**Special notes for your reviewer**:

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [ ] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)

---------

Co-authored-by: Sandeep Sukhani <[email protected]>
  • Loading branch information
salvacorts and sandeepsukhani authored Jul 17, 2023
1 parent 97d34ea commit 88c17ed
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 30 deletions.
22 changes: 15 additions & 7 deletions pkg/loghttp/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete

contentType := r.Header.Get(contentType)
var (
entriesSize int64
streamLabelsSize int64
totalEntries int64
req logproto.PushRequest
entriesSize int64
nonIndexedLabelsSize int64
streamLabelsSize int64
totalEntries int64
req logproto.PushRequest
)

contentType, _ /* params */, err := mime.ParseMediaType(contentType)
Expand Down Expand Up @@ -132,9 +133,15 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete
}
for _, e := range s.Entries {
totalEntries++
entriesSize += int64(len(e.Line))
bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(int64(len(e.Line))))
bytesReceivedStats.Inc(int64(len(e.Line)))
var entryLabelsSize int64
for _, l := range e.NonIndexedLabels {
entryLabelsSize += int64(len(l.Name) + len(l.Value))
}
entrySize := int64(len(e.Line)) + entryLabelsSize
entriesSize += entrySize
nonIndexedLabelsSize += entryLabelsSize
bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(entrySize))
bytesReceivedStats.Inc(entrySize)
if e.Timestamp.After(mostRecentEntry) {
mostRecentEntry = e.Timestamp
}
Expand All @@ -157,6 +164,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete
"entries", totalEntries,
"streamLabelsSize", humanize.Bytes(uint64(streamLabelsSize)),
"entriesSize", humanize.Bytes(uint64(entriesSize)),
"nonIndexedLabelsSize", humanize.Bytes(uint64(nonIndexedLabelsSize)),
"totalSize", humanize.Bytes(uint64(entriesSize+streamLabelsSize)),
"mostRecentLagMs", time.Since(mostRecentEntry).Milliseconds(),
)
Expand Down
90 changes: 67 additions & 23 deletions pkg/loghttp/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"bytes"
"compress/flate"
"compress/gzip"
"fmt"
"log"
"net/http/httptest"
"strings"
"testing"

"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

util_log "github.com/grafana/loki/pkg/util/log"
)
Expand Down Expand Up @@ -41,12 +44,15 @@ func deflateString(source string) string {
}

func TestParseRequest(t *testing.T) {
tests := []struct {
var previousBytesReceived, previousLinesReceived int
for index, test := range []struct {
path string
body string
contentType string
contentEncoding string
valid bool
expectedBytes int
expectedLines int
}{
{
path: `/loki/api/v1/push`,
Expand All @@ -61,17 +67,21 @@ func TestParseRequest(t *testing.T) {
valid: false,
},
{
path: `/loki/api/v1/push`,
body: `{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`,
contentType: `application/json`,
valid: true,
path: `/loki/api/v1/push`,
body: `{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`,
contentType: `application/json`,
valid: true,
expectedBytes: len("fizzbuzz"),
expectedLines: 1,
},
{
path: `/loki/api/v1/push`,
body: `{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`,
contentType: `application/json`,
contentEncoding: ``,
valid: true,
expectedBytes: len("fizzbuzz"),
expectedLines: 1,
},
{
path: `/loki/api/v1/push`,
Expand All @@ -86,13 +96,17 @@ func TestParseRequest(t *testing.T) {
contentType: `application/json`,
contentEncoding: `gzip`,
valid: true,
expectedBytes: len("fizzbuzz"),
expectedLines: 1,
},
{
path: `/loki/api/v1/push`,
body: deflateString(`{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`),
contentType: `application/json`,
contentEncoding: `deflate`,
valid: true,
expectedBytes: len("fizzbuzz"),
expectedLines: 1,
},
{
path: `/loki/api/v1/push`,
Expand All @@ -107,13 +121,17 @@ func TestParseRequest(t *testing.T) {
contentType: `application/json; charset=utf-8`,
contentEncoding: `gzip`,
valid: true,
expectedBytes: len("fizzbuzz"),
expectedLines: 1,
},
{
path: `/loki/api/v1/push`,
body: deflateString(`{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`),
contentType: `application/json; charset=utf-8`,
contentEncoding: `deflate`,
valid: true,
expectedBytes: len("fizzbuzz"),
expectedLines: 1,
},
{
path: `/loki/api/v1/push`,
Expand Down Expand Up @@ -157,24 +175,50 @@ func TestParseRequest(t *testing.T) {
contentEncoding: `deflate`,
valid: false,
},
}
{
path: `/loki/api/v1/push`,
body: deflateString(`{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz", {"a": "a", "b": "b"} ] ] }]}`),
contentType: `application/json; charset=utf-8`,
contentEncoding: `deflate`,
valid: true,
expectedBytes: len("fizzbuzz") + 2*len("a") + 2*len("b"),
expectedLines: 1,
},
} {
t.Run(fmt.Sprintf("test %d", index), func(t *testing.T) {
bytesIngested.Reset()
linesIngested.Reset()

request := httptest.NewRequest("POST", test.path, strings.NewReader(test.body))
if len(test.contentType) > 0 {
request.Header.Add("Content-Type", test.contentType)
}
if len(test.contentEncoding) > 0 {
request.Header.Add("Content-Encoding", test.contentEncoding)
}

data, err := ParseRequest(util_log.Logger, "fake", request, nil)

bytesReceived := int(bytesReceivedStats.Value()["total"].(int64)) - previousBytesReceived
previousBytesReceived += bytesReceived
linesReceived := int(linesReceivedStats.Value()["total"].(int64)) - previousLinesReceived
previousLinesReceived += linesReceived

// Testing input array
for index, test := range tests {
request := httptest.NewRequest("POST", test.path, strings.NewReader(test.body))
if len(test.contentType) > 0 {
request.Header.Add("Content-Type", test.contentType)
}
if len(test.contentEncoding) > 0 {
request.Header.Add("Content-Encoding", test.contentEncoding)
}
data, err := ParseRequest(util_log.Logger, "", request, nil)
if test.valid {
assert.Nil(t, err, "Should not give error for %d", index)
assert.NotNil(t, data, "Should give data for %d", index)
} else {
assert.NotNil(t, err, "Should give error for %d", index)
assert.Nil(t, data, "Should not give data for %d", index)
}
if test.valid {
assert.Nil(t, err, "Should not give error for %d", index)
assert.NotNil(t, data, "Should give data for %d", index)
require.Equal(t, test.expectedBytes, bytesReceived)
require.Equal(t, test.expectedLines, linesReceived)
require.Equal(t, float64(test.expectedBytes), testutil.ToFloat64(bytesIngested.WithLabelValues("fake", "")))
require.Equal(t, float64(test.expectedLines), testutil.ToFloat64(linesIngested.WithLabelValues("fake")))
} else {
assert.NotNil(t, err, "Should give error for %d", index)
assert.Nil(t, data, "Should not give data for %d", index)
require.Equal(t, 0, bytesReceived)
require.Equal(t, 0, linesReceived)
require.Equal(t, float64(0), testutil.ToFloat64(bytesIngested.WithLabelValues("fake", "")))
require.Equal(t, float64(0), testutil.ToFloat64(linesIngested.WithLabelValues("fake")))
}
})
}
}

0 comments on commit 88c17ed

Please sign in to comment.