Skip to content

Commit

Permalink
[Backport 7.63.x] [CWS] pass the missing logs compression component t…
Browse files Browse the repository at this point in the history
…o the direct sender (#33466)

Co-authored-by: Paul Cacheux <[email protected]>
  • Loading branch information
agent-platform-auto-pr[bot] and paulcacheux authored Jan 28, 2025
1 parent 01fe91d commit 81ee80e
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 13 deletions.
10 changes: 6 additions & 4 deletions cmd/system-probe/api/module/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
"github.com/DataDog/datadog-agent/comp/core/telemetry"
workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def"
logscompression "github.com/DataDog/datadog-agent/comp/serializer/logscompression/def"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

Expand Down Expand Up @@ -64,7 +65,7 @@ func withModule(name sysconfigtypes.ModuleName, fn func()) {
// * Initialization using the provided Factory;
// * Registering the HTTP endpoints of each module;
// * Register the gRPC server;
func Register(cfg *sysconfigtypes.Config, httpMux *mux.Router, factories []Factory, wmeta workloadmeta.Component, tagger tagger.Component, telemetry telemetry.Component) error {
func Register(cfg *sysconfigtypes.Config, httpMux *mux.Router, factories []Factory, wmeta workloadmeta.Component, tagger tagger.Component, telemetry telemetry.Component, compression logscompression.Component) error {
var enabledModulesFactories []Factory
for _, factory := range factories {
if !cfg.ModuleIsEnabled(factory.Name) {
Expand All @@ -83,9 +84,10 @@ func Register(cfg *sysconfigtypes.Config, httpMux *mux.Router, factories []Facto
var module Module
withModule(factory.Name, func() {
deps := FactoryDependencies{
WMeta: wmeta,
Tagger: tagger,
Telemetry: telemetry,
WMeta: wmeta,
Tagger: tagger,
Telemetry: telemetry,
Compression: compression,
}
module, err = factory.Fn(cfg, deps)
})
Expand Down
5 changes: 3 additions & 2 deletions cmd/system-probe/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,21 @@ import (
tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
"github.com/DataDog/datadog-agent/comp/core/telemetry"
workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def"
logscompression "github.com/DataDog/datadog-agent/comp/serializer/logscompression/def"
"github.com/DataDog/datadog-agent/pkg/ebpf"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

// StartServer starts the HTTP and gRPC servers for the system-probe, which registers endpoints from all enabled modules.
func StartServer(cfg *sysconfigtypes.Config, telemetry telemetry.Component, wmeta workloadmeta.Component, tagger tagger.Component, settings settings.Component) error {
func StartServer(cfg *sysconfigtypes.Config, telemetry telemetry.Component, wmeta workloadmeta.Component, tagger tagger.Component, settings settings.Component, compression logscompression.Component) error {
conn, err := server.NewListener(cfg.SocketAddress)
if err != nil {
return err
}

mux := gorilla.NewRouter()

err = module.Register(cfg, mux, modules.All, wmeta, tagger, telemetry)
err = module.Register(cfg, mux, modules.All, wmeta, tagger, telemetry, compression)
if err != nil {
return fmt.Errorf("failed to create system probe: %s", err)
}
Expand Down
16 changes: 10 additions & 6 deletions cmd/system-probe/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ import (
compstatsd "github.com/DataDog/datadog-agent/comp/dogstatsd/statsd"
"github.com/DataDog/datadog-agent/comp/remote-config/rcclient"
"github.com/DataDog/datadog-agent/comp/remote-config/rcclient/rcclientimpl"
logscompression "github.com/DataDog/datadog-agent/comp/serializer/logscompression/def"
logscompressionfx "github.com/DataDog/datadog-agent/comp/serializer/logscompression/fx"
"github.com/DataDog/datadog-agent/pkg/api/security"
"github.com/DataDog/datadog-agent/pkg/config/env"
"github.com/DataDog/datadog-agent/pkg/config/model"
Expand Down Expand Up @@ -143,6 +145,7 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command {
}
}),
settingsimpl.Module(),
logscompressionfx.Module(),
)
},
}
Expand All @@ -152,7 +155,7 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command {
}

// run starts the main loop.
func run(log log.Component, _ config.Component, statsd compstatsd.Component, telemetry telemetry.Component, sysprobeconfig sysprobeconfig.Component, rcclient rcclient.Component, wmeta workloadmeta.Component, tagger tagger.Component, _ pid.Component, _ healthprobe.Component, _ autoexit.Component, settings settings.Component) error {
func run(log log.Component, _ config.Component, statsd compstatsd.Component, telemetry telemetry.Component, sysprobeconfig sysprobeconfig.Component, rcclient rcclient.Component, wmeta workloadmeta.Component, tagger tagger.Component, _ pid.Component, _ healthprobe.Component, _ autoexit.Component, settings settings.Component, compression logscompression.Component) error {
defer func() {
stopSystemProbe()
}()
Expand Down Expand Up @@ -194,7 +197,7 @@ func run(log log.Component, _ config.Component, statsd compstatsd.Component, tel
}
}()

if err := startSystemProbe(log, statsd, telemetry, sysprobeconfig, rcclient, wmeta, tagger, settings); err != nil {
if err := startSystemProbe(log, statsd, telemetry, sysprobeconfig, rcclient, wmeta, tagger, settings, compression); err != nil {
if errors.Is(err, ErrNotEnabled) {
// A sleep is necessary to ensure that supervisor registers this process as "STARTED"
// If the exit is "too quick", we enter a BACKOFF->FATAL loop even though this is an expected exit
Expand Down Expand Up @@ -238,9 +241,9 @@ func StartSystemProbeWithDefaults(ctxChan <-chan context.Context) (<-chan error,

func runSystemProbe(ctxChan <-chan context.Context, errChan chan error) error {
return fxutil.OneShot(
func(log log.Component, _ config.Component, statsd compstatsd.Component, telemetry telemetry.Component, sysprobeconfig sysprobeconfig.Component, rcclient rcclient.Component, wmeta workloadmeta.Component, tagger tagger.Component, _ healthprobe.Component, settings settings.Component) error {
func(log log.Component, _ config.Component, statsd compstatsd.Component, telemetry telemetry.Component, sysprobeconfig sysprobeconfig.Component, rcclient rcclient.Component, wmeta workloadmeta.Component, tagger tagger.Component, _ healthprobe.Component, settings settings.Component, compression logscompression.Component) error {
defer StopSystemProbeWithDefaults()
err := startSystemProbe(log, statsd, telemetry, sysprobeconfig, rcclient, wmeta, tagger, settings)
err := startSystemProbe(log, statsd, telemetry, sysprobeconfig, rcclient, wmeta, tagger, settings, compression)
if err != nil {
return err
}
Expand Down Expand Up @@ -311,6 +314,7 @@ func runSystemProbe(ctxChan <-chan context.Context, errChan chan error) error {
}
}),
settingsimpl.Module(),
logscompressionfx.Module(),
)
}

Expand All @@ -320,7 +324,7 @@ func StopSystemProbeWithDefaults() {
}

// startSystemProbe Initializes the system-probe process
func startSystemProbe(log log.Component, statsd compstatsd.Component, telemetry telemetry.Component, sysprobeconfig sysprobeconfig.Component, _ rcclient.Component, wmeta workloadmeta.Component, tagger tagger.Component, settings settings.Component) error {
func startSystemProbe(log log.Component, statsd compstatsd.Component, telemetry telemetry.Component, sysprobeconfig sysprobeconfig.Component, _ rcclient.Component, wmeta workloadmeta.Component, tagger tagger.Component, settings settings.Component, compression logscompression.Component) error {
var err error
cfg := sysprobeconfig.SysProbeObject()

Expand Down Expand Up @@ -380,7 +384,7 @@ func startSystemProbe(log log.Component, statsd compstatsd.Component, telemetry
}()
}

if err = api.StartServer(cfg, telemetry, wmeta, tagger, settings); err != nil {
if err = api.StartServer(cfg, telemetry, wmeta, tagger, settings, compression); err != nil {
return log.Criticalf("error while starting api server, exiting: %v", err)
}
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func setupDiscoveryModule(t *testing.T) (string, *proccontainersmocks.MockContai
return false
},
}
err := module.Register(cfg, mux, []module.Factory{m}, wmeta, tagger, nil)
err := module.Register(cfg, mux, []module.Factory{m}, wmeta, tagger, nil, nil)
require.NoError(t, err)

srv := httptest.NewServer(mux)
Expand Down

0 comments on commit 81ee80e

Please sign in to comment.