From 3545ce6980a3bb9cc54fd4bcbd7294e8b891e95c Mon Sep 17 00:00:00 2001 From: Paul Cacheux Date: Tue, 28 Jan 2025 11:06:42 +0100 Subject: [PATCH] [CWS] pass the missing logs compression component to the direct sender --- cmd/system-probe/api/module/loader.go | 10 ++++++---- cmd/system-probe/api/server.go | 5 +++-- cmd/system-probe/subcommands/run/command.go | 16 ++++++++++------ .../servicediscovery/module/impl_linux_test.go | 2 +- 4 files changed, 20 insertions(+), 13 deletions(-) diff --git a/cmd/system-probe/api/module/loader.go b/cmd/system-probe/api/module/loader.go index 5e028b0d538f7..4fa5b4777ee87 100644 --- a/cmd/system-probe/api/module/loader.go +++ b/cmd/system-probe/api/module/loader.go @@ -19,6 +19,7 @@ import ( tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def" "github.com/DataDog/datadog-agent/comp/core/telemetry" workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def" + logscompression "github.com/DataDog/datadog-agent/comp/serializer/logscompression/def" "github.com/DataDog/datadog-agent/pkg/util/log" ) @@ -64,7 +65,7 @@ func withModule(name sysconfigtypes.ModuleName, fn func()) { // * Initialization using the provided Factory; // * Registering the HTTP endpoints of each module; // * Register the gRPC server; -func Register(cfg *sysconfigtypes.Config, httpMux *mux.Router, factories []Factory, wmeta workloadmeta.Component, tagger tagger.Component, telemetry telemetry.Component) error { +func Register(cfg *sysconfigtypes.Config, httpMux *mux.Router, factories []Factory, wmeta workloadmeta.Component, tagger tagger.Component, telemetry telemetry.Component, compression logscompression.Component) error { var enabledModulesFactories []Factory for _, factory := range factories { if !cfg.ModuleIsEnabled(factory.Name) { @@ -83,9 +84,10 @@ func Register(cfg *sysconfigtypes.Config, httpMux *mux.Router, factories []Facto var module Module withModule(factory.Name, func() { deps := FactoryDependencies{ - WMeta: wmeta, - Tagger: tagger, - Telemetry: telemetry, + WMeta: wmeta, + Tagger: tagger, + Telemetry: telemetry, + Compression: compression, } module, err = factory.Fn(cfg, deps) }) diff --git a/cmd/system-probe/api/server.go b/cmd/system-probe/api/server.go index f0fbe81919f30..ec5e5ec870431 100644 --- a/cmd/system-probe/api/server.go +++ b/cmd/system-probe/api/server.go @@ -25,12 +25,13 @@ import ( tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def" "github.com/DataDog/datadog-agent/comp/core/telemetry" workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def" + logscompression "github.com/DataDog/datadog-agent/comp/serializer/logscompression/def" "github.com/DataDog/datadog-agent/pkg/ebpf" "github.com/DataDog/datadog-agent/pkg/util/log" ) // StartServer starts the HTTP and gRPC servers for the system-probe, which registers endpoints from all enabled modules. -func StartServer(cfg *sysconfigtypes.Config, telemetry telemetry.Component, wmeta workloadmeta.Component, tagger tagger.Component, settings settings.Component) error { +func StartServer(cfg *sysconfigtypes.Config, telemetry telemetry.Component, wmeta workloadmeta.Component, tagger tagger.Component, settings settings.Component, compression logscompression.Component) error { conn, err := server.NewListener(cfg.SocketAddress) if err != nil { return err @@ -38,7 +39,7 @@ func StartServer(cfg *sysconfigtypes.Config, telemetry telemetry.Component, wmet mux := gorilla.NewRouter() - err = module.Register(cfg, mux, modules.All, wmeta, tagger, telemetry) + err = module.Register(cfg, mux, modules.All, wmeta, tagger, telemetry, compression) if err != nil { return fmt.Errorf("failed to create system probe: %s", err) } diff --git a/cmd/system-probe/subcommands/run/command.go b/cmd/system-probe/subcommands/run/command.go index f5d2990b5b0ab..5b43235f743fc 100644 --- a/cmd/system-probe/subcommands/run/command.go +++ b/cmd/system-probe/subcommands/run/command.go @@ -53,6 +53,8 @@ import ( compstatsd "github.com/DataDog/datadog-agent/comp/dogstatsd/statsd" "github.com/DataDog/datadog-agent/comp/remote-config/rcclient" "github.com/DataDog/datadog-agent/comp/remote-config/rcclient/rcclientimpl" + logscompression "github.com/DataDog/datadog-agent/comp/serializer/logscompression/def" + logscompressionfx "github.com/DataDog/datadog-agent/comp/serializer/logscompression/fx" "github.com/DataDog/datadog-agent/pkg/api/security" "github.com/DataDog/datadog-agent/pkg/config/env" "github.com/DataDog/datadog-agent/pkg/config/model" @@ -143,6 +145,7 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command { } }), settingsimpl.Module(), + logscompressionfx.Module(), ) }, } @@ -152,7 +155,7 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command { } // run starts the main loop. -func run(log log.Component, _ config.Component, statsd compstatsd.Component, telemetry telemetry.Component, sysprobeconfig sysprobeconfig.Component, rcclient rcclient.Component, wmeta workloadmeta.Component, tagger tagger.Component, _ pid.Component, _ healthprobe.Component, _ autoexit.Component, settings settings.Component) error { +func run(log log.Component, _ config.Component, statsd compstatsd.Component, telemetry telemetry.Component, sysprobeconfig sysprobeconfig.Component, rcclient rcclient.Component, wmeta workloadmeta.Component, tagger tagger.Component, _ pid.Component, _ healthprobe.Component, _ autoexit.Component, settings settings.Component, compression logscompression.Component) error { defer func() { stopSystemProbe() }() @@ -194,7 +197,7 @@ func run(log log.Component, _ config.Component, statsd compstatsd.Component, tel } }() - if err := startSystemProbe(log, statsd, telemetry, sysprobeconfig, rcclient, wmeta, tagger, settings); err != nil { + if err := startSystemProbe(log, statsd, telemetry, sysprobeconfig, rcclient, wmeta, tagger, settings, compression); err != nil { if errors.Is(err, ErrNotEnabled) { // A sleep is necessary to ensure that supervisor registers this process as "STARTED" // If the exit is "too quick", we enter a BACKOFF->FATAL loop even though this is an expected exit @@ -238,9 +241,9 @@ func StartSystemProbeWithDefaults(ctxChan <-chan context.Context) (<-chan error, func runSystemProbe(ctxChan <-chan context.Context, errChan chan error) error { return fxutil.OneShot( - func(log log.Component, _ config.Component, statsd compstatsd.Component, telemetry telemetry.Component, sysprobeconfig sysprobeconfig.Component, rcclient rcclient.Component, wmeta workloadmeta.Component, tagger tagger.Component, _ healthprobe.Component, settings settings.Component) error { + func(log log.Component, _ config.Component, statsd compstatsd.Component, telemetry telemetry.Component, sysprobeconfig sysprobeconfig.Component, rcclient rcclient.Component, wmeta workloadmeta.Component, tagger tagger.Component, _ healthprobe.Component, settings settings.Component, compression logscompression.Component) error { defer StopSystemProbeWithDefaults() - err := startSystemProbe(log, statsd, telemetry, sysprobeconfig, rcclient, wmeta, tagger, settings) + err := startSystemProbe(log, statsd, telemetry, sysprobeconfig, rcclient, wmeta, tagger, settings, compression) if err != nil { return err } @@ -311,6 +314,7 @@ func runSystemProbe(ctxChan <-chan context.Context, errChan chan error) error { } }), settingsimpl.Module(), + logscompressionfx.Module(), ) } @@ -320,7 +324,7 @@ func StopSystemProbeWithDefaults() { } // startSystemProbe Initializes the system-probe process -func startSystemProbe(log log.Component, statsd compstatsd.Component, telemetry telemetry.Component, sysprobeconfig sysprobeconfig.Component, _ rcclient.Component, wmeta workloadmeta.Component, tagger tagger.Component, settings settings.Component) error { +func startSystemProbe(log log.Component, statsd compstatsd.Component, telemetry telemetry.Component, sysprobeconfig sysprobeconfig.Component, _ rcclient.Component, wmeta workloadmeta.Component, tagger tagger.Component, settings settings.Component, compression logscompression.Component) error { var err error cfg := sysprobeconfig.SysProbeObject() @@ -380,7 +384,7 @@ func startSystemProbe(log log.Component, statsd compstatsd.Component, telemetry }() } - if err = api.StartServer(cfg, telemetry, wmeta, tagger, settings); err != nil { + if err = api.StartServer(cfg, telemetry, wmeta, tagger, settings, compression); err != nil { return log.Criticalf("error while starting api server, exiting: %v", err) } return nil diff --git a/pkg/collector/corechecks/servicediscovery/module/impl_linux_test.go b/pkg/collector/corechecks/servicediscovery/module/impl_linux_test.go index 1f388f538cd58..9cbfaae0fc9a9 100644 --- a/pkg/collector/corechecks/servicediscovery/module/impl_linux_test.go +++ b/pkg/collector/corechecks/servicediscovery/module/impl_linux_test.go @@ -92,7 +92,7 @@ func setupDiscoveryModule(t *testing.T) (string, *proccontainersmocks.MockContai return false }, } - err := module.Register(cfg, mux, []module.Factory{m}, wmeta, tagger, nil) + err := module.Register(cfg, mux, []module.Factory{m}, wmeta, tagger, nil, nil) require.NoError(t, err) srv := httptest.NewServer(mux)