Skip to content

Commit

Permalink
AMLII-2249 - fortify log tailer pipeline for journald and windows events
Browse files Browse the repository at this point in the history
Pass the specified tailers through a no-op line handler, as the messages these tailers
produce are not viable candidates for truncation.

Develop and use a new NoopDecoder constructor to make it clear that these entities
do not yet have meaningful work being performed on them in the decoder flow.
  • Loading branch information
ddrthall committed Jan 24, 2025
1 parent 9547dfc commit 73fbb6e
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 21 deletions.
46 changes: 33 additions & 13 deletions pkg/logs/internal/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
"github.com/DataDog/datadog-agent/pkg/logs/internal/framer"
"github.com/DataDog/datadog-agent/pkg/logs/internal/parsers"
"github.com/DataDog/datadog-agent/pkg/logs/internal/parsers/noop"
"github.com/DataDog/datadog-agent/pkg/logs/message"
"github.com/DataDog/datadog-agent/pkg/logs/sources"
status "github.com/DataDog/datadog-agent/pkg/logs/status/utils"
Expand Down Expand Up @@ -79,14 +80,44 @@ func syncSourceInfo(source *sources.ReplaceableSource, lh *MultiLineHandler) {
}
}

// NewNoopDecoder initializes a decoder with all dependent components in passthrough mode.
func NewNoopDecoder() *Decoder {
inputChan := make(chan *message.Message)
outputChan := make(chan *message.Message)
detectedPattern := &DetectedPattern{}

lineHandler := NewNoopLineHandler(outputChan)
framer, lineParser := buildFramerAndParser(lineHandler, noop.New(), framer.NoFraming)

return New(inputChan, outputChan, framer, lineParser, lineHandler, detectedPattern)
}

// NewDecoderWithFraming initialize a decoder with given endline strategy.
func NewDecoderWithFraming(source *sources.ReplaceableSource, parser parsers.Parser, framing framer.Framing, multiLinePattern *regexp.Regexp, tailerInfo *status.InfoRegistry) *Decoder {
inputChan := make(chan *message.Message)
outputChan := make(chan *message.Message)
maxContentSize := config.MaxMessageSizeBytes(pkgconfigsetup.Datadog())
detectedPattern := &DetectedPattern{}

lineHandler := buildLineHandler(source, multiLinePattern, tailerInfo, outputChan, detectedPattern)
framer, lineParser := buildFramerAndParser(lineHandler, parser, framing)

return New(inputChan, outputChan, framer, lineParser, lineHandler, detectedPattern)
}

func buildFramerAndParser(lineHandler LineHandler, parser parsers.Parser, framing framer.Framing) (*framer.Framer, LineParser) {
maxMessageSize := config.MaxMessageSizeBytes(pkgconfigsetup.Datadog())
var lineParser LineParser
if parser.SupportsPartialLine() {
lineParser = NewMultiLineParser(lineHandler, config.AggregationTimeout(pkgconfigsetup.Datadog()), parser, maxMessageSize)
} else {
lineParser = NewSingleLineParser(lineHandler, parser)
}
return framer.NewFramer(lineParser.process, framing, maxMessageSize), lineParser
}

func buildLineHandler(source *sources.ReplaceableSource, multiLinePattern *regexp.Regexp, tailerInfo *status.InfoRegistry, outputChan chan *message.Message, detectedPattern *DetectedPattern) LineHandler {
outputFn := func(m *message.Message) { outputChan <- m }
maxContentSize := config.MaxMessageSizeBytes(pkgconfigsetup.Datadog())

// construct the lineHandler
var lineHandler LineHandler
Expand Down Expand Up @@ -122,18 +153,7 @@ func NewDecoderWithFraming(source *sources.ReplaceableSource, parser parsers.Par
}
}

// construct the lineParser, wrapping the parser
var lineParser LineParser
if parser.SupportsPartialLine() {
lineParser = NewMultiLineParser(lineHandler, config.AggregationTimeout(pkgconfigsetup.Datadog()), parser, maxContentSize)
} else {
lineParser = NewSingleLineParser(lineHandler, parser)
}

// construct the framer
framer := framer.NewFramer(lineParser.process, framing, maxContentSize)

return New(inputChan, outputChan, framer, lineParser, lineHandler, detectedPattern)
return lineHandler
}

func buildLegacyAutoMultilineHandlerFromConfig(outputFn func(*message.Message), maxContentSize int, source *sources.ReplaceableSource, detectedPattern *DetectedPattern, tailerInfo *status.InfoRegistry) *LegacyAutoMultilineHandler {
Expand Down
38 changes: 38 additions & 0 deletions pkg/logs/internal/decoder/noop_line_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

package decoder

import (
"time"

"github.com/DataDog/datadog-agent/pkg/logs/message"
)

// NoopLineHandler provides a passthrough functionality for flows that don't need a functional line handler
type NoopLineHandler struct {
outputChan chan *message.Message
}

// NewNoopLineHandler returns a new NoopLineHandler
func NewNoopLineHandler(outputChan chan *message.Message) *NoopLineHandler {
return &NoopLineHandler{outputChan: outputChan}
}

// process handles a new line (message)
func (noop *NoopLineHandler) process(msg *message.Message) {
noop.outputChan <- msg
}

// flushChan returns a channel which will deliver a message when `flush` should be called.
func (noop *NoopLineHandler) flushChan() <-chan time.Time {
return nil
}

// flush flushes partially-processed data. It should be called either when flushChan has
// a message, or when the decoder is stopped.
func (noop *NoopLineHandler) flush() {

}
5 changes: 1 addition & 4 deletions pkg/logs/tailers/journald/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@ import (
tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
"github.com/DataDog/datadog-agent/comp/logs/agent/config"
"github.com/DataDog/datadog-agent/pkg/logs/internal/decoder"
"github.com/DataDog/datadog-agent/pkg/logs/internal/framer"
"github.com/DataDog/datadog-agent/pkg/logs/internal/parsers/noop"
"github.com/DataDog/datadog-agent/pkg/logs/internal/tag"
"github.com/DataDog/datadog-agent/pkg/logs/message"
"github.com/DataDog/datadog-agent/pkg/logs/processor"
"github.com/DataDog/datadog-agent/pkg/logs/sources"
status "github.com/DataDog/datadog-agent/pkg/logs/status/utils"
"github.com/DataDog/datadog-agent/pkg/telemetry"
"github.com/DataDog/datadog-agent/pkg/util/log"
)
Expand Down Expand Up @@ -69,7 +66,7 @@ func NewTailer(source *sources.LogSource, outputChan chan *message.Message, jour
}

return &Tailer{
decoder: decoder.NewDecoderWithFraming(sources.NewReplaceableSource(source), noop.New(), framer.NoFraming, nil, status.NewInfoRegistry()),
decoder: decoder.NewNoopDecoder(),
source: source,
outputChan: outputChan,
journal: journal,
Expand Down
5 changes: 1 addition & 4 deletions pkg/logs/tailers/windowsevent/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@ import (
"github.com/cenkalti/backoff"

"github.com/DataDog/datadog-agent/pkg/logs/internal/decoder"
"github.com/DataDog/datadog-agent/pkg/logs/internal/framer"
"github.com/DataDog/datadog-agent/pkg/logs/internal/parsers/noop"
"github.com/DataDog/datadog-agent/pkg/logs/message"
"github.com/DataDog/datadog-agent/pkg/logs/processor"
"github.com/DataDog/datadog-agent/pkg/logs/sources"
status "github.com/DataDog/datadog-agent/pkg/logs/status/utils"
"github.com/DataDog/datadog-agent/pkg/logs/util/windowsevent"
"github.com/DataDog/datadog-agent/pkg/telemetry"
"github.com/DataDog/datadog-agent/pkg/util/log"
Expand Down Expand Up @@ -72,7 +69,7 @@ func NewTailer(evtapi evtapi.API, source *sources.LogSource, config *Config, out
evtapi: evtapi,
source: source,
config: config,
decoder: decoder.NewDecoderWithFraming(sources.NewReplaceableSource(source), noop.New(), framer.NoFraming, nil, status.NewInfoRegistry()),
decoder: decoder.NewNoopDecoder(),
outputChan: outputChan,
}
}
Expand Down

0 comments on commit 73fbb6e

Please sign in to comment.