From 3185361ad2f139273878c05a3e4eb164536dba4c Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Tue, 19 Dec 2023 12:12:10 +0100 Subject: [PATCH] Log raw events and errors containing events to a separate file This commit introduces a new logger that can be configured through `logging.events` that can be used to log any message that contains the whole event or could contain the whole event. At the moment it is used by the elasticsearch output to log indexing errors containing the whole event and errors returned by Elasticsearch that can potentially contain the whole event. --- go.mod | 2 + go.sum | 4 +- libbeat/cmd/instance/beat.go | 43 +++++++++++++--- libbeat/cmd/test/output.go | 4 +- libbeat/outputs/console/console.go | 1 + libbeat/outputs/elasticsearch/client.go | 22 +++++--- .../outputs/elasticsearch/elasticsearch.go | 51 +++++++++++-------- libbeat/outputs/fileout/file.go | 1 + libbeat/outputs/kafka/kafka.go | 1 + libbeat/outputs/logstash/logstash.go | 2 + libbeat/outputs/output_reg.go | 7 ++- libbeat/outputs/redis/redis.go | 2 + libbeat/outputs/shipper/shipper.go | 1 + libbeat/publisher/pipeline/controller.go | 5 +- libbeat/publisher/pipeline/pipeline.go | 3 +- 15 files changed, 105 insertions(+), 44 deletions(-) diff --git a/go.mod b/go.mod index 6732fbc60061..5b03d2b1c546 100644 --- a/go.mod +++ b/go.mod @@ -420,3 +420,5 @@ replace ( // Exclude this version because the version has an invalid checksum. exclude github.com/docker/distribution v2.8.0+incompatible + +replace github.com/elastic/elastic-agent-libs => github.com/belimawr/elastic-agent-libs v0.2.9-0.20231220154111-efc1fba83b4b diff --git a/go.sum b/go.sum index 5b4d63c39652..0aad25138bc1 100644 --- a/go.sum +++ b/go.sum @@ -368,6 +368,8 @@ github.com/awslabs/goformation/v4 v4.1.0 h1:JRxIW0IjhYpYDrIZOTJGMu2azXKI+OK5dP56 github.com/awslabs/goformation/v4 v4.1.0/go.mod h1:MBDN7u1lMNDoehbFuO4uPvgwPeolTMA2TzX1yO6KlxI= github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20220623125934-28468a6701b5 h1:lxW5Q6K2IisyF5tlr6Ts0W4POGWQZco05MJjFmoeIHs= github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20220623125934-28468a6701b5/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc= +github.com/belimawr/elastic-agent-libs v0.2.9-0.20231220154111-efc1fba83b4b h1:GEwwH9rTwJzcHAcdCFkfta0AHbADcTNpxZhL51ASLpo= +github.com/belimawr/elastic-agent-libs v0.2.9-0.20231220154111-efc1fba83b4b/go.mod h1:EbRwBMsWoU4IHGKJlTrxbxC03hkihS9W4h+UgraLdDM= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/immutable v0.2.1/go.mod h1:uc6OHo6PN2++n98KHLxW8ef4W42ylHiQSENghE1ezxI= github.com/benbjohnson/tmpl v1.0.0/go.mod h1:igT620JFIi44B6awvU9IsDhR77IXWtFigTLil/RPdps= @@ -656,8 +658,6 @@ github.com/elastic/elastic-agent-autodiscover v0.6.5 h1:5DeMpuNc8c/tN6HN0A4A2uOF github.com/elastic/elastic-agent-autodiscover v0.6.5/go.mod h1:chulyCAyZb/njMHgzkhC/yWnt8v/Y6eCRUhmFVnsA5o= github.com/elastic/elastic-agent-client/v7 v7.6.0 h1:FEn6FjzynW4TIQo5G096Tr7xYK/P5LY9cSS6wRbXZTc= github.com/elastic/elastic-agent-client/v7 v7.6.0/go.mod h1:GlUKrbVd/O1CRAZonpBeN3J0RlVqP6VGcrBjFWca+aM= -github.com/elastic/elastic-agent-libs v0.7.2 h1:yT0hF0UAxJCdQqhHh6SFpgYrcpB10oFzPj8IaytPS2o= -github.com/elastic/elastic-agent-libs v0.7.2/go.mod h1:pVBEElQJUO9mr4WStWNXuQGsJn54lcjAoYAHmsvBLBc= github.com/elastic/elastic-agent-shipper-client v0.5.1-0.20230228231646-f04347b666f3 h1:sb+25XJn/JcC9/VL8HX4r4QXSUq4uTNzGS2kxOE7u1U= github.com/elastic/elastic-agent-shipper-client v0.5.1-0.20230228231646-f04347b666f3/go.mod h1:rWarFM7qYxJKsi9WcV6ONcFjH/NA3niDNpTxO+8/GVI= github.com/elastic/elastic-agent-system-metrics v0.9.1 h1:r0ofKHgPpl+W09ie7tzGcCDC0d4NZbQUv37rSgHf4FM= diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index efe8bd48f79a..cff6d7120333 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -125,6 +125,7 @@ type beatConfig struct { BufferConfig *config.C `config:"http.buffer"` Path paths.Path `config:"path"` Logging *config.C `config:"logging"` + EventLogging *config.C `config:"logging.events"` MetricLogging *config.C `config:"logging.metrics"` Keystore *config.C `config:"keystore"` Instrumentation instrumentation.Config `config:"instrumentation"` @@ -378,7 +379,26 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { Logger: logp.L().Named("publisher"), Tracer: b.Instrumentation.Tracer(), } - outputFactory := b.makeOutputFactory(b.Config.Output) + + // Get the default/current logging configuration + // we need some defaults to be populates otherwise Unpack will + // fail + eventsLoggerCfg := logp.DefaultConfig(configure.GetEnvironment()) + + // merge eventsLoggerCfg with b.Config.Logging, so logging.events.* only + // overwrites logging.* + if err := b.Config.EventLogging.Unpack(&eventsLoggerCfg); err != nil { + return nil, fmt.Errorf("error initialising events logger: %w", err) + } + + // Ensure the default filename is set + if eventsLoggerCfg.Files.Name == "" { + eventsLoggerCfg.Files.Name = b.Info.Beat + // Append the name so the files do not overwrite themselves. + eventsLoggerCfg.Files.Name = eventsLoggerCfg.Files.Name + "-events-data" + } + + outputFactory := b.makeOutputFactory(b.Config.Output, eventsLoggerCfg) settings := pipeline.Settings{ Processors: b.processors, InputQueueSize: b.InputQueueSize, @@ -388,7 +408,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { return nil, fmt.Errorf("error initializing publisher: %w", err) } - reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader())) + reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader(), eventsLoggerCfg)) // TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet, // but refine publisher to disconnect clients on stop automatically @@ -784,6 +804,14 @@ func (b *Beat) configure(settings Settings) error { return fmt.Errorf("error unpacking config data: %w", err) } + if b.Config.EventLogging == nil { + b.Config.EventLogging = config.NewConfig() + } + b.Config.EventLogging.Merge(b.Config.Logging) + if _, err := b.Config.EventLogging.Remove("events", -1); err != nil { + return fmt.Errorf("cannot merge logging and logging.events configuration: %w", err) + } + if err := promoteOutputQueueSettings(&b.Config); err != nil { return fmt.Errorf("could not promote output queue settings: %w", err) } @@ -1091,7 +1119,7 @@ func (b *Beat) indexSetupCallback() elasticsearch.ConnectCallback { } } -func (b *Beat) makeOutputReloader(outReloader pipeline.OutputReloader) reload.Reloadable { +func (b *Beat) makeOutputReloader(outReloader pipeline.OutputReloader, eventsLoggerCfg logp.Config) reload.Reloadable { return reload.ReloadableFunc(func(update *reload.ConfigWithMeta) error { if update == nil { return nil @@ -1113,15 +1141,16 @@ func (b *Beat) makeOutputReloader(outReloader pipeline.OutputReloader) reload.Re } } - return outReloader.Reload(update, b.createOutput) + return outReloader.Reload(update, eventsLoggerCfg, b.createOutput) }) } func (b *Beat) makeOutputFactory( cfg config.Namespace, + eventLoggerCfg logp.Config, ) func(outputs.Observer) (string, outputs.Group, error) { return func(outStats outputs.Observer) (string, outputs.Group, error) { - out, err := b.createOutput(outStats, cfg) + out, err := b.createOutput(outStats, cfg, eventLoggerCfg) return cfg.Name(), out, err } } @@ -1217,7 +1246,7 @@ func (b *Beat) reloadOutputOnCertChange(cfg config.Namespace) error { return nil } -func (b *Beat) createOutput(stats outputs.Observer, cfg config.Namespace) (outputs.Group, error) { +func (b *Beat) createOutput(stats outputs.Observer, cfg config.Namespace, eventsLoggerCfg logp.Config) (outputs.Group, error) { if !cfg.IsSet() { return outputs.Group{}, nil } @@ -1226,7 +1255,7 @@ func (b *Beat) createOutput(stats outputs.Observer, cfg config.Namespace) (outpu return outputs.Group{}, fmt.Errorf("could not setup output certificates reloader: %w", err) } - return outputs.Load(b.IdxSupporter, b.Info, stats, cfg.Name(), cfg.Config()) + return outputs.Load(b.IdxSupporter, b.Info, stats, cfg.Name(), cfg.Config(), eventsLoggerCfg) } func (b *Beat) registerClusterUUIDFetching() { diff --git a/libbeat/cmd/test/output.go b/libbeat/cmd/test/output.go index 3290c283c27f..ac7e3ba535a2 100644 --- a/libbeat/cmd/test/output.go +++ b/libbeat/cmd/test/output.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/v7/libbeat/cmd/instance" "github.com/elastic/beats/v7/libbeat/idxmgmt" "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/testing" ) @@ -41,7 +42,8 @@ func GenTestOutputCmd(settings instance.Settings) *cobra.Command { } im, _ := idxmgmt.DefaultSupport(nil, b.Info, nil) - output, err := outputs.Load(im, b.Info, nil, b.Config.Output.Name(), b.Config.Output.Config()) + // we use an empty config for the events logger because this is just a output test + output, err := outputs.Load(im, b.Info, nil, b.Config.Output.Name(), b.Config.Output.Config(), logp.Config{}) if err != nil { fmt.Fprintf(os.Stderr, "Error initializing output: %s\n", err) os.Exit(1) diff --git a/libbeat/outputs/console/console.go b/libbeat/outputs/console/console.go index b81bf3363486..a7cc4a69e6ad 100644 --- a/libbeat/outputs/console/console.go +++ b/libbeat/outputs/console/console.go @@ -51,6 +51,7 @@ func makeConsole( beat beat.Info, observer outputs.Observer, cfg *config.C, + eventsLoggerCfg logp.Config, ) (outputs.Group, error) { config := defaultConfig err := cfg.Unpack(&config) diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 4996dba887e2..df3aa8658fef 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -55,7 +55,8 @@ type Client struct { observer outputs.Observer NonIndexableAction string - log *logp.Logger + log *logp.Logger + eventsLogger *logp.Logger } // ClientSettings contains the settings for a client. @@ -81,6 +82,8 @@ const ( // NewClient instantiates a new client. func NewClient( + logger *logp.Logger, + eventsLogger *logp.Logger, s ClientSettings, onConnect *callbacksRegistry, ) (*Client, error) { @@ -140,7 +143,8 @@ func NewClient( observer: s.Observer, NonIndexableAction: s.NonIndexableAction, - log: logp.NewLogger("elasticsearch"), + log: logger, + eventsLogger: eventsLogger, } return client, nil @@ -174,6 +178,8 @@ func (client *Client) Clone() *Client { client.conn.Transport.Proxy.Disable = client.conn.Transport.Proxy.URL == nil c, _ := NewClient( + client.log, + client.eventsLogger, ClientSettings{ ConnectionSettings: connection, Index: client.index, @@ -427,12 +433,12 @@ func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, dat result, _ := data[i].Content.Meta.HasKey(dead_letter_marker_field) if result { stats.nonIndexable++ - client.log.Errorf("Can't deliver to dead letter index event (status=%v). Enable debug logs to view the event and cause.", status) - client.log.Debugf("Can't deliver to dead letter index event %#v (status=%v): %s", data[i], status, msg) + client.log.Errorf("Can't deliver to dead letter index event (status=%v). Look for events-data log file to view the event and cause.", status) + client.eventsLogger.Errorf("Can't deliver to dead letter index event %#v (status=%v): %s", data[i], status, msg) // poison pill - this will clog the pipeline if the underlying failure is non transient. } else if client.NonIndexableAction == dead_letter_index { - client.log.Warnf("Cannot index event (status=%v), trying dead letter index. Enable debug logs to view the event and cause.", status) - client.log.Debugf("Cannot index event %#v (status=%v): %s, trying dead letter index", data[i], status, msg) + client.log.Warnf("Cannot index event (status=%v), trying dead letter index. Look for events-data log file to view the event and cause.", status) + client.eventsLogger.Warnf("Cannot index event %#v (status=%v): %s, trying dead letter index", data[i], status, msg) if data[i].Content.Meta == nil { data[i].Content.Meta = mapstr.M{ dead_letter_marker_field: true, @@ -447,8 +453,8 @@ func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, dat } } else { // drop stats.nonIndexable++ - client.log.Warnf("Cannot index event (status=%v): dropping event! Enable debug logs to view the event and cause.", status) - client.log.Debugf("Cannot index event %#v (status=%v): %s, dropping event!", data[i], status, msg) + client.log.Warnf("Cannot index event (status=%v): dropping event! Look for events-data log file to view the event and cause.", status) + client.eventsLogger.Warnf("Cannot index event %#v (status=%v): %s, dropping event!", data[i], status, msg) continue } } diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index 649168eb11b4..c0826330fa32 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -25,6 +25,7 @@ import ( "github.com/elastic/beats/v7/libbeat/outputs/outil" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" + "go.uber.org/zap" ) func init() { @@ -38,8 +39,13 @@ func makeES( beat beat.Info, observer outputs.Observer, cfg *config.C, + eventsLoggerCfg logp.Config, ) (outputs.Group, error) { log := logp.NewLogger(logSelector) + eventsLogger := logp.NewLogger(logSelector) + // Set a new Output so it writes to a different file than `log` + eventsLogger = log.WithOptions(zap.WrapCore(logp.WithFileOutput(eventsLoggerCfg))) + if !cfg.HasField("bulk_max_size") { if err := cfg.SetInt("bulk_max_size", -1, defaultBulkSize); err != nil { return outputs.Fail(err) @@ -110,27 +116,30 @@ func makeES( } var client outputs.NetworkClient - client, err = NewClient(ClientSettings{ - ConnectionSettings: eslegclient.ConnectionSettings{ - URL: esURL, - Beatname: beat.Beat, - Kerberos: esConfig.Kerberos, - Username: esConfig.Username, - Password: esConfig.Password, - APIKey: esConfig.APIKey, - Parameters: params, - Headers: esConfig.Headers, - CompressionLevel: esConfig.CompressionLevel, - Observer: observer, - EscapeHTML: esConfig.EscapeHTML, - Transport: esConfig.Transport, - IdleConnTimeout: esConfig.Transport.IdleConnTimeout, - }, - Index: index, - Pipeline: pipeline, - Observer: observer, - NonIndexableAction: policy.action(), - }, &connectCallbackRegistry) + client, err = NewClient( + log, + eventsLogger, + ClientSettings{ + ConnectionSettings: eslegclient.ConnectionSettings{ + URL: esURL, + Beatname: beat.Beat, + Kerberos: esConfig.Kerberos, + Username: esConfig.Username, + Password: esConfig.Password, + APIKey: esConfig.APIKey, + Parameters: params, + Headers: esConfig.Headers, + CompressionLevel: esConfig.CompressionLevel, + Observer: observer, + EscapeHTML: esConfig.EscapeHTML, + Transport: esConfig.Transport, + IdleConnTimeout: esConfig.Transport.IdleConnTimeout, + }, + Index: index, + Pipeline: pipeline, + Observer: observer, + NonIndexableAction: policy.action(), + }, &connectCallbackRegistry) if err != nil { return outputs.Fail(err) } diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go index d12a11b25c3c..45bef4c6fe8d 100644 --- a/libbeat/outputs/fileout/file.go +++ b/libbeat/outputs/fileout/file.go @@ -50,6 +50,7 @@ func makeFileout( beat beat.Info, observer outputs.Observer, cfg *c.C, + eventsLoggerCfg logp.Config, ) (outputs.Group, error) { foConfig := defaultConfig() if err := cfg.Unpack(&foConfig); err != nil { diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index 0c856ea425db..ad69f30477e9 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -43,6 +43,7 @@ func makeKafka( beat beat.Info, observer outputs.Observer, cfg *config.C, + eventsLoggerCfg logp.Config, ) (outputs.Group, error) { log := logp.NewLogger(logSelector) log.Debug("initialize kafka output") diff --git a/libbeat/outputs/logstash/logstash.go b/libbeat/outputs/logstash/logstash.go index 072ec049f6fb..466f6b742f9f 100644 --- a/libbeat/outputs/logstash/logstash.go +++ b/libbeat/outputs/logstash/logstash.go @@ -21,6 +21,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/outputs" conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/transport" "github.com/elastic/elastic-agent-libs/transport/tlscommon" ) @@ -40,6 +41,7 @@ func makeLogstash( beat beat.Info, observer outputs.Observer, cfg *conf.C, + eventsLoggerCfg logp.Config, ) (outputs.Group, error) { lsConfig, err := readConfig(cfg, beat) if err != nil { diff --git a/libbeat/outputs/output_reg.go b/libbeat/outputs/output_reg.go index 3d2675c2ce2e..213daf0298ad 100644 --- a/libbeat/outputs/output_reg.go +++ b/libbeat/outputs/output_reg.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" ) var outputReg = map[string]Factory{} @@ -32,7 +33,8 @@ type Factory func( im IndexManager, beat beat.Info, stats Observer, - cfg *config.C) (Group, error) + cfg *config.C, + eventsLogger logp.Config) (Group, error) // IndexManager provides additional index related services to the outputs. type IndexManager interface { @@ -81,6 +83,7 @@ func Load( stats Observer, name string, config *config.C, + eventsLoggerCfg logp.Config, ) (Group, error) { factory := FindFactory(name) if factory == nil { @@ -90,5 +93,5 @@ func Load( if stats == nil { stats = NewNilObserver() } - return factory(im, info, stats, config) + return factory(im, info, stats, config, eventsLoggerCfg) } diff --git a/libbeat/outputs/redis/redis.go b/libbeat/outputs/redis/redis.go index 9814d6abee7b..5656cd951717 100644 --- a/libbeat/outputs/redis/redis.go +++ b/libbeat/outputs/redis/redis.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/outputs/codec" "github.com/elastic/beats/v7/libbeat/outputs/outil" "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/transport" "github.com/elastic/elastic-agent-libs/transport/tlscommon" ) @@ -51,6 +52,7 @@ func makeRedis( beat beat.Info, observer outputs.Observer, cfg *config.C, + eventsLoggerCfg logp.Config, ) (outputs.Group, error) { if !cfg.HasField("index") { diff --git a/libbeat/outputs/shipper/shipper.go b/libbeat/outputs/shipper/shipper.go index fe19a36b31d2..bf0e77691f9e 100644 --- a/libbeat/outputs/shipper/shipper.go +++ b/libbeat/outputs/shipper/shipper.go @@ -92,6 +92,7 @@ func makeShipper( beat beat.Info, observer outputs.Observer, cfg *conf.C, + eventsLoggerCfg logp.Config, ) (outputs.Group, error) { config := defaultConfig() diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index 1c480c01bce2..bcaaca438fd7 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -180,7 +180,8 @@ func (c *outputController) Set(outGrp outputs.Group) { // Reload the output func (c *outputController) Reload( cfg *reload.ConfigWithMeta, - outFactory func(outputs.Observer, conf.Namespace) (outputs.Group, error), + eventsLoggerCfg logp.Config, + outFactory func(outputs.Observer, conf.Namespace, logp.Config) (outputs.Group, error), ) error { outCfg := conf.Namespace{} if cfg != nil { @@ -191,7 +192,7 @@ func (c *outputController) Reload( output, err := loadOutput(c.monitors, func(stats outputs.Observer) (string, outputs.Group, error) { name := outCfg.Name() - out, err := outFactory(stats, outCfg) + out, err := outFactory(stats, outCfg, eventsLoggerCfg) return name, out, err }) if err != nil { diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index cf03163750ee..7076b379fe58 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -111,7 +111,8 @@ const ( type OutputReloader interface { Reload( cfg *reload.ConfigWithMeta, - factory func(outputs.Observer, conf.Namespace) (outputs.Group, error), + eventsLoggerCfg logp.Config, + factory func(outputs.Observer, conf.Namespace, logp.Config) (outputs.Group, error), ) error }