Skip to content

Commit

Permalink
[libbeat] Implement early event encoding for the Elasticsearch output (
Browse files Browse the repository at this point in the history
…#38572)

Add early-encoding support to the queue and the Elasticsearch output.

Early event encoding lets outputs provide helpers that can perform output serialization on an event while it is still in the queue. This early encoding is done in the memory queue's producers, and in the disk queue's reader loop. Benchmarks while writing to Elasticsearch showed significant improvements (reported numbers were measured on Filebeat with a filestream input).

Memory reduction in each preset relative to past versions:

| Preset     | `main` | 8.13.2 | 8.12.2 |
| ---------- | ------ | ------ | ------ |
| balanced   | 21%    | 31%    | 41%    |
| throughput | 43%    | 47%    | 56%    |
| scale      | 23%    | 35%    | 46%    |
| latency    | 24%    | 24%    | 41%    |

CPU reduction for each preset relative to `main` (earlier versions had negligible difference):

| Preset     | CPU reduction |
| ---------- | ------------- |
| balanced   | 7%            |
| throughput | 19%           |
| scale      | 7%            |
| latency    | 9%            |
  • Loading branch information
faec authored Apr 10, 2024
1 parent f619305 commit c9e768d
Show file tree
Hide file tree
Showing 39 changed files with 678 additions and 297 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Raise up logging level to warning when attempting to configure beats with unknown fields from autodiscovered events/environments
- elasticsearch output now supports `idle_connection_timeout`. {issue}35616[35615] {pull}36843[36843]
- Update to Go 1.21.9. {pulk}38727[38727]
- Enable early event encoding in the Elasticsearch output, improving cpu and memory use {pull}38572[38572]

*Auditbeat*

Expand Down
11 changes: 11 additions & 0 deletions libbeat/esleg/eslegclient/enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,22 @@ func (b *jsonEncoder) Marshal(obj interface{}) error {
return b.AddRaw(obj)
}

// RawEncoding is used to wrap objects that have already been json-encoded,
// so the encoder knows to append them directly instead of treating them
// like a string.
type RawEncoding struct {
Encoding []byte
}

func (b *jsonEncoder) AddRaw(obj interface{}) error {
var err error
switch v := obj.(type) {
case beat.Event:
err = b.folder.Fold(event{Timestamp: v.Timestamp, Fields: v.Fields})
case *beat.Event:
err = b.folder.Fold(event{Timestamp: v.Timestamp, Fields: v.Fields})
case RawEncoding:
_, err = b.buf.Write(v.Encoding)
default:
err = b.folder.Fold(obj)
}
Expand Down Expand Up @@ -199,6 +208,8 @@ func (g *gzipEncoder) AddRaw(obj interface{}) error {
err = g.folder.Fold(event{Timestamp: v.Timestamp, Fields: v.Fields})
case *beat.Event:
err = g.folder.Fold(event{Timestamp: v.Timestamp, Fields: v.Fields})
case RawEncoding:
_, err = g.gzip.Write(v.Encoding)
default:
err = g.folder.Fold(obj)
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func makeConsole(
}
}

return outputs.Success(config.Queue, config.BatchSize, 0, c)
return outputs.Success(config.Queue, config.BatchSize, 0, nil, c)
}

func newConsole(index string, observer outputs.Observer, codec codec.Codec) (*console, error) {
Expand Down
96 changes: 43 additions & 53 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,78 +299,65 @@ func (client *Client) bulkEncodePublishRequest(version version.V, data []publish
okEvents := data[:0]
bulkItems := []interface{}{}
for i := range data {
event := &data[i].Content
if data[i].EncodedEvent == nil {
client.log.Error("Elasticsearch output received unencoded publisher.Event")
continue
}
event := data[i].EncodedEvent.(*encodedEvent)
if event.err != nil {
// This means there was an error when encoding the event and it isn't
// ingestable, so report the error and continue.
client.log.Error(event.err)
continue
}
meta, err := client.createEventBulkMeta(version, event)
if err != nil {
client.log.Errorf("Failed to encode event meta data: %+v", err)
continue
}
if opType := events.GetOpType(*event); opType == events.OpTypeDelete {
if event.opType == events.OpTypeDelete {
// We don't include the event source in a bulk DELETE
bulkItems = append(bulkItems, meta)
} else {
bulkItems = append(bulkItems, meta, event)
// Wrap the encoded event in a RawEncoding so the Elasticsearch client
// knows not to re-encode it
bulkItems = append(bulkItems, meta, eslegclient.RawEncoding{Encoding: event.encoding})
}
okEvents = append(okEvents, data[i])
}
return okEvents, bulkItems
}

func (client *Client) createEventBulkMeta(version version.V, event *beat.Event) (interface{}, error) {
func (client *Client) createEventBulkMeta(version version.V, event *encodedEvent) (interface{}, error) {
eventType := ""
if version.Major < 7 {
eventType = defaultEventType
}

pipeline, err := client.getPipeline(event)
if err != nil {
err := fmt.Errorf("failed to select pipeline: %w", err)
return nil, err
}

index, err := client.getIndex(event)
if err != nil {
err := fmt.Errorf("failed to select event index: %w", err)
return nil, err
}

id, _ := events.GetMetaStringValue(*event, events.FieldMetaID)
opType := events.GetOpType(*event)

meta := eslegclient.BulkMeta{
Index: index,
Index: event.index,
DocType: eventType,
Pipeline: pipeline,
ID: id,
Pipeline: event.pipeline,
ID: event.id,
}

if opType == events.OpTypeDelete {
if id != "" {
if event.opType == events.OpTypeDelete {
if event.id != "" {
return eslegclient.BulkDeleteAction{Delete: meta}, nil
} else {
return nil, fmt.Errorf("%s %s requires _id", events.FieldMetaOpType, events.OpTypeDelete)
}
}
if id != "" || version.Major > 7 || (version.Major == 7 && version.Minor >= 5) {
if opType == events.OpTypeIndex {
if event.id != "" || version.Major > 7 || (version.Major == 7 && version.Minor >= 5) {
if event.opType == events.OpTypeIndex {
return eslegclient.BulkIndexAction{Index: meta}, nil
}
return eslegclient.BulkCreateAction{Create: meta}, nil
}
return eslegclient.BulkIndexAction{Index: meta}, nil
}

func (client *Client) getIndex(event *beat.Event) (string, error) {
// If this event has been dead-lettered, override its index
if event.Meta != nil {
if deadLetter, _ := event.Meta.HasKey(dead_letter_marker_field); deadLetter {
return client.deadLetterIndex, nil
}
}
return client.indexSelector.Select(event)
}

func (client *Client) getPipeline(event *beat.Event) (string, error) {
func getPipeline(event *beat.Event, defaultSelector *outil.Selector) (string, error) {
if event.Meta != nil {
pipeline, err := events.GetMetaStringValue(*event, events.FieldMetaPipeline)
if errors.Is(err, mapstr.ErrKeyNotFound) {
Expand All @@ -383,8 +370,8 @@ func (client *Client) getPipeline(event *beat.Event) (string, error) {
return strings.ToLower(pipeline), nil
}

if client.pipelineSelector != nil {
return client.pipelineSelector.Select(event)
if defaultSelector != nil {
return defaultSelector.Select(event)
}
return "", nil
}
Expand Down Expand Up @@ -427,27 +414,16 @@ func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, dat
stats.tooMany++
} else {
// hard failure, apply policy action
result, _ := data[i].Content.Meta.HasKey(dead_letter_marker_field)
if result {
encodedEvent := data[i].EncodedEvent.(*encodedEvent)
if encodedEvent.deadLetter {
stats.nonIndexable++
client.log.Errorf("Can't deliver to dead letter index event (status=%v). Enable debug logs to view the event and cause.", status)
client.log.Debugf("Can't deliver to dead letter index event %#v (status=%v): %s", data[i], status, msg)
// poison pill - this will clog the pipeline if the underlying failure is non transient.
} else if client.deadLetterIndex != "" {
client.log.Warnf("Cannot index event (status=%v), trying dead letter index. Enable debug logs to view the event and cause.", status)
client.log.Debugf("Cannot index event %#v (status=%v): %s, trying dead letter index", data[i], status, msg)
if data[i].Content.Meta == nil {
data[i].Content.Meta = mapstr.M{
dead_letter_marker_field: true,
}
} else {
data[i].Content.Meta[dead_letter_marker_field] = true
}
data[i].Content.Fields = mapstr.M{
"message": data[i].Content.Fields.String(),
"error.type": status,
"error.message": string(msg),
}
client.setDeadLetter(encodedEvent, status, string(msg))
} else { // drop
stats.nonIndexable++
client.log.Warnf("Cannot index event (status=%v): dropping event! Enable debug logs to view the event and cause.", status)
Expand All @@ -465,6 +441,20 @@ func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, dat
return failed, stats
}

func (client *Client) setDeadLetter(
encodedEvent *encodedEvent, errType int, errMsg string,
) {
encodedEvent.deadLetter = true
encodedEvent.index = client.deadLetterIndex
deadLetterReencoding := mapstr.M{
"@timestamp": encodedEvent.timestamp,
"message": string(encodedEvent.encoding),
"error.type": errType,
"error.message": errMsg,
}
encodedEvent.encoding = []byte(deadLetterReencoding.String())
}

func (client *Client) Connect() error {
return client.conn.Connect()
}
Expand Down
72 changes: 21 additions & 51 deletions libbeat/outputs/elasticsearch/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@ package elasticsearch
import (
"context"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"

Expand Down Expand Up @@ -85,15 +81,15 @@ func testPublishEvent(t *testing.T, index string, cfg map[string]interface{}) {
output, client := connectTestEsWithStats(t, cfg, index)

// drop old index preparing test
client.conn.Delete(index, "", "", nil)
_, _, _ = client.conn.Delete(index, "", "", nil)

batch := outest.NewBatch(beat.Event{
batch := encodeBatch(client, outest.NewBatch(beat.Event{
Timestamp: time.Now(),
Fields: mapstr.M{
"type": "libbeat",
"message": "Test message from libbeat",
},
})
}))

err := output.Publish(context.Background(), batch)
if err != nil {
Expand Down Expand Up @@ -131,15 +127,16 @@ func TestClientPublishEventWithPipeline(t *testing.T) {
"index": index,
"pipeline": "%{[pipeline]}",
})
client.conn.Delete(index, "", "", nil)
_, _, _ = client.conn.Delete(index, "", "", nil)

// Check version
if client.conn.GetVersion().Major < 5 {
t.Skip("Skipping tests as pipeline not available in <5.x releases")
}

publish := func(event beat.Event) {
err := output.Publish(context.Background(), outest.NewBatch(event))
batch := encodeBatch(client, outest.NewBatch(event))
err := output.Publish(context.Background(), batch)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -167,7 +164,7 @@ func TestClientPublishEventWithPipeline(t *testing.T) {
},
}

client.conn.DeletePipeline(pipeline, nil)
_, _, _ = client.conn.DeletePipeline(pipeline, nil)
_, resp, err := client.conn.CreatePipeline(pipeline, nil, pipelineBody)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -217,29 +214,30 @@ func TestClientBulkPublishEventsWithDeadletterIndex(t *testing.T) {
},
},
})
client.conn.Delete(index, "", "", nil)
client.conn.Delete(deadletterIndex, "", "", nil)
_, _, _ = client.conn.Delete(index, "", "", nil)
_, _, _ = client.conn.Delete(deadletterIndex, "", "", nil)

err := output.Publish(context.Background(), outest.NewBatch(beat.Event{
batch := encodeBatch(client, outest.NewBatch(beat.Event{
Timestamp: time.Now(),
Fields: mapstr.M{
"type": "libbeat",
"message": "Test message 1",
"testfield": 0,
},
}))
err := output.Publish(context.Background(), batch)
if err != nil {
t.Fatal(err)
}

batch := outest.NewBatch(beat.Event{
batch = encodeBatch(client, outest.NewBatch(beat.Event{
Timestamp: time.Now(),
Fields: mapstr.M{
"type": "libbeat",
"message": "Test message 2",
"testfield": "foo0",
},
})
}))
err = output.Publish(context.Background(), batch)
if err == nil {
t.Fatal("Expecting mapping conflict")
Expand Down Expand Up @@ -277,14 +275,15 @@ func TestClientBulkPublishEventsWithPipeline(t *testing.T) {
"index": index,
"pipeline": "%{[pipeline]}",
})
client.conn.Delete(index, "", "", nil)
_, _, _ = client.conn.Delete(index, "", "", nil)

if client.conn.GetVersion().Major < 5 {
t.Skip("Skipping tests as pipeline not available in <5.x releases")
}

publish := func(events ...beat.Event) {
err := output.Publish(context.Background(), outest.NewBatch(events...))
batch := encodeBatch(client, outest.NewBatch(events...))
err := output.Publish(context.Background(), batch)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -312,7 +311,7 @@ func TestClientBulkPublishEventsWithPipeline(t *testing.T) {
},
}

client.conn.DeletePipeline(pipeline, nil)
_, _, _ = client.conn.DeletePipeline(pipeline, nil)
_, resp, err := client.conn.CreatePipeline(pipeline, nil, pipelineBody)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -354,14 +353,14 @@ func TestClientPublishTracer(t *testing.T) {
"index": index,
})

client.conn.Delete(index, "", "", nil)
_, _, _ = client.conn.Delete(index, "", "", nil)

batch := outest.NewBatch(beat.Event{
batch := encodeBatch(client, outest.NewBatch(beat.Event{
Timestamp: time.Now(),
Fields: mapstr.M{
"message": "Hello world",
},
})
}))

tx, spans, _ := apmtest.WithTransaction(func(ctx context.Context) {
err := output.Publish(ctx, batch)
Expand Down Expand Up @@ -434,7 +433,7 @@ func connectTestEs(t *testing.T, cfg interface{}, stats outputs.Observer) (outpu
client := randomClient(output).(clientWrap).Client().(*Client)

// Load version number
client.Connect()
_ = client.Connect()

return client, client
}
Expand Down Expand Up @@ -475,32 +474,3 @@ func randomClient(grp outputs.Group) outputs.NetworkClient {
client := grp.Clients[rand.Intn(L)]
return client.(outputs.NetworkClient)
}

// startTestProxy starts a proxy that redirects all connections to the specified URL
func startTestProxy(t *testing.T, redirectURL string) *httptest.Server {
t.Helper()

realURL, err := url.Parse(redirectURL)
require.NoError(t, err)

proxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
req := r.Clone(context.Background())
req.RequestURI = ""
req.URL.Scheme = realURL.Scheme
req.URL.Host = realURL.Host

resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)

for _, header := range []string{"Content-Encoding", "Content-Type"} {
w.Header().Set(header, resp.Header.Get(header))
}
w.WriteHeader(resp.StatusCode)
w.Write(body)
}))
return proxy
}
Loading

0 comments on commit c9e768d

Please sign in to comment.