Skip to content

Commit

Permalink
Fix truncation issues for auto multiline V2
Browse files Browse the repository at this point in the history
  • Loading branch information
gh123man committed Jan 9, 2025
1 parent 3f997d2 commit 0099b3e
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 28 deletions.
51 changes: 33 additions & 18 deletions pkg/logs/internal/decoder/auto_multiline_detection/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import (
type bucket struct {
tagTruncatedLogs bool
tagMultiLineLogs bool
maxContentSize int

message *message.Message
originalDataLen int
buffer *bytes.Buffer
lineCount int
truncated bool
shouldTruncate bool
}

func (b *bucket) add(msg *message.Message) {
Expand All @@ -42,24 +43,36 @@ func (b *bucket) isEmpty() bool {
return b.originalDataLen == 0
}

func (b *bucket) truncate() {
b.buffer.Write(message.TruncatedFlag)
b.truncated = true
func (b *bucket) reset() {
b.buffer.Reset()
b.message = nil
b.lineCount = 0
b.originalDataLen = 0
}

func (b *bucket) flush() *message.Message {
defer func() {
b.buffer.Reset()
b.message = nil
b.lineCount = 0
b.originalDataLen = 0
b.truncated = false
}()
defer b.reset()

lastWasTruncated := b.shouldTruncate
b.shouldTruncate = b.buffer.Len() >= b.maxContentSize

data := bytes.TrimSpace(b.buffer.Bytes())
content := make([]byte, len(data))
copy(content, data)

if lastWasTruncated {
// the previous line has been truncated because it was too long,
// the new line is just a remainder,
// adding the truncated flag at the beginning of the content
content = append(message.TruncatedFlag, content...)
}

if b.shouldTruncate {
// the line is too long, it needs to be cut off and send,
// adding the truncated flag the end of the content
content = append(content, message.TruncatedFlag...)
}

msg := message.NewRawMessage(content, b.message.Status, b.originalDataLen, b.message.ParsingExtra.Timestamp)
tlmTags := []string{"false", "single_line"}

Expand All @@ -71,7 +84,7 @@ func (b *bucket) flush() *message.Message {
}
}

if b.truncated {
if lastWasTruncated || b.shouldTruncate {
msg.ParsingExtra.IsTruncated = true
tlmTags[0] = "true"
if b.tagTruncatedLogs {
Expand Down Expand Up @@ -103,7 +116,7 @@ func NewAggregator(outputFn func(m *message.Message), maxContentSize int, flushT

return &Aggregator{
outputFn: outputFn,
bucket: &bucket{buffer: bytes.NewBuffer(nil), tagTruncatedLogs: tagTruncatedLogs, tagMultiLineLogs: tagMultiLineLogs},
bucket: &bucket{buffer: bytes.NewBuffer(nil), tagTruncatedLogs: tagTruncatedLogs, tagMultiLineLogs: tagMultiLineLogs, maxContentSize: maxContentSize, lineCount: 0, shouldTruncate: false},
maxContentSize: maxContentSize,
flushTimeout: flushTimeout,
multiLineMatchInfo: multiLineMatchInfo,
Expand All @@ -120,13 +133,16 @@ func (a *Aggregator) Aggregate(msg *message.Message, label Label) {
// If `noAggregate` - flush the bucket immediately and then flush the next message.
if label == noAggregate {
a.Flush()
a.outputFn(msg)
a.bucket.shouldTruncate = false // noAggregate messages should never be truncated at the beginning (Could break JSON formatted messages)
a.bucket.add(msg)
a.Flush()
return
}

// If `aggregate` and the bucket is empty - flush the next message.
if label == aggregate && a.bucket.isEmpty() {
a.outputFn(msg)
a.bucket.add(msg)
a.Flush()
return
}

Expand All @@ -138,10 +154,8 @@ func (a *Aggregator) Aggregate(msg *message.Message, label Label) {

// At this point we either have `startGroup` with an empty bucket or `aggregate` with a non-empty bucket
// so we add the message to the bucket or flush if the bucket will overflow the max content size.
if msg.RawDataLen+a.bucket.buffer.Len() > a.maxContentSize && !a.bucket.isEmpty() {
a.bucket.truncate() // Truncate the end of the current bucket
if msg.RawDataLen+a.bucket.buffer.Len() > a.maxContentSize {
a.Flush()
a.bucket.truncate() // Truncate the start of the next bucket
}

if !a.bucket.isEmpty() {
Expand Down Expand Up @@ -184,6 +198,7 @@ func (a *Aggregator) FlushChan() <-chan time.Time {
// Flush flushes the aggregator.
func (a *Aggregator) Flush() {
if a.bucket.isEmpty() {
a.bucket.reset()
return
}
a.outputFn(a.bucket.flush())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,7 @@ func TestTagTruncatedLogs(t *testing.T) {
msg = <-outputChan
assert.True(t, msg.ParsingExtra.IsTruncated)
assert.Equal(t, msg.ParsingExtra.Tags, []string{message.TruncatedReasonTag("auto_multiline")})
assertMessageContent(t, msg, "...TRUNCATED...12345...TRUNCATED...")

msg = <-outputChan
assert.True(t, msg.ParsingExtra.IsTruncated)
assert.Equal(t, msg.ParsingExtra.Tags, []string{message.TruncatedReasonTag("auto_multiline")})
assertMessageContent(t, msg, "...TRUNCATED...6789")
assertMessageContent(t, msg, "...TRUNCATED...12345\\n6789...TRUNCATED...")

msg = <-outputChan
assert.False(t, msg.ParsingExtra.IsTruncated)
Expand Down Expand Up @@ -187,14 +182,58 @@ func TestTagMultiLineLogs(t *testing.T) {
assertMessageContent(t, msg, "2")
}

func TestStartGruopIsNotTruncatedWithoutAggreagation(t *testing.T) {
func TestSingleLineTooLongTruncation(t *testing.T) {
outputChan, outputFn := makeHandler()
ag := NewAggregator(outputFn, 5, time.Duration(1*time.Second), false, true, status.NewInfoRegistry())

// Multi line log where each message is too large except the last one
ag.Aggregate(newMessage("123456"), startGroup)
ag.Aggregate(newMessage("123456"), aggregate)
ag.Aggregate(newMessage("123456"), aggregate)
ag.Aggregate(newMessage("123"), aggregate)
// Force a flush
ag.Aggregate(newMessage(""), startGroup)

msg := <-outputChan
assertMessageContent(t, msg, "123456")
assertMessageContent(t, msg, "123456...TRUNCATED...")
msg = <-outputChan
assertMessageContent(t, msg, "...TRUNCATED...123456...TRUNCATED...")
msg = <-outputChan
assertMessageContent(t, msg, "...TRUNCATED...123456...TRUNCATED...")
msg = <-outputChan
assertMessageContent(t, msg, "...TRUNCATED...123")

// Single line logs where each message is too large except the last
ag.Aggregate(newMessage("123456"), startGroup)
ag.Aggregate(newMessage("123456"), startGroup)
ag.Aggregate(newMessage("123456"), startGroup)
ag.Aggregate(newMessage("123"), startGroup)
// Force a flush
ag.Aggregate(newMessage(""), startGroup)

msg = <-outputChan
assertMessageContent(t, msg, "123456...TRUNCATED...")
msg = <-outputChan
assertMessageContent(t, msg, "...TRUNCATED...123456...TRUNCATED...")
msg = <-outputChan
assertMessageContent(t, msg, "...TRUNCATED...123456...TRUNCATED...")
msg = <-outputChan
assertMessageContent(t, msg, "...TRUNCATED...123")

// No aggregate logs should never be truncated from the previous message (Could break a JSON payload)
ag.Aggregate(newMessage("123456"), startGroup)
ag.Aggregate(newMessage("123456"), noAggregate)
ag.Aggregate(newMessage("123456"), startGroup)
ag.Aggregate(newMessage("123"), startGroup)
// Force a flush
ag.Aggregate(newMessage(""), startGroup)

msg = <-outputChan
assertMessageContent(t, msg, "123456...TRUNCATED...")
msg = <-outputChan
assertMessageContent(t, msg, "123456...TRUNCATED...")
msg = <-outputChan
assertMessageContent(t, msg, "...TRUNCATED...123456...TRUNCATED...")
msg = <-outputChan
assertMessageContent(t, msg, "...TRUNCATED...123")
}
4 changes: 3 additions & 1 deletion pkg/logs/internal/decoder/single_line_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,16 @@ func (h *SingleLineHandler) process(msg *message.Message) {
// the new line is just a remainder,
// adding the truncated flag at the beginning of the content
content = append(message.TruncatedFlag, content...)
addTruncatedTag(msg)
}

// how should we detect logs which are too long before rendering them?
if h.shouldTruncate {
// the line is too long, it needs to be cut off and send,
// adding the truncated flag the end of the content
content = append(content, message.TruncatedFlag...)
}

if lastWasTruncated || h.shouldTruncate {
addTruncatedTag(msg)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/logs/internal/decoder/single_line_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestSingleLineHandlerProcess(t *testing.T) {
string(message.TruncatedFlag) + "aaaaaaaaaaaaaaaaaaaa" + string(message.TruncatedFlag),
string(message.TruncatedFlag) + "wait, how many a's?",
},
expTags: [][]string{{truncateTag}, {truncateTag, truncateTag}, {truncateTag}},
expTags: [][]string{{truncateTag}, {truncateTag}, {truncateTag}},
tagTruncatedLogs: true,
},
{
Expand Down

0 comments on commit 0099b3e

Please sign in to comment.