diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index c0826330fa32..268540a5676c 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -44,7 +44,7 @@ func makeES( log := logp.NewLogger(logSelector) eventsLogger := logp.NewLogger(logSelector) // Set a new Output so it writes to a different file than `log` - eventsLogger = log.WithOptions(zap.WrapCore(logp.WithFileOutput(eventsLoggerCfg))) + eventsLogger = eventsLogger.WithOptions(zap.WrapCore(logp.WithFileOutput(eventsLoggerCfg))) if !cfg.HasField("bulk_max_size") { if err := cfg.SetInt("bulk_max_size", -1, defaultBulkSize); err != nil { diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go index 45bef4c6fe8d..9c83676d12b9 100644 --- a/libbeat/outputs/fileout/file.go +++ b/libbeat/outputs/fileout/file.go @@ -29,6 +29,7 @@ import ( c "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/file" "github.com/elastic/elastic-agent-libs/logp" + "go.uber.org/zap" ) func init() { @@ -36,12 +37,13 @@ func init() { } type fileOutput struct { - log *logp.Logger - filePath string - beat beat.Info - observer outputs.Observer - rotator *file.Rotator - codec codec.Codec + log *logp.Logger + eventsLogger *logp.Logger + filePath string + beat beat.Info + observer outputs.Observer + rotator *file.Rotator + codec codec.Codec } // makeFileout instantiates a new file output instance. @@ -60,10 +62,15 @@ func makeFileout( // disable bulk support in publisher pipeline _ = cfg.SetInt("bulk_max_size", -1, -1) + logSelector := "file" + eventsLogger := logp.NewLogger(logSelector) + // Set a new Output so it writes to a different file than `log` + eventsLogger = eventsLogger.WithOptions(zap.WrapCore(logp.WithFileOutput(eventsLoggerCfg))) fo := &fileOutput{ - log: logp.NewLogger("file"), - beat: beat, - observer: observer, + log: logp.NewLogger(logSelector), + eventsLogger: eventsLogger, + beat: beat, + observer: observer, } if err := fo.init(beat, foConfig); err != nil { return outputs.Fail(err) @@ -130,7 +137,8 @@ func (out *fileOutput) Publish(_ context.Context, batch publisher.Batch) error { } else { out.log.Warnf("Failed to serialize the event: %+v", err) } - out.log.Debugf("Failed event: %v", event) + out.log.Debug("Event logged to events-data log file") + out.eventsLogger.Debugf("Failed event: %v", event) dropped++ continue diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index 24bbc61145d4..6acebdfab9a0 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -28,6 +28,7 @@ import ( "github.com/Shopify/sarama" "github.com/eapache/go-resiliency/breaker" + "go.uber.org/zap" "github.com/elastic/beats/v7/libbeat/common/fmtstr" "github.com/elastic/beats/v7/libbeat/outputs" @@ -40,16 +41,17 @@ import ( ) type client struct { - log *logp.Logger - observer outputs.Observer - hosts []string - topic outil.Selector - key *fmtstr.EventFormatString - index string - codec codec.Codec - config sarama.Config - mux sync.Mutex - done chan struct{} + log *logp.Logger + eventsLogger *logp.Logger + observer outputs.Observer + hosts []string + topic outil.Selector + key *fmtstr.EventFormatString + index string + codec codec.Codec + config sarama.Config + mux sync.Mutex + done chan struct{} producer sarama.AsyncProducer @@ -81,17 +83,23 @@ func newKafkaClient( headers []header, writer codec.Codec, cfg *sarama.Config, + eventsLoggerCfg logp.Config, ) (*client, error) { + eventsLogger := logp.NewLogger(logSelector) + // Set a new Output so it writes to a different file than `log` + eventsLogger = eventsLogger.WithOptions(zap.WrapCore(logp.WithFileOutput(eventsLoggerCfg))) + c := &client{ - log: logp.NewLogger(logSelector), - observer: observer, - hosts: hosts, - topic: topic, - key: key, - index: strings.ToLower(index), - codec: writer, - config: *cfg, - done: make(chan struct{}), + log: logp.NewLogger(logSelector), + eventsLogger: eventsLogger, + observer: observer, + hosts: hosts, + topic: topic, + key: key, + index: strings.ToLower(index), + codec: writer, + config: *cfg, + done: make(chan struct{}), } if len(headers) != 0 { @@ -228,7 +236,8 @@ func (c *client) getEventMessage(data *publisher.Event) (*message, error) { serializedEvent, err := c.codec.Encode(c.index, event) if err != nil { if c.log.IsDebug() { - c.log.Debugf("failed event: %v", event) + c.eventsLogger.Debugf("failed event: %v", event) + c.log.Debug("failed event logged to events logger file") } return nil, err } diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index ad69f30477e9..524a51bafe2d 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -73,7 +73,7 @@ func makeKafka( return outputs.Fail(err) } - client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, kConfig.Key, topic, kConfig.Headers, codec, libCfg) + client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, kConfig.Key, topic, kConfig.Headers, codec, libCfg, eventsLoggerCfg) if err != nil { return outputs.Fail(err) } diff --git a/libbeat/outputs/redis/client.go b/libbeat/outputs/redis/client.go index 5165d894f654..740609b95460 100644 --- a/libbeat/outputs/redis/client.go +++ b/libbeat/outputs/redis/client.go @@ -26,6 +26,7 @@ import ( "time" "github.com/gomodule/redigo/redis" + "go.uber.org/zap" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/outputs" @@ -47,7 +48,8 @@ type publishFn func( ) ([]publisher.Event, error) type client struct { - log *logp.Logger + log *logp.Logger + eventsLogger *logp.Logger *transport.Client observer outputs.Observer index string @@ -74,18 +76,25 @@ func newClient( pass string, db int, key outil.Selector, dt redisDataType, index string, codec codec.Codec, + eventsLoggerCfg logp.Config, ) *client { + logSelector := "redis" + eventsLogger := logp.NewLogger(logSelector) + // Set a new Output so it writes to a different file than `log` + eventsLogger = eventsLogger.WithOptions(zap.WrapCore(logp.WithFileOutput(eventsLoggerCfg))) + return &client{ - log: logp.NewLogger("redis"), - Client: tc, - observer: observer, - timeout: timeout, - password: pass, - index: strings.ToLower(index), - db: db, - dataType: dt, - key: key, - codec: codec, + log: logp.NewLogger(logSelector), + eventsLogger: eventsLogger, + Client: tc, + observer: observer, + timeout: timeout, + password: pass, + index: strings.ToLower(index), + db: db, + dataType: dt, + key: key, + codec: codec, } } @@ -227,7 +236,7 @@ func (c *client) publishEventsBulk(conn redis.Conn, command string) publishFn { args := make([]interface{}, 1, len(data)+1) args[0] = dest - okEvents, args := serializeEvents(c.log, args, 1, data, c.index, c.codec) + okEvents, args := serializeEvents(c.log, c.eventsLogger, args, 1, data, c.index, c.codec) c.observer.Dropped(len(data) - len(okEvents)) if (len(args) - 1) == 0 { return nil, nil @@ -250,7 +259,7 @@ func (c *client) publishEventsPipeline(conn redis.Conn, command string) publishF return func(key outil.Selector, data []publisher.Event) ([]publisher.Event, error) { var okEvents []publisher.Event serialized := make([]interface{}, 0, len(data)) - okEvents, serialized = serializeEvents(c.log, serialized, 0, data, c.index, c.codec) + okEvents, serialized = serializeEvents(c.log, c.eventsLogger, serialized, 0, data, c.index, c.codec) c.observer.Dropped(len(data) - len(okEvents)) if len(serialized) == 0 { return nil, nil @@ -305,6 +314,7 @@ func (c *client) publishEventsPipeline(conn redis.Conn, command string) publishF func serializeEvents( log *logp.Logger, + eventsLogger *logp.Logger, to []interface{}, i int, data []publisher.Event, @@ -316,8 +326,8 @@ func serializeEvents( for _, d := range data { serializedEvent, err := codec.Encode(index, &d.Content) if err != nil { - log.Errorf("Encoding event failed with error: %+v", err) - log.Debugf("Failed event: %v", d.Content) + log.Errorf("Encoding event failed with error: %+v. Look for events-data log file to view the event", err) + eventsLogger.Debugf("Failed event: %v", d.Content) goto failLoop } @@ -334,8 +344,8 @@ failLoop: for _, d := range rest { serializedEvent, err := codec.Encode(index, &d.Content) if err != nil { - log.Errorf("Encoding event failed with error: %+v", err) - log.Debugf("Failed event: %v", d.Content) + log.Errorf("Encoding event failed with error: %+v. Look for events-data log file to view the event", err) + eventsLogger.Debugf("Failed event: %v", d.Content) i++ continue } diff --git a/libbeat/outputs/redis/redis.go b/libbeat/outputs/redis/redis.go index 5656cd951717..5f902620a2ab 100644 --- a/libbeat/outputs/redis/redis.go +++ b/libbeat/outputs/redis/redis.go @@ -163,7 +163,7 @@ func makeRedis( } client := newClient(conn, observer, rConfig.Timeout, - pass, rConfig.Db, key, dataType, rConfig.Index, enc) + pass, rConfig.Db, key, dataType, rConfig.Index, enc, eventsLoggerCfg) clients[i] = newBackoffClient(client, rConfig.Backoff.Init, rConfig.Backoff.Max) }