From c9e768d3e06ed557bd55157964a7503bc23f7deb Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 10 Apr 2024 13:04:52 -0400 Subject: [PATCH] [libbeat] Implement early event encoding for the Elasticsearch output (#38572) Add early-encoding support to the queue and the Elasticsearch output. Early event encoding lets outputs provide helpers that can perform output serialization on an event while it is still in the queue. This early encoding is done in the memory queue's producers, and in the disk queue's reader loop. Benchmarks while writing to Elasticsearch showed significant improvements (reported numbers were measured on Filebeat with a filestream input). Memory reduction in each preset relative to past versions: | Preset | `main` | 8.13.2 | 8.12.2 | | ---------- | ------ | ------ | ------ | | balanced | 21% | 31% | 41% | | throughput | 43% | 47% | 56% | | scale | 23% | 35% | 46% | | latency | 24% | 24% | 41% | CPU reduction for each preset relative to `main` (earlier versions had negligible difference): | Preset | CPU reduction | | ---------- | ------------- | | balanced | 7% | | throughput | 19% | | scale | 7% | | latency | 9% | --- CHANGELOG.next.asciidoc | 1 + libbeat/esleg/eslegclient/enc.go | 11 ++ libbeat/outputs/console/console.go | 2 +- libbeat/outputs/elasticsearch/client.go | 96 +++++----- .../elasticsearch/client_integration_test.go | 72 +++----- libbeat/outputs/elasticsearch/client_test.go | 168 +++++++++--------- .../elasticsearch/dead_letter_index.go | 5 +- .../outputs/elasticsearch/elasticsearch.go | 15 +- .../elasticsearch/elasticsearch_test.go | 14 +- .../outputs/elasticsearch/event_encoder.go | 138 ++++++++++++++ .../elasticsearch/event_encoder_test.go | 142 +++++++++++++++ libbeat/outputs/fileout/file.go | 2 +- libbeat/outputs/kafka/kafka.go | 2 +- libbeat/outputs/logstash/logstash.go | 2 +- .../logstash/logstash_integration_test.go | 36 +++- libbeat/outputs/output_reg.go | 16 ++ libbeat/outputs/redis/redis.go | 2 +- libbeat/outputs/util.go | 17 +- libbeat/publisher/event.go | 6 + libbeat/publisher/pipeline/client_test.go | 4 +- libbeat/publisher/pipeline/controller.go | 8 +- libbeat/publisher/pipeline/controller_test.go | 2 +- libbeat/publisher/pipeline/pipeline.go | 2 +- libbeat/publisher/pipeline/pipeline_test.go | 10 +- libbeat/publisher/pipeline/stress/out.go | 2 +- libbeat/publisher/pipeline/ttl_batch_test.go | 3 +- .../queue/diskqueue/benchmark_test.go | 4 +- libbeat/publisher/queue/diskqueue/consumer.go | 2 +- libbeat/publisher/queue/diskqueue/frames.go | 4 +- libbeat/publisher/queue/diskqueue/producer.go | 6 +- libbeat/publisher/queue/diskqueue/queue.go | 11 +- .../publisher/queue/diskqueue/queue_test.go | 4 +- .../publisher/queue/diskqueue/reader_loop.go | 22 ++- libbeat/publisher/queue/memqueue/broker.go | 25 ++- .../publisher/queue/memqueue/internal_api.go | 6 +- libbeat/publisher/queue/memqueue/produce.go | 28 ++- .../publisher/queue/memqueue/queue_test.go | 22 +-- .../publisher/queue/memqueue/runloop_test.go | 8 +- libbeat/publisher/queue/queue.go | 55 ++++-- 39 files changed, 678 insertions(+), 297 deletions(-) create mode 100644 libbeat/outputs/elasticsearch/event_encoder.go create mode 100644 libbeat/outputs/elasticsearch/event_encoder_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 56bf83702fb8..37fc00bd9262 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -141,6 +141,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Raise up logging level to warning when attempting to configure beats with unknown fields from autodiscovered events/environments - elasticsearch output now supports `idle_connection_timeout`. {issue}35616[35615] {pull}36843[36843] - Update to Go 1.21.9. {pulk}38727[38727] +- Enable early event encoding in the Elasticsearch output, improving cpu and memory use {pull}38572[38572] *Auditbeat* diff --git a/libbeat/esleg/eslegclient/enc.go b/libbeat/esleg/eslegclient/enc.go index 644a2b7d8cc5..27e409f9172e 100644 --- a/libbeat/esleg/eslegclient/enc.go +++ b/libbeat/esleg/eslegclient/enc.go @@ -109,6 +109,13 @@ func (b *jsonEncoder) Marshal(obj interface{}) error { return b.AddRaw(obj) } +// RawEncoding is used to wrap objects that have already been json-encoded, +// so the encoder knows to append them directly instead of treating them +// like a string. +type RawEncoding struct { + Encoding []byte +} + func (b *jsonEncoder) AddRaw(obj interface{}) error { var err error switch v := obj.(type) { @@ -116,6 +123,8 @@ func (b *jsonEncoder) AddRaw(obj interface{}) error { err = b.folder.Fold(event{Timestamp: v.Timestamp, Fields: v.Fields}) case *beat.Event: err = b.folder.Fold(event{Timestamp: v.Timestamp, Fields: v.Fields}) + case RawEncoding: + _, err = b.buf.Write(v.Encoding) default: err = b.folder.Fold(obj) } @@ -199,6 +208,8 @@ func (g *gzipEncoder) AddRaw(obj interface{}) error { err = g.folder.Fold(event{Timestamp: v.Timestamp, Fields: v.Fields}) case *beat.Event: err = g.folder.Fold(event{Timestamp: v.Timestamp, Fields: v.Fields}) + case RawEncoding: + _, err = g.gzip.Write(v.Encoding) default: err = g.folder.Fold(obj) } diff --git a/libbeat/outputs/console/console.go b/libbeat/outputs/console/console.go index b81bf3363486..f723bf818c91 100644 --- a/libbeat/outputs/console/console.go +++ b/libbeat/outputs/console/console.go @@ -85,7 +85,7 @@ func makeConsole( } } - return outputs.Success(config.Queue, config.BatchSize, 0, c) + return outputs.Success(config.Queue, config.BatchSize, 0, nil, c) } func newConsole(index string, observer outputs.Observer, codec codec.Codec) (*console, error) { diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 936bbea8ca4a..504aac710af3 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -299,60 +299,57 @@ func (client *Client) bulkEncodePublishRequest(version version.V, data []publish okEvents := data[:0] bulkItems := []interface{}{} for i := range data { - event := &data[i].Content + if data[i].EncodedEvent == nil { + client.log.Error("Elasticsearch output received unencoded publisher.Event") + continue + } + event := data[i].EncodedEvent.(*encodedEvent) + if event.err != nil { + // This means there was an error when encoding the event and it isn't + // ingestable, so report the error and continue. + client.log.Error(event.err) + continue + } meta, err := client.createEventBulkMeta(version, event) if err != nil { client.log.Errorf("Failed to encode event meta data: %+v", err) continue } - if opType := events.GetOpType(*event); opType == events.OpTypeDelete { + if event.opType == events.OpTypeDelete { // We don't include the event source in a bulk DELETE bulkItems = append(bulkItems, meta) } else { - bulkItems = append(bulkItems, meta, event) + // Wrap the encoded event in a RawEncoding so the Elasticsearch client + // knows not to re-encode it + bulkItems = append(bulkItems, meta, eslegclient.RawEncoding{Encoding: event.encoding}) } okEvents = append(okEvents, data[i]) } return okEvents, bulkItems } -func (client *Client) createEventBulkMeta(version version.V, event *beat.Event) (interface{}, error) { +func (client *Client) createEventBulkMeta(version version.V, event *encodedEvent) (interface{}, error) { eventType := "" if version.Major < 7 { eventType = defaultEventType } - pipeline, err := client.getPipeline(event) - if err != nil { - err := fmt.Errorf("failed to select pipeline: %w", err) - return nil, err - } - - index, err := client.getIndex(event) - if err != nil { - err := fmt.Errorf("failed to select event index: %w", err) - return nil, err - } - - id, _ := events.GetMetaStringValue(*event, events.FieldMetaID) - opType := events.GetOpType(*event) - meta := eslegclient.BulkMeta{ - Index: index, + Index: event.index, DocType: eventType, - Pipeline: pipeline, - ID: id, + Pipeline: event.pipeline, + ID: event.id, } - if opType == events.OpTypeDelete { - if id != "" { + if event.opType == events.OpTypeDelete { + if event.id != "" { return eslegclient.BulkDeleteAction{Delete: meta}, nil } else { return nil, fmt.Errorf("%s %s requires _id", events.FieldMetaOpType, events.OpTypeDelete) } } - if id != "" || version.Major > 7 || (version.Major == 7 && version.Minor >= 5) { - if opType == events.OpTypeIndex { + if event.id != "" || version.Major > 7 || (version.Major == 7 && version.Minor >= 5) { + if event.opType == events.OpTypeIndex { return eslegclient.BulkIndexAction{Index: meta}, nil } return eslegclient.BulkCreateAction{Create: meta}, nil @@ -360,17 +357,7 @@ func (client *Client) createEventBulkMeta(version version.V, event *beat.Event) return eslegclient.BulkIndexAction{Index: meta}, nil } -func (client *Client) getIndex(event *beat.Event) (string, error) { - // If this event has been dead-lettered, override its index - if event.Meta != nil { - if deadLetter, _ := event.Meta.HasKey(dead_letter_marker_field); deadLetter { - return client.deadLetterIndex, nil - } - } - return client.indexSelector.Select(event) -} - -func (client *Client) getPipeline(event *beat.Event) (string, error) { +func getPipeline(event *beat.Event, defaultSelector *outil.Selector) (string, error) { if event.Meta != nil { pipeline, err := events.GetMetaStringValue(*event, events.FieldMetaPipeline) if errors.Is(err, mapstr.ErrKeyNotFound) { @@ -383,8 +370,8 @@ func (client *Client) getPipeline(event *beat.Event) (string, error) { return strings.ToLower(pipeline), nil } - if client.pipelineSelector != nil { - return client.pipelineSelector.Select(event) + if defaultSelector != nil { + return defaultSelector.Select(event) } return "", nil } @@ -427,8 +414,8 @@ func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, dat stats.tooMany++ } else { // hard failure, apply policy action - result, _ := data[i].Content.Meta.HasKey(dead_letter_marker_field) - if result { + encodedEvent := data[i].EncodedEvent.(*encodedEvent) + if encodedEvent.deadLetter { stats.nonIndexable++ client.log.Errorf("Can't deliver to dead letter index event (status=%v). Enable debug logs to view the event and cause.", status) client.log.Debugf("Can't deliver to dead letter index event %#v (status=%v): %s", data[i], status, msg) @@ -436,18 +423,7 @@ func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, dat } else if client.deadLetterIndex != "" { client.log.Warnf("Cannot index event (status=%v), trying dead letter index. Enable debug logs to view the event and cause.", status) client.log.Debugf("Cannot index event %#v (status=%v): %s, trying dead letter index", data[i], status, msg) - if data[i].Content.Meta == nil { - data[i].Content.Meta = mapstr.M{ - dead_letter_marker_field: true, - } - } else { - data[i].Content.Meta[dead_letter_marker_field] = true - } - data[i].Content.Fields = mapstr.M{ - "message": data[i].Content.Fields.String(), - "error.type": status, - "error.message": string(msg), - } + client.setDeadLetter(encodedEvent, status, string(msg)) } else { // drop stats.nonIndexable++ client.log.Warnf("Cannot index event (status=%v): dropping event! Enable debug logs to view the event and cause.", status) @@ -465,6 +441,20 @@ func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, dat return failed, stats } +func (client *Client) setDeadLetter( + encodedEvent *encodedEvent, errType int, errMsg string, +) { + encodedEvent.deadLetter = true + encodedEvent.index = client.deadLetterIndex + deadLetterReencoding := mapstr.M{ + "@timestamp": encodedEvent.timestamp, + "message": string(encodedEvent.encoding), + "error.type": errType, + "error.message": errMsg, + } + encodedEvent.encoding = []byte(deadLetterReencoding.String()) +} + func (client *Client) Connect() error { return client.conn.Connect() } diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index 7a8a06beccaf..56567931ee4e 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -22,11 +22,7 @@ package elasticsearch import ( "context" "fmt" - "io/ioutil" "math/rand" - "net/http" - "net/http/httptest" - "net/url" "testing" "time" @@ -85,15 +81,15 @@ func testPublishEvent(t *testing.T, index string, cfg map[string]interface{}) { output, client := connectTestEsWithStats(t, cfg, index) // drop old index preparing test - client.conn.Delete(index, "", "", nil) + _, _, _ = client.conn.Delete(index, "", "", nil) - batch := outest.NewBatch(beat.Event{ + batch := encodeBatch(client, outest.NewBatch(beat.Event{ Timestamp: time.Now(), Fields: mapstr.M{ "type": "libbeat", "message": "Test message from libbeat", }, - }) + })) err := output.Publish(context.Background(), batch) if err != nil { @@ -131,7 +127,7 @@ func TestClientPublishEventWithPipeline(t *testing.T) { "index": index, "pipeline": "%{[pipeline]}", }) - client.conn.Delete(index, "", "", nil) + _, _, _ = client.conn.Delete(index, "", "", nil) // Check version if client.conn.GetVersion().Major < 5 { @@ -139,7 +135,8 @@ func TestClientPublishEventWithPipeline(t *testing.T) { } publish := func(event beat.Event) { - err := output.Publish(context.Background(), outest.NewBatch(event)) + batch := encodeBatch(client, outest.NewBatch(event)) + err := output.Publish(context.Background(), batch) if err != nil { t.Fatal(err) } @@ -167,7 +164,7 @@ func TestClientPublishEventWithPipeline(t *testing.T) { }, } - client.conn.DeletePipeline(pipeline, nil) + _, _, _ = client.conn.DeletePipeline(pipeline, nil) _, resp, err := client.conn.CreatePipeline(pipeline, nil, pipelineBody) if err != nil { t.Fatal(err) @@ -217,10 +214,10 @@ func TestClientBulkPublishEventsWithDeadletterIndex(t *testing.T) { }, }, }) - client.conn.Delete(index, "", "", nil) - client.conn.Delete(deadletterIndex, "", "", nil) + _, _, _ = client.conn.Delete(index, "", "", nil) + _, _, _ = client.conn.Delete(deadletterIndex, "", "", nil) - err := output.Publish(context.Background(), outest.NewBatch(beat.Event{ + batch := encodeBatch(client, outest.NewBatch(beat.Event{ Timestamp: time.Now(), Fields: mapstr.M{ "type": "libbeat", @@ -228,18 +225,19 @@ func TestClientBulkPublishEventsWithDeadletterIndex(t *testing.T) { "testfield": 0, }, })) + err := output.Publish(context.Background(), batch) if err != nil { t.Fatal(err) } - batch := outest.NewBatch(beat.Event{ + batch = encodeBatch(client, outest.NewBatch(beat.Event{ Timestamp: time.Now(), Fields: mapstr.M{ "type": "libbeat", "message": "Test message 2", "testfield": "foo0", }, - }) + })) err = output.Publish(context.Background(), batch) if err == nil { t.Fatal("Expecting mapping conflict") @@ -277,14 +275,15 @@ func TestClientBulkPublishEventsWithPipeline(t *testing.T) { "index": index, "pipeline": "%{[pipeline]}", }) - client.conn.Delete(index, "", "", nil) + _, _, _ = client.conn.Delete(index, "", "", nil) if client.conn.GetVersion().Major < 5 { t.Skip("Skipping tests as pipeline not available in <5.x releases") } publish := func(events ...beat.Event) { - err := output.Publish(context.Background(), outest.NewBatch(events...)) + batch := encodeBatch(client, outest.NewBatch(events...)) + err := output.Publish(context.Background(), batch) if err != nil { t.Fatal(err) } @@ -312,7 +311,7 @@ func TestClientBulkPublishEventsWithPipeline(t *testing.T) { }, } - client.conn.DeletePipeline(pipeline, nil) + _, _, _ = client.conn.DeletePipeline(pipeline, nil) _, resp, err := client.conn.CreatePipeline(pipeline, nil, pipelineBody) if err != nil { t.Fatal(err) @@ -354,14 +353,14 @@ func TestClientPublishTracer(t *testing.T) { "index": index, }) - client.conn.Delete(index, "", "", nil) + _, _, _ = client.conn.Delete(index, "", "", nil) - batch := outest.NewBatch(beat.Event{ + batch := encodeBatch(client, outest.NewBatch(beat.Event{ Timestamp: time.Now(), Fields: mapstr.M{ "message": "Hello world", }, - }) + })) tx, spans, _ := apmtest.WithTransaction(func(ctx context.Context) { err := output.Publish(ctx, batch) @@ -434,7 +433,7 @@ func connectTestEs(t *testing.T, cfg interface{}, stats outputs.Observer) (outpu client := randomClient(output).(clientWrap).Client().(*Client) // Load version number - client.Connect() + _ = client.Connect() return client, client } @@ -475,32 +474,3 @@ func randomClient(grp outputs.Group) outputs.NetworkClient { client := grp.Clients[rand.Intn(L)] return client.(outputs.NetworkClient) } - -// startTestProxy starts a proxy that redirects all connections to the specified URL -func startTestProxy(t *testing.T, redirectURL string) *httptest.Server { - t.Helper() - - realURL, err := url.Parse(redirectURL) - require.NoError(t, err) - - proxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - req := r.Clone(context.Background()) - req.RequestURI = "" - req.URL.Scheme = realURL.Scheme - req.URL.Host = realURL.Host - - resp, err := http.DefaultClient.Do(req) - require.NoError(t, err) - defer resp.Body.Close() - - body, err := ioutil.ReadAll(resp.Body) - require.NoError(t, err) - - for _, header := range []string{"Content-Encoding", "Content-Type"} { - w.Header().Set(header, resp.Header.Get(header)) - } - w.WriteHeader(resp.StatusCode) - w.Write(body) - })) - return proxy -} diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 12493e28d028..28033ff3cb24 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -20,7 +20,9 @@ package elasticsearch import ( + "bytes" "context" + "encoding/json" "fmt" "io" "net/http" @@ -50,12 +52,6 @@ import ( libversion "github.com/elastic/elastic-agent-libs/version" ) -type testIndexSelector struct{} - -func (testIndexSelector) Select(event *beat.Event) (string, error) { - return "test", nil -} - type batchMock struct { events []publisher.Event ack bool @@ -117,20 +113,20 @@ func TestPublish(t *testing.T) { client := makePublishTestClient(t, esMock.URL) // Try publishing a batch that can be split - batch := &batchMock{ + batch := encodeBatch(client, &batchMock{ events: []publisher.Event{event1}, canSplit: true, - } + }) err := client.Publish(ctx, batch) assert.NoError(t, err, "Publish should split the batch without error") assert.True(t, batch.didSplit, "batch should be split") // Try publishing a batch that cannot be split - batch = &batchMock{ + batch = encodeBatch(client, &batchMock{ events: []publisher.Event{event1}, canSplit: false, - } + }) err = client.Publish(ctx, batch) assert.NoError(t, err, "Publish should drop the batch without error") @@ -145,9 +141,9 @@ func TestPublish(t *testing.T) { defer esMock.Close() client := makePublishTestClient(t, esMock.URL) - batch := &batchMock{ + batch := encodeBatch(client, &batchMock{ events: []publisher.Event{event1, event2}, - } + }) err := client.Publish(ctx, batch) @@ -171,7 +167,7 @@ func TestPublish(t *testing.T) { // test results directly without atomics/mutexes. done := false retryCount := 0 - batch := pipeline.NewBatchForTesting( + batch := encodeBatch(client, pipeline.NewBatchForTesting( []publisher.Event{event1, event2, event3}, func(b publisher.Batch) { // The retry function sends the batch back through Publish. @@ -179,11 +175,13 @@ func TestPublish(t *testing.T) { // first and then back to Publish when an output worker was // available. retryCount++ + // We shouldn't need to re-encode the events since that was done + // before the initial Publish call err := client.Publish(ctx, b) assert.NoError(t, err, "Publish should return without error") }, func() { done = true }, - ) + )) err := client.Publish(ctx, batch) assert.NoError(t, err, "Publish should return without error") @@ -220,7 +218,7 @@ func TestPublish(t *testing.T) { // test results directly without atomics/mutexes. done := false retryCount := 0 - batch := pipeline.NewBatchForTesting( + batch := encodeBatch(client, pipeline.NewBatchForTesting( []publisher.Event{event1, event2, event3}, func(b publisher.Batch) { // The retry function sends the batch back through Publish. @@ -232,7 +230,7 @@ func TestPublish(t *testing.T) { assert.NoError(t, err, "Publish should return without error") }, func() { done = true }, - ) + )) err := client.Publish(ctx, batch) assert.NoError(t, err, "Publish should return without error") @@ -308,24 +306,25 @@ func TestCollectPublishFailDeadLetterQueue(t *testing.T) { ) assert.NoError(t, err) + parseError := `{ + "root_cause" : [ + { + "type" : "mapper_parsing_exception", + "reason" : "failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'" + } + ], + "type" : "mapper_parsing_exception", + "reason" : "failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'", + "caused_by" : { + "type" : "illegal_argument_exception", + "reason" : "For input string: \"bar1\"" + } + }` response := []byte(` { "items": [ {"create": {"status": 200}}, {"create": { - "error" : { - "root_cause" : [ - { - "type" : "mapper_parsing_exception", - "reason" : "failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'" - } - ], - "type" : "mapper_parsing_exception", - "reason" : "failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'", - "caused_by" : { - "type" : "illegal_argument_exception", - "reason" : "For input string: \"bar1\"" - } - }, + "error" : ` + parseError + `, "status" : 400 } }, @@ -334,24 +333,18 @@ func TestCollectPublishFailDeadLetterQueue(t *testing.T) { `) event := publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": 1}}} + event2 := publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": 2}}} eventFail := publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": "bar1"}}} - events := []publisher.Event{event, eventFail, event} + events := encodeEvents(client, []publisher.Event{event, eventFail, event2}) res, stats := client.bulkCollectPublishFails(response, events) assert.Equal(t, 1, len(res)) if len(res) == 1 { - expected := publisher.Event{ - Content: beat.Event{ - Fields: mapstr.M{ - "message": "{\"bar\":\"bar1\"}", - "error.type": 400, - "error.message": "{\n\t\t\t\"root_cause\" : [\n\t\t\t {\n\t\t\t\t\"type\" : \"mapper_parsing_exception\",\n\t\t\t\t\"reason\" : \"failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'\"\n\t\t\t }\n\t\t\t],\n\t\t\t\"type\" : \"mapper_parsing_exception\",\n\t\t\t\"reason\" : \"failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'\",\n\t\t\t\"caused_by\" : {\n\t\t\t \"type\" : \"illegal_argument_exception\",\n\t\t\t \"reason\" : \"For input string: \\\"bar1\\\"\"\n\t\t\t}\n\t\t }", - }, - Meta: mapstr.M{ - dead_letter_marker_field: true, - }, - }, - } + expected := encodeEvent(client, eventFail) + encodedEvent := expected.EncodedEvent.(*encodedEvent) + // Mark the encoded event with the expected error + client.setDeadLetter(encodedEvent, 400, parseError) + assert.Equal(t, expected, res[0]) } assert.Equal(t, bulkResultStats{acked: 2, fails: 1, nonIndexable: 0}, stats) @@ -394,7 +387,7 @@ func TestCollectPublishFailDrop(t *testing.T) { event := publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": 1}}} eventFail := publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": "bar1"}}} - events := []publisher.Event{event, eventFail, event} + events := encodeEvents(client, []publisher.Event{event, eventFail, event}) res, stats := client.bulkCollectPublishFails(response, events) assert.Equal(t, 0, len(res)) @@ -419,7 +412,7 @@ func TestCollectPublishFailAll(t *testing.T) { `) event := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 2}}} - events := []publisher.Event{event, event, event} + events := encodeEvents(client, []publisher.Event{event, event, event}) res, stats := client.bulkCollectPublishFails(response, events) assert.Equal(t, 3, len(res)) @@ -468,7 +461,7 @@ func TestCollectPipelinePublishFail(t *testing.T) { }`) event := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 2}}} - events := []publisher.Event{event} + events := encodeEvents(client, []publisher.Event{event}) res, _ := client.bulkCollectPublishFails(response, events) assert.Equal(t, 1, len(res)) @@ -494,7 +487,7 @@ func BenchmarkCollectPublishFailsNone(b *testing.B) { `) event := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 1}}} - events := []publisher.Event{event, event, event} + events := encodeEvents(client, []publisher.Event{event, event, event}) for i := 0; i < b.N; i++ { res, _ := client.bulkCollectPublishFails(response, events) @@ -523,7 +516,7 @@ func BenchmarkCollectPublishFailMiddle(b *testing.B) { event := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 1}}} eventFail := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 2}}} - events := []publisher.Event{event, eventFail, event} + events := encodeEvents(client, []publisher.Event{event, eventFail, event}) for i := 0; i < b.N; i++ { res, _ := client.bulkCollectPublishFails(response, events) @@ -551,7 +544,7 @@ func BenchmarkCollectPublishFailAll(b *testing.B) { `) event := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 2}}} - events := []publisher.Event{event, event, event} + events := encodeEvents(client, []publisher.Event{event, event, event}) for i := 0; i < b.N; i++ { res, _ := client.bulkCollectPublishFails(response, events) @@ -608,7 +601,7 @@ func TestClientWithHeaders(t *testing.T) { "message": "Test message from libbeat", }} - batch := outest.NewBatch(event, event, event) + batch := encodeBatch(client, outest.NewBatch(event, event, event)) err = client.Publish(context.Background(), batch) assert.NoError(t, err) assert.Equal(t, 2, requestCount) @@ -650,16 +643,6 @@ func TestBulkEncodeEvents(t *testing.T) { index, pipeline, err := buildSelectors(im, info, cfg) require.NoError(t, err) - events := make([]publisher.Event, len(test.events)) - for i, fields := range test.events { - events[i] = publisher.Event{ - Content: beat.Event{ - Timestamp: time.Now(), - Fields: fields, - }, - } - } - client, err := NewClient( clientSettings{ observer: outputs.NewNilObserver(), @@ -670,6 +653,17 @@ func TestBulkEncodeEvents(t *testing.T) { ) assert.NoError(t, err) + events := make([]publisher.Event, len(test.events)) + for i, fields := range test.events { + events[i] = publisher.Event{ + Content: beat.Event{ + Timestamp: time.Now(), + Fields: fields, + }, + } + } + encodeEvents(client, events) + encoded, bulkItems := client.bulkEncodePublishRequest(*libversion.MustNew(test.version), events) assert.Equal(t, len(events), len(encoded), "all events should have been encoded") assert.Equal(t, 2*len(events), len(bulkItems), "incomplete bulk") @@ -717,6 +711,15 @@ func TestBulkEncodeEventsWithOpType(t *testing.T) { index, pipeline, err := buildSelectors(im, info, cfg) require.NoError(t, err) + client, _ := NewClient( + clientSettings{ + observer: outputs.NewNilObserver(), + indexSelector: index, + pipelineSelector: pipeline, + }, + nil, + ) + events := make([]publisher.Event, len(cases)) for i, fields := range cases { meta := mapstr.M{ @@ -735,15 +738,7 @@ func TestBulkEncodeEventsWithOpType(t *testing.T) { }, } } - - client, _ := NewClient( - clientSettings{ - observer: outputs.NewNilObserver(), - indexSelector: index, - pipelineSelector: pipeline, - }, - nil, - ) + encodeEvents(client, events) encoded, bulkItems := client.bulkEncodePublishRequest(*libversion.MustNew(version.GetDefaultVersion()), events) require.Equal(t, len(events)-1, len(encoded), "all events should have been encoded") @@ -841,7 +836,7 @@ func TestPublishEventsWithBulkFiltering(t *testing.T) { client := makePublishTestClient(t, esMock.URL, nil) // Try publishing a batch that can be split - events := []publisher.Event{event1} + events := encodeEvents(client, []publisher.Event{event1}) evt, err := client.publishEvents(ctx, events) require.NoError(t, err) require.Equal(t, len(recParams), len(expectedFilteringParams)) @@ -872,7 +867,7 @@ func TestPublishEventsWithBulkFiltering(t *testing.T) { client := makePublishTestClient(t, esMock.URL, configParams) // Try publishing a batch that can be split - events := []publisher.Event{event1} + events := encodeEvents(client, []publisher.Event{event1}) evt, err := client.publishEvents(ctx, events) require.NoError(t, err) require.Equal(t, len(recParams), len(expectedFilteringParams)+len(configParams)) @@ -914,29 +909,36 @@ func TestPublishEventsWithBulkFiltering(t *testing.T) { client := makePublishTestClient(t, esMock.URL, nil) // Try publishing a batch that can be split - events := []publisher.Event{event1} + events := encodeEvents(client, []publisher.Event{event1}) _, err := client.publishEvents(ctx, events) require.NoError(t, err) require.Equal(t, len(recParams), 1) }) } -func TestGetIndex(t *testing.T) { +func TestSetDeadLetter(t *testing.T) { dead_letter_index := "dead_index" client := &Client{ deadLetterIndex: dead_letter_index, indexSelector: testIndexSelector{}, } - event := &beat.Event{ - Meta: make(map[string]interface{}), + e := &encodedEvent{ + index: "original_index", + } + errType := 123 + errStr := "test error string" + client.setDeadLetter(e, errType, errStr) + + assert.True(t, e.deadLetter, "setDeadLetter should set the event's deadLetter flag") + assert.Equal(t, dead_letter_index, e.index, "setDeadLetter should overwrite the event's original index") + + var errFields struct { + ErrType int `json:"error.type"` + ErrMessage string `json:"error.message"` } - index, err := client.getIndex(event) - require.NoError(t, err, "getIndex call must succeed") - assert.Equal(t, "test", index, "Event with no dead letter marker should use the client's index selector") - - event.Meta[dead_letter_marker_field] = true - index, err = client.getIndex(event) - require.NoError(t, err, "getIndex call must succeed") - assert.Equal(t, dead_letter_index, index, "Event with dead letter marker should use the client's dead letter index") + err := json.NewDecoder(bytes.NewReader(e.encoding)).Decode(&errFields) + require.NoError(t, err, "json decoding of encoded event should succeed") + assert.Equal(t, errType, errFields.ErrType, "encoded error.type should match value in setDeadLetter") + assert.Equal(t, errStr, errFields.ErrMessage, "encoded error.message should match value in setDeadLetter") } diff --git a/libbeat/outputs/elasticsearch/dead_letter_index.go b/libbeat/outputs/elasticsearch/dead_letter_index.go index d27882056919..435418314781 100644 --- a/libbeat/outputs/elasticsearch/dead_letter_index.go +++ b/libbeat/outputs/elasticsearch/dead_letter_index.go @@ -25,9 +25,8 @@ import ( ) const ( - dead_letter_marker_field = "deadlettered" - drop = "drop" - dead_letter_index = "dead_letter_index" + drop = "drop" + dead_letter_index = "dead_letter_index" ) func deadLetterIndexForConfig(config *config.C) (string, error) { diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index 616e08e08f67..9bc8498afe45 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -35,7 +35,7 @@ const logSelector = "elasticsearch" func makeES( im outputs.IndexManager, - beat beat.Info, + beatInfo beat.Info, observer outputs.Observer, cfg *config.C, ) (outputs.Group, error) { @@ -46,7 +46,7 @@ func makeES( } } - index, pipeline, err := buildSelectors(im, beat, cfg) + indexSelector, pipelineSelector, err := buildSelectors(im, beatInfo, cfg) if err != nil { return outputs.Fail(err) } @@ -94,6 +94,9 @@ func makeES( params = nil } + encoderFactory := newEventEncoderFactory( + esConfig.EscapeHTML, indexSelector, pipelineSelector) + clients := make([]outputs.NetworkClient, len(hosts)) for i, host := range hosts { esURL, err := common.MakeURL(esConfig.Protocol, esConfig.Path, host, 9200) @@ -106,7 +109,7 @@ func makeES( client, err = NewClient(clientSettings{ connection: eslegclient.ConnectionSettings{ URL: esURL, - Beatname: beat.Beat, + Beatname: beatInfo.Beat, Kerberos: esConfig.Kerberos, Username: esConfig.Username, Password: esConfig.Password, @@ -119,8 +122,8 @@ func makeES( Transport: esConfig.Transport, IdleConnTimeout: esConfig.Transport.IdleConnTimeout, }, - indexSelector: index, - pipelineSelector: pipeline, + indexSelector: indexSelector, + pipelineSelector: pipelineSelector, observer: observer, deadLetterIndex: deadLetterIndex, }, &connectCallbackRegistry) @@ -132,7 +135,7 @@ func makeES( clients[i] = client } - return outputs.SuccessNet(esConfig.Queue, esConfig.LoadBalance, esConfig.BulkMaxSize, esConfig.MaxRetries, clients) + return outputs.SuccessNet(esConfig.Queue, esConfig.LoadBalance, esConfig.BulkMaxSize, esConfig.MaxRetries, encoderFactory, clients) } func buildSelectors( diff --git a/libbeat/outputs/elasticsearch/elasticsearch_test.go b/libbeat/outputs/elasticsearch/elasticsearch_test.go index 7950bdd63234..25902801cbb8 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch_test.go +++ b/libbeat/outputs/elasticsearch/elasticsearch_test.go @@ -20,7 +20,7 @@ package elasticsearch import ( "testing" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" @@ -120,21 +120,13 @@ func TestPipelineSelection(t *testing.T) { test := _test t.Run(name, func(t *testing.T) { selector, err := buildPipelineSelector(config.MustNewConfigFrom(test.cfg)) - assert.NoError(t, err) - - client, err := NewClient( - clientSettings{ - pipelineSelector: &selector, - }, - nil, - ) - assert.NoError(t, err) + require.NoError(t, err) if err != nil { t.Fatalf("Failed to parse configuration: %v", err) } - got, err := client.getPipeline(&test.event) + got, err := getPipeline(&test.event, &selector) if err != nil { t.Fatalf("Failed to create pipeline name: %v", err) } diff --git a/libbeat/outputs/elasticsearch/event_encoder.go b/libbeat/outputs/elasticsearch/event_encoder.go new file mode 100644 index 000000000000..0441695d53c7 --- /dev/null +++ b/libbeat/outputs/elasticsearch/event_encoder.go @@ -0,0 +1,138 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package elasticsearch + +import ( + "bytes" + "fmt" + "time" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/beat/events" + "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" + "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/beats/v7/libbeat/outputs/outil" + "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/beats/v7/libbeat/publisher/queue" +) + +type eventEncoder struct { + buf *bytes.Buffer + enc eslegclient.BodyEncoder + pipelineSelector *outil.Selector + indexSelector outputs.IndexSelector +} + +type encodedEvent struct { + // If err is set, the event couldn't be encoded, and other fields should + // not be relied on. + err error + + // If deadLetter is true, this event produced an ingestion error on a + // previous attempt, and is now being retried as a bare event with all + // contents included as a raw string in the "message" field. + deadLetter bool + + // timestamp is the timestamp from the source beat.Event. It's only used + // when reencoding for the dead letter index, so it isn't strictly needed + // but it avoids deserializing the encoded event to recover one field if + // there's an ingestion error. + timestamp time.Time + + id string + opType events.OpType + pipeline string + index string + encoding []byte +} + +func newEventEncoderFactory( + escapeHTML bool, + indexSelector outputs.IndexSelector, + pipelineSelector *outil.Selector, +) queue.EncoderFactory { + return func() queue.Encoder { + return newEventEncoder(escapeHTML, indexSelector, pipelineSelector) + } +} + +func newEventEncoder(escapeHTML bool, + indexSelector outputs.IndexSelector, + pipelineSelector *outil.Selector, +) queue.Encoder { + buf := bytes.NewBuffer(nil) + enc := eslegclient.NewJSONEncoder(buf, escapeHTML) + return &eventEncoder{ + buf: buf, + enc: enc, + pipelineSelector: pipelineSelector, + indexSelector: indexSelector, + } +} + +func (pe *eventEncoder) EncodeEntry(entry queue.Entry) (queue.Entry, int) { + e, ok := entry.(publisher.Event) + if !ok { + // Currently all queue entries are publisher.Events but let's be cautious. + return entry, 0 + } + + encodedEvent := pe.encodeRawEvent(&e.Content) + e.EncodedEvent = encodedEvent + e.Content = beat.Event{} + return e, len(encodedEvent.encoding) +} + +// Note: we can't early-encode the bulk metadata that goes with an event, +// because it depends on the upstream Elasticsearch version and thus requires +// a live client connection. However, benchmarks show that even for a known +// version, encoding the bulk metadata and the event together gives slightly +// worse performance, so there's no reason to try optimizing around this +// dependency. +func (pe *eventEncoder) encodeRawEvent(e *beat.Event) *encodedEvent { + opType := events.GetOpType(*e) + pipeline, err := getPipeline(e, pe.pipelineSelector) + if err != nil { + return &encodedEvent{err: fmt.Errorf("failed to select event pipeline: %w", err)} + } + var index string + if pe.indexSelector != nil { + index, err = pe.indexSelector.Select(e) + if err != nil { + return &encodedEvent{err: fmt.Errorf("failed to select event index: %w", err)} + } + } + + id, _ := events.GetMetaStringValue(*e, events.FieldMetaID) + + err = pe.enc.Marshal(e) + if err != nil { + return &encodedEvent{err: fmt.Errorf("failed to encode event for output: %w", err)} + } + bufBytes := pe.buf.Bytes() + bytes := make([]byte, len(bufBytes)) + copy(bytes, bufBytes) + return &encodedEvent{ + id: id, + timestamp: e.Timestamp, + opType: opType, + pipeline: pipeline, + index: index, + encoding: bytes, + } +} diff --git a/libbeat/outputs/elasticsearch/event_encoder_test.go b/libbeat/outputs/elasticsearch/event_encoder_test.go new file mode 100644 index 000000000000..a3aef08ca23c --- /dev/null +++ b/libbeat/outputs/elasticsearch/event_encoder_test.go @@ -0,0 +1,142 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package elasticsearch + +import ( + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/beat/events" + "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +type testIndexSelector struct{} + +func (testIndexSelector) Select(event *beat.Event) (string, error) { + return "test", nil +} + +func TestEncodeEntry(t *testing.T) { + indexSelector := testIndexSelector{} + + encoder := newEventEncoder(true, indexSelector, nil) + + timestamp := time.Date(1980, time.January, 1, 0, 0, 0, 0, time.UTC) + pubEvent := publisher.Event{ + Content: beat.Event{ + Timestamp: timestamp, + Fields: mapstr.M{ + "test_field": "test_value", + "number_field": 5, + "nested": mapstr.M{ + "nested_field": "nested_value", + }, + }, + Meta: mapstr.M{ + events.FieldMetaOpType: "create", + events.FieldMetaPipeline: "TEST_PIPELINE", + events.FieldMetaID: "test_id", + }, + }, + } + + encoded, encodedSize := encoder.EncodeEntry(pubEvent) + encPubEvent, ok := encoded.(publisher.Event) + + // Check the resulting publisher.Event + require.True(t, ok, "EncodeEntry must return a publisher.Event") + require.NotNil(t, encPubEvent.EncodedEvent, "EncodeEntry must set EncodedEvent") + assert.Nil(t, encPubEvent.Content.Fields, "EncodeEntry should clear event.Content") + + // Check the inner encodedEvent + encBeatEvent, ok := encPubEvent.EncodedEvent.(*encodedEvent) + require.True(t, ok, "EncodeEntry should set EncodedEvent to a *encodedEvent") + require.Equal(t, len(encBeatEvent.encoding), encodedSize, "Reported size should match encoded buffer") + + // Check event metadata + assert.Equal(t, "test_id", encBeatEvent.id, "Event id should match original metadata") + assert.Equal(t, "test", encBeatEvent.index, "Event should have the index set by its selector") + assert.Equal(t, "test_pipeline", encBeatEvent.pipeline, "Event pipeline should match original metadata") + assert.Equal(t, timestamp, encBeatEvent.timestamp, "encodedEvent.timestamp should match the original event") + assert.Equal(t, events.OpTypeCreate, encBeatEvent.opType, "encoded opType should match the original metadata") + assert.False(t, encBeatEvent.deadLetter, "encoded event shouldn't have deadLetter flag set") + + // Check encoded fields + var eventContent struct { + Timestamp time.Time `json:"@timestamp"` + TestField string `json:"test_field"` + NumberField int `json:"number_field"` + Nested struct { + NestedField string `json:"nested_field"` + } `json:"nested"` + } + err := json.Unmarshal(encBeatEvent.encoding, &eventContent) + require.NoError(t, err, "encoding should contain valid json") + assert.Equal(t, timestamp, eventContent.Timestamp, "Encoded timestamp should match original") + assert.Equal(t, "test_value", eventContent.TestField, "Encoded field should match original") + assert.Equal(t, 5, eventContent.NumberField, "Encoded field should match original") + assert.Equal(t, "nested_value", eventContent.Nested.NestedField, "Encoded field should match original") +} + +// encodeBatch encodes a publisher.Batch so it can be provided to +// Client.Publish and other helpers. +// This modifies the batch in place, but also returns its input batch +// to allow for easy chaining while creating test batches. +func encodeBatch[B publisher.Batch](client *Client, batch B) B { + encodeEvents(client, batch.Events()) + return batch +} + +// A test helper to encode an event array for an Elasticsearch client. +// This isn't particularly efficient since it creates a new encoder object +// for every set of events, but it's much easier and the difference is +// negligible for any non-benchmark tests. +// This modifies the slice in place, but also returns its input slice +// to allow for easy chaining while creating test events. +func encodeEvents(client *Client, events []publisher.Event) []publisher.Event { + encoder := newEventEncoder( + client.conn.EscapeHTML, + client.indexSelector, + client.pipelineSelector, + ) + for i := range events { + // Skip encoding if there's already encoded data present + if events[i].EncodedEvent == nil { + encoded, _ := encoder.EncodeEntry(events[i]) + event := encoded.(publisher.Event) + events[i] = event + } + } + return events +} + +func encodeEvent(client *Client, event publisher.Event) publisher.Event { + encoder := newEventEncoder( + client.conn.EscapeHTML, + client.indexSelector, + client.pipelineSelector, + ) + encoded, _ := encoder.EncodeEntry(event) + return encoded.(publisher.Event) +} diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go index 34c57f29791f..d14bd99d69ad 100644 --- a/libbeat/outputs/fileout/file.go +++ b/libbeat/outputs/fileout/file.go @@ -66,7 +66,7 @@ func makeFileout( return outputs.Fail(err) } - return outputs.Success(foConfig.Queue, -1, 0, fo) + return outputs.Success(foConfig.Queue, -1, 0, nil, fo) } func (out *fileOutput) init(beat beat.Info, c fileOutConfig) error { diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index d004bd16ba32..cb23823a95a3 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -84,7 +84,7 @@ func makeKafka( if kConfig.MaxRetries < 0 { retry = -1 } - return outputs.Success(kConfig.Queue, kConfig.BulkMaxSize, retry, client) + return outputs.Success(kConfig.Queue, kConfig.BulkMaxSize, retry, nil, client) } // buildTopicSelector builds the topic selector for standalone Beat and when diff --git a/libbeat/outputs/logstash/logstash.go b/libbeat/outputs/logstash/logstash.go index 072ec049f6fb..c4c51ae54373 100644 --- a/libbeat/outputs/logstash/logstash.go +++ b/libbeat/outputs/logstash/logstash.go @@ -85,5 +85,5 @@ func makeLogstash( clients[i] = client } - return outputs.SuccessNet(lsConfig.Queue, lsConfig.LoadBalance, lsConfig.BulkMaxSize, lsConfig.MaxRetries, clients) + return outputs.SuccessNet(lsConfig.Queue, lsConfig.LoadBalance, lsConfig.BulkMaxSize, lsConfig.MaxRetries, nil, clients) } diff --git a/libbeat/outputs/logstash/logstash_integration_test.go b/libbeat/outputs/logstash/logstash_integration_test.go index 2cfbcd03974c..442145835dfd 100644 --- a/libbeat/outputs/logstash/logstash_integration_test.go +++ b/libbeat/outputs/logstash/logstash_integration_test.go @@ -38,6 +38,8 @@ import ( _ "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" "github.com/elastic/beats/v7/libbeat/outputs/outest" "github.com/elastic/beats/v7/libbeat/outputs/outil" + "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/beats/v7/libbeat/publisher/queue" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/transport/httpcommon" @@ -61,6 +63,7 @@ type esConnection struct { type testOutputer struct { outputs.NetworkClient *esConnection + encoder queue.Encoder } type esSource interface { @@ -161,7 +164,7 @@ func newTestLogstashOutput(t *testing.T, test string, tls bool) *testOutputer { index := testLogstashIndex(test) connection := esConnect(t, index) - return &testOutputer{output, connection} + return &testOutputer{output, connection, nil} } func newTestElasticsearchOutput(t *testing.T, test string) *testOutputer { @@ -201,6 +204,9 @@ func newTestElasticsearchOutput(t *testing.T, test string) *testOutputer { es := &testOutputer{} es.NetworkClient = grp.Clients[0].(outputs.NetworkClient) es.esConnection = connection + // The Elasticsearch output requires events to be encoded + // before calling Publish, so create an event encoder. + es.encoder = grp.EncoderFactory() es.Connect() return es @@ -552,12 +558,13 @@ func checkEvent(t *testing.T, ls, es map[string]interface{}) { } func (t *testOutputer) PublishEvent(event beat.Event) { - t.Publish(context.Background(), outest.NewBatch(event)) + batch := encodeBatch(t.encoder, outest.NewBatch(event)) + t.Publish(context.Background(), batch) } func (t *testOutputer) BulkPublish(events []beat.Event) bool { ok := false - batch := outest.NewBatch(events...) + batch := encodeBatch(t.encoder, outest.NewBatch(events...)) var wg sync.WaitGroup wg.Add(1) @@ -570,3 +577,26 @@ func (t *testOutputer) BulkPublish(events []beat.Event) bool { wg.Wait() return ok } + +// encodeBatch encodes a publisher.Batch so it can be provided to +// Client.Publish and other helpers. +// This modifies the batch in place, but also returns its input batch +// to allow for easy chaining while creating test batches. +func encodeBatch[B publisher.Batch](encoder queue.Encoder, batch B) B { + if encoder != nil { + encodeEvents(encoder, batch.Events()) + } + return batch +} + +func encodeEvents(encoder queue.Encoder, events []publisher.Event) []publisher.Event { + for i := range events { + // Skip encoding if there's already encoded data present + if events[i].EncodedEvent == nil { + encoded, _ := encoder.EncodeEntry(events[i]) + event := encoded.(publisher.Event) + events[i] = event + } + } + return events +} diff --git a/libbeat/outputs/output_reg.go b/libbeat/outputs/output_reg.go index 3d2675c2ce2e..fdd8e22a6634 100644 --- a/libbeat/outputs/output_reg.go +++ b/libbeat/outputs/output_reg.go @@ -59,6 +59,22 @@ type Group struct { BatchSize int Retry int QueueFactory queue.QueueFactory + + // If the output supports early encoding (where events are converted to their + // output-serialized form before entering the queue) it should provide an + // encoder factory here. Events will be processed using the resulting encoders + // before being returned from the queue. This can provide significant cpu and + // memory savings for outputs that support it. + // - Each encoder will be accessed from only one goroutine at a time. + // - Encoders should add the event's output-serialized form, along with any + // metadata needed to handle a Publish call, to the EncodedEvent field of + // the underlying publisher.Event. + // - Encoders should clear the Content field of the underlying publisher.Event + // so memory can be reclaimed for the unencoded version. + // - If there is a fatal error in encoding, provide a non-nil EncodedEvent + // and clear Content anyway. Metadata about the error should be saved in + // EncodedEvent and reported when Publish is called. + EncoderFactory queue.EncoderFactory } // RegisterType registers a new output type. diff --git a/libbeat/outputs/redis/redis.go b/libbeat/outputs/redis/redis.go index 9814d6abee7b..d0cba1e70618 100644 --- a/libbeat/outputs/redis/redis.go +++ b/libbeat/outputs/redis/redis.go @@ -165,7 +165,7 @@ func makeRedis( clients[i] = newBackoffClient(client, rConfig.Backoff.Init, rConfig.Backoff.Max) } - return outputs.SuccessNet(rConfig.Queue, rConfig.LoadBalance, rConfig.BulkMaxSize, rConfig.MaxRetries, clients) + return outputs.SuccessNet(rConfig.Queue, rConfig.LoadBalance, rConfig.BulkMaxSize, rConfig.MaxRetries, nil, clients) } func buildKeySelector(cfg *config.C) (outil.Selector, error) { diff --git a/libbeat/outputs/util.go b/libbeat/outputs/util.go index cab6b99aebef..8b3d96fcaa5f 100644 --- a/libbeat/outputs/util.go +++ b/libbeat/outputs/util.go @@ -35,7 +35,7 @@ func Fail(err error) (Group, error) { return Group{}, err } // instances. The first argument is expected to contain a queue // config.Namespace. The queue config is passed to assign the queue // factory when elastic-agent reloads the output. -func Success(cfg config.Namespace, batchSize, retry int, clients ...Client) (Group, error) { +func Success(cfg config.Namespace, batchSize, retry int, encoderFactory queue.EncoderFactory, clients ...Client) (Group, error) { var q queue.QueueFactory if cfg.IsSet() && cfg.Config().Enabled() { switch cfg.Name() { @@ -59,10 +59,11 @@ func Success(cfg config.Namespace, batchSize, retry int, clients ...Client) (Gro } } return Group{ - Clients: clients, - BatchSize: batchSize, - Retry: retry, - QueueFactory: q, + Clients: clients, + BatchSize: batchSize, + Retry: retry, + QueueFactory: q, + EncoderFactory: encoderFactory, }, nil } @@ -79,12 +80,12 @@ func NetworkClients(netclients []NetworkClient) []Client { // The first argument is expected to contain a queue config.Namespace. // The queue config is passed to assign the queue factory when // elastic-agent reloads the output. -func SuccessNet(cfg config.Namespace, loadbalance bool, batchSize, retry int, netclients []NetworkClient) (Group, error) { +func SuccessNet(cfg config.Namespace, loadbalance bool, batchSize, retry int, encoderFactory queue.EncoderFactory, netclients []NetworkClient) (Group, error) { if !loadbalance { - return Success(cfg, batchSize, retry, NewFailoverClient(netclients)) + return Success(cfg, batchSize, retry, encoderFactory, NewFailoverClient(netclients)) } clients := NetworkClients(netclients) - return Success(cfg, batchSize, retry, clients...) + return Success(cfg, batchSize, retry, encoderFactory, clients...) } diff --git a/libbeat/publisher/event.go b/libbeat/publisher/event.go index 83dbb22f7771..77ab6716f99f 100644 --- a/libbeat/publisher/event.go +++ b/libbeat/publisher/event.go @@ -68,6 +68,12 @@ type Event struct { Content beat.Event Flags EventFlags Cache EventCache + + // If the output provides an early encoder for incoming events, + // it should store the encoded form in EncodedEvent and clear Content + // to free the unencoded data. The updated event will be provided to + // output workers when calling Publish. + EncodedEvent interface{} } // EventFlags provides additional flags/option types for used with the outputs. diff --git a/libbeat/publisher/pipeline/client_test.go b/libbeat/publisher/pipeline/client_test.go index 015d8f70c9df..4ed45d25628b 100644 --- a/libbeat/publisher/pipeline/client_test.go +++ b/libbeat/publisher/pipeline/client_test.go @@ -131,7 +131,7 @@ func TestClient(t *testing.T) { Events: 5, MaxGetRequest: 1, FlushTimeout: time.Millisecond, - }, 5) + }, 5, nil) // model a processor that we're going to make produce errors after p := &testProcessor{} @@ -243,7 +243,7 @@ func TestClientWaitClose(t *testing.T) { } logp.TestingSetup() - q := memqueue.NewQueue(logp.L(), nil, memqueue.Settings{Events: 1}, 0) + q := memqueue.NewQueue(logp.L(), nil, memqueue.Settings{Events: 1}, 0, nil) pipeline := makePipeline(Settings{}, q) defer pipeline.Close() diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index 1c480c01bce2..bb75c9619c57 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -267,11 +267,11 @@ func (c *outputController) createQueueIfNeeded(outGrp outputs.Group) { factory = c.queueFactory } - queue, err := factory(logger, c.onACK, c.inputQueueSize) + queue, err := factory(logger, c.onACK, c.inputQueueSize, outGrp.EncoderFactory) if err != nil { logger.Errorf("queue creation failed, falling back to default memory queue, check your queue configuration") s, _ := memqueue.SettingsForUserConfig(nil) - queue = memqueue.NewQueue(logger, c.onACK, s, c.inputQueueSize) + queue = memqueue.NewQueue(logger, c.onACK, s, c.inputQueueSize, outGrp.EncoderFactory) } c.queue = queue @@ -295,11 +295,11 @@ func (c *outputController) createQueueIfNeeded(outGrp outputs.Group) { // a producer for a nonexistent queue. type emptyProducer struct{} -func (emptyProducer) Publish(_ interface{}) (queue.EntryID, bool) { +func (emptyProducer) Publish(_ queue.Entry) (queue.EntryID, bool) { return 0, false } -func (emptyProducer) TryPublish(_ interface{}) (queue.EntryID, bool) { +func (emptyProducer) TryPublish(_ queue.Entry) (queue.EntryID, bool) { return 0, false } diff --git a/libbeat/publisher/pipeline/controller_test.go b/libbeat/publisher/pipeline/controller_test.go index 7384e5f71287..6834af2c7f37 100644 --- a/libbeat/publisher/pipeline/controller_test.go +++ b/libbeat/publisher/pipeline/controller_test.go @@ -189,7 +189,7 @@ func TestOutputQueueFactoryTakesPrecedence(t *testing.T) { func TestFailedQueueFactoryRevertsToDefault(t *testing.T) { defaultSettings, _ := memqueue.SettingsForUserConfig(nil) - failedFactory := func(_ *logp.Logger, _ func(int), _ int) (queue.Queue, error) { + failedFactory := func(_ *logp.Logger, _ func(int), _ int, _ queue.EncoderFactory) (queue.Queue, error) { return nil, fmt.Errorf("This queue creation intentionally failed") } controller := outputController{ diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index cf03163750ee..37bf437395c2 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -255,7 +255,7 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { producerCfg := queue.ProducerConfig{} if client.eventWaitGroup != nil || cfg.ClientListener != nil { - producerCfg.OnDrop = func(event interface{}) { + producerCfg.OnDrop = func(event queue.Entry) { publisherEvent, _ := event.(publisher.Event) if cfg.ClientListener != nil { cfg.ClientListener.DroppedOnPublish(publisherEvent.Content) diff --git a/libbeat/publisher/pipeline/pipeline_test.go b/libbeat/publisher/pipeline/pipeline_test.go index 1278f5196ab3..5a236acc9c0a 100644 --- a/libbeat/publisher/pipeline/pipeline_test.go +++ b/libbeat/publisher/pipeline/pipeline_test.go @@ -32,7 +32,7 @@ type testQueue struct { } type testProducer struct { - publish func(try bool, event interface{}) (queue.EntryID, bool) + publish func(try bool, event queue.Entry) (queue.EntryID, bool) cancel func() int } @@ -72,14 +72,14 @@ func (q *testQueue) Get(sz int) (queue.Batch, error) { return nil, nil } -func (p *testProducer) Publish(event interface{}) (queue.EntryID, bool) { +func (p *testProducer) Publish(event queue.Entry) (queue.EntryID, bool) { if p.publish != nil { return p.publish(false, event) } return 0, false } -func (p *testProducer) TryPublish(event interface{}) (queue.EntryID, bool) { +func (p *testProducer) TryPublish(event queue.Entry) (queue.EntryID, bool) { if p.publish != nil { return p.publish(true, event) } @@ -118,7 +118,7 @@ func makeTestQueue() queue.Queue { var producer *testProducer p := blockingProducer(cfg) producer = &testProducer{ - publish: func(try bool, event interface{}) (queue.EntryID, bool) { + publish: func(try bool, event queue.Entry) (queue.EntryID, bool) { if try { return p.TryPublish(event) } @@ -150,7 +150,7 @@ func blockingProducer(_ queue.ProducerConfig) queue.Producer { waiting := atomic.MakeInt(0) return &testProducer{ - publish: func(_ bool, _ interface{}) (queue.EntryID, bool) { + publish: func(_ bool, _ queue.Entry) (queue.EntryID, bool) { waiting.Inc() <-sig return 0, false diff --git a/libbeat/publisher/pipeline/stress/out.go b/libbeat/publisher/pipeline/stress/out.go index d1014b8d782b..03ea06d3be86 100644 --- a/libbeat/publisher/pipeline/stress/out.go +++ b/libbeat/publisher/pipeline/stress/out.go @@ -67,7 +67,7 @@ func makeTestOutput(_ outputs.IndexManager, beat beat.Info, observer outputs.Obs clients[i] = client } - return outputs.Success(config.Queue, config.BulkMaxSize, config.Retry, clients...) + return outputs.Success(config.Queue, config.BulkMaxSize, config.Retry, nil, clients...) } func (*testOutput) Close() error { return nil } diff --git a/libbeat/publisher/pipeline/ttl_batch_test.go b/libbeat/publisher/pipeline/ttl_batch_test.go index 5e277d5042c4..4c5207acbb07 100644 --- a/libbeat/publisher/pipeline/ttl_batch_test.go +++ b/libbeat/publisher/pipeline/ttl_batch_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/beats/v7/libbeat/publisher/queue" ) func TestBatchSplitRetry(t *testing.T) { @@ -128,7 +129,7 @@ func (b *mockQueueBatch) Count() int { func (b *mockQueueBatch) Done() { } -func (b *mockQueueBatch) Entry(i int) interface{} { +func (b *mockQueueBatch) Entry(i int) queue.Entry { return fmt.Sprintf("event %v", i) } diff --git a/libbeat/publisher/queue/diskqueue/benchmark_test.go b/libbeat/publisher/queue/diskqueue/benchmark_test.go index 8bd2a23276c9..1ac91e57ce1d 100644 --- a/libbeat/publisher/queue/diskqueue/benchmark_test.go +++ b/libbeat/publisher/queue/diskqueue/benchmark_test.go @@ -100,7 +100,7 @@ func setup(b *testing.B, encrypt bool, compress bool, protobuf bool) (*diskQueue } s.UseCompression = compress s.UseProtobuf = protobuf - q, err := NewQueue(logp.L(), nil, s) + q, err := NewQueue(logp.L(), nil, s, nil) if err != nil { panic(err) } @@ -118,7 +118,7 @@ func setup(b *testing.B, encrypt bool, compress bool, protobuf bool) (*diskQueue func publishEvents(p queue.Producer, num int, protobuf bool) { for i := 0; i < num; i++ { - var e interface{} + var e queue.Entry if protobuf { e = makeMessagesEvent() } else { diff --git a/libbeat/publisher/queue/diskqueue/consumer.go b/libbeat/publisher/queue/diskqueue/consumer.go index 3515d0d2820b..0ebdcef5ad3a 100644 --- a/libbeat/publisher/queue/diskqueue/consumer.go +++ b/libbeat/publisher/queue/diskqueue/consumer.go @@ -86,7 +86,7 @@ func (batch *diskQueueBatch) Count() int { return len(batch.frames) } -func (batch *diskQueueBatch) Entry(i int) interface{} { +func (batch *diskQueueBatch) Entry(i int) queue.Entry { return batch.frames[i].event } diff --git a/libbeat/publisher/queue/diskqueue/frames.go b/libbeat/publisher/queue/diskqueue/frames.go index f0bd7d3b0b61..2043c5b649b8 100644 --- a/libbeat/publisher/queue/diskqueue/frames.go +++ b/libbeat/publisher/queue/diskqueue/frames.go @@ -17,6 +17,8 @@ package diskqueue +import "github.com/elastic/beats/v7/libbeat/publisher/queue" + // Every data frame read from the queue is assigned a unique sequential // integer, which is used to keep track of which frames have been // acknowledged. @@ -52,7 +54,7 @@ type readFrame struct { id frameID // The event decoded from the data frame. - event interface{} + event queue.Entry // How much space this frame occupied on disk (before deserialization), // including the frame header / footer. diff --git a/libbeat/publisher/queue/diskqueue/producer.go b/libbeat/publisher/queue/diskqueue/producer.go index 7471c2b47014..69725c62ccc1 100644 --- a/libbeat/publisher/queue/diskqueue/producer.go +++ b/libbeat/publisher/queue/diskqueue/producer.go @@ -49,16 +49,16 @@ type producerWriteRequest struct { // diskQueueProducer implementation of the queue.Producer interface // -func (producer *diskQueueProducer) Publish(event interface{}) (queue.EntryID, bool) { +func (producer *diskQueueProducer) Publish(event queue.Entry) (queue.EntryID, bool) { return 0, producer.publish(event, true) } -func (producer *diskQueueProducer) TryPublish(event interface{}) (queue.EntryID, bool) { +func (producer *diskQueueProducer) TryPublish(event queue.Entry) (queue.EntryID, bool) { return 0, producer.publish(event, false) } func (producer *diskQueueProducer) publish( - event interface{}, shouldBlock bool, + event queue.Entry, shouldBlock bool, ) bool { if producer.cancelled { return false diff --git a/libbeat/publisher/queue/diskqueue/queue.go b/libbeat/publisher/queue/diskqueue/queue.go index 74fff3fea647..5c04f9a03850 100644 --- a/libbeat/publisher/queue/diskqueue/queue.go +++ b/libbeat/publisher/queue/diskqueue/queue.go @@ -110,8 +110,9 @@ func FactoryForSettings(settings Settings) queue.QueueFactory { logger *logp.Logger, ackCallback func(eventCount int), inputQueueSize int, + encoderFactory queue.EncoderFactory, ) (queue.Queue, error) { - return NewQueue(logger, ackCallback, settings) + return NewQueue(logger, ackCallback, settings, encoderFactory) } } @@ -121,6 +122,7 @@ func NewQueue( logger *logp.Logger, writeToDiskCallback func(eventCount int), settings Settings, + encoderFactory queue.EncoderFactory, ) (*diskQueue, error) { logger = logger.Named("diskqueue") logger.Debugf( @@ -212,6 +214,11 @@ func NewQueue( activeFrameCount -= int(nextReadPosition.frameIndex) logger.Infof("Found %d existing events on queue start", activeFrameCount) + var encoder queue.Encoder + if encoderFactory != nil { + encoder = encoderFactory() + } + queue := &diskQueue{ logger: logger, settings: settings, @@ -225,7 +232,7 @@ func NewQueue( acks: newDiskQueueACKs(logger, nextReadPosition, positionFile), - readerLoop: newReaderLoop(settings), + readerLoop: newReaderLoop(settings, encoder), writerLoop: newWriterLoop(logger, writeToDiskCallback, settings), deleterLoop: newDeleterLoop(settings), diff --git a/libbeat/publisher/queue/diskqueue/queue_test.go b/libbeat/publisher/queue/diskqueue/queue_test.go index c0b780ffb38e..f6a4c406ed32 100644 --- a/libbeat/publisher/queue/diskqueue/queue_test.go +++ b/libbeat/publisher/queue/diskqueue/queue_test.go @@ -89,7 +89,7 @@ func TestMetrics(t *testing.T) { // lower max segment size so we can get multiple segments settings.MaxSegmentSize = 100 - testQueue, err := NewQueue(logp.L(), nil, settings) + testQueue, err := NewQueue(logp.L(), nil, settings, nil) require.NoError(t, err) defer testQueue.Close() @@ -124,7 +124,7 @@ func makeTestQueue() queuetest.QueueFactory { } settings := DefaultSettings() settings.Path = dir - queue, _ := NewQueue(logp.L(), nil, settings) + queue, _ := NewQueue(logp.L(), nil, settings, nil) return testQueue{ diskQueue: queue, teardown: func() { diff --git a/libbeat/publisher/queue/diskqueue/reader_loop.go b/libbeat/publisher/queue/diskqueue/reader_loop.go index 644e378f3010..0dae48732b3c 100644 --- a/libbeat/publisher/queue/diskqueue/reader_loop.go +++ b/libbeat/publisher/queue/diskqueue/reader_loop.go @@ -21,6 +21,8 @@ import ( "encoding/binary" "fmt" "io" + + "github.com/elastic/beats/v7/libbeat/publisher/queue" ) // startPosition and endPosition are absolute byte offsets into the segment @@ -67,16 +69,22 @@ type readerLoop struct { // The helper object to deserialize binary blobs from the queue into // publisher.Event objects that can be returned in a readFrame. decoder *eventDecoder + + // If set, this encoding helper is called on events after loading + // them from disk, to convert them to their final output serialization + // format. + outputEncoder queue.Encoder } -func newReaderLoop(settings Settings) *readerLoop { +func newReaderLoop(settings Settings, outputEncoder queue.Encoder) *readerLoop { return &readerLoop{ settings: settings, - requestChan: make(chan readerLoopRequest, 1), - responseChan: make(chan readerLoopResponse), - output: make(chan *readFrame, settings.ReadAheadLimit), - decoder: newEventDecoder(), + requestChan: make(chan readerLoopRequest, 1), + responseChan: make(chan readerLoopResponse), + output: make(chan *readFrame, settings.ReadAheadLimit), + decoder: newEventDecoder(), + outputEncoder: outputEncoder, } } @@ -124,6 +132,10 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon frame.segment = request.segment frame.id = nextFrameID nextFrameID++ + // If an output encoder is configured, apply it now + if rl.outputEncoder != nil { + frame.event, _ = rl.outputEncoder.EncodeEntry(frame.event) + } // We've read the frame, try sending it to the output channel. select { case rl.output <- frame: diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index 1455745961c7..23569f02150a 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -54,6 +54,9 @@ type broker struct { // wait group for queue workers (runLoop and ackLoop) wg sync.WaitGroup + // The factory used to create an event encoder when creating a producer + encoderFactory queue.EncoderFactory + /////////////////////////// // api channels @@ -113,7 +116,7 @@ type Settings struct { } type queueEntry struct { - event interface{} + event queue.Entry id queue.EntryID producer *ackProducer @@ -147,8 +150,9 @@ func FactoryForSettings(settings Settings) queue.QueueFactory { logger *logp.Logger, ackCallback func(eventCount int), inputQueueSize int, + encoderFactory queue.EncoderFactory, ) (queue.Queue, error) { - return NewQueue(logger, ackCallback, settings, inputQueueSize), nil + return NewQueue(logger, ackCallback, settings, inputQueueSize, encoderFactory), nil } } @@ -160,8 +164,9 @@ func NewQueue( ackCallback func(eventCount int), settings Settings, inputQueueSize int, + encoderFactory queue.EncoderFactory, ) *broker { - b := newQueue(logger, ackCallback, settings, inputQueueSize) + b := newQueue(logger, ackCallback, settings, inputQueueSize, encoderFactory) // Start the queue workers b.wg.Add(2) @@ -186,6 +191,7 @@ func newQueue( ackCallback func(eventCount int), settings Settings, inputQueueSize int, + encoderFactory queue.EncoderFactory, ) *broker { chanSize := AdjustInputQueueSize(inputQueueSize, settings.Events) @@ -213,6 +219,8 @@ func newQueue( buf: make([]queueEntry, settings.Events), + encoderFactory: encoderFactory, + // broker API channels pushChan: make(chan pushRequest, chanSize), getChan: make(chan getRequest), @@ -249,7 +257,14 @@ func (b *broker) BufferConfig() queue.BufferConfig { } func (b *broker) Producer(cfg queue.ProducerConfig) queue.Producer { - return newProducer(b, cfg.ACK, cfg.OnDrop, cfg.DropOnCancel) + // If we were given an encoder factory to allow producers to encode + // events for output before they entered the queue, then create an + // encoder for the new producer. + var encoder queue.Encoder + if b.encoderFactory != nil { + encoder = b.encoderFactory() + } + return newProducer(b, cfg.ACK, cfg.OnDrop, cfg.DropOnCancel, encoder) } func (b *broker) Get(count int) (queue.Batch, error) { @@ -398,7 +413,7 @@ func (b *batch) rawEntry(i int) *queueEntry { } // Return the event referenced by the i-th element of this batch -func (b *batch) Entry(i int) interface{} { +func (b *batch) Entry(i int) queue.Entry { return b.rawEntry(i).event } diff --git a/libbeat/publisher/queue/memqueue/internal_api.go b/libbeat/publisher/queue/memqueue/internal_api.go index ae93a5df0d52..95b5e0eba90f 100644 --- a/libbeat/publisher/queue/memqueue/internal_api.go +++ b/libbeat/publisher/queue/memqueue/internal_api.go @@ -22,7 +22,11 @@ import "github.com/elastic/beats/v7/libbeat/publisher/queue" // producer -> broker API type pushRequest struct { - event interface{} + event queue.Entry + + // The event's encoded size in bytes if the configured output supports + // early encoding, 0 otherwise. + eventSize int // The producer that generated this event, or nil if this producer does // not require ack callbacks. diff --git a/libbeat/publisher/queue/memqueue/produce.go b/libbeat/publisher/queue/memqueue/produce.go index 5ddea468e4c6..55f15a8cc869 100644 --- a/libbeat/publisher/queue/memqueue/produce.go +++ b/libbeat/publisher/queue/memqueue/produce.go @@ -40,6 +40,7 @@ type openState struct { done chan struct{} queueDone <-chan struct{} events chan pushRequest + encoder queue.Encoder } // producerID stores the order of events within a single producer, so multiple @@ -50,19 +51,20 @@ type producerID uint64 type produceState struct { cb ackHandler - dropCB func(interface{}) + dropCB func(queue.Entry) cancelled bool lastACK producerID } type ackHandler func(count int) -func newProducer(b *broker, cb ackHandler, dropCB func(interface{}), dropOnCancel bool) queue.Producer { +func newProducer(b *broker, cb ackHandler, dropCB func(queue.Entry), dropOnCancel bool, encoder queue.Encoder) queue.Producer { openState := openState{ log: b.logger, done: make(chan struct{}), queueDone: b.ctx.Done(), events: b.pushChan, + encoder: encoder, } if cb != nil { @@ -74,18 +76,18 @@ func newProducer(b *broker, cb ackHandler, dropCB func(interface{}), dropOnCance return &forgetfulProducer{broker: b, openState: openState} } -func (p *forgetfulProducer) makePushRequest(event interface{}) pushRequest { +func (p *forgetfulProducer) makePushRequest(event queue.Entry) pushRequest { resp := make(chan queue.EntryID, 1) return pushRequest{ event: event, resp: resp} } -func (p *forgetfulProducer) Publish(event interface{}) (queue.EntryID, bool) { +func (p *forgetfulProducer) Publish(event queue.Entry) (queue.EntryID, bool) { return p.openState.publish(p.makePushRequest(event)) } -func (p *forgetfulProducer) TryPublish(event interface{}) (queue.EntryID, bool) { +func (p *forgetfulProducer) TryPublish(event queue.Entry) (queue.EntryID, bool) { return p.openState.tryPublish(p.makePushRequest(event)) } @@ -94,7 +96,7 @@ func (p *forgetfulProducer) Cancel() int { return 0 } -func (p *ackProducer) makePushRequest(event interface{}) pushRequest { +func (p *ackProducer) makePushRequest(event queue.Entry) pushRequest { resp := make(chan queue.EntryID, 1) return pushRequest{ event: event, @@ -105,7 +107,7 @@ func (p *ackProducer) makePushRequest(event interface{}) pushRequest { resp: resp} } -func (p *ackProducer) Publish(event interface{}) (queue.EntryID, bool) { +func (p *ackProducer) Publish(event queue.Entry) (queue.EntryID, bool) { id, published := p.openState.publish(p.makePushRequest(event)) if published { p.producedCount++ @@ -113,7 +115,7 @@ func (p *ackProducer) Publish(event interface{}) (queue.EntryID, bool) { return id, published } -func (p *ackProducer) TryPublish(event interface{}) (queue.EntryID, bool) { +func (p *ackProducer) TryPublish(event queue.Entry) (queue.EntryID, bool) { id, published := p.openState.tryPublish(p.makePushRequest(event)) if published { p.producedCount++ @@ -143,6 +145,11 @@ func (st *openState) Close() { } func (st *openState) publish(req pushRequest) (queue.EntryID, bool) { + // If we were given an encoder callback for incoming events, apply it before + // sending the entry to the queue. + if st.encoder != nil { + req.event, req.eventSize = st.encoder.EncodeEntry(req.event) + } select { case st.events <- req: // The events channel is buffered, which means we may successfully @@ -166,6 +173,11 @@ func (st *openState) publish(req pushRequest) (queue.EntryID, bool) { } func (st *openState) tryPublish(req pushRequest) (queue.EntryID, bool) { + // If we were given an encoder callback for incoming events, apply it before + // sending the entry to the queue. + if st.encoder != nil { + req.event, req.eventSize = st.encoder.EncodeEntry(req.event) + } select { case st.events <- req: // The events channel is buffered, which means we may successfully diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 637e7ccd4fbb..df2d16d0dec7 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -92,12 +92,12 @@ func TestProducerDoesNotBlockWhenQueueClosed(t *testing.T) { Events: 2, // Queue size MaxGetRequest: 1, // make sure the queue won't buffer events FlushTimeout: time.Millisecond, - }, 0) + }, 0, nil) p := q.Producer(queue.ProducerConfig{ // We do not read from the queue, so the callbacks are never called ACK: func(count int) {}, - OnDrop: func(e interface{}) {}, + OnDrop: func(e queue.Entry) {}, DropOnCancel: false, }) @@ -164,13 +164,13 @@ func TestProducerClosePreservesEventCount(t *testing.T) { Events: 3, // Queue size MaxGetRequest: 2, FlushTimeout: 10 * time.Millisecond, - }, 1) + }, 1, nil) p := q.Producer(queue.ProducerConfig{ ACK: func(count int) { activeEvents.Add(-int64(count)) }, - OnDrop: func(e interface{}) { + OnDrop: func(e queue.Entry) { //activeEvents.Add(-1) }, DropOnCancel: false, @@ -263,7 +263,7 @@ func TestQueueMetricsBuffer(t *testing.T) { } func queueTestWithSettings(t *testing.T, settings Settings, eventsToTest int, testName string) { - testQueue := NewQueue(nil, nil, settings, 0) + testQueue := NewQueue(nil, nil, settings, 0, nil) defer testQueue.Close() // Send events to queue @@ -307,7 +307,7 @@ func makeTestQueue(sz, minEvents int, flushTimeout time.Duration) queuetest.Queu Events: sz, MaxGetRequest: minEvents, FlushTimeout: flushTimeout, - }, 0) + }, 0, nil) } } @@ -418,22 +418,22 @@ func TestEntryIDs(t *testing.T) { } t.Run("acking in forward order with directEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000}, 0) + testQueue := NewQueue(nil, nil, Settings{Events: 1000}, 0, nil) testForward(testQueue) }) t.Run("acking in reverse order with directEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000}, 0) + testQueue := NewQueue(nil, nil, Settings{Events: 1000}, 0, nil) testBackward(testQueue) }) t.Run("acking in forward order with bufferedEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000, MaxGetRequest: 2, FlushTimeout: time.Microsecond}, 0) + testQueue := NewQueue(nil, nil, Settings{Events: 1000, MaxGetRequest: 2, FlushTimeout: time.Microsecond}, 0, nil) testForward(testQueue) }) t.Run("acking in reverse order with bufferedEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000, MaxGetRequest: 2, FlushTimeout: time.Microsecond}, 0) + testQueue := NewQueue(nil, nil, Settings{Events: 1000, MaxGetRequest: 2, FlushTimeout: time.Microsecond}, 0, nil) testBackward(testQueue) }) } @@ -447,7 +447,7 @@ func TestBatchFreeEntries(t *testing.T) { // 4. Make sure only events 6-10 are nil // 5. Call FreeEntries on the first batch // 6. Make sure all events are nil - testQueue := NewQueue(nil, nil, Settings{Events: queueSize, MaxGetRequest: batchSize, FlushTimeout: time.Second}, 0) + testQueue := NewQueue(nil, nil, Settings{Events: queueSize, MaxGetRequest: batchSize, FlushTimeout: time.Second}, 0, nil) producer := testQueue.Producer(queue.ProducerConfig{}) for i := 0; i < queueSize; i++ { _, ok := producer.Publish(i) diff --git a/libbeat/publisher/queue/memqueue/runloop_test.go b/libbeat/publisher/queue/memqueue/runloop_test.go index 9b3a467647a4..d25537265ea3 100644 --- a/libbeat/publisher/queue/memqueue/runloop_test.go +++ b/libbeat/publisher/queue/memqueue/runloop_test.go @@ -42,9 +42,9 @@ func TestFlushSettingsDoNotBlockFullBatches(t *testing.T) { MaxGetRequest: 500, FlushTimeout: 10 * time.Second, }, - 10) + 10, nil) - producer := newProducer(broker, nil, nil, false) + producer := newProducer(broker, nil, nil, false, nil) rl := broker.runLoop for i := 0; i < 100; i++ { // Pair each publish call with an iteration of the run loop so we @@ -81,9 +81,9 @@ func TestFlushSettingsBlockPartialBatches(t *testing.T) { MaxGetRequest: 500, FlushTimeout: 10 * time.Second, }, - 10) + 10, nil) - producer := newProducer(broker, nil, nil, false) + producer := newProducer(broker, nil, nil, false, nil) rl := broker.runLoop for i := 0; i < 100; i++ { // Pair each publish call with an iteration of the run loop so we diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index 101a32901177..e691c2888f66 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -25,6 +25,12 @@ import ( "github.com/elastic/elastic-agent-libs/opt" ) +// Entry is a placeholder type for the objects contained by the queue, which +// can be anything (but right now is always a publisher.Event). We could just +// use interface{} everywhere but this makes the API's intentions clearer +// and reduces accidental type mismatches. +type Entry interface{} + // Metrics is a set of basic-user friendly metrics that report the current state of the queue. These metrics are meant to be relatively generic and high-level, and when reported directly, can be comprehensible to a user. type Metrics struct { //EventCount is the total events currently in the queue @@ -74,7 +80,14 @@ type Queue interface { Metrics() (Metrics, error) } -type QueueFactory func(logger *logp.Logger, ack func(eventCount int), inputQueueSize int) (Queue, error) +// If encoderFactory is provided, then the resulting queue must use it to +// encode queued events before returning them. +type QueueFactory func( + logger *logp.Logger, + ack func(eventCount int), + inputQueueSize int, + encoderFactory EncoderFactory, +) (Queue, error) // BufferConfig returns the pipelines buffering settings, // for the pipeline to use. @@ -98,7 +111,7 @@ type ProducerConfig struct { // the queue. Currently this can only happen when a Publish call is sent // to the memory queue's request channel but the producer is cancelled // before it reaches the queue buffer. - OnDrop func(interface{}) + OnDrop func(Entry) // DropOnCancel is a hint to the queue to drop events if the producer disconnects // via Cancel. @@ -110,35 +123,49 @@ type EntryID uint64 // Producer is an interface to be used by the pipelines client to forward // events to a queue. type Producer interface { - // Publish adds an event to the queue, blocking if necessary, and returns + // Publish adds an entry to the queue, blocking if necessary, and returns // the new entry's id and true on success. - Publish(event interface{}) (EntryID, bool) + Publish(entry Entry) (EntryID, bool) - // TryPublish adds an event to the queue if doing so will not block the + // TryPublish adds an entry to the queue if doing so will not block the // caller, otherwise it immediately returns. The reasons a publish attempt // might block are defined by the specific queue implementation and its // configuration. If the event was successfully added, returns true with // the event's assigned ID, and false otherwise. - TryPublish(event interface{}) (EntryID, bool) + TryPublish(entry Entry) (EntryID, bool) // Cancel closes this Producer endpoint. If the producer is configured to - // drop its events on Cancel, the number of dropped events is returned. + // drop its entries on Cancel, the number of dropped entries is returned. // Note: A queue may still send ACK signals even after Cancel is called on // the originating Producer. The pipeline client must accept and // discard these ACKs. Cancel() int } -// Batch of events to be returned to Consumers. The `Done` method will tell the -// queue that the batch has been consumed and its events can be discarded. +// Batch of entries (usually publisher.Event) to be returned to Consumers. +// The `Done` method will tell the queue that the batch has been consumed and +// its entries can be acknowledged and discarded. type Batch interface { Count() int - Entry(i int) interface{} - // Release the internal references to the contained events. + Entry(i int) Entry + // Release the internal references to the contained events, if + // supported (the disk queue does not yet implement it). // Count() and Entry() cannot be used after this call. - // This is only guaranteed to release references when using the - // proxy queue, where it is used to avoid keeping multiple copies - // of events that have already been queued by the shipper. FreeEntries() Done() } + +// Outputs can provide an EncoderFactory to enable early encoding, in which +// case the queue will run the given encoder on events before they reach +// consumers. +// Encoders are provided as factories so each worker goroutine can have its own +type EncoderFactory func() Encoder + +type Encoder interface { + // Return the encoded form of the entry that the output workers can use, + // and the in-memory size of the encoded buffer. + // EncodeEntry should return a valid Entry when given one, even if the + // encoding fails. In that case, the returned Entry should contain the + // metadata needed to report the error when the entry is consumed. + EncodeEntry(Entry) (Entry, int) +}