From 1d04cd5e317dc6696f61ffa62119f201129f33fa Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Mon, 24 Jul 2023 10:31:11 +0200 Subject: [PATCH] Metadata to labels result and filtering support (#9702) **What this PR does / why we need it**: In #9700, we support encoding and decoding metadata for each entry into the chunks. This PR adds support for returning metadata labels for matching entries in a query to the returned LabelResults. It also supports filtering out logs by metadata labels. **Special notes for your reviewer**: **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) --------- Co-authored-by: Sandeep Sukhani --- pkg/chunkenc/memchunk.go | 46 ++--- pkg/chunkenc/memchunk_test.go | 211 +++++++++++++++++++---- pkg/chunkenc/unordered.go | 4 +- pkg/chunkenc/unordered_test.go | 4 +- pkg/logql/log/labels.go | 7 + pkg/logql/log/metrics_extraction.go | 34 ++-- pkg/logql/log/metrics_extraction_test.go | 188 +++++++++++++++++--- pkg/logql/log/pipeline.go | 47 ++--- pkg/logql/log/pipeline_test.go | 103 +++++++++-- 9 files changed, 507 insertions(+), 137 deletions(-) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index ae5b8f20c3d7d..f3f6b8f075fc0 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -1024,7 +1024,7 @@ func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction, return } stats.AddHeadChunkBytes(int64(len(e.s))) - newLine, parsedLbs, matches := pipeline.ProcessString(e.t, e.s) + newLine, parsedLbs, matches := pipeline.ProcessString(e.t, e.s, e.nonIndexedLabels...) if !matches { return } @@ -1077,7 +1077,7 @@ func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extra for _, e := range hb.entries { stats.AddHeadChunkBytes(int64(len(e.s))) - value, parsedLabels, ok := extractor.ProcessString(e.t, e.s) + value, parsedLabels, ok := extractor.ProcessString(e.t, e.s, e.nonIndexedLabels...) if !ok { continue } @@ -1144,8 +1144,8 @@ type bufferedIterator struct { currLine []byte // the current line, this is the same as the buffer but sliced the line size. currTs int64 - metaLabelsBuf [][]byte // The buffer for a single entry's metadata labels. - currMetadataLabels [][]byte // The current labels. + metaLabelsBuf [][]byte // The buffer for a single entry's metadata labels. + currMetadataLabels labels.Labels // The current labels. closed bool } @@ -1177,15 +1177,29 @@ func (si *bufferedIterator) Next() bool { } } - ts, line, metaLabels, ok := si.moveNext() + ts, line, nonIndexedLabelsBuff, ok := si.moveNext() if !ok { si.Close() return false } + var nonIndexedLabels labels.Labels + if len(nonIndexedLabelsBuff) > 0 { + if len(nonIndexedLabelsBuff)%2 != 0 { + si.err = fmt.Errorf("expected even number of metadata labels, got %d", len(nonIndexedLabelsBuff)) + return false + } + + nonIndexedLabels = make(labels.Labels, len(nonIndexedLabelsBuff)/2) + for i := 0; i < len(nonIndexedLabelsBuff); i += 2 { + nonIndexedLabels[i/2].Name = string(nonIndexedLabelsBuff[i]) + nonIndexedLabels[i/2].Value = string(nonIndexedLabelsBuff[i+1]) + } + } + si.currTs = ts si.currLine = line - si.currMetadataLabels = metaLabels + si.currMetadataLabels = nonIndexedLabels return true } @@ -1452,28 +1466,14 @@ func (e *entryBufferedIterator) StreamHash() uint64 { return e.pipeline.BaseLabe func (e *entryBufferedIterator) Next() bool { for e.bufferedIterator.Next() { - if len(e.currMetadataLabels)%2 != 0 { - e.err = fmt.Errorf("expected even number of metadata labels, got %d", len(e.currMetadataLabels)) - return false - } - - var nonIndexedLabels []logproto.LabelAdapter - if len(e.currMetadataLabels) > 0 { - nonIndexedLabels = make([]logproto.LabelAdapter, len(e.currMetadataLabels)/2) - for i := 0; i < len(e.currMetadataLabels); i += 2 { - nonIndexedLabels[i/2].Name = string(e.currMetadataLabels[i]) - nonIndexedLabels[i/2].Value = string(e.currMetadataLabels[i+1]) - } - } - - newLine, lbs, matches := e.pipeline.Process(e.currTs, e.currLine) + newLine, lbs, matches := e.pipeline.Process(e.currTs, e.currLine, e.currMetadataLabels...) if !matches { continue } e.stats.AddPostFilterLines(1) e.currLabels = lbs - e.cur.NonIndexedLabels = nonIndexedLabels + e.cur.NonIndexedLabels = logproto.FromLabelsToLabelAdapters(e.currMetadataLabels) e.cur.Timestamp = time.Unix(0, e.currTs) e.cur.Line = string(newLine) return true @@ -1500,7 +1500,7 @@ type sampleBufferedIterator struct { func (e *sampleBufferedIterator) Next() bool { for e.bufferedIterator.Next() { - val, labels, ok := e.extractor.Process(e.currTs, e.currLine) + val, labels, ok := e.extractor.Process(e.currTs, e.currLine, e.currMetadataLabels...) if !ok { continue } diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 67bf4758a04f6..1f3d162c7c7d3 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -141,10 +141,6 @@ func TestBlock(t *testing.T) { { ts: 8, str: "hello, worl\nd8!", - lbs: []logproto.LabelAdapter{ - {Name: "a", Value: "a2"}, - {Name: "b", Value: "b"}, - }, }, { ts: 8, @@ -158,6 +154,14 @@ func TestBlock(t *testing.T) { ts: 9, str: "", }, + { + ts: 10, + str: "hello, world10!", + lbs: []logproto.LabelAdapter{ + {Name: "a", Value: "a2"}, + {Name: "b", Value: "b"}, + }, + }, } for _, c := range cases { @@ -187,7 +191,6 @@ func TestBlock(t *testing.T) { require.NoError(t, it.Close()) require.Equal(t, len(cases), idx) - // TODO: Test labels and metadata labels here. sampleIt := chk.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor) idx = 0 for sampleIt.Next() { @@ -770,10 +773,10 @@ func BenchmarkWrite(b *testing.B) { type nomatchPipeline struct{} func (nomatchPipeline) BaseLabels() log.LabelsResult { return log.EmptyLabelsResult } -func (nomatchPipeline) Process(_ int64, line []byte) ([]byte, log.LabelsResult, bool) { +func (nomatchPipeline) Process(_ int64, line []byte, _ ...labels.Label) ([]byte, log.LabelsResult, bool) { return line, nil, false } -func (nomatchPipeline) ProcessString(_ int64, line string) (string, log.LabelsResult, bool) { +func (nomatchPipeline) ProcessString(_ int64, line string, _ ...labels.Label) (string, log.LabelsResult, bool) { return line, nil, false } @@ -1547,6 +1550,9 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) { for _, enc := range testEncoding { enc := enc t.Run(enc.String(), func(t *testing.T) { + streamLabels := labels.Labels{ + {Name: "job", Value: "fake"}, + } chk := newMemChunkWithFormat(chunkFormatV4, enc, UnorderedWithMetadataHeadBlockFmt, testBlockSize, testTargetSize) require.NoError(t, chk.Append(logprotoEntryWithMetadata(1, "lineA", []logproto.LabelAdapter{ {Name: "traceID", Value: "123"}, @@ -1566,14 +1572,6 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) { {Name: "user", Value: "d"}, }))) - expectedLines := []string{"lineA", "lineB", "lineC", "lineD"} - expectedStreams := []string{ - labels.FromStrings("traceID", "123", "user", "a").String(), - labels.FromStrings("traceID", "456", "user", "b").String(), - labels.FromStrings("traceID", "789", "user", "c").String(), - labels.FromStrings("traceID", "123", "user", "d").String(), - } - // The expected bytes is the sum of bytes decompressed and bytes read from the head chunk. // First we add the bytes read from the store (aka decompressed). That's // nonIndexedLabelsBytes = n. lines * (n. labels + (2 * n. labels) * (label length + label)) @@ -1588,28 +1586,173 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) { // Finally, the expected total bytes is the line bytes + non-indexed labels bytes expectedBytes := lineBytes + expectedNonIndexedLabelsBytes - // We will run the test twice so the iterator will be created twice. - // This is to ensure that the iterator is correctly closed. - for i := 0; i < 2; i++ { - sts, ctx := stats.NewContext(context.Background()) + for _, tc := range []struct { + name string + query string + expectedLines []string + expectedStreams []string + }{ + { + name: "no-filter", + query: `{job="fake"}`, + expectedLines: []string{"lineA", "lineB", "lineC", "lineD"}, + expectedStreams: []string{ + labels.FromStrings("job", "fake", "traceID", "123", "user", "a").String(), + labels.FromStrings("job", "fake", "traceID", "456", "user", "b").String(), + labels.FromStrings("job", "fake", "traceID", "789", "user", "c").String(), + labels.FromStrings("job", "fake", "traceID", "123", "user", "d").String(), + }, + }, + { + name: "filter", + query: `{job="fake"} | traceID="789"`, + expectedLines: []string{"lineC"}, + expectedStreams: []string{ + labels.FromStrings("job", "fake", "traceID", "789", "user", "c").String(), + }, + }, + { + name: "filter-regex-or", + query: `{job="fake"} | traceID=~"456|789"`, + expectedLines: []string{"lineB", "lineC"}, + expectedStreams: []string{ + labels.FromStrings("job", "fake", "traceID", "456", "user", "b").String(), + labels.FromStrings("job", "fake", "traceID", "789", "user", "c").String(), + }, + }, + { + name: "filter-regex-contains", + query: `{job="fake"} | traceID=~".*5.*"`, + expectedLines: []string{"lineB"}, + expectedStreams: []string{ + labels.FromStrings("job", "fake", "traceID", "456", "user", "b").String(), + }, + }, + { + name: "filter-regex-complex", + query: `{job="fake"} | traceID=~"^[0-9]2.*"`, + expectedLines: []string{"lineA", "lineD"}, + expectedStreams: []string{ + labels.FromStrings("job", "fake", "traceID", "123", "user", "a").String(), + labels.FromStrings("job", "fake", "traceID", "123", "user", "d").String(), + }, + }, + { + name: "multiple-filters", + query: `{job="fake"} | traceID="123" | user="d"`, + expectedLines: []string{"lineD"}, + expectedStreams: []string{ + labels.FromStrings("job", "fake", "traceID", "123", "user", "d").String(), + }, + }, + { + name: "metadata-and-keep", + query: `{job="fake"} | keep job, user`, + expectedLines: []string{"lineA", "lineB", "lineC", "lineD"}, + expectedStreams: []string{ + labels.FromStrings("job", "fake", "user", "a").String(), + labels.FromStrings("job", "fake", "user", "b").String(), + labels.FromStrings("job", "fake", "user", "c").String(), + labels.FromStrings("job", "fake", "user", "d").String(), + }, + }, + { + name: "metadata-and-keep-filter", + query: `{job="fake"} | keep job, user="b"`, + expectedLines: []string{"lineA", "lineB", "lineC", "lineD"}, + expectedStreams: []string{ + labels.FromStrings("job", "fake").String(), + labels.FromStrings("job", "fake", "user", "b").String(), + labels.FromStrings("job", "fake").String(), + labels.FromStrings("job", "fake").String(), + }, + }, + { + name: "metadata-and-drop", + query: `{job="fake"} | drop traceID`, + expectedLines: []string{"lineA", "lineB", "lineC", "lineD"}, + expectedStreams: []string{ + labels.FromStrings("job", "fake", "user", "a").String(), + labels.FromStrings("job", "fake", "user", "b").String(), + labels.FromStrings("job", "fake", "user", "c").String(), + labels.FromStrings("job", "fake", "user", "d").String(), + }, + }, + { + name: "metadata-and-drop-filter", + query: `{job="fake"} | drop traceID="123"`, + expectedLines: []string{"lineA", "lineB", "lineC", "lineD"}, + expectedStreams: []string{ + labels.FromStrings("job", "fake", "user", "a").String(), + labels.FromStrings("job", "fake", "traceID", "456", "user", "b").String(), + labels.FromStrings("job", "fake", "traceID", "789", "user", "c").String(), + labels.FromStrings("job", "fake", "user", "d").String(), + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + t.Run("log", func(t *testing.T) { + expr, err := syntax.ParseLogSelector(tc.query, true) + require.NoError(t, err) + + pipeline, err := expr.Pipeline() + require.NoError(t, err) + + // We will run the test twice so the iterator will be created twice. + // This is to ensure that the iterator is correctly closed. + for i := 0; i < 2; i++ { + sts, ctx := stats.NewContext(context.Background()) + it, err := chk.Iterator(ctx, time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, pipeline.ForStream(streamLabels)) + require.NoError(t, err) + + var lines []string + var streams []string + for it.Next() { + require.NoError(t, it.Error()) + e := it.Entry() + lines = append(lines, e.Line) + streams = append(streams, it.Labels()) + } + assert.ElementsMatch(t, tc.expectedLines, lines) + assert.ElementsMatch(t, tc.expectedStreams, streams) + + resultStats := sts.Result(0, 0, len(lines)) + require.Equal(t, int64(expectedBytes), resultStats.Summary.TotalBytesProcessed) + require.Equal(t, int64(expectedNonIndexedLabelsBytes), resultStats.Summary.TotalNonIndexedLabelsBytesProcessed) + } + }) - it, err := chk.Iterator(ctx, time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) - require.NoError(t, err) + t.Run("metric", func(t *testing.T) { + query := fmt.Sprintf(`count_over_time(%s [1d])`, tc.query) + expr, err := syntax.ParseSampleExpr(query) + require.NoError(t, err) - var lines []string - var streams []string - for it.Next() { - require.NoError(t, it.Error()) - e := it.Entry() - lines = append(lines, e.Line) - streams = append(streams, logproto.FromLabelAdaptersToLabels(e.NonIndexedLabels).String()) - } - assert.ElementsMatch(t, expectedLines, lines) - assert.ElementsMatch(t, expectedStreams, streams) + extractor, err := expr.Extractor() + require.NoError(t, err) - resultStats := sts.Result(0, 0, len(lines)) - require.Equal(t, int64(expectedBytes), resultStats.Summary.TotalBytesProcessed) - require.Equal(t, int64(expectedNonIndexedLabelsBytes), resultStats.Summary.TotalNonIndexedLabelsBytesProcessed) + // We will run the test twice so the iterator will be created twice. + // This is to ensure that the iterator is correctly closed. + for i := 0; i < 2; i++ { + sts, ctx := stats.NewContext(context.Background()) + it := chk.SampleIterator(ctx, time.Unix(0, 0), time.Unix(0, math.MaxInt64), extractor.ForStream(streamLabels)) + + var sumValues int + var streams []string + for it.Next() { + require.NoError(t, it.Error()) + e := it.Sample() + sumValues += int(e.Value) + streams = append(streams, it.Labels()) + } + require.Equal(t, len(tc.expectedLines), sumValues) + assert.ElementsMatch(t, tc.expectedStreams, streams) + + resultStats := sts.Result(0, 0, 0) + require.Equal(t, int64(expectedBytes), resultStats.Summary.TotalBytesProcessed) + require.Equal(t, int64(expectedNonIndexedLabelsBytes), resultStats.Summary.TotalNonIndexedLabelsBytesProcessed) + } + }) + }) } }) } diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index 4156f63ecc7a0..f65ec5843d9e4 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -263,7 +263,7 @@ func (hb *unorderedHeadBlock) Iterator( mint, maxt, func(statsCtx *stats.Context, ts int64, line string, nonIndexedLabels labels.Labels) error { - newLine, parsedLbs, matches := pipeline.ProcessString(ts, line) + newLine, parsedLbs, matches := pipeline.ProcessString(ts, line, nonIndexedLabels...) if !matches { return nil } @@ -313,7 +313,7 @@ func (hb *unorderedHeadBlock) SampleIterator( mint, maxt, func(statsCtx *stats.Context, ts int64, line string, metaLabels labels.Labels) error { - value, parsedLabels, ok := extractor.ProcessString(ts, line) + value, parsedLabels, ok := extractor.ProcessString(ts, line, metaLabels...) if !ok { return nil } diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go index 1efda8ad52cb3..8a891761ca1c5 100644 --- a/pkg/chunkenc/unordered_test.go +++ b/pkg/chunkenc/unordered_test.go @@ -127,10 +127,10 @@ func Test_Unordered_InsertRetrieval(t *testing.T) { { desc: "ts collision forward", input: []entry{ - {0, "a", nil}, {0, "b", labels.Labels{{Name: "a", Value: "b"}}}, {1, "c", nil}, + {0, "a", labels.Labels{{Name: "a", Value: "b"}}}, {0, "b", labels.Labels{{Name: "a", Value: "b"}}}, {1, "c", nil}, }, exp: []entry{ - {0, "a", nil}, {0, "b", labels.Labels{{Name: "a", Value: "b"}}}, {1, "c", nil}, + {0, "a", labels.Labels{{Name: "a", Value: "b"}}}, {0, "b", labels.Labels{{Name: "a", Value: "b"}}}, {1, "c", nil}, }, }, { diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index 155271c7d317a..1f72051b340f7 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -244,6 +244,13 @@ func (b *LabelsBuilder) Set(n, v string) *LabelsBuilder { return b } +func (b *LabelsBuilder) Add(labels ...labels.Label) *LabelsBuilder { + for _, l := range labels { + b.Set(l.Name, l.Value) + } + return b +} + // Labels returns the labels from the builder. If no modifications // were made, the original labels are returned. func (b *LabelsBuilder) labels() labels.Labels { diff --git a/pkg/logql/log/metrics_extraction.go b/pkg/logql/log/metrics_extraction.go index b5e82c59778c4..4ba20f90676f3 100644 --- a/pkg/logql/log/metrics_extraction.go +++ b/pkg/logql/log/metrics_extraction.go @@ -34,8 +34,8 @@ type SampleExtractor interface { // A StreamSampleExtractor never mutate the received line. type StreamSampleExtractor interface { BaseLabels() LabelsResult - Process(ts int64, line []byte) (float64, LabelsResult, bool) - ProcessString(ts int64, line string) (float64, LabelsResult, bool) + Process(ts int64, line []byte, metadataLabels ...labels.Label) (float64, LabelsResult, bool) + ProcessString(ts int64, line string, metadataLabels ...labels.Label) (float64, LabelsResult, bool) } type lineSampleExtractor struct { @@ -80,12 +80,15 @@ type streamLineSampleExtractor struct { builder *LabelsBuilder } -func (l *streamLineSampleExtractor) Process(ts int64, line []byte) (float64, LabelsResult, bool) { +func (l *streamLineSampleExtractor) Process(ts int64, line []byte, metadataLabels ...labels.Label) (float64, LabelsResult, bool) { + l.builder.Reset() + l.builder.Add(metadataLabels...) + // short circuit. if l.Stage == NoopStage { return l.LineExtractor(line), l.builder.GroupedLabels(), true } - l.builder.Reset() + line, ok := l.Stage.Process(ts, line, l.builder) if !ok { return 0, nil, false @@ -93,9 +96,9 @@ func (l *streamLineSampleExtractor) Process(ts int64, line []byte) (float64, Lab return l.LineExtractor(line), l.builder.GroupedLabels(), true } -func (l *streamLineSampleExtractor) ProcessString(ts int64, line string) (float64, LabelsResult, bool) { +func (l *streamLineSampleExtractor) ProcessString(ts int64, line string, metadataLabels ...labels.Label) (float64, LabelsResult, bool) { // unsafe get bytes since we have the guarantee that the line won't be mutated. - return l.Process(ts, unsafeGetBytes(line)) + return l.Process(ts, unsafeGetBytes(line), metadataLabels...) } func (l *streamLineSampleExtractor) BaseLabels() LabelsResult { return l.builder.currentResult } @@ -168,9 +171,10 @@ func (l *labelSampleExtractor) ForStream(labels labels.Labels) StreamSampleExtra return res } -func (l *streamLabelSampleExtractor) Process(ts int64, line []byte) (float64, LabelsResult, bool) { +func (l *streamLabelSampleExtractor) Process(ts int64, line []byte, metadataLabels ...labels.Label) (float64, LabelsResult, bool) { // Apply the pipeline first. l.builder.Reset() + l.builder.Add(metadataLabels...) line, ok := l.preStage.Process(ts, line, l.builder) if !ok { return 0, nil, false @@ -198,9 +202,9 @@ func (l *streamLabelSampleExtractor) Process(ts int64, line []byte) (float64, La return v, l.builder.GroupedLabels(), true } -func (l *streamLabelSampleExtractor) ProcessString(ts int64, line string) (float64, LabelsResult, bool) { +func (l *streamLabelSampleExtractor) ProcessString(ts int64, line string, metadataLabels ...labels.Label) (float64, LabelsResult, bool) { // unsafe get bytes since we have the guarantee that the line won't be mutated. - return l.Process(ts, unsafeGetBytes(line)) + return l.Process(ts, unsafeGetBytes(line), metadataLabels...) } func (l *streamLabelSampleExtractor) BaseLabels() LabelsResult { return l.builder.currentResult } @@ -247,14 +251,14 @@ func (sp *filteringStreamExtractor) BaseLabels() LabelsResult { return sp.extractor.BaseLabels() } -func (sp *filteringStreamExtractor) Process(ts int64, line []byte) (float64, LabelsResult, bool) { +func (sp *filteringStreamExtractor) Process(ts int64, line []byte, nonIndexedLabels ...labels.Label) (float64, LabelsResult, bool) { for _, filter := range sp.filters { if ts < filter.start || ts > filter.end { continue } - _, _, matches := filter.pipeline.Process(ts, line) - if matches { //When the filter matches, don't run the next step + _, _, matches := filter.pipeline.Process(ts, line, nonIndexedLabels...) + if matches { // When the filter matches, don't run the next step return 0, nil, false } } @@ -262,14 +266,14 @@ func (sp *filteringStreamExtractor) Process(ts int64, line []byte) (float64, Lab return sp.extractor.Process(ts, line) } -func (sp *filteringStreamExtractor) ProcessString(ts int64, line string) (float64, LabelsResult, bool) { +func (sp *filteringStreamExtractor) ProcessString(ts int64, line string, nonIndexedLabels ...labels.Label) (float64, LabelsResult, bool) { for _, filter := range sp.filters { if ts < filter.start || ts > filter.end { continue } - _, _, matches := filter.pipeline.ProcessString(ts, line) - if matches { //When the filter matches, don't run the next step + _, _, matches := filter.pipeline.ProcessString(ts, line, nonIndexedLabels...) + if matches { // When the filter matches, don't run the next step return 0, nil, false } } diff --git a/pkg/logql/log/metrics_extraction_test.go b/pkg/logql/log/metrics_extraction_test.go index deeddf59da24f..778809a0e54c8 100644 --- a/pkg/logql/log/metrics_extraction_test.go +++ b/pkg/logql/log/metrics_extraction_test.go @@ -11,13 +11,14 @@ import ( func Test_labelSampleExtractor_Extract(t *testing.T) { tests := []struct { - name string - ex SampleExtractor - in labels.Labels - want float64 - wantLbs labels.Labels - wantOk bool - line string + name string + ex SampleExtractor + in labels.Labels + nonIndexedLabels labels.Labels + want float64 + wantLbs labels.Labels + wantOk bool + line string }{ { name: "convert float", @@ -69,6 +70,50 @@ func Test_labelSampleExtractor_Extract(t *testing.T) { ), wantOk: true, }, + { + name: "convert float with non-indexed labels", + ex: mustSampleExtractor(LabelExtractorWithStages( + "foo", ConvertFloat, nil, false, false, nil, NoopStage, + )), + in: labels.EmptyLabels(), + nonIndexedLabels: labels.FromStrings("foo", "15.0"), + want: 15, + wantLbs: labels.EmptyLabels(), + wantOk: true, + }, + { + name: "convert float as vector with non-indexed labels with no grouping", + ex: mustSampleExtractor(LabelExtractorWithStages( + "foo", ConvertFloat, nil, false, true, nil, NoopStage, + )), + in: labels.FromStrings("bar", "buzz"), + nonIndexedLabels: labels.FromStrings("foo", "15.0", "buzz", "blip"), + want: 15, + wantLbs: labels.EmptyLabels(), + wantOk: true, + }, + { + name: "convert float with non-indexed labels and grouping", + ex: mustSampleExtractor(LabelExtractorWithStages( + "foo", ConvertFloat, []string{"bar", "buzz"}, false, false, nil, NoopStage, + )), + in: labels.FromStrings("bar", "buzz", "namespace", "dev"), + nonIndexedLabels: labels.FromStrings("foo", "15.0", "buzz", "blip"), + want: 15, + wantLbs: labels.FromStrings("bar", "buzz", "buzz", "blip"), + wantOk: true, + }, + { + name: "convert float with non-indexed labels and grouping without", + ex: mustSampleExtractor(LabelExtractorWithStages( + "foo", ConvertFloat, []string{"bar", "buzz"}, true, false, nil, NoopStage, + )), + in: labels.FromStrings("bar", "buzz", "namespace", "dev"), + nonIndexedLabels: labels.FromStrings("foo", "15.0", "buzz", "blip"), + want: 15, + wantLbs: labels.FromStrings("namespace", "dev"), + wantOk: true, + }, { name: "convert duration with", ex: mustSampleExtractor(LabelExtractorWithStages( @@ -85,6 +130,22 @@ func Test_labelSampleExtractor_Extract(t *testing.T) { ), wantOk: true, }, + { + name: "convert duration with non-indexed labels", + ex: mustSampleExtractor(LabelExtractorWithStages( + "foo", ConvertDuration, []string{"bar", "buzz"}, false, false, nil, NoopStage, + )), + in: labels.FromStrings( + "bar", "foo", + "namespace", "dev", + ), + nonIndexedLabels: labels.FromStrings("foo", "500ms", "buzz", "blip"), + want: 0.5, + wantLbs: labels.FromStrings("bar", "foo", + "buzz", "blip", + ), + wantOk: true, + }, { name: "convert bytes", ex: mustSampleExtractor(LabelExtractorWithStages( @@ -101,6 +162,22 @@ func Test_labelSampleExtractor_Extract(t *testing.T) { ), wantOk: true, }, + { + name: "convert bytes with non-indexed labels", + ex: mustSampleExtractor(LabelExtractorWithStages( + "foo", ConvertBytes, []string{"bar", "buzz"}, false, false, nil, NoopStage, + )), + in: labels.FromStrings( + "bar", "foo", + "namespace", "dev", + ), + nonIndexedLabels: labels.FromStrings("foo", "13 MiB", "buzz", "blip"), + want: 13 * 1024 * 1024, + wantLbs: labels.FromStrings("bar", "foo", + "buzz", "blip", + ), + wantOk: true, + }, { name: "not convertable", ex: mustSampleExtractor(LabelExtractorWithStages( @@ -116,6 +193,20 @@ func Test_labelSampleExtractor_Extract(t *testing.T) { ), wantOk: true, }, + { + name: "not convertable with non-indexed labels", + ex: mustSampleExtractor(LabelExtractorWithStages( + "foo", ConvertFloat, []string{"bar", "buzz"}, false, false, nil, NoopStage, + )), + in: labels.FromStrings("bar", "foo"), + nonIndexedLabels: labels.FromStrings("foo", "not_a_number"), + wantLbs: labels.FromStrings("__error__", "SampleExtractionErr", + "__error_details__", "strconv.ParseFloat: parsing \"not_a_number\": invalid syntax", + "bar", "foo", + "foo", "not_a_number", + ), + wantOk: true, + }, { name: "dynamic label, convert duration", ex: mustSampleExtractor(LabelExtractorWithStages( @@ -144,12 +235,12 @@ func Test_labelSampleExtractor_Extract(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - outval, outlbs, ok := tt.ex.ForStream(tt.in).Process(0, []byte(tt.line)) + outval, outlbs, ok := tt.ex.ForStream(tt.in).Process(0, []byte(tt.line), tt.nonIndexedLabels...) require.Equal(t, tt.wantOk, ok) require.Equal(t, tt.want, outval) require.Equal(t, tt.wantLbs, outlbs.Labels()) - outval, outlbs, ok = tt.ex.ForStream(tt.in).ProcessString(0, tt.line) + outval, outlbs, ok = tt.ex.ForStream(tt.in).ProcessString(0, tt.line, tt.nonIndexedLabels...) require.Equal(t, tt.wantOk, ok) require.Equal(t, tt.want, outval) require.Equal(t, tt.wantLbs, outlbs.Labels()) @@ -270,34 +361,77 @@ func TestNewLineSampleExtractor(t *testing.T) { require.False(t, ok) } +func TestNewLineSampleExtractorWithNonIndexedMetadata(t *testing.T) { + lbs := labels.FromStrings("foo", "bar") + nonIndexedLabels := labels.FromStrings("user", "bob") + expectedLabelsResults := append(lbs, nonIndexedLabels...) + se, err := NewLineSampleExtractor(CountExtractor, []Stage{ + NewStringLabelFilter(labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")), + NewStringLabelFilter(labels.MustNewMatcher(labels.MatchEqual, "user", "bob")), + }, nil, false, false) + require.NoError(t, err) + + sse := se.ForStream(lbs) + f, l, ok := sse.Process(0, []byte(`foo`), nonIndexedLabels...) + require.True(t, ok) + require.Equal(t, 1., f) + assertLabelResult(t, expectedLabelsResults, l) + + f, l, ok = sse.ProcessString(0, `foo`, nonIndexedLabels...) + require.True(t, ok) + require.Equal(t, 1., f) + assertLabelResult(t, expectedLabelsResults, l) + + se, err = NewLineSampleExtractor(BytesExtractor, []Stage{ + NewStringLabelFilter(labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")), + NewStringLabelFilter(labels.MustNewMatcher(labels.MatchEqual, "user", "bob")), + mustFilter(NewFilter("foo", labels.MatchEqual)).ToStage(), + }, []string{"foo"}, false, false) + require.NoError(t, err) + + sse = se.ForStream(lbs) + f, l, ok = sse.Process(0, []byte(`foo`), nonIndexedLabels...) + require.True(t, ok) + require.Equal(t, 3., f) + assertLabelResult(t, labels.FromStrings("foo", "bar"), l) + + sse = se.ForStream(lbs) + _, _, ok = sse.Process(0, []byte(`nope`)) + require.False(t, ok) +} + func TestFilteringSampleExtractor(t *testing.T) { se := NewFilteringSampleExtractor([]PipelineFilter{ - newPipelineFilter(2, 4, labels.FromStrings("foo", "bar", "bar", "baz"), "e"), - newPipelineFilter(3, 5, labels.FromStrings("baz", "foo"), "e"), + newPipelineFilter(2, 4, labels.FromStrings("foo", "bar", "bar", "baz"), nil, "e"), + newPipelineFilter(3, 5, labels.FromStrings("baz", "foo"), nil, "e"), + newPipelineFilter(3, 5, labels.FromStrings("foo", "baz"), labels.FromStrings("user", "bob"), "e"), }, newStubExtractor()) tt := []struct { - name string - ts int64 - line string - labels labels.Labels - ok bool + name string + ts int64 + line string + labels labels.Labels + nonIndexedLabels labels.Labels + ok bool }{ - {"it is after the timerange", 6, "line", labels.FromStrings("baz", "foo"), true}, - {"it is before the timerange", 1, "line", labels.FromStrings("baz", "foo"), true}, - {"it doesn't match the filter", 3, "all good", labels.FromStrings("baz", "foo"), true}, - {"it doesn't match all the selectors", 3, "line", labels.FromStrings("foo", "bar"), true}, - {"it doesn't match any selectors", 3, "line", labels.FromStrings("beep", "boop"), true}, - {"it matches all selectors", 3, "line", labels.FromStrings("foo", "bar", "bar", "baz"), false}, - {"it tries all the filters", 5, "line", labels.FromStrings("baz", "foo"), false}, + {"it is after the timerange", 6, "line", labels.FromStrings("baz", "foo"), nil, true}, + {"it is before the timerange", 1, "line", labels.FromStrings("baz", "foo"), nil, true}, + {"it doesn't match the filter", 3, "all good", labels.FromStrings("baz", "foo"), nil, true}, + {"it doesn't match all the selectors", 3, "line", labels.FromStrings("foo", "bar"), nil, true}, + {"it doesn't match any selectors", 3, "line", labels.FromStrings("beep", "boop"), nil, true}, + {"it matches all selectors", 3, "line", labels.FromStrings("foo", "bar", "bar", "baz"), nil, false}, + {"it doesn't match all non-indexed labels", 3, "line", labels.FromStrings("foo", "baz"), labels.FromStrings("user", "alice"), true}, + {"it matches all non-indexed labels", 3, "line", labels.FromStrings("foo", "baz"), labels.FromStrings("user", "bob"), false}, + {"it tries all the filters", 5, "line", labels.FromStrings("baz", "foo"), nil, false}, } for _, test := range tt { t.Run(test.name, func(t *testing.T) { - _, _, ok := se.ForStream(test.labels).Process(test.ts, []byte(test.line)) + _, _, ok := se.ForStream(test.labels).Process(test.ts, []byte(test.line), test.nonIndexedLabels...) require.Equal(t, test.ok, ok) - _, _, ok = se.ForStream(test.labels).ProcessString(test.ts, test.line) + _, _, ok = se.ForStream(test.labels).ProcessString(test.ts, test.line, test.nonIndexedLabels...) require.Equal(t, test.ok, ok) }) } @@ -325,10 +459,10 @@ func (p *stubStreamExtractor) BaseLabels() LabelsResult { return nil } -func (p *stubStreamExtractor) Process(_ int64, _ []byte) (float64, LabelsResult, bool) { +func (p *stubStreamExtractor) Process(_ int64, _ []byte, _ ...labels.Label) (float64, LabelsResult, bool) { return 0, nil, true } -func (p *stubStreamExtractor) ProcessString(_ int64, _ string) (float64, LabelsResult, bool) { +func (p *stubStreamExtractor) ProcessString(_ int64, _ string, _ ...labels.Label) (float64, LabelsResult, bool) { return 0, nil, true } diff --git a/pkg/logql/log/pipeline.go b/pkg/logql/log/pipeline.go index c4cf78d1d9d43..c589c94a48469 100644 --- a/pkg/logql/log/pipeline.go +++ b/pkg/logql/log/pipeline.go @@ -23,8 +23,8 @@ type StreamPipeline interface { BaseLabels() LabelsResult // Process processes a log line and returns the transformed line and the labels. // The buffer returned for the log line can be reused on subsequent calls to Process and therefore must be copied. - Process(ts int64, line []byte) (resultLine []byte, resultLabels LabelsResult, matches bool) - ProcessString(ts int64, line string) (resultLine string, resultLabels LabelsResult, matches bool) + Process(ts int64, line []byte, nonIndexedLabels ...labels.Label) (resultLine []byte, resultLabels LabelsResult, matches bool) + ProcessString(ts int64, line string, nonIndexedLabels ...labels.Label) (resultLine string, resultLabels LabelsResult, matches bool) } // Stage is a single step of a Pipeline. @@ -38,17 +38,19 @@ type Stage interface { // NewNoopPipeline creates a pipelines that does not process anything and returns log streams as is. func NewNoopPipeline() Pipeline { return &noopPipeline{ - cache: map[uint64]*noopStreamPipeline{}, + cache: map[uint64]*noopStreamPipeline{}, + baseBuilder: NewBaseLabelsBuilder(), } } type noopPipeline struct { - cache map[uint64]*noopStreamPipeline - mu sync.RWMutex + cache map[uint64]*noopStreamPipeline + baseBuilder *BaseLabelsBuilder + mu sync.RWMutex } func (n *noopPipeline) ForStream(labels labels.Labels) StreamPipeline { - h := labels.Hash() + h := n.baseBuilder.Hash(labels) n.mu.RLock() if cached, ok := n.cache[h]; ok { @@ -57,7 +59,7 @@ func (n *noopPipeline) ForStream(labels labels.Labels) StreamPipeline { } n.mu.RUnlock() - sp := &noopStreamPipeline{LabelsResult: NewLabelsResult(labels, h)} + sp := &noopStreamPipeline{n.baseBuilder.ForLabels(labels, h)} n.mu.Lock() defer n.mu.Unlock() @@ -82,18 +84,21 @@ func IsNoopPipeline(p Pipeline) bool { } type noopStreamPipeline struct { - LabelsResult + builder *LabelsBuilder } -func (n noopStreamPipeline) Process(_ int64, line []byte) ([]byte, LabelsResult, bool) { - return line, n.LabelsResult, true +func (n noopStreamPipeline) Process(_ int64, line []byte, nonIndexedLabels ...labels.Label) ([]byte, LabelsResult, bool) { + n.builder.Reset() + n.builder.Add(nonIndexedLabels...) + return line, n.builder.LabelsResult(), true } -func (n noopStreamPipeline) ProcessString(_ int64, line string) (string, LabelsResult, bool) { - return line, n.LabelsResult, true +func (n noopStreamPipeline) ProcessString(ts int64, line string, nonIndexedLabels ...labels.Label) (string, LabelsResult, bool) { + _, lr, ok := n.Process(ts, unsafeGetBytes(line), nonIndexedLabels...) + return line, lr, ok } -func (n noopStreamPipeline) BaseLabels() LabelsResult { return n.LabelsResult } +func (n noopStreamPipeline) BaseLabels() LabelsResult { return n.builder.currentResult } type noopStage struct{} @@ -196,9 +201,11 @@ func (p *pipeline) Reset() { } } -func (p *streamPipeline) Process(ts int64, line []byte) ([]byte, LabelsResult, bool) { +func (p *streamPipeline) Process(ts int64, line []byte, nonIndexedLabels ...labels.Label) ([]byte, LabelsResult, bool) { var ok bool p.builder.Reset() + p.builder.Add(nonIndexedLabels...) + for _, s := range p.stages { line, ok = s.Process(ts, line, p.builder) if !ok { @@ -208,9 +215,9 @@ func (p *streamPipeline) Process(ts int64, line []byte) ([]byte, LabelsResult, b return line, p.builder.LabelsResult(), true } -func (p *streamPipeline) ProcessString(ts int64, line string) (string, LabelsResult, bool) { +func (p *streamPipeline) ProcessString(ts int64, line string, nonIndexedLabels ...labels.Label) (string, LabelsResult, bool) { // Stages only read from the line. - lb, lr, ok := p.Process(ts, unsafeGetBytes(line)) + lb, lr, ok := p.Process(ts, unsafeGetBytes(line), nonIndexedLabels...) // but the returned line needs to be copied. return string(lb), lr, ok } @@ -289,13 +296,13 @@ func (sp *filteringStreamPipeline) BaseLabels() LabelsResult { return sp.pipeline.BaseLabels() } -func (sp *filteringStreamPipeline) Process(ts int64, line []byte) ([]byte, LabelsResult, bool) { +func (sp *filteringStreamPipeline) Process(ts int64, line []byte, nonIndexedLabels ...labels.Label) ([]byte, LabelsResult, bool) { for _, filter := range sp.filters { if ts < filter.start || ts > filter.end { continue } - _, _, matches := filter.pipeline.Process(ts, line) + _, _, matches := filter.pipeline.Process(ts, line, nonIndexedLabels...) if matches { // When the filter matches, don't run the next step return nil, nil, false } @@ -304,13 +311,13 @@ func (sp *filteringStreamPipeline) Process(ts int64, line []byte) ([]byte, Label return sp.pipeline.Process(ts, line) } -func (sp *filteringStreamPipeline) ProcessString(ts int64, line string) (string, LabelsResult, bool) { +func (sp *filteringStreamPipeline) ProcessString(ts int64, line string, nonIndexedLabels ...labels.Label) (string, LabelsResult, bool) { for _, filter := range sp.filters { if ts < filter.start || ts > filter.end { continue } - _, _, matches := filter.pipeline.ProcessString(ts, line) + _, _, matches := filter.pipeline.ProcessString(ts, line, nonIndexedLabels...) if matches { // When the filter matches, don't run the next step return "", nil, false } diff --git a/pkg/logql/log/pipeline_test.go b/pkg/logql/log/pipeline_test.go index d38f24892f8f4..cbc6641343cf3 100644 --- a/pkg/logql/log/pipeline_test.go +++ b/pkg/logql/log/pipeline_test.go @@ -24,6 +24,21 @@ func TestNoopPipeline(t *testing.T) { require.Equal(t, NewLabelsResult(lbs, lbs.Hash()), lbr) require.Equal(t, true, matches) + nonIndexedLabels := labels.Labels{ + {Name: "y", Value: "1"}, + {Name: "z", Value: "2"}, + } + expectedLabelsResults := append(lbs, nonIndexedLabels...) + l, lbr, matches = pipeline.ForStream(lbs).Process(0, []byte(""), nonIndexedLabels...) + require.Equal(t, []byte(""), l) + require.Equal(t, NewLabelsResult(expectedLabelsResults, expectedLabelsResults.Hash()), lbr) + require.Equal(t, true, matches) + + ls, lbr, matches = pipeline.ForStream(lbs).ProcessString(0, "", nonIndexedLabels...) + require.Equal(t, "", ls) + require.Equal(t, NewLabelsResult(expectedLabelsResults, expectedLabelsResults.Hash()), lbr) + require.Equal(t, true, matches) + pipeline.Reset() require.Len(t, pipeline.cache, 0) } @@ -65,35 +80,89 @@ func TestPipeline(t *testing.T) { require.Len(t, p.baseBuilder.add, 0) } +func TestPipelineWithNonIndexedLabels(t *testing.T) { + lbs := labels.FromStrings("foo", "bar") + nonIndexedLabels := labels.FromStrings("user", "bob") + expectedLabelsResults := append(lbs, nonIndexedLabels...) + p := NewPipeline([]Stage{ + NewStringLabelFilter(labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")), + NewStringLabelFilter(labels.MustNewMatcher(labels.MatchEqual, "user", "bob")), + newMustLineFormatter("lbs {{.foo}} {{.user}}"), + }).(*pipeline) + + l, lbr, matches := p.ForStream(lbs).Process(0, []byte("line"), nonIndexedLabels...) + require.Equal(t, []byte("lbs bar bob"), l) + require.Equal(t, NewLabelsResult(expectedLabelsResults, expectedLabelsResults.Hash()), lbr) + require.Equal(t, true, matches) + + ls, lbr, matches := p.ForStream(lbs).ProcessString(0, "line", nonIndexedLabels...) + require.Equal(t, "lbs bar bob", ls) + require.Equal(t, NewLabelsResult(expectedLabelsResults, expectedLabelsResults.Hash()), lbr) + require.Equal(t, true, matches) + + l, lbr, matches = p.ForStream(lbs).Process(0, []byte("line")) + require.Equal(t, []byte(nil), l) + require.Equal(t, nil, lbr) + require.Equal(t, false, matches) + + ls, lbr, matches = p.ForStream(lbs).ProcessString(0, "line") + require.Equal(t, "", ls) + require.Equal(t, nil, lbr) + require.Equal(t, false, matches) + + l, lbr, matches = p.ForStream(labels.EmptyLabels()).Process(0, []byte("line"), nonIndexedLabels...) + require.Equal(t, []byte(nil), l) + require.Equal(t, nil, lbr) + require.Equal(t, false, matches) + + ls, lbr, matches = p.ForStream(labels.EmptyLabels()).ProcessString(0, "line", nonIndexedLabels...) + require.Equal(t, "", ls) + require.Equal(t, nil, lbr) + require.Equal(t, false, matches) + + // Reset caches + p.baseBuilder.del = []string{"foo", "bar"} + p.baseBuilder.add = labels.FromStrings("baz", "blip") + + p.Reset() + require.Len(t, p.streamPipelines, 0) + require.Len(t, p.baseBuilder.del, 0) + require.Len(t, p.baseBuilder.add, 0) +} + func TestFilteringPipeline(t *testing.T) { tt := []struct { name string ts int64 line string inputStreamLabels labels.Labels + nonIndexedLabels labels.Labels ok bool }{ - {"it is before the timerange", 1, "line", labels.FromStrings("baz", "foo"), true}, - {"it is after the timerange", 6, "line", labels.FromStrings("baz", "foo"), true}, - {"it doesn't match the filter", 3, "all good", labels.FromStrings("baz", "foo"), true}, - {"it doesn't match all the selectors", 3, "line", labels.FromStrings("foo", "bar"), true}, - {"it doesn't match any selectors", 3, "line", labels.FromStrings("beep", "boop"), true}, - {"it matches all selectors", 3, "line", labels.FromStrings("foo", "bar", "bar", "baz"), false}, - {"it tries all the filters", 5, "line", labels.FromStrings("baz", "foo"), false}, + {"it is before the timerange", 1, "line", labels.FromStrings("baz", "foo"), nil, true}, + {"it is after the timerange", 6, "line", labels.FromStrings("baz", "foo"), nil, true}, + {"it doesn't match the filter", 3, "all good", labels.FromStrings("baz", "foo"), nil, true}, + {"it doesn't match all the selectors", 3, "line", labels.FromStrings("foo", "bar"), nil, true}, + {"it doesn't match any selectors", 3, "line", labels.FromStrings("beep", "boop"), nil, true}, + {"it matches all selectors", 3, "line", labels.FromStrings("foo", "bar", "bar", "baz"), nil, false}, + {"it doesn't match all non-indexed labels", 3, "line", labels.FromStrings("foo", "baz"), labels.FromStrings("user", "alice"), true}, + {"it matches all non-indexed labels", 3, "line", labels.FromStrings("foo", "baz"), labels.FromStrings("user", "bob"), false}, + {"it tries all the filters", 5, "line", labels.FromStrings("baz", "foo"), nil, false}, } for _, test := range tt { downstream := newStubPipeline() p := NewFilteringPipeline([]PipelineFilter{ - newPipelineFilter(2, 4, labels.FromStrings("foo", "bar", "bar", "baz"), "e"), - newPipelineFilter(3, 5, labels.FromStrings("baz", "foo"), "e"), + newPipelineFilter(2, 4, labels.FromStrings("foo", "bar", "bar", "baz"), nil, "e"), + newPipelineFilter(3, 5, labels.FromStrings("baz", "foo"), nil, "e"), + newPipelineFilter(3, 5, labels.FromStrings("foo", "baz"), labels.FromStrings("user", "bob"), "e"), }, downstream) t.Run(test.name, func(t *testing.T) { - _, _, matches := p.ForStream(test.inputStreamLabels).Process(test.ts, []byte(test.line)) + _, _, matches := p.ForStream(test.inputStreamLabels).Process(test.ts, []byte(test.line), test.nonIndexedLabels...) require.Equal(t, test.ok, matches) - _, _, matches = p.ForStream(test.inputStreamLabels).ProcessString(test.ts, test.line) + _, _, matches = p.ForStream(test.inputStreamLabels).ProcessString(test.ts, test.line, test.nonIndexedLabels...) require.Equal(t, test.ok, matches) p.Reset() @@ -103,13 +172,19 @@ func TestFilteringPipeline(t *testing.T) { } //nolint:unparam -func newPipelineFilter(start, end int64, lbls labels.Labels, filter string) PipelineFilter { +func newPipelineFilter(start, end int64, lbls, nonIndexedLbls labels.Labels, filter string) PipelineFilter { var stages []Stage var matchers []*labels.Matcher lbls.Range(func(l labels.Label) { m := labels.MustNewMatcher(labels.MatchEqual, l.Name, l.Value) matchers = append(matchers, m) }) + + nonIndexedLbls.Range(func(l labels.Label) { + s := NewStringLabelFilter(labels.MustNewMatcher(labels.MatchEqual, l.Name, l.Value)) + stages = append(stages, s) + }) + stages = append(stages, mustFilter(NewFilter(filter, labels.MatchEqual)).ToStage()) return PipelineFilter{start, end, matchers, NewPipeline(stages)} @@ -142,11 +217,11 @@ func (p *stubStreamPipeline) BaseLabels() LabelsResult { return nil } -func (p *stubStreamPipeline) Process(_ int64, _ []byte) ([]byte, LabelsResult, bool) { +func (p *stubStreamPipeline) Process(_ int64, _ []byte, _ ...labels.Label) ([]byte, LabelsResult, bool) { return nil, nil, true } -func (p *stubStreamPipeline) ProcessString(_ int64, _ string) (string, LabelsResult, bool) { +func (p *stubStreamPipeline) ProcessString(_ int64, _ string, _ ...labels.Label) (string, LabelsResult, bool) { return "", nil, true }