Skip to content

Commit

Permalink
Clean up non-indexable policy handling in the ES output (elastic#38591)
Browse files Browse the repository at this point in the history
Simplify non-indexable policy handling:
- Replace the lookup table of generic factories and custom index selectors with a simple `deadLetterIndex` string, since the only valid non-indexable policies are to use a dead letter index or to drop the event if none is provided.
- Make `elasticsearch.ClientSettings` and its fields internal rather than public, and clarify some field names. (Aside from better encapsulation, this makes it easier to see that this cleanup doesn't break anything else.)

This is a preparation for the pending early-encoding change. The non-indexable policy code needs to be rewritten to support early encoding, so I split most of the cleanups out into this standalone PR to be kinder to code reviewers. This PR doesn't change any user-visible behavior.
  • Loading branch information
faec authored and zeynepyz committed Apr 7, 2024
1 parent 36fb809 commit f6ce8cc
Show file tree
Hide file tree
Showing 9 changed files with 204 additions and 276 deletions.
91 changes: 45 additions & 46 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,28 @@ var (
type Client struct {
conn eslegclient.Connection

index outputs.IndexSelector
pipeline *outil.Selector
indexSelector outputs.IndexSelector
pipelineSelector *outil.Selector

observer outputs.Observer
NonIndexableAction string
observer outputs.Observer

// If deadLetterIndex is set, events with bulk-ingest errors will be
// forwarded to this index. Otherwise, they will be dropped.
deadLetterIndex string

log *logp.Logger
}

// ClientSettings contains the settings for a client.
type ClientSettings struct {
eslegclient.ConnectionSettings
Index outputs.IndexSelector
Pipeline *outil.Selector
Observer outputs.Observer
NonIndexableAction string
// clientSettings contains the settings for a client.
type clientSettings struct {
connection eslegclient.ConnectionSettings
indexSelector outputs.IndexSelector
pipelineSelector *outil.Selector
observer outputs.Observer

// If deadLetterIndex is set, events with bulk-ingest errors will be
// forwarded to this index. Otherwise, they will be dropped.
deadLetterIndex string
}

type bulkResultStats struct {
Expand All @@ -81,29 +87,15 @@ const (

// NewClient instantiates a new client.
func NewClient(
s ClientSettings,
s clientSettings,
onConnect *callbacksRegistry,
) (*Client, error) {
pipeline := s.Pipeline
pipeline := s.pipelineSelector
if pipeline != nil && pipeline.IsEmpty() {
pipeline = nil
}

conn, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{
URL: s.URL,
Beatname: s.Beatname,
Username: s.Username,
Password: s.Password,
APIKey: s.APIKey,
Headers: s.Headers,
Kerberos: s.Kerberos,
Observer: s.Observer,
Parameters: s.Parameters,
CompressionLevel: s.CompressionLevel,
EscapeHTML: s.EscapeHTML,
Transport: s.Transport,
IdleConnTimeout: s.IdleConnTimeout,
})
conn, err := eslegclient.NewConnection(s.connection)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -134,11 +126,11 @@ func NewClient(
}

client := &Client{
conn: *conn,
index: s.Index,
pipeline: pipeline,
observer: s.Observer,
NonIndexableAction: s.NonIndexableAction,
conn: *conn,
indexSelector: s.indexSelector,
pipelineSelector: pipeline,
observer: s.observer,
deadLetterIndex: s.deadLetterIndex,

log: logp.NewLogger("elasticsearch"),
}
Expand Down Expand Up @@ -174,11 +166,11 @@ func (client *Client) Clone() *Client {
client.conn.Transport.Proxy.Disable = client.conn.Transport.Proxy.URL == nil

c, _ := NewClient(
ClientSettings{
ConnectionSettings: connection,
Index: client.index,
Pipeline: client.pipeline,
NonIndexableAction: client.NonIndexableAction,
clientSettings{
connection: connection,
indexSelector: client.indexSelector,
pipelineSelector: client.pipelineSelector,
deadLetterIndex: client.deadLetterIndex,
},
nil, // XXX: do not pass connection callback?
)
Expand Down Expand Up @@ -296,10 +288,7 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event)
}

if failed > 0 {
if sendErr == nil {
sendErr = eslegclient.ErrTempBulkFailure
}
return failedEvents, sendErr
return failedEvents, eslegclient.ErrTempBulkFailure
}
return nil, nil
}
Expand Down Expand Up @@ -339,7 +328,7 @@ func (client *Client) createEventBulkMeta(version version.V, event *beat.Event)
return nil, err
}

index, err := client.index.Select(event)
index, err := client.getIndex(event)
if err != nil {
err := fmt.Errorf("failed to select event index: %w", err)
return nil, err
Expand Down Expand Up @@ -371,6 +360,16 @@ func (client *Client) createEventBulkMeta(version version.V, event *beat.Event)
return eslegclient.BulkIndexAction{Index: meta}, nil
}

func (client *Client) getIndex(event *beat.Event) (string, error) {
// If this event has been dead-lettered, override its index
if event.Meta != nil {
if deadLetter, _ := event.Meta.HasKey(dead_letter_marker_field); deadLetter {
return client.deadLetterIndex, nil
}
}
return client.indexSelector.Select(event)
}

func (client *Client) getPipeline(event *beat.Event) (string, error) {
if event.Meta != nil {
pipeline, err := events.GetMetaStringValue(*event, events.FieldMetaPipeline)
Expand All @@ -384,8 +383,8 @@ func (client *Client) getPipeline(event *beat.Event) (string, error) {
return strings.ToLower(pipeline), nil
}

if client.pipeline != nil {
return client.pipeline.Select(event)
if client.pipelineSelector != nil {
return client.pipelineSelector.Select(event)
}
return "", nil
}
Expand Down Expand Up @@ -434,7 +433,7 @@ func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, dat
client.log.Errorf("Can't deliver to dead letter index event (status=%v). Enable debug logs to view the event and cause.", status)
client.log.Debugf("Can't deliver to dead letter index event %#v (status=%v): %s", data[i], status, msg)
// poison pill - this will clog the pipeline if the underlying failure is non transient.
} else if client.NonIndexableAction == dead_letter_index {
} else if client.deadLetterIndex != "" {
client.log.Warnf("Cannot index event (status=%v), trying dead letter index. Enable debug logs to view the event and cause.", status)
client.log.Debugf("Cannot index event %#v (status=%v): %s, trying dead letter index", data[i], status, msg)
if data[i].Content.Meta == nil {
Expand Down
16 changes: 9 additions & 7 deletions libbeat/outputs/elasticsearch/client_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,17 @@ func TestProxyDisableOverridesProxySettings(t *testing.T) {
func execClient(t *testing.T, env ...string) {
// The child process always runs only the TestClientPing test, which pings
// the server at TEST_SERVER_URL and then terminates.
cmd := exec.Command(os.Args[0], "-test.run=TestClientPing")
executable, err := os.Executable()
require.NoError(t, err, "couldn't get current executable")
cmd := exec.Command(executable, "-test.run=TestClientPing")
cmd.Env = append(append(os.Environ(),
"TEST_START_CLIENT=1"),
env...)
cmdOutput := new(bytes.Buffer)
cmd.Stderr = cmdOutput
cmd.Stdout = cmdOutput

err := cmd.Run()
err = cmd.Run()
if err != nil {
t.Error("Error executing client:\n" + cmdOutput.String())
}
Expand All @@ -185,8 +187,8 @@ func doClientPing(t *testing.T) {
proxy := os.Getenv("TEST_PROXY_URL")
// if TEST_PROXY_DISABLE is nonempty, set ClientSettings.ProxyDisable.
proxyDisable := os.Getenv("TEST_PROXY_DISABLE")
clientSettings := ClientSettings{
ConnectionSettings: eslegclient.ConnectionSettings{
clientSettings := clientSettings{
connection: eslegclient.ConnectionSettings{
URL: serverURL,
Headers: map[string]string{headerTestField: headerTestValue},
Transport: httpcommon.HTTPTransportSettings{
Expand All @@ -195,22 +197,22 @@ func doClientPing(t *testing.T) {
},
},
},
Index: outil.MakeSelector(outil.ConstSelectorExpr("test", outil.SelectorLowerCase)),
indexSelector: outil.MakeSelector(outil.ConstSelectorExpr("test", outil.SelectorLowerCase)),
}
if proxy != "" {
u, err := url.Parse(proxy)
require.NoError(t, err)
proxyURL := httpcommon.ProxyURI(*u)

clientSettings.Transport.Proxy.URL = &proxyURL
clientSettings.connection.Transport.Proxy.URL = &proxyURL
}
client, err := NewClient(clientSettings, nil)
require.NoError(t, err)

// This ping won't succeed; we aren't testing end-to-end communication
// (which would require a lot more setup work), we just want to make sure
// the client is pointed at the right server or proxy.
client.Connect()
_ = client.Connect()
}

// serverState contains the state of the http listeners for proxy tests,
Expand Down
Loading

0 comments on commit f6ce8cc

Please sign in to comment.