Skip to content

Commit

Permalink
Log raw events and errors containing events to a separate file
Browse files Browse the repository at this point in the history
This commit introduces a new logger that can be configured through
`logging.events` that can be used to log any message that contains the
whole event or could contain the whole event.

At the moment it is used by the elasticsearch output to log indexing
errors containing the whole event and errors returned by Elasticsearch
that can potentially contain the whole event.
  • Loading branch information
belimawr committed Dec 20, 2023
1 parent b59a8f4 commit 3185361
Show file tree
Hide file tree
Showing 15 changed files with 105 additions and 44 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -420,3 +420,5 @@ replace (

// Exclude this version because the version has an invalid checksum.
exclude github.com/docker/distribution v2.8.0+incompatible

replace github.com/elastic/elastic-agent-libs => github.com/belimawr/elastic-agent-libs v0.2.9-0.20231220154111-efc1fba83b4b
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,8 @@ github.com/awslabs/goformation/v4 v4.1.0 h1:JRxIW0IjhYpYDrIZOTJGMu2azXKI+OK5dP56
github.com/awslabs/goformation/v4 v4.1.0/go.mod h1:MBDN7u1lMNDoehbFuO4uPvgwPeolTMA2TzX1yO6KlxI=
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20220623125934-28468a6701b5 h1:lxW5Q6K2IisyF5tlr6Ts0W4POGWQZco05MJjFmoeIHs=
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20220623125934-28468a6701b5/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc=
github.com/belimawr/elastic-agent-libs v0.2.9-0.20231220154111-efc1fba83b4b h1:GEwwH9rTwJzcHAcdCFkfta0AHbADcTNpxZhL51ASLpo=
github.com/belimawr/elastic-agent-libs v0.2.9-0.20231220154111-efc1fba83b4b/go.mod h1:EbRwBMsWoU4IHGKJlTrxbxC03hkihS9W4h+UgraLdDM=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/immutable v0.2.1/go.mod h1:uc6OHo6PN2++n98KHLxW8ef4W42ylHiQSENghE1ezxI=
github.com/benbjohnson/tmpl v1.0.0/go.mod h1:igT620JFIi44B6awvU9IsDhR77IXWtFigTLil/RPdps=
Expand Down Expand Up @@ -656,8 +658,6 @@ github.com/elastic/elastic-agent-autodiscover v0.6.5 h1:5DeMpuNc8c/tN6HN0A4A2uOF
github.com/elastic/elastic-agent-autodiscover v0.6.5/go.mod h1:chulyCAyZb/njMHgzkhC/yWnt8v/Y6eCRUhmFVnsA5o=
github.com/elastic/elastic-agent-client/v7 v7.6.0 h1:FEn6FjzynW4TIQo5G096Tr7xYK/P5LY9cSS6wRbXZTc=
github.com/elastic/elastic-agent-client/v7 v7.6.0/go.mod h1:GlUKrbVd/O1CRAZonpBeN3J0RlVqP6VGcrBjFWca+aM=
github.com/elastic/elastic-agent-libs v0.7.2 h1:yT0hF0UAxJCdQqhHh6SFpgYrcpB10oFzPj8IaytPS2o=
github.com/elastic/elastic-agent-libs v0.7.2/go.mod h1:pVBEElQJUO9mr4WStWNXuQGsJn54lcjAoYAHmsvBLBc=
github.com/elastic/elastic-agent-shipper-client v0.5.1-0.20230228231646-f04347b666f3 h1:sb+25XJn/JcC9/VL8HX4r4QXSUq4uTNzGS2kxOE7u1U=
github.com/elastic/elastic-agent-shipper-client v0.5.1-0.20230228231646-f04347b666f3/go.mod h1:rWarFM7qYxJKsi9WcV6ONcFjH/NA3niDNpTxO+8/GVI=
github.com/elastic/elastic-agent-system-metrics v0.9.1 h1:r0ofKHgPpl+W09ie7tzGcCDC0d4NZbQUv37rSgHf4FM=
Expand Down
43 changes: 36 additions & 7 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ type beatConfig struct {
BufferConfig *config.C `config:"http.buffer"`
Path paths.Path `config:"path"`
Logging *config.C `config:"logging"`
EventLogging *config.C `config:"logging.events"`
MetricLogging *config.C `config:"logging.metrics"`
Keystore *config.C `config:"keystore"`
Instrumentation instrumentation.Config `config:"instrumentation"`
Expand Down Expand Up @@ -378,7 +379,26 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
Logger: logp.L().Named("publisher"),
Tracer: b.Instrumentation.Tracer(),
}
outputFactory := b.makeOutputFactory(b.Config.Output)

// Get the default/current logging configuration
// we need some defaults to be populates otherwise Unpack will
// fail
eventsLoggerCfg := logp.DefaultConfig(configure.GetEnvironment())

// merge eventsLoggerCfg with b.Config.Logging, so logging.events.* only
// overwrites logging.*
if err := b.Config.EventLogging.Unpack(&eventsLoggerCfg); err != nil {
return nil, fmt.Errorf("error initialising events logger: %w", err)
}

// Ensure the default filename is set
if eventsLoggerCfg.Files.Name == "" {
eventsLoggerCfg.Files.Name = b.Info.Beat
// Append the name so the files do not overwrite themselves.
eventsLoggerCfg.Files.Name = eventsLoggerCfg.Files.Name + "-events-data"
}

outputFactory := b.makeOutputFactory(b.Config.Output, eventsLoggerCfg)
settings := pipeline.Settings{
Processors: b.processors,
InputQueueSize: b.InputQueueSize,
Expand All @@ -388,7 +408,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
return nil, fmt.Errorf("error initializing publisher: %w", err)
}

reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader()))
reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader(), eventsLoggerCfg))

// TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet,
// but refine publisher to disconnect clients on stop automatically
Expand Down Expand Up @@ -784,6 +804,14 @@ func (b *Beat) configure(settings Settings) error {
return fmt.Errorf("error unpacking config data: %w", err)
}

if b.Config.EventLogging == nil {
b.Config.EventLogging = config.NewConfig()
}
b.Config.EventLogging.Merge(b.Config.Logging)

Check failure on line 810 in libbeat/cmd/instance/beat.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value of `b.Config.EventLogging.Merge` is not checked (errcheck)

Check failure on line 810 in libbeat/cmd/instance/beat.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value of `b.Config.EventLogging.Merge` is not checked (errcheck)
if _, err := b.Config.EventLogging.Remove("events", -1); err != nil {
return fmt.Errorf("cannot merge logging and logging.events configuration: %w", err)
}

if err := promoteOutputQueueSettings(&b.Config); err != nil {
return fmt.Errorf("could not promote output queue settings: %w", err)
}
Expand Down Expand Up @@ -1091,7 +1119,7 @@ func (b *Beat) indexSetupCallback() elasticsearch.ConnectCallback {
}
}

func (b *Beat) makeOutputReloader(outReloader pipeline.OutputReloader) reload.Reloadable {
func (b *Beat) makeOutputReloader(outReloader pipeline.OutputReloader, eventsLoggerCfg logp.Config) reload.Reloadable {
return reload.ReloadableFunc(func(update *reload.ConfigWithMeta) error {
if update == nil {
return nil
Expand All @@ -1113,15 +1141,16 @@ func (b *Beat) makeOutputReloader(outReloader pipeline.OutputReloader) reload.Re
}
}

return outReloader.Reload(update, b.createOutput)
return outReloader.Reload(update, eventsLoggerCfg, b.createOutput)
})
}

func (b *Beat) makeOutputFactory(
cfg config.Namespace,
eventLoggerCfg logp.Config,
) func(outputs.Observer) (string, outputs.Group, error) {
return func(outStats outputs.Observer) (string, outputs.Group, error) {
out, err := b.createOutput(outStats, cfg)
out, err := b.createOutput(outStats, cfg, eventLoggerCfg)
return cfg.Name(), out, err
}
}
Expand Down Expand Up @@ -1217,7 +1246,7 @@ func (b *Beat) reloadOutputOnCertChange(cfg config.Namespace) error {
return nil
}

func (b *Beat) createOutput(stats outputs.Observer, cfg config.Namespace) (outputs.Group, error) {
func (b *Beat) createOutput(stats outputs.Observer, cfg config.Namespace, eventsLoggerCfg logp.Config) (outputs.Group, error) {
if !cfg.IsSet() {
return outputs.Group{}, nil
}
Expand All @@ -1226,7 +1255,7 @@ func (b *Beat) createOutput(stats outputs.Observer, cfg config.Namespace) (outpu
return outputs.Group{}, fmt.Errorf("could not setup output certificates reloader: %w", err)
}

return outputs.Load(b.IdxSupporter, b.Info, stats, cfg.Name(), cfg.Config())
return outputs.Load(b.IdxSupporter, b.Info, stats, cfg.Name(), cfg.Config(), eventsLoggerCfg)
}

func (b *Beat) registerClusterUUIDFetching() {
Expand Down
4 changes: 3 additions & 1 deletion libbeat/cmd/test/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/v7/libbeat/cmd/instance"
"github.com/elastic/beats/v7/libbeat/idxmgmt"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/testing"
)

Expand All @@ -41,7 +42,8 @@ func GenTestOutputCmd(settings instance.Settings) *cobra.Command {
}

im, _ := idxmgmt.DefaultSupport(nil, b.Info, nil)
output, err := outputs.Load(im, b.Info, nil, b.Config.Output.Name(), b.Config.Output.Config())
// we use an empty config for the events logger because this is just a output test
output, err := outputs.Load(im, b.Info, nil, b.Config.Output.Name(), b.Config.Output.Config(), logp.Config{})
if err != nil {
fmt.Fprintf(os.Stderr, "Error initializing output: %s\n", err)
os.Exit(1)
Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func makeConsole(
beat beat.Info,
observer outputs.Observer,
cfg *config.C,
eventsLoggerCfg logp.Config,
) (outputs.Group, error) {
config := defaultConfig
err := cfg.Unpack(&config)
Expand Down
22 changes: 14 additions & 8 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ type Client struct {
observer outputs.Observer
NonIndexableAction string

log *logp.Logger
log *logp.Logger
eventsLogger *logp.Logger
}

// ClientSettings contains the settings for a client.
Expand All @@ -81,6 +82,8 @@ const (

// NewClient instantiates a new client.
func NewClient(
logger *logp.Logger,
eventsLogger *logp.Logger,
s ClientSettings,
onConnect *callbacksRegistry,
) (*Client, error) {
Expand Down Expand Up @@ -140,7 +143,8 @@ func NewClient(
observer: s.Observer,
NonIndexableAction: s.NonIndexableAction,

log: logp.NewLogger("elasticsearch"),
log: logger,
eventsLogger: eventsLogger,
}

return client, nil
Expand Down Expand Up @@ -174,6 +178,8 @@ func (client *Client) Clone() *Client {
client.conn.Transport.Proxy.Disable = client.conn.Transport.Proxy.URL == nil

c, _ := NewClient(
client.log,
client.eventsLogger,
ClientSettings{
ConnectionSettings: connection,
Index: client.index,
Expand Down Expand Up @@ -427,12 +433,12 @@ func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, dat
result, _ := data[i].Content.Meta.HasKey(dead_letter_marker_field)
if result {
stats.nonIndexable++
client.log.Errorf("Can't deliver to dead letter index event (status=%v). Enable debug logs to view the event and cause.", status)
client.log.Debugf("Can't deliver to dead letter index event %#v (status=%v): %s", data[i], status, msg)
client.log.Errorf("Can't deliver to dead letter index event (status=%v). Look for events-data log file to view the event and cause.", status)
client.eventsLogger.Errorf("Can't deliver to dead letter index event %#v (status=%v): %s", data[i], status, msg)
// poison pill - this will clog the pipeline if the underlying failure is non transient.
} else if client.NonIndexableAction == dead_letter_index {
client.log.Warnf("Cannot index event (status=%v), trying dead letter index. Enable debug logs to view the event and cause.", status)
client.log.Debugf("Cannot index event %#v (status=%v): %s, trying dead letter index", data[i], status, msg)
client.log.Warnf("Cannot index event (status=%v), trying dead letter index. Look for events-data log file to view the event and cause.", status)
client.eventsLogger.Warnf("Cannot index event %#v (status=%v): %s, trying dead letter index", data[i], status, msg)
if data[i].Content.Meta == nil {
data[i].Content.Meta = mapstr.M{
dead_letter_marker_field: true,
Expand All @@ -447,8 +453,8 @@ func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, dat
}
} else { // drop
stats.nonIndexable++
client.log.Warnf("Cannot index event (status=%v): dropping event! Enable debug logs to view the event and cause.", status)
client.log.Debugf("Cannot index event %#v (status=%v): %s, dropping event!", data[i], status, msg)
client.log.Warnf("Cannot index event (status=%v): dropping event! Look for events-data log file to view the event and cause.", status)
client.eventsLogger.Warnf("Cannot index event %#v (status=%v): %s, dropping event!", data[i], status, msg)
continue
}
}
Expand Down
51 changes: 30 additions & 21 deletions libbeat/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/elastic/beats/v7/libbeat/outputs/outil"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"go.uber.org/zap"
)

func init() {
Expand All @@ -38,8 +39,13 @@ func makeES(
beat beat.Info,
observer outputs.Observer,
cfg *config.C,
eventsLoggerCfg logp.Config,
) (outputs.Group, error) {
log := logp.NewLogger(logSelector)
eventsLogger := logp.NewLogger(logSelector)
// Set a new Output so it writes to a different file than `log`
eventsLogger = log.WithOptions(zap.WrapCore(logp.WithFileOutput(eventsLoggerCfg)))

if !cfg.HasField("bulk_max_size") {
if err := cfg.SetInt("bulk_max_size", -1, defaultBulkSize); err != nil {
return outputs.Fail(err)
Expand Down Expand Up @@ -110,27 +116,30 @@ func makeES(
}

var client outputs.NetworkClient
client, err = NewClient(ClientSettings{
ConnectionSettings: eslegclient.ConnectionSettings{
URL: esURL,
Beatname: beat.Beat,
Kerberos: esConfig.Kerberos,
Username: esConfig.Username,
Password: esConfig.Password,
APIKey: esConfig.APIKey,
Parameters: params,
Headers: esConfig.Headers,
CompressionLevel: esConfig.CompressionLevel,
Observer: observer,
EscapeHTML: esConfig.EscapeHTML,
Transport: esConfig.Transport,
IdleConnTimeout: esConfig.Transport.IdleConnTimeout,
},
Index: index,
Pipeline: pipeline,
Observer: observer,
NonIndexableAction: policy.action(),
}, &connectCallbackRegistry)
client, err = NewClient(
log,
eventsLogger,
ClientSettings{
ConnectionSettings: eslegclient.ConnectionSettings{
URL: esURL,
Beatname: beat.Beat,
Kerberos: esConfig.Kerberos,
Username: esConfig.Username,
Password: esConfig.Password,
APIKey: esConfig.APIKey,
Parameters: params,
Headers: esConfig.Headers,
CompressionLevel: esConfig.CompressionLevel,
Observer: observer,
EscapeHTML: esConfig.EscapeHTML,
Transport: esConfig.Transport,
IdleConnTimeout: esConfig.Transport.IdleConnTimeout,
},
Index: index,
Pipeline: pipeline,
Observer: observer,
NonIndexableAction: policy.action(),
}, &connectCallbackRegistry)
if err != nil {
return outputs.Fail(err)
}
Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/fileout/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func makeFileout(
beat beat.Info,
observer outputs.Observer,
cfg *c.C,
eventsLoggerCfg logp.Config,
) (outputs.Group, error) {
foConfig := defaultConfig()
if err := cfg.Unpack(&foConfig); err != nil {
Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func makeKafka(
beat beat.Info,
observer outputs.Observer,
cfg *config.C,
eventsLoggerCfg logp.Config,
) (outputs.Group, error) {
log := logp.NewLogger(logSelector)
log.Debug("initialize kafka output")
Expand Down
2 changes: 2 additions & 0 deletions libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/transport"
"github.com/elastic/elastic-agent-libs/transport/tlscommon"
)
Expand All @@ -40,6 +41,7 @@ func makeLogstash(
beat beat.Info,
observer outputs.Observer,
cfg *conf.C,
eventsLoggerCfg logp.Config,
) (outputs.Group, error) {
lsConfig, err := readConfig(cfg, beat)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions libbeat/outputs/output_reg.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

var outputReg = map[string]Factory{}
Expand All @@ -32,7 +33,8 @@ type Factory func(
im IndexManager,
beat beat.Info,
stats Observer,
cfg *config.C) (Group, error)
cfg *config.C,
eventsLogger logp.Config) (Group, error)

// IndexManager provides additional index related services to the outputs.
type IndexManager interface {
Expand Down Expand Up @@ -81,6 +83,7 @@ func Load(
stats Observer,
name string,
config *config.C,
eventsLoggerCfg logp.Config,
) (Group, error) {
factory := FindFactory(name)
if factory == nil {
Expand All @@ -90,5 +93,5 @@ func Load(
if stats == nil {
stats = NewNilObserver()
}
return factory(im, info, stats, config)
return factory(im, info, stats, config, eventsLoggerCfg)
}
2 changes: 2 additions & 0 deletions libbeat/outputs/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/elastic/beats/v7/libbeat/outputs/codec"
"github.com/elastic/beats/v7/libbeat/outputs/outil"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/transport"
"github.com/elastic/elastic-agent-libs/transport/tlscommon"
)
Expand All @@ -51,6 +52,7 @@ func makeRedis(
beat beat.Info,
observer outputs.Observer,
cfg *config.C,
eventsLoggerCfg logp.Config,
) (outputs.Group, error) {

if !cfg.HasField("index") {
Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func makeShipper(
beat beat.Info,
observer outputs.Observer,
cfg *conf.C,
eventsLoggerCfg logp.Config,
) (outputs.Group, error) {

config := defaultConfig()
Expand Down
Loading

0 comments on commit 3185361

Please sign in to comment.