Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix buffered iterator #9976

Merged
merged 2 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -1260,6 +1260,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {

// TODO: This is pretty similar to how we read the line size, and the metadata name and value sizes
// Maybe we can extract it to a separate function and reuse it?
lastAttempt = 0
var labelsWidth, nLabels int
for labelsWidth == 0 { // Read until we have enough bytes for the labels.
n, err := si.reader.Read(si.readBuf[si.readBufValid:])
Expand Down Expand Up @@ -1288,7 +1289,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {

// If not enough space for the labels, create a new buffer slice and put the old one back in the pool.
metaLabelsBufLen := nLabels * 2
if metaLabelsBufLen > cap(si.metaLabelsBuf) {
if si.metaLabelsBuf == nil || metaLabelsBufLen > cap(si.metaLabelsBuf) {
if si.metaLabelsBuf != nil {
for i := range si.metaLabelsBuf {
if si.metaLabelsBuf[i] != nil {
Expand All @@ -1309,6 +1310,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {
// Read all the label-value pairs, into the buffer slice.
for i := 0; i < metaLabelsBufLen; i++ {
// Read the length of the label.
lastAttempt = 0
var labelWidth, labelSize int
for labelWidth == 0 { // Read until we have enough bytes for the name.
n, err := si.reader.Read(si.readBuf[si.readBufValid:])
Expand Down Expand Up @@ -1395,6 +1397,7 @@ func (si *bufferedIterator) close() {
for i := range si.metaLabelsBuf {
if si.metaLabelsBuf[i] != nil {
BytesBufferPool.Put(si.metaLabelsBuf[i])
si.metaLabelsBuf[i] = nil
}
}
LabelsPool.Put(si.metaLabelsBuf)
Expand Down
52 changes: 52 additions & 0 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1542,3 +1542,55 @@ func TestMemChunk_SpaceFor(t *testing.T) {
})
}
}

func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) {
for _, enc := range testEncoding {
enc := enc
t.Run(enc.String(), func(t *testing.T) {
chk := newMemChunkWithFormat(chunkFormatV4, enc, UnorderedWithMetadataHeadBlockFmt, testBlockSize, testTargetSize)
require.NoError(t, chk.Append(logprotoEntryWithMetadata(1, "lineA", []logproto.LabelAdapter{
{Name: "traceID", Value: "123"},
{Name: "user", Value: "a"},
})))
require.NoError(t, chk.Append(logprotoEntryWithMetadata(2, "lineB", []logproto.LabelAdapter{
{Name: "traceID", Value: "456"},
{Name: "user", Value: "b"},
})))
require.NoError(t, chk.cut())
require.NoError(t, chk.Append(logprotoEntryWithMetadata(3, "lineC", []logproto.LabelAdapter{
{Name: "traceID", Value: "789"},
{Name: "user", Value: "c"},
})))
require.NoError(t, chk.Append(logprotoEntryWithMetadata(4, "lineD", []logproto.LabelAdapter{
{Name: "traceID", Value: "123"},
{Name: "user", Value: "d"},
})))

expectedLines := []string{"lineA", "lineB", "lineC", "lineD"}
expectedStreams := []string{
labels.FromStrings("traceID", "123", "user", "a").String(),
labels.FromStrings("traceID", "456", "user", "b").String(),
labels.FromStrings("traceID", "789", "user", "c").String(),
labels.FromStrings("traceID", "123", "user", "d").String(),
}

// We will run the test twice so the iterator will be created twice.
// This is to ensure that the iterator is correctly closed.
for i := 0; i < 2; i++ {
it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline)
require.NoError(t, err)

var lines []string
var streams []string
for it.Next() {
require.NoError(t, it.Error())
e := it.Entry()
lines = append(lines, e.Line)
streams = append(streams, logproto.FromLabelAdaptersToLabels(e.NonIndexedLabels).String())
}
assert.ElementsMatch(t, expectedLines, lines)
assert.ElementsMatch(t, expectedStreams, streams)
}
})
}
}