diff --git a/pkg/logs/internal/decoder/auto_multiline_detection/aggregator.go b/pkg/logs/internal/decoder/auto_multiline_detection/aggregator.go index 88bc86160e3555..e44c9b13688140 100644 --- a/pkg/logs/internal/decoder/auto_multiline_detection/aggregator.go +++ b/pkg/logs/internal/decoder/auto_multiline_detection/aggregator.go @@ -25,6 +25,7 @@ type bucket struct { buffer *bytes.Buffer lineCount int shouldTruncate bool + needsTruncation bool } func (b *bucket) add(msg *message.Message) { @@ -48,13 +49,14 @@ func (b *bucket) reset() { b.message = nil b.lineCount = 0 b.originalDataLen = 0 + b.needsTruncation = false } func (b *bucket) flush() *message.Message { defer b.reset() lastWasTruncated := b.shouldTruncate - b.shouldTruncate = b.buffer.Len() >= b.maxContentSize + b.shouldTruncate = b.buffer.Len() >= b.maxContentSize || b.needsTruncation data := bytes.TrimSpace(b.buffer.Bytes()) content := make([]byte, len(data)) @@ -154,7 +156,8 @@ func (a *Aggregator) Aggregate(msg *message.Message, label Label) { // At this point we either have `startGroup` with an empty bucket or `aggregate` with a non-empty bucket // so we add the message to the bucket or flush if the bucket will overflow the max content size. - if msg.RawDataLen+a.bucket.buffer.Len() > a.maxContentSize { + if msg.RawDataLen+a.bucket.buffer.Len() >= a.maxContentSize { + a.bucket.needsTruncation = true a.Flush() } diff --git a/pkg/logs/internal/decoder/auto_multiline_detection/aggregator_test.go b/pkg/logs/internal/decoder/auto_multiline_detection/aggregator_test.go index db664ac4fae5a9..d29ae54fd7f461 100644 --- a/pkg/logs/internal/decoder/auto_multiline_detection/aggregator_test.go +++ b/pkg/logs/internal/decoder/auto_multiline_detection/aggregator_test.go @@ -159,7 +159,7 @@ func TestTagMultiLineLogs(t *testing.T) { ag := NewAggregator(outputFn, 10, time.Duration(1*time.Second), false, true, status.NewInfoRegistry()) ag.Aggregate(newMessage("12345"), startGroup) - ag.Aggregate(newMessage("67890"), aggregate) + ag.Aggregate(newMessage("6789"), aggregate) ag.Aggregate(newMessage("1"), aggregate) // Causes overflow, truncate and flush ag.Aggregate(newMessage("2"), noAggregate) @@ -167,7 +167,7 @@ func TestTagMultiLineLogs(t *testing.T) { assert.True(t, msg.ParsingExtra.IsMultiLine) assert.True(t, msg.ParsingExtra.IsTruncated) assert.Equal(t, msg.ParsingExtra.Tags, []string{message.MultiLineSourceTag("auto_multiline")}) - assertMessageContent(t, msg, "12345\\n67890...TRUNCATED...") + assertMessageContent(t, msg, "12345\\n6789...TRUNCATED...") msg = <-outputChan assert.False(t, msg.ParsingExtra.IsMultiLine)