From b456903cdce3bb868a523ace952e67841b8f8d0d Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Wed, 19 Jul 2023 12:24:08 +0200 Subject: [PATCH 1/2] Add test --- pkg/chunkenc/memchunk_test.go | 52 +++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 868bbfd2d41ea..dbde132512842 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -1542,3 +1542,55 @@ func TestMemChunk_SpaceFor(t *testing.T) { }) } } + +func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) { + for _, enc := range testEncoding { + enc := enc + t.Run(enc.String(), func(t *testing.T) { + chk := newMemChunkWithFormat(chunkFormatV4, enc, UnorderedWithMetadataHeadBlockFmt, testBlockSize, testTargetSize) + require.NoError(t, chk.Append(logprotoEntryWithMetadata(1, "lineA", []logproto.LabelAdapter{ + {Name: "traceID", Value: "123"}, + {Name: "user", Value: "a"}, + }))) + require.NoError(t, chk.Append(logprotoEntryWithMetadata(2, "lineB", []logproto.LabelAdapter{ + {Name: "traceID", Value: "456"}, + {Name: "user", Value: "b"}, + }))) + require.NoError(t, chk.cut()) + require.NoError(t, chk.Append(logprotoEntryWithMetadata(3, "lineC", []logproto.LabelAdapter{ + {Name: "traceID", Value: "789"}, + {Name: "user", Value: "c"}, + }))) + require.NoError(t, chk.Append(logprotoEntryWithMetadata(4, "lineD", []logproto.LabelAdapter{ + {Name: "traceID", Value: "123"}, + {Name: "user", Value: "d"}, + }))) + + expectedLines := []string{"lineA", "lineB", "lineC", "lineD"} + expectedStreams := []string{ + labels.FromStrings("traceID", "123", "user", "a").String(), + labels.FromStrings("traceID", "456", "user", "b").String(), + labels.FromStrings("traceID", "789", "user", "c").String(), + labels.FromStrings("traceID", "123", "user", "d").String(), + } + + // We will run the test twice so the iterator will be created twice. + // This is to ensure that the iterator is correctly closed. + for i := 0; i < 2; i++ { + it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) + require.NoError(t, err) + + var lines []string + var streams []string + for it.Next() { + require.NoError(t, it.Error()) + e := it.Entry() + lines = append(lines, e.Line) + streams = append(streams, logproto.FromLabelAdaptersToLabels(e.NonIndexedLabels).String()) + } + assert.ElementsMatch(t, expectedLines, lines) + assert.ElementsMatch(t, expectedStreams, streams) + } + }) + } +} From 4be583bee54de706885661600c50c147a9f5abeb Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Wed, 19 Jul 2023 12:31:37 +0200 Subject: [PATCH 2/2] Fix --- pkg/chunkenc/memchunk.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index a90e032bdb574..3f18ec3659415 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -1260,6 +1260,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) { // TODO: This is pretty similar to how we read the line size, and the metadata name and value sizes // Maybe we can extract it to a separate function and reuse it? + lastAttempt = 0 var labelsWidth, nLabels int for labelsWidth == 0 { // Read until we have enough bytes for the labels. n, err := si.reader.Read(si.readBuf[si.readBufValid:]) @@ -1288,7 +1289,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) { // If not enough space for the labels, create a new buffer slice and put the old one back in the pool. metaLabelsBufLen := nLabels * 2 - if metaLabelsBufLen > cap(si.metaLabelsBuf) { + if si.metaLabelsBuf == nil || metaLabelsBufLen > cap(si.metaLabelsBuf) { if si.metaLabelsBuf != nil { for i := range si.metaLabelsBuf { if si.metaLabelsBuf[i] != nil { @@ -1309,6 +1310,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) { // Read all the label-value pairs, into the buffer slice. for i := 0; i < metaLabelsBufLen; i++ { // Read the length of the label. + lastAttempt = 0 var labelWidth, labelSize int for labelWidth == 0 { // Read until we have enough bytes for the name. n, err := si.reader.Read(si.readBuf[si.readBufValid:]) @@ -1395,6 +1397,7 @@ func (si *bufferedIterator) close() { for i := range si.metaLabelsBuf { if si.metaLabelsBuf[i] != nil { BytesBufferPool.Put(si.metaLabelsBuf[i]) + si.metaLabelsBuf[i] = nil } } LabelsPool.Put(si.metaLabelsBuf)