Skip to content

Commit

Permalink
Make # of log pipelines configurable and default to GOMAXPROCS (#31190)
Browse files Browse the repository at this point in the history
Co-authored-by: Srdjan Grubor <[email protected]>
  • Loading branch information
gh123man and sgnn7 authored Dec 10, 2024
1 parent a7ba911 commit bcfd6ab
Show file tree
Hide file tree
Showing 7 changed files with 9 additions and 10 deletions.
2 changes: 1 addition & 1 deletion comp/logs/agent/agentimpl/agent_core_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (a *logAgent) SetupPipeline(processingRules []*config.ProcessingRule, wmeta
diagnosticMessageReceiver := diagnostic.NewBufferedMessageReceiver(nil, a.hostname)

// setup the pipeline provider that provides pairs of processor and sender
pipelineProvider := pipeline.NewProvider(config.NumberOfPipelines, auditor, diagnosticMessageReceiver, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config)
pipelineProvider := pipeline.NewProvider(a.config.GetInt("logs_config.pipelines"), auditor, diagnosticMessageReceiver, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config)

// setup the launchers
lnchrs := launchers.NewLaunchers(a.sources, pipelineProvider, auditor, a.tracker)
Expand Down
2 changes: 1 addition & 1 deletion comp/logs/agent/agentimpl/agent_serverless_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (a *logAgent) SetupPipeline(
destinationsCtx := client.NewDestinationsContext()

// setup the pipeline provider that provides pairs of processor and sender
pipelineProvider := pipeline.NewServerlessProvider(config.NumberOfPipelines, a.auditor, diagnosticMessageReceiver, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config)
pipelineProvider := pipeline.NewServerlessProvider(a.config.GetInt("logs_config.pipelines"), a.auditor, diagnosticMessageReceiver, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config)

lnchrs := launchers.NewLaunchers(a.sources, pipelineProvider, a.auditor, a.tracker)
lnchrs.AddLauncher(channel.NewLauncher())
Expand Down
5 changes: 0 additions & 5 deletions comp/logs/agent/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@

package config

// Pipeline constraints
const (
NumberOfPipelines = 4
)

const (
// DateFormat is the default date format.
DateFormat = "2006-01-02T15:04:05.000000000Z"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (a *Agent) SetupPipeline(
destinationsCtx := client.NewDestinationsContext()

// setup the pipeline provider that provides pairs of processor and sender
pipelineProvider := pipeline.NewProvider(config.NumberOfPipelines, auditor, &diagnostic.NoopMessageReceiver{}, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config)
pipelineProvider := pipeline.NewProvider(a.config.GetInt("logs_config.pipelines"), auditor, &diagnostic.NoopMessageReceiver{}, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config)

a.auditor = auditor
a.destinationsCtx = destinationsCtx
Expand Down
2 changes: 1 addition & 1 deletion pkg/compliance/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewLogReporter(hostname string, sourceName, sourceType string, endpoints *c
auditor.Start()

// setup the pipeline provider that provides pairs of processor and sender
pipelineProvider := pipeline.NewProvider(config.NumberOfPipelines, auditor, &diagnostic.NoopMessageReceiver{}, nil, endpoints, dstcontext, agentimpl.NewStatusProvider(), hostnameimpl.NewHostnameService(), pkgconfigsetup.Datadog())
pipelineProvider := pipeline.NewProvider(4, auditor, &diagnostic.NoopMessageReceiver{}, nil, endpoints, dstcontext, agentimpl.NewStatusProvider(), hostnameimpl.NewHostnameService(), pkgconfigsetup.Datadog())
pipelineProvider.Start()

logSource := sources.NewLogSource(
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/setup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1572,6 +1572,10 @@ func logsagent(config pkgconfigmodel.Setup) {
// Add a tag to logs that are truncated by the agent
config.BindEnvAndSetDefault("logs_config.tag_truncated_logs", false)

// Number of logs pipeline instances. Defaults to number of logical CPU cores as defined by GOMAXPROCS or 4, whichever is lower.
logsPipelines := min(4, runtime.GOMAXPROCS(0))
config.BindEnvAndSetDefault("logs_config.pipelines", logsPipelines)

// If true, the agent looks for container logs in the location used by podman, rather
// than docker. This is a temporary configuration parameter to support podman logs until
// a more substantial refactor of autodiscovery is made to determine this automatically.
Expand Down
2 changes: 1 addition & 1 deletion pkg/security/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func newReporter(hostname string, stopper startstop.Stopper, sourceName, sourceT
stopper.Add(auditor)

// setup the pipeline provider that provides pairs of processor and sender
pipelineProvider := pipeline.NewProvider(logsconfig.NumberOfPipelines, auditor, &diagnostic.NoopMessageReceiver{}, nil, endpoints, context, agentimpl.NewStatusProvider(), hostnameimpl.NewHostnameService(), pkgconfigsetup.Datadog())
pipelineProvider := pipeline.NewProvider(4, auditor, &diagnostic.NoopMessageReceiver{}, nil, endpoints, context, agentimpl.NewStatusProvider(), hostnameimpl.NewHostnameService(), pkgconfigsetup.Datadog())
pipelineProvider.Start()
stopper.Add(pipelineProvider)

Expand Down

0 comments on commit bcfd6ab

Please sign in to comment.