Skip to content

Commit

Permalink
Fix ES and use events logger in all outputs
Browse files Browse the repository at this point in the history
  • Loading branch information
belimawr committed Dec 22, 2023
1 parent f01e703 commit 7efeff1
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 50 deletions.
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func makeES(
log := logp.NewLogger(logSelector)
eventsLogger := logp.NewLogger(logSelector)
// Set a new Output so it writes to a different file than `log`
eventsLogger = log.WithOptions(zap.WrapCore(logp.WithFileOutput(eventsLoggerCfg)))
eventsLogger = eventsLogger.WithOptions(zap.WrapCore(logp.WithFileOutput(eventsLoggerCfg)))

if !cfg.HasField("bulk_max_size") {
if err := cfg.SetInt("bulk_max_size", -1, defaultBulkSize); err != nil {
Expand Down
28 changes: 18 additions & 10 deletions libbeat/outputs/fileout/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,21 @@ import (
c "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/file"
"github.com/elastic/elastic-agent-libs/logp"
"go.uber.org/zap"
)

func init() {
outputs.RegisterType("file", makeFileout)
}

type fileOutput struct {
log *logp.Logger
filePath string
beat beat.Info
observer outputs.Observer
rotator *file.Rotator
codec codec.Codec
log *logp.Logger
eventsLogger *logp.Logger
filePath string
beat beat.Info
observer outputs.Observer
rotator *file.Rotator
codec codec.Codec
}

// makeFileout instantiates a new file output instance.
Expand All @@ -60,10 +62,15 @@ func makeFileout(
// disable bulk support in publisher pipeline
_ = cfg.SetInt("bulk_max_size", -1, -1)

logSelector := "file"
eventsLogger := logp.NewLogger(logSelector)
// Set a new Output so it writes to a different file than `log`
eventsLogger = eventsLogger.WithOptions(zap.WrapCore(logp.WithFileOutput(eventsLoggerCfg)))
fo := &fileOutput{
log: logp.NewLogger("file"),
beat: beat,
observer: observer,
log: logp.NewLogger(logSelector),
eventsLogger: eventsLogger,
beat: beat,
observer: observer,
}
if err := fo.init(beat, foConfig); err != nil {
return outputs.Fail(err)
Expand Down Expand Up @@ -130,7 +137,8 @@ func (out *fileOutput) Publish(_ context.Context, batch publisher.Batch) error {
} else {
out.log.Warnf("Failed to serialize the event: %+v", err)
}
out.log.Debugf("Failed event: %v", event)
out.log.Debug("Event logged to events-data log file")
out.eventsLogger.Debugf("Failed event: %v", event)

dropped++
continue
Expand Down
49 changes: 29 additions & 20 deletions libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/Shopify/sarama"
"github.com/eapache/go-resiliency/breaker"
"go.uber.org/zap"

"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/outputs"
Expand All @@ -40,16 +41,17 @@ import (
)

type client struct {
log *logp.Logger
observer outputs.Observer
hosts []string
topic outil.Selector
key *fmtstr.EventFormatString
index string
codec codec.Codec
config sarama.Config
mux sync.Mutex
done chan struct{}
log *logp.Logger
eventsLogger *logp.Logger
observer outputs.Observer
hosts []string
topic outil.Selector
key *fmtstr.EventFormatString
index string
codec codec.Codec
config sarama.Config
mux sync.Mutex
done chan struct{}

producer sarama.AsyncProducer

Expand Down Expand Up @@ -81,17 +83,23 @@ func newKafkaClient(
headers []header,
writer codec.Codec,
cfg *sarama.Config,
eventsLoggerCfg logp.Config,
) (*client, error) {
eventsLogger := logp.NewLogger(logSelector)
// Set a new Output so it writes to a different file than `log`
eventsLogger = eventsLogger.WithOptions(zap.WrapCore(logp.WithFileOutput(eventsLoggerCfg)))

c := &client{
log: logp.NewLogger(logSelector),
observer: observer,
hosts: hosts,
topic: topic,
key: key,
index: strings.ToLower(index),
codec: writer,
config: *cfg,
done: make(chan struct{}),
log: logp.NewLogger(logSelector),
eventsLogger: eventsLogger,
observer: observer,
hosts: hosts,
topic: topic,
key: key,
index: strings.ToLower(index),
codec: writer,
config: *cfg,
done: make(chan struct{}),
}

if len(headers) != 0 {
Expand Down Expand Up @@ -228,7 +236,8 @@ func (c *client) getEventMessage(data *publisher.Event) (*message, error) {
serializedEvent, err := c.codec.Encode(c.index, event)
if err != nil {
if c.log.IsDebug() {
c.log.Debugf("failed event: %v", event)
c.eventsLogger.Debugf("failed event: %v", event)
c.log.Debug("failed event logged to events logger file")
}
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func makeKafka(
return outputs.Fail(err)
}

client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, kConfig.Key, topic, kConfig.Headers, codec, libCfg)
client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, kConfig.Key, topic, kConfig.Headers, codec, libCfg, eventsLoggerCfg)
if err != nil {
return outputs.Fail(err)
}
Expand Down
44 changes: 27 additions & 17 deletions libbeat/outputs/redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/gomodule/redigo/redis"
"go.uber.org/zap"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs"
Expand All @@ -47,7 +48,8 @@ type publishFn func(
) ([]publisher.Event, error)

type client struct {
log *logp.Logger
log *logp.Logger
eventsLogger *logp.Logger
*transport.Client
observer outputs.Observer
index string
Expand All @@ -74,18 +76,25 @@ func newClient(
pass string,
db int, key outil.Selector, dt redisDataType,
index string, codec codec.Codec,
eventsLoggerCfg logp.Config,
) *client {
logSelector := "redis"
eventsLogger := logp.NewLogger(logSelector)
// Set a new Output so it writes to a different file than `log`
eventsLogger = eventsLogger.WithOptions(zap.WrapCore(logp.WithFileOutput(eventsLoggerCfg)))

return &client{
log: logp.NewLogger("redis"),
Client: tc,
observer: observer,
timeout: timeout,
password: pass,
index: strings.ToLower(index),
db: db,
dataType: dt,
key: key,
codec: codec,
log: logp.NewLogger(logSelector),
eventsLogger: eventsLogger,
Client: tc,
observer: observer,
timeout: timeout,
password: pass,
index: strings.ToLower(index),
db: db,
dataType: dt,
key: key,
codec: codec,
}
}

Expand Down Expand Up @@ -227,7 +236,7 @@ func (c *client) publishEventsBulk(conn redis.Conn, command string) publishFn {
args := make([]interface{}, 1, len(data)+1)
args[0] = dest

okEvents, args := serializeEvents(c.log, args, 1, data, c.index, c.codec)
okEvents, args := serializeEvents(c.log, c.eventsLogger, args, 1, data, c.index, c.codec)
c.observer.Dropped(len(data) - len(okEvents))
if (len(args) - 1) == 0 {
return nil, nil
Expand All @@ -250,7 +259,7 @@ func (c *client) publishEventsPipeline(conn redis.Conn, command string) publishF
return func(key outil.Selector, data []publisher.Event) ([]publisher.Event, error) {
var okEvents []publisher.Event
serialized := make([]interface{}, 0, len(data))
okEvents, serialized = serializeEvents(c.log, serialized, 0, data, c.index, c.codec)
okEvents, serialized = serializeEvents(c.log, c.eventsLogger, serialized, 0, data, c.index, c.codec)
c.observer.Dropped(len(data) - len(okEvents))
if len(serialized) == 0 {
return nil, nil
Expand Down Expand Up @@ -305,6 +314,7 @@ func (c *client) publishEventsPipeline(conn redis.Conn, command string) publishF

func serializeEvents(
log *logp.Logger,
eventsLogger *logp.Logger,
to []interface{},
i int,
data []publisher.Event,
Expand All @@ -316,8 +326,8 @@ func serializeEvents(
for _, d := range data {
serializedEvent, err := codec.Encode(index, &d.Content)
if err != nil {
log.Errorf("Encoding event failed with error: %+v", err)
log.Debugf("Failed event: %v", d.Content)
log.Errorf("Encoding event failed with error: %+v. Look for events-data log file to view the event", err)
eventsLogger.Debugf("Failed event: %v", d.Content)
goto failLoop
}

Expand All @@ -334,8 +344,8 @@ failLoop:
for _, d := range rest {
serializedEvent, err := codec.Encode(index, &d.Content)
if err != nil {
log.Errorf("Encoding event failed with error: %+v", err)
log.Debugf("Failed event: %v", d.Content)
log.Errorf("Encoding event failed with error: %+v. Look for events-data log file to view the event", err)
eventsLogger.Debugf("Failed event: %v", d.Content)
i++
continue
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func makeRedis(
}

client := newClient(conn, observer, rConfig.Timeout,
pass, rConfig.Db, key, dataType, rConfig.Index, enc)
pass, rConfig.Db, key, dataType, rConfig.Index, enc, eventsLoggerCfg)
clients[i] = newBackoffClient(client, rConfig.Backoff.Init, rConfig.Backoff.Max)
}

Expand Down

0 comments on commit 7efeff1

Please sign in to comment.