diff --git a/pkg/logs/internal/decoder/decoder.go b/pkg/logs/internal/decoder/decoder.go index b3d354b64411f7..18e939edd299f0 100644 --- a/pkg/logs/internal/decoder/decoder.go +++ b/pkg/logs/internal/decoder/decoder.go @@ -14,6 +14,7 @@ import ( pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" "github.com/DataDog/datadog-agent/pkg/logs/internal/framer" "github.com/DataDog/datadog-agent/pkg/logs/internal/parsers" + "github.com/DataDog/datadog-agent/pkg/logs/internal/parsers/noop" "github.com/DataDog/datadog-agent/pkg/logs/message" "github.com/DataDog/datadog-agent/pkg/logs/sources" status "github.com/DataDog/datadog-agent/pkg/logs/status/utils" @@ -79,14 +80,44 @@ func syncSourceInfo(source *sources.ReplaceableSource, lh *MultiLineHandler) { } } +// NewNoopDecoder initializes a decoder with all dependent components in passthrough mode. +func NewNoopDecoder() *Decoder { + inputChan := make(chan *message.Message) + outputChan := make(chan *message.Message) + detectedPattern := &DetectedPattern{} + + lineHandler := NewNoopLineHandler(outputChan) + framer, lineParser := buildFramerAndParser(lineHandler, noop.New(), framer.NoFraming) + + return New(inputChan, outputChan, framer, lineParser, lineHandler, detectedPattern) +} + // NewDecoderWithFraming initialize a decoder with given endline strategy. func NewDecoderWithFraming(source *sources.ReplaceableSource, parser parsers.Parser, framing framer.Framing, multiLinePattern *regexp.Regexp, tailerInfo *status.InfoRegistry) *Decoder { inputChan := make(chan *message.Message) outputChan := make(chan *message.Message) - maxContentSize := config.MaxMessageSizeBytes(pkgconfigsetup.Datadog()) detectedPattern := &DetectedPattern{} + lineHandler := buildLineHandler(source, multiLinePattern, tailerInfo, outputChan, detectedPattern) + framer, lineParser := buildFramerAndParser(lineHandler, parser, framing) + + return New(inputChan, outputChan, framer, lineParser, lineHandler, detectedPattern) +} + +func buildFramerAndParser(lineHandler LineHandler, parser parsers.Parser, framing framer.Framing) (*framer.Framer, LineParser) { + maxMessageSize := config.MaxMessageSizeBytes(pkgconfigsetup.Datadog()) + var lineParser LineParser + if parser.SupportsPartialLine() { + lineParser = NewMultiLineParser(lineHandler, config.AggregationTimeout(pkgconfigsetup.Datadog()), parser, maxMessageSize) + } else { + lineParser = NewSingleLineParser(lineHandler, parser) + } + return framer.NewFramer(lineParser.process, framing, maxMessageSize), lineParser +} + +func buildLineHandler(source *sources.ReplaceableSource, multiLinePattern *regexp.Regexp, tailerInfo *status.InfoRegistry, outputChan chan *message.Message, detectedPattern *DetectedPattern) LineHandler { outputFn := func(m *message.Message) { outputChan <- m } + maxContentSize := config.MaxMessageSizeBytes(pkgconfigsetup.Datadog()) // construct the lineHandler var lineHandler LineHandler @@ -122,18 +153,7 @@ func NewDecoderWithFraming(source *sources.ReplaceableSource, parser parsers.Par } } - // construct the lineParser, wrapping the parser - var lineParser LineParser - if parser.SupportsPartialLine() { - lineParser = NewMultiLineParser(lineHandler, config.AggregationTimeout(pkgconfigsetup.Datadog()), parser, maxContentSize) - } else { - lineParser = NewSingleLineParser(lineHandler, parser) - } - - // construct the framer - framer := framer.NewFramer(lineParser.process, framing, maxContentSize) - - return New(inputChan, outputChan, framer, lineParser, lineHandler, detectedPattern) + return lineHandler } func buildLegacyAutoMultilineHandlerFromConfig(outputFn func(*message.Message), maxContentSize int, source *sources.ReplaceableSource, detectedPattern *DetectedPattern, tailerInfo *status.InfoRegistry) *LegacyAutoMultilineHandler { diff --git a/pkg/logs/internal/decoder/noop_line_handler.go b/pkg/logs/internal/decoder/noop_line_handler.go new file mode 100644 index 00000000000000..2c9cecec055a6e --- /dev/null +++ b/pkg/logs/internal/decoder/noop_line_handler.go @@ -0,0 +1,38 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +package decoder + +import ( + "time" + + "github.com/DataDog/datadog-agent/pkg/logs/message" +) + +// NoopLineHandler provides a passthrough functionality for flows that don't need a functional line handler +type NoopLineHandler struct { + outputChan chan *message.Message +} + +// NewNoopLineHandler returns a new NoopLineHandler +func NewNoopLineHandler(outputChan chan *message.Message) *NoopLineHandler { + return &NoopLineHandler{outputChan: outputChan} +} + +// process handles a new line (message) +func (noop *NoopLineHandler) process(msg *message.Message) { + noop.outputChan <- msg +} + +// flushChan returns a channel which will deliver a message when `flush` should be called. +func (noop *NoopLineHandler) flushChan() <-chan time.Time { + return nil +} + +// flush flushes partially-processed data. It should be called either when flushChan has +// a message, or when the decoder is stopped. +func (noop *NoopLineHandler) flush() { + +} diff --git a/pkg/logs/tailers/journald/tailer.go b/pkg/logs/tailers/journald/tailer.go index a8280944305c55..ec1bae31ce358b 100644 --- a/pkg/logs/tailers/journald/tailer.go +++ b/pkg/logs/tailers/journald/tailer.go @@ -19,13 +19,10 @@ import ( tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def" "github.com/DataDog/datadog-agent/comp/logs/agent/config" "github.com/DataDog/datadog-agent/pkg/logs/internal/decoder" - "github.com/DataDog/datadog-agent/pkg/logs/internal/framer" - "github.com/DataDog/datadog-agent/pkg/logs/internal/parsers/noop" "github.com/DataDog/datadog-agent/pkg/logs/internal/tag" "github.com/DataDog/datadog-agent/pkg/logs/message" "github.com/DataDog/datadog-agent/pkg/logs/processor" "github.com/DataDog/datadog-agent/pkg/logs/sources" - status "github.com/DataDog/datadog-agent/pkg/logs/status/utils" "github.com/DataDog/datadog-agent/pkg/telemetry" "github.com/DataDog/datadog-agent/pkg/util/log" ) @@ -69,7 +66,7 @@ func NewTailer(source *sources.LogSource, outputChan chan *message.Message, jour } return &Tailer{ - decoder: decoder.NewDecoderWithFraming(sources.NewReplaceableSource(source), noop.New(), framer.NoFraming, nil, status.NewInfoRegistry()), + decoder: decoder.NewNoopDecoder(), source: source, outputChan: outputChan, journal: journal, diff --git a/pkg/logs/tailers/windowsevent/tailer.go b/pkg/logs/tailers/windowsevent/tailer.go index 5db7a658a7f73f..0c296e04a3310a 100644 --- a/pkg/logs/tailers/windowsevent/tailer.go +++ b/pkg/logs/tailers/windowsevent/tailer.go @@ -18,12 +18,9 @@ import ( "github.com/cenkalti/backoff" "github.com/DataDog/datadog-agent/pkg/logs/internal/decoder" - "github.com/DataDog/datadog-agent/pkg/logs/internal/framer" - "github.com/DataDog/datadog-agent/pkg/logs/internal/parsers/noop" "github.com/DataDog/datadog-agent/pkg/logs/message" "github.com/DataDog/datadog-agent/pkg/logs/processor" "github.com/DataDog/datadog-agent/pkg/logs/sources" - status "github.com/DataDog/datadog-agent/pkg/logs/status/utils" "github.com/DataDog/datadog-agent/pkg/logs/util/windowsevent" "github.com/DataDog/datadog-agent/pkg/telemetry" "github.com/DataDog/datadog-agent/pkg/util/log" @@ -72,7 +69,7 @@ func NewTailer(evtapi evtapi.API, source *sources.LogSource, config *Config, out evtapi: evtapi, source: source, config: config, - decoder: decoder.NewDecoderWithFraming(sources.NewReplaceableSource(source), noop.New(), framer.NoFraming, nil, status.NewInfoRegistry()), + decoder: decoder.NewNoopDecoder(), outputChan: outputChan, } }