From ce91076dd4cb6016f1f39b7a5126c2fd88a8b18e Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Tue, 18 Jul 2023 08:37:47 +0200 Subject: [PATCH] Add non-indexed metadata to chunks (#9700) **What this PR does / why we need it**: In https://github.com/grafana/loki/pull/9694 we support adding metadata labels for each entry in the push payload. In this PR we take that metadata and add it to the entries in the chunk. Supporting serialization and deserialization of those metadata labels. --------- Signed-off-by: Vladyslav Diachenko Co-authored-by: Vladyslav Diachenko --- pkg/chunkenc/memchunk.go | 241 ++++++++++--- pkg/chunkenc/memchunk_test.go | 607 ++++++++++++++++++++++----------- pkg/chunkenc/pool.go | 6 + pkg/chunkenc/unordered.go | 156 +++++++-- pkg/chunkenc/unordered_test.go | 273 ++++++++++----- pkg/chunkenc/util_test.go | 8 + pkg/ingester/checkpoint.go | 2 +- pkg/ingester/stream.go | 2 +- pkg/storage/lazy_chunk_test.go | 1 + 9 files changed, 930 insertions(+), 366 deletions(-) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 8d6bd3698c512..a90e032bdb574 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -31,6 +31,7 @@ const ( chunkFormatV1 chunkFormatV2 chunkFormatV3 + chunkFormatV4 DefaultChunkFormat = chunkFormatV3 // the currently used chunk format @@ -43,7 +44,7 @@ const ( defaultBlockSize = 256 * 1024 ) -var HeadBlockFmts = []HeadBlockFmt{OrderedHeadBlockFmt, UnorderedHeadBlockFmt} +var HeadBlockFmts = []HeadBlockFmt{OrderedHeadBlockFmt, UnorderedHeadBlockFmt, UnorderedWithMetadataHeadBlockFmt} type HeadBlockFmt byte @@ -55,6 +56,8 @@ func (f HeadBlockFmt) String() string { return "ordered" case f == UnorderedHeadBlockFmt: return "unordered" + case f == UnorderedWithMetadataHeadBlockFmt: + return "unordered with metadata" default: return fmt.Sprintf("unknown: %v", byte(f)) } @@ -65,7 +68,7 @@ func (f HeadBlockFmt) NewBlock() HeadBlock { case f < UnorderedHeadBlockFmt: return &headBlock{} default: - return newUnorderedHeadBlock() + return newUnorderedHeadBlock(f) } } @@ -77,6 +80,9 @@ const ( _ OrderedHeadBlockFmt UnorderedHeadBlockFmt + UnorderedWithMetadataHeadBlockFmt + + DefaultHeadBlockFmt = UnorderedHeadBlockFmt ) var magicNumber = uint32(0x12EE56A) @@ -111,7 +117,6 @@ type MemChunk struct { // Current in-mem block being appended to. head HeadBlock - // the chunk format default to v2 format byte encoding Encoding headFmt HeadBlockFmt @@ -159,12 +164,12 @@ func (hb *headBlock) Reset() { func (hb *headBlock) Bounds() (int64, int64) { return hb.mint, hb.maxt } -func (hb *headBlock) Append(ts int64, line string) error { +func (hb *headBlock) Append(ts int64, line string, _ labels.Labels) error { if !hb.IsEmpty() && hb.maxt > ts { return ErrOutOfOrder } - hb.entries = append(hb.entries, entry{ts, line}) + hb.entries = append(hb.entries, entry{t: ts, s: line}) if hb.mint == 0 || hb.mint > ts { hb.mint = ts } @@ -281,7 +286,7 @@ func (hb *headBlock) LoadBytes(b []byte) error { return errors.Wrap(db.err(), "verifying headblock header") } switch version { - case chunkFormatV1, chunkFormatV2, chunkFormatV3: + case chunkFormatV1, chunkFormatV2, chunkFormatV3, chunkFormatV4: default: return errors.Errorf("incompatible headBlock version (%v), only V1,V2,V3 is currently supported", version) } @@ -315,10 +320,10 @@ func (hb *headBlock) Convert(version HeadBlockFmt) (HeadBlock, error) { if version < UnorderedHeadBlockFmt { return hb, nil } - out := newUnorderedHeadBlock() + out := version.NewBlock() for _, e := range hb.entries { - if err := out.Append(e.t, e.s); err != nil { + if err := out.Append(e.t, e.s, e.nonIndexedLabels); err != nil { return nil, err } } @@ -326,18 +331,24 @@ func (hb *headBlock) Convert(version HeadBlockFmt) (HeadBlock, error) { } type entry struct { - t int64 - s string + t int64 + s string + nonIndexedLabels labels.Labels } // NewMemChunk returns a new in-mem chunk. func NewMemChunk(enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk { + return newMemChunkWithFormat(DefaultChunkFormat, enc, head, blockSize, targetSize) +} + +// NewMemChunk returns a new in-mem chunk. +func newMemChunkWithFormat(format byte, enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk { return &MemChunk{ blockSize: blockSize, // The blockSize in bytes. targetSize: targetSize, // Desired chunk size in compressed bytes blocks: []block{}, - format: DefaultChunkFormat, + format: format, head: head.NewBlock(), encoding: enc, @@ -366,7 +377,7 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) { switch version { case chunkFormatV1: bc.encoding = EncGZIP - case chunkFormatV2, chunkFormatV3: + case chunkFormatV2, chunkFormatV3, chunkFormatV4: // format v2+ has a byte for block encoding. enc := Encoding(db.byte()) if db.err() != nil { @@ -401,7 +412,7 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) { // Read offset and length. blk.offset = db.uvarint() - if version == chunkFormatV3 { + if version >= chunkFormatV3 { blk.uncompressedSize = db.uvarint() } l := db.uvarint() @@ -460,7 +471,7 @@ func (c *MemChunk) BytesSize() int { size += binary.MaxVarintLen64 // mint size += binary.MaxVarintLen64 // maxt size += binary.MaxVarintLen32 // offset - if c.format == chunkFormatV3 { + if c.format >= chunkFormatV3 { size += binary.MaxVarintLen32 // uncompressed size } size += binary.MaxVarintLen32 // len(b) @@ -534,7 +545,7 @@ func (c *MemChunk) WriteTo(w io.Writer) (int64, error) { eb.putVarint64(b.mint) eb.putVarint64(b.maxt) eb.putUvarint(b.offset) - if c.format == chunkFormatV3 { + if c.format >= chunkFormatV3 { eb.putUvarint(b.uncompressedSize) } eb.putUvarint(len(b.b)) @@ -627,6 +638,9 @@ func (c *MemChunk) SpaceFor(e *logproto.Entry) bool { // This is looking to see if the uncompressed lines will fit which is not // a great check, but it will guarantee we are always under the target size newHBSize := c.head.UncompressedSize() + len(e.Line) + if c.format >= chunkFormatV4 { + newHBSize += metaLabelsLen(logproto.FromLabelAdaptersToLabels(e.NonIndexedLabels)) + } return (c.cutBlockSize + newHBSize) < c.targetSize } // if targetSize is not defined, default to the original behavior of fixed blocks per chunk @@ -674,7 +688,7 @@ func (c *MemChunk) Append(entry *logproto.Entry) error { return ErrOutOfOrder } - if err := c.head.Append(entryTimestamp, entry.Line); err != nil { + if err := c.head.Append(entryTimestamp, entry.Line, logproto.FromLabelAdaptersToLabels(entry.NonIndexedLabels)); err != nil { return err } @@ -797,7 +811,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi } lastMax = b.maxt - blockItrs = append(blockItrs, encBlock{c.encoding, b}.Iterator(ctx, pipeline)) + blockItrs = append(blockItrs, encBlock{c.encoding, c.format, b}.Iterator(ctx, pipeline)) } if !c.head.IsEmpty() { @@ -871,7 +885,7 @@ func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, ordered = false } lastMax = b.maxt - its = append(its, encBlock{c.encoding, b}.SampleIterator(ctx, extractor)) + its = append(its, encBlock{c.encoding, c.format, b}.SampleIterator(ctx, extractor)) } if !c.head.IsEmpty() { @@ -903,7 +917,7 @@ func (c *MemChunk) Blocks(mintT, maxtT time.Time) []Block { for _, b := range c.blocks { if maxt >= b.mint && b.maxt >= mint { - blocks = append(blocks, encBlock{c.encoding, b}) + blocks = append(blocks, encBlock{c.encoding, c.format, b}) } } return blocks @@ -955,7 +969,8 @@ func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, err // then allows us to bind a decoding context to a block when requested, but otherwise helps reduce the // chances of chunk<>block encoding drift in the codebase as the latter is parameterized by the former. type encBlock struct { - enc Encoding + enc Encoding + format byte block } @@ -963,14 +978,14 @@ func (b encBlock) Iterator(ctx context.Context, pipeline log.StreamPipeline) ite if len(b.b) == 0 { return iter.NoopIterator } - return newEntryIterator(ctx, getReaderPool(b.enc), b.b, pipeline) + return newEntryIterator(ctx, getReaderPool(b.enc), b.b, pipeline, b.format) } func (b encBlock) SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator { if len(b.b) == 0 { return iter.NoopIterator } - return newSampleIterator(ctx, getReaderPool(b.enc), b.b, extractor) + return newSampleIterator(ctx, getReaderPool(b.enc), b.b, b.format, extractor) } func (b block) Offset() int { @@ -1024,8 +1039,9 @@ func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction, streams[labels] = stream } stream.Entries = append(stream.Entries, logproto.Entry{ - Timestamp: time.Unix(0, e.t), - Line: newLine, + Timestamp: time.Unix(0, e.t), + Line: newLine, + NonIndexedLabels: logproto.FromLabelsToLabelAdapters(e.nonIndexedLabels), }) } @@ -1121,14 +1137,18 @@ type bufferedIterator struct { readBuf [20]byte // Enough bytes to store two varints. readBufValid int // How many bytes are left in readBuf from previous read. + format byte buf []byte // The buffer for a single entry. currLine []byte // the current line, this is the same as the buffer but sliced the line size. currTs int64 + metaLabelsBuf [][]byte // The buffer for a single entry's metadata labels. + currMetadataLabels [][]byte // The current labels. + closed bool } -func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte) *bufferedIterator { +func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte, format byte) *bufferedIterator { stats := stats.FromContext(ctx) stats.AddCompressedBytes(int64(len(b))) return &bufferedIterator{ @@ -1136,6 +1156,7 @@ func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte) *buffer origBytes: b, reader: nil, // will be initialized later pool: pool, + format: format, } } @@ -1154,7 +1175,7 @@ func (si *bufferedIterator) Next() bool { } } - ts, line, ok := si.moveNext() + ts, line, metaLabels, ok := si.moveNext() if !ok { si.Close() return false @@ -1165,11 +1186,12 @@ func (si *bufferedIterator) Next() bool { si.currTs = ts si.currLine = line + si.currMetadataLabels = metaLabels return true } // moveNext moves the buffer to the next entry -func (si *bufferedIterator) moveNext() (int64, []byte, bool) { +func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) { var ts int64 var tWidth, lWidth, lineSize, lastAttempt int for lWidth == 0 { // Read until both varints have enough bytes. @@ -1178,14 +1200,14 @@ func (si *bufferedIterator) moveNext() (int64, []byte, bool) { if err != nil { if err != io.EOF { si.err = err - return 0, nil, false + return 0, nil, nil, false } if si.readBufValid == 0 { // Got EOF and no data in the buffer. - return 0, nil, false + return 0, nil, nil, false } if si.readBufValid == lastAttempt { // Got EOF and could not parse same data last time. si.err = fmt.Errorf("invalid data in chunk") - return 0, nil, false + return 0, nil, nil, false } } var l uint64 @@ -1197,7 +1219,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, bool) { if lineSize >= maxLineLength { si.err = fmt.Errorf("line too long %d, maximum %d", lineSize, maxLineLength) - return 0, nil, false + return 0, nil, nil, false } // If the buffer is not yet initialize or too small, we get a new one. if si.buf == nil || lineSize > cap(si.buf) { @@ -1208,7 +1230,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, bool) { si.buf = BytesBufferPool.Get(lineSize).([]byte) if lineSize > cap(si.buf) { si.err = fmt.Errorf("could not get a line buffer of size %d, actual %d", lineSize, cap(si.buf)) - return 0, nil, false + return 0, nil, nil, false } } si.buf = si.buf[:lineSize] @@ -1228,10 +1250,124 @@ func (si *bufferedIterator) moveNext() (int64, []byte, bool) { continue } si.err = err - return 0, nil, false + return 0, nil, nil, false + } + } + + if si.format < chunkFormatV4 { + return ts, si.buf[:lineSize], nil, true + } + + // TODO: This is pretty similar to how we read the line size, and the metadata name and value sizes + // Maybe we can extract it to a separate function and reuse it? + var labelsWidth, nLabels int + for labelsWidth == 0 { // Read until we have enough bytes for the labels. + n, err := si.reader.Read(si.readBuf[si.readBufValid:]) + si.readBufValid += n + if err != nil { + if err != io.EOF { + si.err = err + return 0, nil, nil, false + } + if si.readBufValid == 0 { // Got EOF and no data in the buffer. + return 0, nil, nil, false + } + if si.readBufValid == lastAttempt { // Got EOF and could not parse same data last time. + si.err = fmt.Errorf("invalid data in chunk") + return 0, nil, nil, false + } + } + var l uint64 + l, labelsWidth = binary.Uvarint(si.readBuf[:si.readBufValid]) + nLabels = int(l) + lastAttempt = si.readBufValid + } + + // Shift down what is still left in the fixed-size read buffer, if any. + si.readBufValid = copy(si.readBuf[:], si.readBuf[labelsWidth:si.readBufValid]) + + // If not enough space for the labels, create a new buffer slice and put the old one back in the pool. + metaLabelsBufLen := nLabels * 2 + if metaLabelsBufLen > cap(si.metaLabelsBuf) { + if si.metaLabelsBuf != nil { + for i := range si.metaLabelsBuf { + if si.metaLabelsBuf[i] != nil { + BytesBufferPool.Put(si.metaLabelsBuf[i]) + } + } + LabelsPool.Put(si.metaLabelsBuf) + } + si.metaLabelsBuf = LabelsPool.Get(metaLabelsBufLen).([][]byte) + if metaLabelsBufLen > cap(si.metaLabelsBuf) { + si.err = fmt.Errorf("could not get a labels matrix of size %d, actual %d", metaLabelsBufLen, cap(si.metaLabelsBuf)) + return 0, nil, nil, false } } - return ts, si.buf[:lineSize], true + + si.metaLabelsBuf = si.metaLabelsBuf[:nLabels*2] + + // Read all the label-value pairs, into the buffer slice. + for i := 0; i < metaLabelsBufLen; i++ { + // Read the length of the label. + var labelWidth, labelSize int + for labelWidth == 0 { // Read until we have enough bytes for the name. + n, err := si.reader.Read(si.readBuf[si.readBufValid:]) + si.readBufValid += n + if err != nil { + if err != io.EOF { + si.err = err + return 0, nil, nil, false + } + if si.readBufValid == 0 { // Got EOF and no data in the buffer. + return 0, nil, nil, false + } + if si.readBufValid == lastAttempt { // Got EOF and could not parse same data last time. + si.err = fmt.Errorf("invalid data in chunk") + return 0, nil, nil, false + } + } + var l uint64 + l, labelWidth = binary.Uvarint(si.readBuf[:si.readBufValid]) + labelSize = int(l) + lastAttempt = si.readBufValid + } + + // If the buffer is not yet initialize or too small, we get a new one. + if si.metaLabelsBuf[i] == nil || labelSize > cap(si.metaLabelsBuf[i]) { + // in case of a replacement we replace back the buffer in the pool + if si.metaLabelsBuf[i] != nil { + BytesBufferPool.Put(si.metaLabelsBuf[i]) + } + si.metaLabelsBuf[i] = BytesBufferPool.Get(labelSize).([]byte) + if labelSize > cap(si.metaLabelsBuf[i]) { + si.err = fmt.Errorf("could not get a label buffer of size %d, actual %d", labelSize, cap(si.metaLabelsBuf[i])) + return 0, nil, nil, false + } + } + + si.metaLabelsBuf[i] = si.metaLabelsBuf[i][:labelSize] + // Take however many bytes are left in the read buffer. + n := copy(si.metaLabelsBuf[i], si.readBuf[labelWidth:si.readBufValid]) + // Shift down what is still left in the fixed-size read buffer, if any. + si.readBufValid = copy(si.readBuf[:], si.readBuf[labelWidth+n:si.readBufValid]) + + // Then process reading the label. + for n < labelSize { + r, err := si.reader.Read(si.metaLabelsBuf[i][n:labelSize]) + n += r + if err != nil { + // We might get EOF after reading enough bytes to fill the buffer, which is OK. + // EOF and zero bytes read when the buffer isn't full is an error. + if err == io.EOF && r != 0 { + continue + } + si.err = err + return 0, nil, nil, false + } + } + } + + return ts, si.buf[:lineSize], si.metaLabelsBuf[:metaLabelsBufLen], true } func (si *bufferedIterator) Error() error { return si.err } @@ -1254,12 +1390,23 @@ func (si *bufferedIterator) close() { BytesBufferPool.Put(si.buf) si.buf = nil } + + if si.metaLabelsBuf != nil { + for i := range si.metaLabelsBuf { + if si.metaLabelsBuf[i] != nil { + BytesBufferPool.Put(si.metaLabelsBuf[i]) + } + } + LabelsPool.Put(si.metaLabelsBuf) + si.metaLabelsBuf = nil + } + si.origBytes = nil } -func newEntryIterator(ctx context.Context, pool ReaderPool, b []byte, pipeline log.StreamPipeline) iter.EntryIterator { +func newEntryIterator(ctx context.Context, pool ReaderPool, b []byte, pipeline log.StreamPipeline, format byte) iter.EntryIterator { return &entryBufferedIterator{ - bufferedIterator: newBufferedIterator(ctx, pool, b), + bufferedIterator: newBufferedIterator(ctx, pool, b, format), pipeline: pipeline, } } @@ -1282,21 +1429,37 @@ func (e *entryBufferedIterator) StreamHash() uint64 { return e.pipeline.BaseLabe func (e *entryBufferedIterator) Next() bool { for e.bufferedIterator.Next() { + if len(e.currMetadataLabels)%2 != 0 { + e.err = fmt.Errorf("expected even number of metadata labels, got %d", len(e.currMetadataLabels)) + return false + } + + var nonIndexedLabels []logproto.LabelAdapter + if len(e.currMetadataLabels) > 0 { + nonIndexedLabels = make([]logproto.LabelAdapter, len(e.currMetadataLabels)/2) + for i := 0; i < len(e.currMetadataLabels); i += 2 { + nonIndexedLabels[i/2].Name = string(e.currMetadataLabels[i]) + nonIndexedLabels[i/2].Value = string(e.currMetadataLabels[i+1]) + } + } + newLine, lbs, matches := e.pipeline.Process(e.currTs, e.currLine) if !matches { continue } + + e.currLabels = lbs + e.cur.NonIndexedLabels = nonIndexedLabels e.cur.Timestamp = time.Unix(0, e.currTs) e.cur.Line = string(newLine) - e.currLabels = lbs return true } return false } -func newSampleIterator(ctx context.Context, pool ReaderPool, b []byte, extractor log.StreamSampleExtractor) iter.SampleIterator { +func newSampleIterator(ctx context.Context, pool ReaderPool, b []byte, format byte, extractor log.StreamSampleExtractor) iter.SampleIterator { it := &sampleBufferedIterator{ - bufferedIterator: newBufferedIterator(ctx, pool, b), + bufferedIterator: newBufferedIterator(ctx, pool, b, format), extractor: extractor, } return it diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 0d67a98c1fcd1..868bbfd2d41ea 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -24,6 +24,7 @@ import ( "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/logqlmodel/stats" + "github.com/grafana/loki/pkg/push" "github.com/grafana/loki/pkg/storage/chunk" ) @@ -50,12 +51,33 @@ var ( } return ex.ForStream(labels.Labels{}) }() + allPossibleFormats = []struct { + headBlockFmt HeadBlockFmt + chunkFormat byte + }{ + { + headBlockFmt: OrderedHeadBlockFmt, + chunkFormat: chunkFormatV2, + }, + { + headBlockFmt: OrderedHeadBlockFmt, + chunkFormat: chunkFormatV3, + }, + { + headBlockFmt: UnorderedHeadBlockFmt, + chunkFormat: chunkFormatV3, + }, + { + headBlockFmt: UnorderedWithMetadataHeadBlockFmt, + chunkFormat: chunkFormatV4, + }, + } ) -const DefaultHeadBlockFmt = OrderedHeadBlockFmt +const DefaultTestHeadBlockFmt = OrderedHeadBlockFmt func TestBlocksInclusive(t *testing.T) { - chk := NewMemChunk(EncNone, DefaultHeadBlockFmt, testBlockSize, testTargetSize) + chk := NewMemChunk(EncNone, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize) err := chk.Append(logprotoEntry(1, "1")) require.Nil(t, err) err = chk.cut() @@ -69,112 +91,134 @@ func TestBlocksInclusive(t *testing.T) { func TestBlock(t *testing.T) { for _, enc := range testEncoding { enc := enc - t.Run(enc.String(), func(t *testing.T) { - t.Parallel() - chk := NewMemChunk(enc, DefaultHeadBlockFmt, testBlockSize, testTargetSize) - cases := []struct { - ts int64 - str string - cut bool - }{ - { - ts: 1, - str: "hello, world!", - }, - { - ts: 2, - str: "hello, world2!", - }, - { - ts: 3, - str: "hello, world3!", - }, - { - ts: 4, - str: "hello, world4!", - }, - { - ts: 5, - str: "hello, world5!", - }, - { - ts: 6, - str: "hello, world6!", - cut: true, - }, - { - ts: 7, - str: "hello, world7!", - }, - { - ts: 8, - str: "hello, worl\nd8!", - }, - { - ts: 8, - str: "hello, world 8, 2!", - }, - { - ts: 8, - str: "hello, world 8, 3!", - }, - { - ts: 9, - str: "", - }, - } - - for _, c := range cases { - require.NoError(t, chk.Append(logprotoEntry(c.ts, c.str))) - if c.cut { - require.NoError(t, chk.cut()) + for _, format := range allPossibleFormats { + chunkFormat, headBlockFmt := format.chunkFormat, format.headBlockFmt + t.Run(fmt.Sprintf("encoding:%v chunkFormat:%v headBlockFmt:%v", enc, chunkFormat, headBlockFmt), func(t *testing.T) { + t.Parallel() + chk := newMemChunkWithFormat(chunkFormat, enc, headBlockFmt, testBlockSize, testTargetSize) + cases := []struct { + ts int64 + str string + lbs []logproto.LabelAdapter + cut bool + }{ + { + ts: 1, + str: "hello, world!", + }, + { + ts: 2, + str: "hello, world2!", + lbs: []logproto.LabelAdapter{ + {Name: "app", Value: "myapp"}, + }, + }, + { + ts: 3, + str: "hello, world3!", + lbs: []logproto.LabelAdapter{ + {Name: "a", Value: "a"}, + {Name: "b", Value: "b"}, + }, + }, + { + ts: 4, + str: "hello, world4!", + }, + { + ts: 5, + str: "hello, world5!", + }, + { + ts: 6, + str: "hello, world6!", + cut: true, + }, + { + ts: 7, + str: "hello, world7!", + }, + { + ts: 8, + str: "hello, worl\nd8!", + lbs: []logproto.LabelAdapter{ + {Name: "a", Value: "a2"}, + {Name: "b", Value: "b"}, + }, + }, + { + ts: 8, + str: "hello, world 8, 2!", + }, + { + ts: 8, + str: "hello, world 8, 3!", + }, + { + ts: 9, + str: "", + }, } - } - - it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) - require.NoError(t, err) - - idx := 0 - for it.Next() { - e := it.Entry() - require.Equal(t, cases[idx].ts, e.Timestamp.UnixNano()) - require.Equal(t, cases[idx].str, e.Line) - idx++ - } - - require.NoError(t, it.Error()) - require.NoError(t, it.Close()) - require.Equal(t, len(cases), idx) - - sampleIt := chk.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor) - idx = 0 - for sampleIt.Next() { - s := sampleIt.Sample() - require.Equal(t, cases[idx].ts, s.Timestamp) - require.Equal(t, 1., s.Value) - require.NotEmpty(t, s.Hash) - idx++ - } - require.NoError(t, sampleIt.Error()) - require.NoError(t, sampleIt.Close()) - require.Equal(t, len(cases), idx) + for _, c := range cases { + require.NoError(t, chk.Append(logprotoEntryWithMetadata(c.ts, c.str, c.lbs))) + if c.cut { + require.NoError(t, chk.cut()) + } + } - t.Run("bounded-iteration", func(t *testing.T) { - it, err := chk.Iterator(context.Background(), time.Unix(0, 3), time.Unix(0, 7), logproto.FORWARD, noopStreamPipeline) + it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) require.NoError(t, err) - idx := 2 + idx := 0 for it.Next() { e := it.Entry() require.Equal(t, cases[idx].ts, e.Timestamp.UnixNano()) require.Equal(t, cases[idx].str, e.Line) + if chunkFormat < chunkFormatV4 { + require.Empty(t, e.NonIndexedLabels) + } else { + require.Equal(t, push.LabelsAdapter(cases[idx].lbs), e.NonIndexedLabels) + } idx++ } + require.NoError(t, it.Error()) - require.Equal(t, 6, idx) + require.NoError(t, it.Close()) + require.Equal(t, len(cases), idx) + + // TODO: Test labels and metadata labels here. + sampleIt := chk.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor) + idx = 0 + for sampleIt.Next() { + s := sampleIt.Sample() + require.Equal(t, cases[idx].ts, s.Timestamp) + require.Equal(t, 1., s.Value) + require.NotEmpty(t, s.Hash) + idx++ + } + + require.NoError(t, sampleIt.Error()) + require.NoError(t, sampleIt.Close()) + require.Equal(t, len(cases), idx) + + t.Run("bounded-iteration", func(t *testing.T) { + it, err := chk.Iterator(context.Background(), time.Unix(0, 3), time.Unix(0, 7), logproto.FORWARD, noopStreamPipeline) + require.NoError(t, err) + + idx := 2 + for it.Next() { + e := it.Entry() + require.Equal(t, cases[idx].ts, e.Timestamp.UnixNano()) + require.Equal(t, cases[idx].str, e.Line) + idx++ + } + require.NoError(t, it.Error()) + require.Equal(t, 6, idx) + }) }) - }) + + } } } @@ -184,7 +228,7 @@ func TestCorruptChunk(t *testing.T) { t.Run(enc.String(), func(t *testing.T) { t.Parallel() - chk := NewMemChunk(enc, DefaultHeadBlockFmt, testBlockSize, testTargetSize) + chk := NewMemChunk(enc, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize) cases := []struct { data []byte }{ @@ -214,7 +258,7 @@ func TestCorruptChunk(t *testing.T) { func TestReadFormatV1(t *testing.T) { t.Parallel() - c := NewMemChunk(EncGZIP, DefaultHeadBlockFmt, testBlockSize, testTargetSize) + c := NewMemChunk(EncGZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize) fillChunk(c) // overrides default v2 format c.format = chunkFormatV1 @@ -249,65 +293,66 @@ func TestReadFormatV1(t *testing.T) { // 1) memory populated chunks <-> []byte loaded chunks // 2) []byte loaded chunks <-> []byte loaded chunks func TestRoundtripV2(t *testing.T) { - for _, f := range HeadBlockFmts { + for _, testData := range allPossibleFormats { for _, enc := range testEncoding { - for _, version := range []byte{chunkFormatV2, chunkFormatV3} { - enc := enc - version := version - t.Run(enc.String(), func(t *testing.T) { - t.Parallel() + enc := enc + t.Run(testNameWithFormats(enc, testData.chunkFormat, testData.headBlockFmt), func(t *testing.T) { + t.Parallel() - c := NewMemChunk(enc, f, testBlockSize, testTargetSize) - c.format = version - populated := fillChunk(c) + c := newMemChunkWithFormat(testData.chunkFormat, enc, testData.headBlockFmt, testBlockSize, testTargetSize) + populated := fillChunk(c) - assertLines := func(c *MemChunk) { - require.Equal(t, enc, c.Encoding()) - it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) - if err != nil { - t.Fatal(err) - } + assertLines := func(c *MemChunk) { + require.Equal(t, enc, c.Encoding()) + it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) + if err != nil { + t.Fatal(err) + } - i := int64(0) - var data int64 - for it.Next() { - require.Equal(t, i, it.Entry().Timestamp.UnixNano()) - require.Equal(t, testdata.LogString(i), it.Entry().Line) + i := int64(0) + var data int64 + for it.Next() { + require.Equal(t, i, it.Entry().Timestamp.UnixNano()) + require.Equal(t, testdata.LogString(i), it.Entry().Line) - data += int64(len(it.Entry().Line)) - i++ - } - require.Equal(t, populated, data) + data += int64(len(it.Entry().Line)) + i++ } + require.Equal(t, populated, data) + } - assertLines(c) + assertLines(c) - // test MemChunk -> NewByteChunk loading - b, err := c.Bytes() - if err != nil { - t.Fatal(err) - } + // test MemChunk -> NewByteChunk loading + b, err := c.Bytes() + if err != nil { + t.Fatal(err) + } - r, err := NewByteChunk(b, testBlockSize, testTargetSize) - if err != nil { - t.Fatal(err) - } - assertLines(r) + r, err := NewByteChunk(b, testBlockSize, testTargetSize) + if err != nil { + t.Fatal(err) + } + assertLines(r) - // test NewByteChunk -> NewByteChunk loading - rOut, err := r.Bytes() - require.Nil(t, err) + // test NewByteChunk -> NewByteChunk loading + rOut, err := r.Bytes() + require.Nil(t, err) - loaded, err := NewByteChunk(rOut, testBlockSize, testTargetSize) - require.Nil(t, err) + loaded, err := NewByteChunk(rOut, testBlockSize, testTargetSize) + require.Nil(t, err) - assertLines(loaded) - }) - } + assertLines(loaded) + }) } + } } +func testNameWithFormats(enc Encoding, chunkFormat byte, headBlockFmt HeadBlockFmt) string { + return fmt.Sprintf("encoding:%v chunkFormat:%v headBlockFmt:%v", enc, chunkFormat, headBlockFmt) +} + func TestRoundtripV3(t *testing.T) { for _, f := range HeadBlockFmts { for _, enc := range testEncoding { @@ -333,14 +378,14 @@ func TestRoundtripV3(t *testing.T) { } func TestSerialization(t *testing.T) { - for _, f := range HeadBlockFmts { + for _, testData := range allPossibleFormats { for _, enc := range testEncoding { enc := enc - t.Run(enc.String(), func(t *testing.T) { + t.Run(testNameWithFormats(enc, testData.chunkFormat, testData.headBlockFmt), func(t *testing.T) { t.Parallel() - chk := NewMemChunk(enc, f, testBlockSize, testTargetSize) - + chk := NewMemChunk(enc, testData.headBlockFmt, testBlockSize, testTargetSize) + chk.format = testData.chunkFormat numSamples := 50000 for i := 0; i < numSamples; i++ { @@ -385,13 +430,13 @@ func TestSerialization(t *testing.T) { } func TestChunkFilling(t *testing.T) { - for _, f := range HeadBlockFmts { + for _, testData := range allPossibleFormats { for _, enc := range testEncoding { enc := enc - t.Run(enc.String(), func(t *testing.T) { + t.Run(testNameWithFormats(enc, testData.chunkFormat, testData.headBlockFmt), func(t *testing.T) { t.Parallel() - chk := NewMemChunk(enc, f, testBlockSize, 0) + chk := newMemChunkWithFormat(testData.chunkFormat, enc, testData.headBlockFmt, testBlockSize, 0) chk.blockSize = 1024 // We should be able to append only 10KB of logs. @@ -431,7 +476,7 @@ func TestChunkFilling(t *testing.T) { func TestGZIPChunkTargetSize(t *testing.T) { t.Parallel() - chk := NewMemChunk(EncGZIP, DefaultHeadBlockFmt, testBlockSize, testTargetSize) + chk := NewMemChunk(EncGZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize) lineSize := 512 entry := &logproto.Entry{ @@ -573,7 +618,7 @@ func TestChunkSize(t *testing.T) { } func TestChunkStats(t *testing.T) { - c := NewMemChunk(EncSnappy, DefaultHeadBlockFmt, testBlockSize, 0) + c := NewMemChunk(EncSnappy, DefaultTestHeadBlockFmt, testBlockSize, 0) first := time.Now() entry := &logproto.Entry{ Timestamp: first, @@ -690,23 +735,34 @@ func BenchmarkWrite(b *testing.B) { for _, f := range HeadBlockFmts { for _, enc := range testEncoding { - b.Run(fmt.Sprintf("%v-%v", f, enc), func(b *testing.B) { - uncompressedBytes, compressedBytes := 0, 0 - for n := 0; n < b.N; n++ { - c := NewMemChunk(enc, f, testBlockSize, testTargetSize) - // adds until full so we trigger cut which serialize using gzip - for c.SpaceFor(entry) { - _ = c.Append(entry) - entry.Timestamp = time.Unix(0, i) - entry.Line = testdata.LogString(i) - i++ - } - uncompressedBytes += c.UncompressedSize() - compressedBytes += c.CompressedSize() + for _, withNonIndexedLabels := range []bool{false, true} { + name := fmt.Sprintf("%v-%v", f, enc) + if withNonIndexedLabels { + name += "-withNonIndexedLabels" } - b.SetBytes(int64(uncompressedBytes) / int64(b.N)) - b.ReportMetric(float64(compressedBytes)/float64(uncompressedBytes)*100, "%compressed") - }) + b.Run(name, func(b *testing.B) { + uncompressedBytes, compressedBytes := 0, 0 + for n := 0; n < b.N; n++ { + c := NewMemChunk(enc, f, testBlockSize, testTargetSize) + // adds until full so we trigger cut which serialize using gzip + for c.SpaceFor(entry) { + _ = c.Append(entry) + entry.Timestamp = time.Unix(0, i) + entry.Line = testdata.LogString(i) + if withNonIndexedLabels { + entry.NonIndexedLabels = []logproto.LabelAdapter{ + {Name: "foo", Value: fmt.Sprint(i)}, + } + } + i++ + } + uncompressedBytes += c.UncompressedSize() + compressedBytes += c.CompressedSize() + } + b.SetBytes(int64(uncompressedBytes) / int64(b.N)) + b.ReportMetric(float64(compressedBytes)/float64(uncompressedBytes)*100, "%compressed") + }) + } } } } @@ -779,7 +835,7 @@ func BenchmarkBackwardIterator(b *testing.B) { for _, bs := range testBlockSizes { b.Run(humanize.Bytes(uint64(bs)), func(b *testing.B) { b.ReportAllocs() - c := NewMemChunk(EncSnappy, DefaultHeadBlockFmt, bs, testTargetSize) + c := NewMemChunk(EncSnappy, DefaultTestHeadBlockFmt, bs, testTargetSize) _ = fillChunk(c) b.ResetTimer() for n := 0; n < b.N; n++ { @@ -826,57 +882,71 @@ func TestGenerateDataSize(t *testing.T) { func BenchmarkHeadBlockIterator(b *testing.B) { for _, j := range []int{100000, 50000, 15000, 10000} { - b.Run(fmt.Sprintf("Size %d", j), func(b *testing.B) { - h := headBlock{} + for _, withNonIndexedLabels := range []bool{false, true} { + b.Run(fmt.Sprintf("size=%d nonIndexedLabels=%v", j, withNonIndexedLabels), func(b *testing.B) { + h := headBlock{} - for i := 0; i < j; i++ { - if err := h.Append(int64(i), "this is the append string"); err != nil { - b.Fatal(err) + var nonIndexedLabels labels.Labels + if withNonIndexedLabels { + nonIndexedLabels = labels.Labels{{Name: "foo", Value: "foo"}} } - } - b.ResetTimer() + for i := 0; i < j; i++ { + if err := h.Append(int64(i), "this is the append string", nonIndexedLabels); err != nil { + b.Fatal(err) + } + } - for n := 0; n < b.N; n++ { - iter := h.Iterator(context.Background(), logproto.BACKWARD, 0, math.MaxInt64, noopStreamPipeline) + b.ResetTimer() - for iter.Next() { - _ = iter.Entry() + for n := 0; n < b.N; n++ { + iter := h.Iterator(context.Background(), logproto.BACKWARD, 0, math.MaxInt64, noopStreamPipeline) + + for iter.Next() { + _ = iter.Entry() + } } - } - }) + }) + } } } func BenchmarkHeadBlockSampleIterator(b *testing.B) { for _, j := range []int{20000, 10000, 8000, 5000} { - b.Run(fmt.Sprintf("Size %d", j), func(b *testing.B) { - h := headBlock{} + for _, withNonIndexedLabels := range []bool{false, true} { + b.Run(fmt.Sprintf("size=%d nonIndexedLabels=%v", j, withNonIndexedLabels), func(b *testing.B) { + h := headBlock{} - for i := 0; i < j; i++ { - if err := h.Append(int64(i), "this is the append string"); err != nil { - b.Fatal(err) + var nonIndexedLabels labels.Labels + if withNonIndexedLabels { + nonIndexedLabels = labels.Labels{{Name: "foo", Value: "foo"}} } - } - b.ResetTimer() + for i := 0; i < j; i++ { + if err := h.Append(int64(i), "this is the append string", nonIndexedLabels); err != nil { + b.Fatal(err) + } + } - for n := 0; n < b.N; n++ { - iter := h.SampleIterator(context.Background(), 0, math.MaxInt64, countExtractor) + b.ResetTimer() - for iter.Next() { - _ = iter.Sample() + for n := 0; n < b.N; n++ { + iter := h.SampleIterator(context.Background(), 0, math.MaxInt64, countExtractor) + + for iter.Next() { + _ = iter.Sample() + } + iter.Close() } - iter.Close() - } - }) + }) + } } } func TestMemChunk_IteratorBounds(t *testing.T) { createChunk := func() *MemChunk { t.Helper() - c := NewMemChunk(EncNone, DefaultHeadBlockFmt, 1e6, 1e6) + c := NewMemChunk(EncNone, DefaultTestHeadBlockFmt, 1e6, 1e6) if err := c.Append(&logproto.Entry{ Timestamp: time.Unix(0, 1), @@ -940,7 +1010,7 @@ func TestMemchunkLongLine(t *testing.T) { t.Run(enc.String(), func(t *testing.T) { t.Parallel() - c := NewMemChunk(enc, DefaultHeadBlockFmt, testBlockSize, testTargetSize) + c := NewMemChunk(enc, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize) for i := 1; i <= 10; i++ { require.NoError(t, c.Append(&logproto.Entry{Timestamp: time.Unix(0, int64(i)), Line: strings.Repeat("e", 200000)})) } @@ -958,9 +1028,9 @@ func TestMemchunkLongLine(t *testing.T) { func TestBytesWith(t *testing.T) { t.Parallel() - exp, err := NewMemChunk(EncNone, DefaultHeadBlockFmt, testBlockSize, testTargetSize).BytesWith(nil) + exp, err := NewMemChunk(EncNone, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith(nil) require.Nil(t, err) - out, err := NewMemChunk(EncNone, DefaultHeadBlockFmt, testBlockSize, testTargetSize).BytesWith([]byte{1, 2, 3}) + out, err := NewMemChunk(EncNone, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith([]byte{1, 2, 3}) require.Nil(t, err) require.Equal(t, exp, out) @@ -1119,9 +1189,9 @@ func BenchmarkBufferedIteratorLabels(b *testing.B) { } func Test_HeadIteratorReverse(t *testing.T) { - for _, f := range HeadBlockFmts { - t.Run(f.String(), func(t *testing.T) { - c := NewMemChunk(EncSnappy, f, testBlockSize, testTargetSize) + for _, testData := range allPossibleFormats { + t.Run(testNameWithFormats(EncSnappy, testData.chunkFormat, testData.headBlockFmt), func(t *testing.T) { + c := newMemChunkWithFormat(testData.chunkFormat, EncSnappy, testData.headBlockFmt, testBlockSize, testTargetSize) genEntry := func(i int64) *logproto.Entry { return &logproto.Entry{ Timestamp: time.Unix(0, i), @@ -1234,7 +1304,7 @@ func TestMemChunk_Rebound(t *testing.T) { } func buildTestMemChunk(t *testing.T, from, through time.Time) *MemChunk { - chk := NewMemChunk(EncGZIP, DefaultHeadBlockFmt, defaultBlockSize, 0) + chk := NewMemChunk(EncGZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0) for ; from.Before(through); from = from.Add(time.Second) { err := chk.Append(&logproto.Entry{ Line: from.String(), @@ -1313,7 +1383,7 @@ func TestMemChunk_ReboundAndFilter_with_filter(t *testing.T) { } func buildFilterableTestMemChunk(t *testing.T, from, through time.Time, matchingFrom, matchingTo *time.Time) *MemChunk { - chk := NewMemChunk(EncGZIP, DefaultHeadBlockFmt, defaultBlockSize, 0) + chk := NewMemChunk(EncGZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0) t.Logf("from : %v", from.String()) t.Logf("through: %v", through.String()) for from.Before(through) { @@ -1339,3 +1409,136 @@ func buildFilterableTestMemChunk(t *testing.T, from, through time.Time, matching return chk } + +func TestMemChunk_SpaceFor(t *testing.T) { + for _, tc := range []struct { + desc string + + nBlocks int + targetSize int + headSize int + cutBlockSize int + entry logproto.Entry + + expect bool + expectFunc func(chunkFormat byte, headFmt HeadBlockFmt) bool + }{ + { + desc: "targetSize not defined", + nBlocks: blocksPerChunk - 1, + entry: logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: "a", + }, + expect: true, + }, + { + desc: "targetSize not defined and too many blocks", + nBlocks: blocksPerChunk + 1, + entry: logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: "a", + }, + expect: false, + }, + { + desc: "head too big", + targetSize: 10, + headSize: 100, + cutBlockSize: 0, + entry: logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: "a", + }, + expect: false, + }, + { + desc: "cut blocks too big", + targetSize: 10, + headSize: 0, + cutBlockSize: 100, + entry: logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: "a", + }, + expect: false, + }, + { + desc: "entry fits", + targetSize: 10, + headSize: 0, + cutBlockSize: 0, + entry: logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: strings.Repeat("a", 9), + }, + expect: true, + }, + { + desc: "entry fits with metadata", + targetSize: 10, + headSize: 0, + cutBlockSize: 0, + entry: logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: strings.Repeat("a", 2), + NonIndexedLabels: []logproto.LabelAdapter{ + {Name: "foo", Value: strings.Repeat("a", 2)}, + }, + }, + expect: true, + }, + { + desc: "entry too big", + targetSize: 10, + headSize: 0, + cutBlockSize: 0, + entry: logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: strings.Repeat("a", 100), + }, + expect: false, + }, + { + desc: "entry too big because metadata", + targetSize: 10, + headSize: 0, + cutBlockSize: 0, + entry: logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: strings.Repeat("a", 5), + NonIndexedLabels: []logproto.LabelAdapter{ + {Name: "foo", Value: strings.Repeat("a", 5)}, + }, + }, + + expectFunc: func(chunkFormat byte, _ HeadBlockFmt) bool { + // Succeed unless we're using chunk format v4, which should + // take the metadata into account. + return chunkFormat < chunkFormatV4 + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + for _, format := range allPossibleFormats { + t.Run(fmt.Sprintf("chunk_v%d_head_%s", format.chunkFormat, format.headBlockFmt), func(t *testing.T) { + chk := newMemChunkWithFormat(format.chunkFormat, EncNone, format.headBlockFmt, 1024, tc.targetSize) + + chk.blocks = make([]block, tc.nBlocks) + chk.cutBlockSize = tc.cutBlockSize + for i := 0; i < tc.headSize; i++ { + require.NoError(t, chk.head.Append(int64(i), "a", nil)) + } + + expect := tc.expect + if tc.expectFunc != nil { + expect = tc.expectFunc(format.chunkFormat, format.headBlockFmt) + } + + require.Equal(t, expect, chk.SpaceFor(&tc.entry)) + }) + } + + }) + } +} diff --git a/pkg/chunkenc/pool.go b/pkg/chunkenc/pool.go index a7c4fbea8f865..1bf7887d6b770 100644 --- a/pkg/chunkenc/pool.go +++ b/pkg/chunkenc/pool.go @@ -48,6 +48,12 @@ var ( // Buckets [0.5KB,1KB,2KB,4KB,8KB] BytesBufferPool = pool.New(1<<9, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) }) + // LabelsPool is a matrix of bytes buffers used to store label names and values. + // Buckets [8, 16, 32, 64, 128, 256]. + // Since we store label names and values, the number of labels we can store is the half the bucket size. + // So we will be able to store from 0 to 128 labels. + LabelsPool = pool.New(1<<3, 1<<8, 2, func(size int) interface{} { return make([][]byte, 0, size) }) + // SamplesPool pooling array of samples [512,1024,...,16k] SamplesPool = pool.New(1<<9, 1<<14, 2, func(size int) interface{} { return make([]logproto.Sample, 0, size) }) diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index 913cdb68986c0..541aa66f6839e 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -34,7 +34,7 @@ type HeadBlock interface { Entries() int UncompressedSize() int Convert(HeadBlockFmt) (HeadBlock, error) - Append(int64, string) error + Append(int64, string, labels.Labels) error Iterator( ctx context.Context, direction logproto.Direction, @@ -52,6 +52,7 @@ type HeadBlock interface { } type unorderedHeadBlock struct { + format HeadBlockFmt // Opted for range tree over skiplist for space reduction. // Inserts: O(log(n)) // Scans: (O(k+log(n))) where k=num_scanned_entries & n=total_entries @@ -62,13 +63,14 @@ type unorderedHeadBlock struct { mint, maxt int64 // upper and lower bounds } -func newUnorderedHeadBlock() *unorderedHeadBlock { +func newUnorderedHeadBlock(headBlockFmt HeadBlockFmt) *unorderedHeadBlock { return &unorderedHeadBlock{ - rt: rangetree.New(1), + format: headBlockFmt, + rt: rangetree.New(1), } } -func (hb *unorderedHeadBlock) Format() HeadBlockFmt { return UnorderedHeadBlockFmt } +func (hb *unorderedHeadBlock) Format() HeadBlockFmt { return hb.format } func (hb *unorderedHeadBlock) IsEmpty() bool { return hb.size == 0 @@ -87,21 +89,30 @@ func (hb *unorderedHeadBlock) UncompressedSize() int { } func (hb *unorderedHeadBlock) Reset() { - x := newUnorderedHeadBlock() + x := newUnorderedHeadBlock(hb.format) *hb = *x } +type nsEntry struct { + line string + metadataLabels labels.Labels +} + // collection of entries belonging to the same nanosecond type nsEntries struct { ts int64 - entries []string + entries []nsEntry } func (e *nsEntries) ValueAtDimension(_ uint64) int64 { return e.ts } -func (hb *unorderedHeadBlock) Append(ts int64, line string) error { +func (hb *unorderedHeadBlock) Append(ts int64, line string, metaLabels labels.Labels) error { + if hb.format < UnorderedWithMetadataHeadBlockFmt { + // metaLabels must be ignored for the previous head block formats + metaLabels = nil + } // This is an allocation hack. The rangetree lib does not // support the ability to pass a "mutate" function during an insert // and instead will displace any existing entry at the specified timestamp. @@ -120,14 +131,14 @@ func (hb *unorderedHeadBlock) Append(ts int64, line string) error { // entries at the same time with the same content, iterate through any existing // entries and ignore the line if we already have an entry with the same content for _, et := range displaced[0].(*nsEntries).entries { - if et == line { + if et.line == line { e.entries = displaced[0].(*nsEntries).entries return nil } } - e.entries = append(displaced[0].(*nsEntries).entries, line) + e.entries = append(displaced[0].(*nsEntries).entries, nsEntry{line, metaLabels}) } else { - e.entries = []string{line} + e.entries = []nsEntry{{line, metaLabels}} } // Update hb metdata @@ -139,12 +150,20 @@ func (hb *unorderedHeadBlock) Append(ts int64, line string) error { hb.maxt = ts } - hb.size += len(line) + hb.size += len(line) + metaLabelsLen(metaLabels) hb.lines++ return nil } +func metaLabelsLen(metaLabels labels.Labels) int { + length := 0 + for _, label := range metaLabels { + length += len(label.Name) + len(label.Value) + } + return length +} + // Implements rangetree.Interval type interval struct { mint, maxt int64 @@ -162,7 +181,7 @@ func (hb *unorderedHeadBlock) forEntries( direction logproto.Direction, mint, maxt int64, - entryFn func(int64, string) error, // returning an error exits early + entryFn func(int64, string, labels.Labels) error, // returning an error exits early ) (err error) { if hb.IsEmpty() || (maxt < hb.mint || hb.maxt < mint) { return @@ -191,9 +210,10 @@ func (hb *unorderedHeadBlock) forEntries( } for ; i < len(es.entries) && i >= 0; next() { - line := es.entries[i] + line := es.entries[i].line + metadataLabels := es.entries[i].metadataLabels chunkStats.AddHeadChunkBytes(int64(len(line))) - err = entryFn(es.ts, line) + err = entryFn(es.ts, line, metadataLabels) } } @@ -235,7 +255,7 @@ func (hb *unorderedHeadBlock) Iterator( direction, mint, maxt, - func(ts int64, line string) error { + func(ts int64, line string, nonIndexedLabels labels.Labels) error { newLine, parsedLbs, matches := pipeline.ProcessString(ts, line) if !matches { return nil @@ -253,8 +273,9 @@ func (hb *unorderedHeadBlock) Iterator( } stream.Entries = append(stream.Entries, logproto.Entry{ - Timestamp: time.Unix(0, ts), - Line: newLine, + Timestamp: time.Unix(0, ts), + Line: newLine, + NonIndexedLabels: logproto.FromLabelsToLabelAdapters(nonIndexedLabels), }) return nil }, @@ -284,7 +305,7 @@ func (hb *unorderedHeadBlock) SampleIterator( logproto.FORWARD, mint, maxt, - func(ts int64, line string) error { + func(ts int64, line string, metaLabels labels.Labels) error { value, parsedLabels, ok := extractor.ProcessString(ts, line) if !ok { return nil @@ -307,6 +328,7 @@ func (hb *unorderedHeadBlock) SampleIterator( Timestamp: ts, Value: value, Hash: xxhash.Sum64(unsafeGetBytes(line)), + // TODO: add metadata labels to sample }) return nil }, @@ -346,7 +368,7 @@ func (hb *unorderedHeadBlock) Serialise(pool WriterPool) ([]byte, error) { logproto.FORWARD, 0, math.MaxInt64, - func(ts int64, line string) error { + func(ts int64, line string, metaLabels labels.Labels) error { n := binary.PutVarint(encBuf, ts) inBuf.Write(encBuf[:n]) @@ -354,6 +376,21 @@ func (hb *unorderedHeadBlock) Serialise(pool WriterPool) ([]byte, error) { inBuf.Write(encBuf[:n]) inBuf.WriteString(line) + + if hb.format >= UnorderedWithMetadataHeadBlockFmt { + // Serialize metadata labels + n = binary.PutUvarint(encBuf, uint64(len(metaLabels))) + inBuf.Write(encBuf[:n]) + for _, l := range metaLabels { + n = binary.PutUvarint(encBuf, uint64(len(l.Name))) + inBuf.Write(encBuf[:n]) + inBuf.WriteString(l.Name) + + n = binary.PutUvarint(encBuf, uint64(len(l.Value))) + inBuf.Write(encBuf[:n]) + inBuf.WriteString(l.Value) + } + } return nil }, ) @@ -369,7 +406,7 @@ func (hb *unorderedHeadBlock) Serialise(pool WriterPool) ([]byte, error) { } func (hb *unorderedHeadBlock) Convert(version HeadBlockFmt) (HeadBlock, error) { - if version > OrderedHeadBlockFmt { + if hb.format == version { return hb, nil } out := version.NewBlock() @@ -379,8 +416,8 @@ func (hb *unorderedHeadBlock) Convert(version HeadBlockFmt) (HeadBlock, error) { logproto.FORWARD, 0, math.MaxInt64, - func(ts int64, line string) error { - return out.Append(ts, line) + func(ts int64, line string, metaLabels labels.Labels) error { + return out.Append(ts, line, metaLabels) }, ) return out, err @@ -392,7 +429,22 @@ func (hb *unorderedHeadBlock) CheckpointSize() int { size += binary.MaxVarintLen32 * 2 // total entries + total size size += binary.MaxVarintLen64 * 2 // mint,maxt size += (binary.MaxVarintLen64 + binary.MaxVarintLen32) * hb.lines // ts + len of log line. - size += hb.size // uncompressed bytes of lines + if hb.format >= UnorderedWithMetadataHeadBlockFmt { + _ = hb.forEntries( + context.Background(), + logproto.FORWARD, + 0, + math.MaxInt64, + func(ts int64, line string, metaLabels labels.Labels) error { + // len of meta labels + size += binary.MaxVarintLen32 + // len of name and value of each meta label, the size of values is already included into hb.size + size += (binary.MaxVarintLen32 * 2) * len(metaLabels) + return nil + }, + ) + } + size += hb.size // uncompressed bytes of lines return size } @@ -432,7 +484,7 @@ func (hb *unorderedHeadBlock) CheckpointTo(w io.Writer) error { logproto.FORWARD, 0, math.MaxInt64, - func(ts int64, line string) error { + func(ts int64, line string, metaLabels labels.Labels) error { eb.putVarint64(ts) eb.putUvarint(len(line)) _, err = w.Write(eb.get()) @@ -445,6 +497,35 @@ func (hb *unorderedHeadBlock) CheckpointTo(w io.Writer) error { if err != nil { return errors.Wrap(err, "write headblock entry line") } + + if hb.format >= UnorderedWithMetadataHeadBlockFmt { + // metadata + eb.putUvarint(len(metaLabels)) + _, err = w.Write(eb.get()) + if err != nil { + return errors.Wrap(err, "write headBlock entry meta labels length") + } + eb.reset() + for _, l := range metaLabels { + eb.putUvarint(len(l.Name)) + eb.putUvarint(len(l.Value)) + _, err = w.Write(eb.get()) + if err != nil { + return errors.Wrap(err, "write headBlock entry meta label name and value length") + } + eb.reset() + + _, err = io.WriteString(w, l.Name) + if err != nil { + return errors.Wrap(err, "write headBlock entry meta label name") + } + _, err = io.WriteString(w, l.Value) + if err != nil { + return errors.Wrap(err, "write headBlock entry meta label value") + } + } + } + return nil }, ) @@ -454,7 +535,7 @@ func (hb *unorderedHeadBlock) CheckpointTo(w io.Writer) error { func (hb *unorderedHeadBlock) LoadBytes(b []byte) error { // ensure it's empty - *hb = *newUnorderedHeadBlock() + *hb = *newUnorderedHeadBlock(hb.format) if len(b) < 1 { return nil @@ -467,8 +548,8 @@ func (hb *unorderedHeadBlock) LoadBytes(b []byte) error { return errors.Wrap(db.err(), "verifying headblock header") } - if version != UnorderedHeadBlockFmt.Byte() { - return errors.Errorf("incompatible headBlock version (%v), only V4 is currently supported", version) + if version < UnorderedHeadBlockFmt.Byte() { + return errors.Errorf("incompatible headBlock version (%v), only V4 and the next versions are currently supported", version) } n := db.uvarint() @@ -481,7 +562,24 @@ func (hb *unorderedHeadBlock) LoadBytes(b []byte) error { ts := db.varint64() lineLn := db.uvarint() line := string(db.bytes(lineLn)) - if err := hb.Append(ts, line); err != nil { + + var metaLabels labels.Labels + if version >= UnorderedWithMetadataHeadBlockFmt.Byte() { + metaLn := db.uvarint() + if metaLn > 0 { + metaLabels = make(labels.Labels, metaLn) + for j := 0; j < metaLn && db.err() == nil; j++ { + nameLn := db.uvarint() + valueLn := db.uvarint() + metaLabels[j] = labels.Label{ + Name: string(db.bytes(nameLn)), + Value: string(db.bytes(valueLn)), + } + } + } + } + + if err := hb.Append(ts, line, metaLabels); err != nil { return err } } @@ -508,7 +606,7 @@ func HeadFromCheckpoint(b []byte, desired HeadBlockFmt) (HeadBlock, error) { return nil, errors.Wrap(db.err(), "verifying headblock header") } format := HeadBlockFmt(version) - if format > UnorderedHeadBlockFmt { + if format > UnorderedWithMetadataHeadBlockFmt { return nil, fmt.Errorf("unexpected head block version: %v", format) } diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go index d36e02e72a0b9..a79e5c1e4dac0 100644 --- a/pkg/chunkenc/unordered_test.go +++ b/pkg/chunkenc/unordered_test.go @@ -21,8 +21,9 @@ func iterEq(t *testing.T, exp []entry, got iter.EntryIterator) { var i int for got.Next() { require.Equal(t, logproto.Entry{ - Timestamp: time.Unix(0, exp[i].t), - Line: exp[i].s, + Timestamp: time.Unix(0, exp[i].t), + Line: exp[i].s, + NonIndexedLabels: logproto.FromLabelsToLabelAdapters(exp[i].nonIndexedLabels), }, got.Entry()) i++ } @@ -30,9 +31,9 @@ func iterEq(t *testing.T, exp []entry, got iter.EntryIterator) { } func Test_forEntriesEarlyReturn(t *testing.T) { - hb := newUnorderedHeadBlock() + hb := newUnorderedHeadBlock(UnorderedHeadBlockFmt) for i := 0; i < 10; i++ { - require.Nil(t, hb.Append(int64(i), fmt.Sprint(i))) + require.Nil(t, hb.Append(int64(i), fmt.Sprint(i), labels.Labels{{Name: "i", Value: fmt.Sprint(i)}})) } // forward @@ -43,7 +44,7 @@ func Test_forEntriesEarlyReturn(t *testing.T) { logproto.FORWARD, 0, math.MaxInt64, - func(ts int64, line string) error { + func(ts int64, _ string, _ labels.Labels) error { forwardCt++ forwardStop = ts if ts == 5 { @@ -64,7 +65,7 @@ func Test_forEntriesEarlyReturn(t *testing.T) { logproto.BACKWARD, 0, math.MaxInt64, - func(ts int64, line string) error { + func(ts int64, _ string, _ labels.Labels) error { backwardCt++ backwardStop = ts if ts == 5 { @@ -87,96 +88,111 @@ func Test_Unordered_InsertRetrieval(t *testing.T) { { desc: "simple forward", input: []entry{ - {0, "a"}, {1, "b"}, {2, "c"}, + {0, "a", nil}, {1, "b", nil}, {2, "c", labels.Labels{{Name: "a", Value: "b"}}}, }, exp: []entry{ - {0, "a"}, {1, "b"}, {2, "c"}, + {0, "a", nil}, {1, "b", nil}, {2, "c", labels.Labels{{Name: "a", Value: "b"}}}, }, }, { desc: "simple backward", input: []entry{ - {0, "a"}, {1, "b"}, {2, "c"}, + {0, "a", nil}, {1, "b", nil}, {2, "c", labels.Labels{{Name: "a", Value: "b"}}}, }, exp: []entry{ - {2, "c"}, {1, "b"}, {0, "a"}, + {2, "c", labels.Labels{{Name: "a", Value: "b"}}}, {1, "b", nil}, {0, "a", nil}, }, dir: logproto.BACKWARD, }, { desc: "unordered forward", input: []entry{ - {1, "b"}, {0, "a"}, {2, "c"}, + {1, "b", nil}, {0, "a", nil}, {2, "c", labels.Labels{{Name: "a", Value: "b"}}}, }, exp: []entry{ - {0, "a"}, {1, "b"}, {2, "c"}, + {0, "a", nil}, {1, "b", nil}, {2, "c", labels.Labels{{Name: "a", Value: "b"}}}, }, }, { desc: "unordered backward", input: []entry{ - {1, "b"}, {0, "a"}, {2, "c"}, + {1, "b", nil}, {0, "a", nil}, {2, "c", labels.Labels{{Name: "a", Value: "b"}}}, }, exp: []entry{ - {2, "c"}, {1, "b"}, {0, "a"}, + {2, "c", labels.Labels{{Name: "a", Value: "b"}}}, {1, "b", nil}, {0, "a", nil}, }, dir: logproto.BACKWARD, }, { desc: "ts collision forward", input: []entry{ - {0, "a"}, {0, "b"}, {1, "c"}, + {0, "a", nil}, {0, "b", labels.Labels{{Name: "a", Value: "b"}}}, {1, "c", nil}, }, exp: []entry{ - {0, "a"}, {0, "b"}, {1, "c"}, + {0, "a", nil}, {0, "b", labels.Labels{{Name: "a", Value: "b"}}}, {1, "c", nil}, }, }, { desc: "ts collision backward", input: []entry{ - {0, "a"}, {0, "b"}, {1, "c"}, + {0, "a", labels.Labels{{Name: "a", Value: "b"}}}, {0, "b", nil}, {1, "c", nil}, }, exp: []entry{ - {1, "c"}, {0, "b"}, {0, "a"}, + {1, "c", nil}, {0, "b", nil}, {0, "a", labels.Labels{{Name: "a", Value: "b"}}}, }, dir: logproto.BACKWARD, }, { desc: "ts remove exact dupe forward", input: []entry{ - {0, "a"}, {0, "b"}, {1, "c"}, {0, "b"}, + {0, "a", nil}, {0, "b", nil}, {1, "c", nil}, {0, "b", labels.Labels{{Name: "a", Value: "b"}}}, }, exp: []entry{ - {0, "a"}, {0, "b"}, {1, "c"}, + {0, "a", nil}, {0, "b", nil}, {1, "c", nil}, }, dir: logproto.FORWARD, }, { desc: "ts remove exact dupe backward", input: []entry{ - {0, "a"}, {0, "b"}, {1, "c"}, {0, "b"}, + {0, "a", nil}, {0, "b", nil}, {1, "c", nil}, {0, "b", labels.Labels{{Name: "a", Value: "b"}}}, }, exp: []entry{ - {1, "c"}, {0, "b"}, {0, "a"}, + {1, "c", nil}, {0, "b", nil}, {0, "a", nil}, }, dir: logproto.BACKWARD, }, } { t.Run(tc.desc, func(t *testing.T) { - hb := newUnorderedHeadBlock() - for _, e := range tc.input { - require.Nil(t, hb.Append(e.t, e.s)) - } + for _, format := range []HeadBlockFmt{ + UnorderedHeadBlockFmt, + UnorderedWithMetadataHeadBlockFmt, + } { + t.Run(format.String(), func(t *testing.T) { + hb := newUnorderedHeadBlock(format) + for _, e := range tc.input { + require.Nil(t, hb.Append(e.t, e.s, e.nonIndexedLabels)) + } - itr := hb.Iterator( - context.Background(), - tc.dir, - 0, - math.MaxInt64, - noopStreamPipeline, - ) + itr := hb.Iterator( + context.Background(), + tc.dir, + 0, + math.MaxInt64, + noopStreamPipeline, + ) + + expected := make([]entry, len(tc.exp)) + copy(expected, tc.exp) + if format < UnorderedWithMetadataHeadBlockFmt { + for i := range expected { + expected[i].nonIndexedLabels = nil + } + } - iterEq(t, tc.exp, itr) + iterEq(t, expected, itr) + }) + } }) } } @@ -194,10 +210,10 @@ func Test_UnorderedBoundedIter(t *testing.T) { mint: 1, maxt: 4, input: []entry{ - {0, "a"}, {1, "b"}, {2, "c"}, {3, "d"}, {4, "e"}, + {0, "a", nil}, {1, "b", labels.Labels{{Name: "a", Value: "b"}}}, {2, "c", nil}, {3, "d", nil}, {4, "e", nil}, }, exp: []entry{ - {1, "b"}, {2, "c"}, {3, "d"}, + {1, "b", labels.Labels{{Name: "a", Value: "b"}}}, {2, "c", nil}, {3, "d", nil}, }, }, { @@ -205,10 +221,10 @@ func Test_UnorderedBoundedIter(t *testing.T) { mint: 1, maxt: 4, input: []entry{ - {0, "a"}, {1, "b"}, {2, "c"}, {3, "d"}, {4, "e"}, + {0, "a", nil}, {1, "b", labels.Labels{{Name: "a", Value: "b"}}}, {2, "c", nil}, {3, "d", nil}, {4, "e", nil}, }, exp: []entry{ - {3, "d"}, {2, "c"}, {1, "b"}, + {3, "d", nil}, {2, "c", nil}, {1, "b", labels.Labels{{Name: "a", Value: "b"}}}, }, dir: logproto.BACKWARD, }, @@ -217,64 +233,123 @@ func Test_UnorderedBoundedIter(t *testing.T) { mint: 1, maxt: 4, input: []entry{ - {0, "a"}, {2, "c"}, {1, "b"}, {4, "e"}, {3, "d"}, + {0, "a", nil}, {2, "c", nil}, {1, "b", labels.Labels{{Name: "a", Value: "b"}}}, {4, "e", nil}, {3, "d", nil}, }, exp: []entry{ - {1, "b"}, {2, "c"}, {3, "d"}, + {1, "b", labels.Labels{{Name: "a", Value: "b"}}}, {2, "c", nil}, {3, "d", nil}, }, }, } { t.Run(tc.desc, func(t *testing.T) { - hb := newUnorderedHeadBlock() - for _, e := range tc.input { - require.Nil(t, hb.Append(e.t, e.s)) - } + for _, format := range []HeadBlockFmt{ + UnorderedHeadBlockFmt, + UnorderedWithMetadataHeadBlockFmt, + } { + t.Run(format.String(), func(t *testing.T) { + hb := newUnorderedHeadBlock(format) + for _, e := range tc.input { + require.Nil(t, hb.Append(e.t, e.s, e.nonIndexedLabels)) + } - itr := hb.Iterator( - context.Background(), - tc.dir, - tc.mint, - tc.maxt, - noopStreamPipeline, - ) + itr := hb.Iterator( + context.Background(), + tc.dir, + tc.mint, + tc.maxt, + noopStreamPipeline, + ) + + expected := make([]entry, len(tc.exp)) + copy(expected, tc.exp) + if format < UnorderedWithMetadataHeadBlockFmt { + for i := range expected { + expected[i].nonIndexedLabels = nil + } + } - iterEq(t, tc.exp, itr) + iterEq(t, expected, itr) + }) + } }) } } func TestHeadBlockInterop(t *testing.T) { - unordered, ordered := newUnorderedHeadBlock(), &headBlock{} + unordered, ordered := newUnorderedHeadBlock(UnorderedHeadBlockFmt), &headBlock{} + unorderedWithMetadata := newUnorderedHeadBlock(UnorderedWithMetadataHeadBlockFmt) for i := 0; i < 100; i++ { - require.Nil(t, unordered.Append(int64(99-i), fmt.Sprint(99-i))) - require.Nil(t, ordered.Append(int64(i), fmt.Sprint(i))) + metaLabels := labels.Labels{{Name: "foo", Value: fmt.Sprint(99 - i)}} + require.Nil(t, unordered.Append(int64(99-i), fmt.Sprint(99-i), metaLabels)) + require.Nil(t, unorderedWithMetadata.Append(int64(99-i), fmt.Sprint(99-i), metaLabels)) + require.Nil(t, ordered.Append(int64(i), fmt.Sprint(i), labels.Labels{{Name: "foo", Value: fmt.Sprint(i)}})) } // turn to bytes - b1, err := ordered.CheckpointBytes(nil) + orderedCheckpointBytes, err := ordered.CheckpointBytes(nil) require.Nil(t, err) - b2, err := unordered.CheckpointBytes(nil) + unorderedCheckpointBytes, err := unordered.CheckpointBytes(nil) + require.Nil(t, err) + unorderedWithMetadataCheckpointBytes, err := unorderedWithMetadata.CheckpointBytes(nil) require.Nil(t, err) // Ensure we can recover ordered checkpoint into ordered headblock - recovered, err := HeadFromCheckpoint(b1, OrderedHeadBlockFmt) + recovered, err := HeadFromCheckpoint(orderedCheckpointBytes, OrderedHeadBlockFmt) require.Nil(t, err) require.Equal(t, ordered, recovered) // Ensure we can recover ordered checkpoint into unordered headblock - recovered, err = HeadFromCheckpoint(b1, UnorderedHeadBlockFmt) + recovered, err = HeadFromCheckpoint(orderedCheckpointBytes, UnorderedHeadBlockFmt) require.Nil(t, err) require.Equal(t, unordered, recovered) + // Ensure we can recover ordered checkpoint into unordered headblock with metadata + recovered, err = HeadFromCheckpoint(orderedCheckpointBytes, UnorderedWithMetadataHeadBlockFmt) + require.NoError(t, err) + require.Equal(t, &unorderedHeadBlock{ + format: UnorderedWithMetadataHeadBlockFmt, + rt: unordered.rt, + lines: unordered.lines, + size: unordered.size, + mint: unordered.mint, + maxt: unordered.maxt, + }, recovered) + // Ensure we can recover unordered checkpoint into ordered headblock - recovered, err = HeadFromCheckpoint(b2, OrderedHeadBlockFmt) + recovered, err = HeadFromCheckpoint(unorderedCheckpointBytes, OrderedHeadBlockFmt) require.Nil(t, err) require.Equal(t, ordered, recovered) // Ensure we can recover unordered checkpoint into unordered headblock - recovered, err = HeadFromCheckpoint(b2, UnorderedHeadBlockFmt) + recovered, err = HeadFromCheckpoint(unorderedCheckpointBytes, UnorderedHeadBlockFmt) require.Nil(t, err) require.Equal(t, unordered, recovered) + + // Ensure we can recover unordered checkpoint into unordered with metadata headblock + recovered, err = HeadFromCheckpoint(unorderedCheckpointBytes, UnorderedWithMetadataHeadBlockFmt) + require.NoError(t, err) + require.Equal(t, &unorderedHeadBlock{ + format: UnorderedWithMetadataHeadBlockFmt, + rt: unordered.rt, + lines: unordered.lines, + size: unordered.size, + mint: unordered.mint, + maxt: unordered.maxt, + }, recovered) + + // Ensure we can recover unordered with metadata checkpoint into ordered headblock + recovered, err = HeadFromCheckpoint(unorderedWithMetadataCheckpointBytes, OrderedHeadBlockFmt) + require.Nil(t, err) + require.Equal(t, ordered, recovered) // we compare the data with unordered because unordered head block does not contain metaLabels. + + // Ensure we can recover unordered with metadata checkpoint into unordered headblock + recovered, err = HeadFromCheckpoint(unorderedWithMetadataCheckpointBytes, UnorderedHeadBlockFmt) + require.Nil(t, err) + require.Equal(t, unordered, recovered) // we compare the data with unordered because unordered head block does not contain metaLabels. + + // Ensure we can recover unordered with metadata checkpoint into unordered with metadata headblock + recovered, err = HeadFromCheckpoint(unorderedWithMetadataCheckpointBytes, UnorderedWithMetadataHeadBlockFmt) + require.Nil(t, err) + require.Equal(t, unorderedWithMetadata, recovered) } // ensure backwards compatibility from when chunk format @@ -291,23 +366,23 @@ func BenchmarkHeadBlockWrites(b *testing.B) { // current default block size of 256kb with 75b avg log lines =~ 5.2k lines/block nWrites := (256 << 10) / 50 - headBlockFn := func() func(int64, string) { + headBlockFn := func() func(int64, string, labels.Labels) { hb := &headBlock{} - return func(ts int64, line string) { - _ = hb.Append(ts, line) + return func(ts int64, line string, metaLabels labels.Labels) { + _ = hb.Append(ts, line, metaLabels) } } - unorderedHeadBlockFn := func() func(int64, string) { - hb := newUnorderedHeadBlock() - return func(ts int64, line string) { - _ = hb.Append(ts, line) + unorderedHeadBlockFn := func() func(int64, string, labels.Labels) { + hb := newUnorderedHeadBlock(UnorderedHeadBlockFmt) + return func(ts int64, line string, metaLabels labels.Labels) { + _ = hb.Append(ts, line, metaLabels) } } for _, tc := range []struct { desc string - fn func() func(int64, string) + fn func() func(int64, string, labels.Labels) unorderedWrites bool }{ { @@ -324,33 +399,42 @@ func BenchmarkHeadBlockWrites(b *testing.B) { unorderedWrites: true, }, } { - // build writes before we start benchmarking so random number generation, etc, - // isn't included in our timing info - writes := make([]entry, 0, nWrites) - rnd := rand.NewSource(0) - for i := 0; i < nWrites; i++ { - if tc.unorderedWrites { - ts := rnd.Int63() - writes = append(writes, entry{ - t: ts, - s: fmt.Sprint("line:", ts), - }) - } else { + for _, withNonIndexedLabels := range []bool{false, true} { + // build writes before we start benchmarking so random number generation, etc, + // isn't included in our timing info + writes := make([]entry, 0, nWrites) + rnd := rand.NewSource(0) + for i := 0; i < nWrites; i++ { + ts := int64(i) + if tc.unorderedWrites { + ts = rnd.Int63() + } + + var nonIndexedLabels labels.Labels + if withNonIndexedLabels { + nonIndexedLabels = labels.Labels{{Name: "foo", Value: fmt.Sprint(ts)}} + } + writes = append(writes, entry{ - t: int64(i), - s: fmt.Sprint("line:", i), + t: ts, + s: fmt.Sprint("line:", i), + nonIndexedLabels: nonIndexedLabels, }) } - } - b.Run(tc.desc, func(b *testing.B) { - for n := 0; n < b.N; n++ { - writeFn := tc.fn() - for _, w := range writes { - writeFn(w.t, w.s) - } + name := tc.desc + if withNonIndexedLabels { + name += " with non-indexed labels" } - }) + b.Run(name, func(b *testing.B) { + for n := 0; n < b.N; n++ { + writeFn := tc.fn() + for _, w := range writes { + writeFn(w.t, w.s, w.nonIndexedLabels) + } + } + }) + } } } @@ -638,11 +722,12 @@ func Test_HeadIteratorHash(t *testing.T) { } for name, b := range map[string]HeadBlock{ - "unordered": newUnorderedHeadBlock(), - "ordered": &headBlock{}, + "unordered": newUnorderedHeadBlock(UnorderedHeadBlockFmt), + "unordered with metadata": newUnorderedHeadBlock(UnorderedWithMetadataHeadBlockFmt), + "ordered": &headBlock{}, } { t.Run(name, func(t *testing.T) { - require.NoError(t, b.Append(1, "foo")) + require.NoError(t, b.Append(1, "foo", labels.Labels{{Name: "foo", Value: "bar"}})) eit := b.Iterator(context.Background(), logproto.BACKWARD, 0, 2, log.NewNoopPipeline().ForStream(lbs)) for eit.Next() { diff --git a/pkg/chunkenc/util_test.go b/pkg/chunkenc/util_test.go index a65bdbcae1050..bd667c89caa8a 100644 --- a/pkg/chunkenc/util_test.go +++ b/pkg/chunkenc/util_test.go @@ -15,6 +15,14 @@ func logprotoEntry(ts int64, line string) *logproto.Entry { } } +func logprotoEntryWithMetadata(ts int64, line string, nonIndexedLabels []logproto.LabelAdapter) *logproto.Entry { + return &logproto.Entry{ + Timestamp: time.Unix(0, ts), + Line: line, + NonIndexedLabels: nonIndexedLabels, + } +} + func generateData(enc Encoding, chunksCount, blockSize, targetSize int) ([]Chunk, uint64) { chunks := []Chunk{} i := int64(0) diff --git a/pkg/ingester/checkpoint.go b/pkg/ingester/checkpoint.go index 21ee49c5847db..604d5c93bfd36 100644 --- a/pkg/ingester/checkpoint.go +++ b/pkg/ingester/checkpoint.go @@ -103,7 +103,7 @@ func fromWireChunks(conf *Config, wireChunks []Chunk) ([]chunkDesc, error) { // Always use Unordered headblocks during replay // to ensure Loki can effectively replay an unordered-friendly // WAL into a new configuration that disables unordered writes. - hbType := chunkenc.UnorderedHeadBlockFmt + hbType := chunkenc.DefaultHeadBlockFmt mc, err := chunkenc.MemchunkFromCheckpoint(c.Data, c.Head, hbType, conf.BlockSize, conf.TargetChunkSize) if err != nil { return nil, err diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index fbdf870da663c..a038a06a5052e 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -594,7 +594,7 @@ func (s *stream) resetCounter() { func headBlockType(unorderedWrites bool) chunkenc.HeadBlockFmt { if unorderedWrites { - return chunkenc.UnorderedHeadBlockFmt + return chunkenc.DefaultHeadBlockFmt } return chunkenc.OrderedHeadBlockFmt } diff --git a/pkg/storage/lazy_chunk_test.go b/pkg/storage/lazy_chunk_test.go index 29b81fc0bad4d..3962deaaa3079 100644 --- a/pkg/storage/lazy_chunk_test.go +++ b/pkg/storage/lazy_chunk_test.go @@ -22,6 +22,7 @@ func TestLazyChunkIterator(t *testing.T) { chunk *LazyChunk expected []logproto.Stream }{ + // TODO: Add tests for metadata labels. { newLazyChunk(logproto.Stream{ Labels: fooLabelsWithName.String(),