diff --git a/NOTICE.txt b/NOTICE.txt index 57a645fa6f55..8f32b49c6522 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -29374,6 +29374,29 @@ Contents of probable licence file $GOMODCACHE/gotest.tools/gotestsum@v1.7.0/LICE limitations under the License. +-------------------------------------------------------------------------------- +Dependency : gotest.tools/v3 +Version: v3.5.1 +Licence type (autodetected): Apache-2.0 +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/gotest.tools/v3@v3.5.1/LICENSE: + +Copyright 2018 gotest.tools authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + -------------------------------------------------------------------------------- Dependency : howett.net/plist Version: v1.0.1 @@ -72086,29 +72109,6 @@ See the License for the specific language governing permissions and limitations under the License. --------------------------------------------------------------------------------- -Dependency : gotest.tools/v3 -Version: v3.5.1 -Licence type (autodetected): Apache-2.0 --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/gotest.tools/v3@v3.5.1/LICENSE: - -Copyright 2018 gotest.tools authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - - -------------------------------------------------------------------------------- Dependency : k8s.io/klog/v2 Version: v2.130.1 diff --git a/go.mod b/go.mod index ab7306c3078c..b0f2ab46f837 100644 --- a/go.mod +++ b/go.mod @@ -235,6 +235,7 @@ require ( golang.org/x/term v0.27.0 google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 gopkg.in/natefinch/lumberjack.v2 v2.2.1 + gotest.tools/v3 v3.5.1 ) require ( diff --git a/libbeat/otelbeat/beatconverter/beatconverter.go b/libbeat/otelbeat/beatconverter/beatconverter.go index ebc115b7a106..935b9293dd56 100644 --- a/libbeat/otelbeat/beatconverter/beatconverter.go +++ b/libbeat/otelbeat/beatconverter/beatconverter.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/collector/confmap" "github.com/elastic/beats/v7/libbeat/cloudid" + elasticsearchtranslate "github.com/elastic/beats/v7/libbeat/otelbeat/oteltranslate/outputs/elasticsearch" "github.com/elastic/elastic-agent-libs/config" ) @@ -47,19 +48,20 @@ func (c converter) Convert(_ context.Context, conf *confmap.Conf) error { for _, beatreceiver := range supportedReceivers { var out map[string]any + var beatReceiverConfigKey = "receivers::" + beatreceiver // check if supported beat receiver is configured. Skip translation logic if not - if v := conf.Get("receivers::" + beatreceiver); v == nil { + if v := conf.Get(beatReceiverConfigKey); v == nil { continue } // handle cloud id if set - if conf.IsSet("receivers::" + beatreceiver + "::cloud") { + if conf.IsSet(beatReceiverConfigKey + "::cloud") { if err := handleCloudId(beatreceiver, conf); err != nil { return fmt.Errorf("error handling cloud id %w", err) } } - receiverCfg, _ := conf.Sub("receivers::" + beatreceiver) + receiverCfg, _ := conf.Sub(beatReceiverConfigKey) output, _ := receiverCfg.Sub("output") if len(output.ToStringMap()) > 1 { @@ -70,10 +72,18 @@ func (c converter) Convert(_ context.Context, conf *confmap.Conf) error { switch key { case "elasticsearch": esConfig := config.MustNewConfigFrom(output) - esOTelConfig, err := ToOTelConfig(esConfig) + esOTelConfig, err := elasticsearchtranslate.ToOTelConfig(esConfig) if err != nil { return fmt.Errorf("cannot convert elasticsearch config: %w", err) } + + // when output.queue is set by user or it comes from "preset" config, promote it to global level + if ok := esConfig.HasField("queue"); ok { + if err := promoteOutputQueueSettings(beatreceiver, esConfig, conf); err != nil { + return err + } + } + out = map[string]any{ "service::pipelines::logs::exporters": []string{"elasticsearch"}, "exporters": map[string]any{ @@ -91,14 +101,14 @@ func (c converter) Convert(_ context.Context, conf *confmap.Conf) error { // Replace output.[configured-output] with output.otelconsumer out = map[string]any{ - "receivers::" + beatreceiver + "::output": nil, + beatReceiverConfigKey + "::output": nil, } err := conf.Merge(confmap.NewFromStringMap(out)) if err != nil { return err } out = map[string]any{ - "receivers::" + beatreceiver + "::output::otelconsumer": nil, + beatReceiverConfigKey + "::output::otelconsumer": nil, } err = conf.Merge(confmap.NewFromStringMap(out)) @@ -110,9 +120,9 @@ func (c converter) Convert(_ context.Context, conf *confmap.Conf) error { return nil } -func handleCloudId(beatreceiver string, conf *confmap.Conf) error { +func handleCloudId(beatReceiverConfigKey string, conf *confmap.Conf) error { - receiverCfg, _ := conf.Sub("receivers::" + beatreceiver) + receiverCfg, _ := conf.Sub("receivers::" + beatReceiverConfigKey) beatCfg := config.MustNewConfigFrom(receiverCfg.ToStringMap()) // Handle cloud.id the same way Beats does, this will also handle @@ -128,7 +138,7 @@ func handleCloudId(beatreceiver string, conf *confmap.Conf) error { } out := map[string]any{ - "receivers::" + beatreceiver: beatOutput, + "receivers::" + beatReceiverConfigKey: beatOutput, } err = conf.Merge(confmap.NewFromStringMap(out)) if err != nil { @@ -137,7 +147,26 @@ func handleCloudId(beatreceiver string, conf *confmap.Conf) error { // we set this to nil to ensure cloudid check does not throw error when output is next set to otelconsumer out = map[string]any{ - "receivers::" + beatreceiver + "::cloud": nil, + "receivers::" + beatReceiverConfigKey + "::cloud": nil, + } + err = conf.Merge(confmap.NewFromStringMap(out)) + if err != nil { + return err + } + + return nil +} + +// promoteOutputQueueSettings promotes output.queue settings to global level +func promoteOutputQueueSettings(beatReceiverConfigKey string, outputConfig *config.C, conf *confmap.Conf) error { + + var queueOutput map[string]any + err := outputConfig.Unpack(&queueOutput) + if err != nil { + return err + } + out := map[string]any{ + "receivers::" + beatReceiverConfigKey + "::queue": queueOutput["queue"], } err = conf.Merge(confmap.NewFromStringMap(out)) if err != nil { diff --git a/libbeat/otelbeat/beatconverter/beatconverter_test.go b/libbeat/otelbeat/beatconverter/beatconverter_test.go index 778a686596f7..668b7a5238fc 100644 --- a/libbeat/otelbeat/beatconverter/beatconverter_test.go +++ b/libbeat/otelbeat/beatconverter/beatconverter_test.go @@ -27,7 +27,32 @@ import ( "gopkg.in/yaml.v2" ) -var supportedInput = ` +var esCommonOutput = ` +exporters: + elasticsearch: + api_key: "" + endpoints: + - https://localhost:9200 + idle_conn_timeout: 3s + logs_index: form-otel-exporter + num_workers: 0 + password: changeme + retry: + enabled: true + initial_interval: 1s + max_interval: 1m0s + max_retries: 3 + user: elastic + timeout: 1m30s + batcher: + enabled: true + max_size_items: 1600 +` + +func TestConverter(t *testing.T) { + c := converter{} + t.Run("test converter functionality", func(t *testing.T) { + var supportedInput = ` receivers: filebeatreceiver: filebeat: @@ -40,7 +65,7 @@ receivers: - type: log enabled: true paths: - - /var/log/*.log + - /var/log/*.log output: elasticsearch: hosts: ["https://localhost:9200"] @@ -54,24 +79,7 @@ service: receivers: - "filebeatreceiver" ` - -var expectedOutput = ` -exporters: - elasticsearch: - api_key: "" - endpoints: - - https://localhost:9200 - idle_conn_timeout: 3s - logs_index: form-otel-exporter - num_workers: 0 - password: changeme - retry: - enabled: true - initial_interval: 1s - max_interval: 1m0s - max_retries: 3 - user: elastic - timeout: 1m30s + var expectedOutput = esCommonOutput + ` receivers: filebeatreceiver: filebeat: @@ -96,7 +104,17 @@ service: - filebeatreceiver ` -var unsupportedOutputConfig = ` + input := newFromYamlString(t, supportedInput) + err := c.Convert(context.Background(), input) + require.NoError(t, err, "error converting beats output config") + + expOutput := newFromYamlString(t, expectedOutput) + compareAndAssert(t, expOutput, input) + + }) + + t.Run("test failure if unsupported config is provided", func(t *testing.T) { + var unsupportedOutputConfig = ` receivers: filebeatreceiver: filebeat: @@ -117,34 +135,144 @@ service: - "filebeatreceiver" ` -func TestConverter(t *testing.T) { - c := converter{} - t.Run("test converter functionality", func(t *testing.T) { + input := newFromYamlString(t, unsupportedOutputConfig) + err := c.Convert(context.Background(), input) + require.ErrorContains(t, err, "output type \"kafka\" is unsupported in OTel mode") + + }) + + t.Run("test cloud id conversion", func(t *testing.T) { + var supportedInput = ` +receivers: + filebeatreceiver: + filebeat: + inputs: + - type: filestream + enabled: true + id: filestream-input-id + paths: + - /tmp/flog.log + output: + elasticsearch: + hosts: ["https://localhost:9200"] + username: elastic + password: changeme + index: form-otel-exporter + cloud: + id: ZWxhc3RpYy5jbyRlcy1ob3N0bmFtZSRraWJhbmEtaG9zdG5hbWU= + auth: elastic-cloud:password +service: + pipelines: + logs: + receivers: + - "filebeatreceiver" +` + var expectedOutput = ` +exporters: + elasticsearch: + api_key: "" + endpoints: + - https://es-hostname.elastic.co:443 + idle_conn_timeout: 3s + logs_index: form-otel-exporter + num_workers: 0 + password: password + retry: + enabled: true + initial_interval: 1s + max_interval: 1m0s + max_retries: 3 + user: elastic-cloud + timeout: 1m30s + batcher: + enabled: true + max_size_items: 1600 +receivers: + filebeatreceiver: + filebeat: + inputs: + - enabled: true + id: filestream-input-id + paths: + - /tmp/flog.log + type: filestream + output: + otelconsumer: null + cloud: null + setup: + kibana: + host: https://kibana-hostname.elastic.co:443 +service: + pipelines: + logs: + exporters: + - elasticsearch + receivers: + - filebeatreceiver +` input := newFromYamlString(t, supportedInput) err := c.Convert(context.Background(), input) require.NoError(t, err, "error converting beats output config") expOutput := newFromYamlString(t, expectedOutput) + compareAndAssert(t, expOutput, input) - // convert it to a common type - want, err := yaml.Marshal(expOutput.ToStringMap()) - require.NoError(t, err) - got, err := yaml.Marshal(input.ToStringMap()) - require.NoError(t, err) + }) + + t.Run("test local queue setting is promoted to global level", func(t *testing.T) { + var supportedInput = ` +receivers: + filebeatreceiver: + output: + elasticsearch: + hosts: ["https://localhost:9200"] + username: elastic + password: changeme + index: form-otel-exporter + queue: + mem: + events: 3200 + flush: + min_events: 1600 + timeout: 10s - assert.Equal(t, string(want), string(got)) +service: + pipelines: + logs: + receivers: + - "filebeatreceiver" +` - }) + var expectedOutput = esCommonOutput + ` +receivers: + filebeatreceiver: + queue: + mem: + events: 3200 + flush: + min_events: 1600 + timeout: 10s + output: + otelconsumer: null +service: + pipelines: + logs: + exporters: + - elasticsearch + receivers: + - filebeatreceiver +` - t.Run("test failure if unsupported config is provided", func(t *testing.T) { - input := newFromYamlString(t, unsupportedOutputConfig) + input := newFromYamlString(t, supportedInput) err := c.Convert(context.Background(), input) - require.ErrorContains(t, err, "output type \"kafka\" is unsupported in OTel mode") + require.NoError(t, err, "error converting beats output config") + + expOutput := newFromYamlString(t, expectedOutput) + compareAndAssert(t, expOutput, input) }) - // TODO: Add a test case with cloud id set } func newFromYamlString(t *testing.T, input string) *confmap.Conf { @@ -155,3 +283,14 @@ func newFromYamlString(t *testing.T, input string) *confmap.Conf { return confmap.NewFromStringMap(rawConf) } + +func compareAndAssert(t *testing.T, expectedOutput *confmap.Conf, gotOutput *confmap.Conf) { + t.Helper() + // convert it to a common type + want, err := yaml.Marshal(expectedOutput.ToStringMap()) + require.NoError(t, err) + got, err := yaml.Marshal(gotOutput.ToStringMap()) + require.NoError(t, err) + + assert.Equal(t, string(want), string(got)) +} diff --git a/libbeat/otelbeat/beatconverter/config_otel_test.go b/libbeat/otelbeat/beatconverter/config_otel_test.go deleted file mode 100644 index f1f556fa3902..000000000000 --- a/libbeat/otelbeat/beatconverter/config_otel_test.go +++ /dev/null @@ -1,88 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package beatconverter - -import ( - _ "embed" - "fmt" - "os" - "path/filepath" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/confmap/confmaptest" - "gopkg.in/yaml.v2" - - "github.com/elastic/elastic-agent-libs/config" -) - -func TestToOtelConfig(t *testing.T) { - - tests := []struct { - name string - input string - expOutput string - experr bool - }{ - { - name: "basic elasticsearch input", - input: "basic.yml", - expOutput: "basic-op.yml", - experr: false, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - rawConf, err := loadBeatConf(filepath.Join("testdata", test.input)) - require.NoError(t, err) - beatCfg := config.MustNewConfigFrom(rawConf) - - otelCfg, err := ToOTelConfig(beatCfg) - require.NoError(t, err, "could not convert beat config to otel ES config") - - expectedValue, err := confmaptest.LoadConf(filepath.Join("testdata", test.expOutput)) - require.NoError(t, err) - want, err := yaml.Marshal(expectedValue.ToStringMap()) - require.NoError(t, err) - - got, err := yaml.Marshal(otelCfg) - require.NoError(t, err) - - assert.Equal(t, string(want), string(got)) - - }) - } - -} - -func loadBeatConf(fileName string) (map[string]any, error) { - // Clean the path before using it. - content, err := os.ReadFile(filepath.Clean(fileName)) - if err != nil { - return nil, fmt.Errorf("unable to read the file %v: %w", fileName, err) - } - - var rawConf map[string]any - if err = yaml.Unmarshal(content, &rawConf); err != nil { - return nil, err - } - - return rawConf, nil -} diff --git a/libbeat/otelbeat/beatconverter/testdata/basic-op.yml b/libbeat/otelbeat/beatconverter/testdata/basic-op.yml deleted file mode 100644 index 20c986b08fe2..000000000000 --- a/libbeat/otelbeat/beatconverter/testdata/basic-op.yml +++ /dev/null @@ -1,20 +0,0 @@ -api_key: "" -endpoints: - - http://localhost:9200/foo/bar - - http://localhost:9300/foo/bar -idle_conn_timeout: 3s -logs_index: some-index -num_workers: 30 -password: changeme -pipeline: some-ingest-pipeline -proxy_url: https://proxy.url -retry: - enabled: true - initial_interval: 42s - max_interval: 7m0s - max_retries: 3 -timeout: 1m30s -user: elastic -headers: - X-Header-1: foo - X-Bar-Header: bar diff --git a/libbeat/otelbeat/beatconverter/testdata/basic.yml b/libbeat/otelbeat/beatconverter/testdata/basic.yml deleted file mode 100644 index 5423c0b594a5..000000000000 --- a/libbeat/otelbeat/beatconverter/testdata/basic.yml +++ /dev/null @@ -1,22 +0,0 @@ -hosts: - - localhost:9200 - - localhost:9300 -protocol: http -path: /foo/bar -username: elastic -password: changeme -index: "some-index" -pipeline: "some-ingest-pipeline" -proxy_url: "https://proxy.url" -backoff: - init: 42s - max: 420s -workers: 30 -headers: - X-Header-1: foo - X-Bar-Header: bar - - -## elastic.co$es-hostname$kibana-hostname -# cloud.id: ZWxhc3RpYy5jbyRlcy1ob3N0bmFtZSRraWJhbmEtaG9zdG5hbWU= -# cloud.auth: elastic-cloud:password diff --git a/libbeat/otelbeat/beatconverter/config_otel.go b/libbeat/otelbeat/oteltranslate/outputs/elasticsearch/config_otel.go similarity index 84% rename from libbeat/otelbeat/beatconverter/config_otel.go rename to libbeat/otelbeat/oteltranslate/outputs/elasticsearch/config_otel.go index c1bdf0f2d6e7..a1a40d9276fc 100644 --- a/libbeat/otelbeat/beatconverter/config_otel.go +++ b/libbeat/otelbeat/oteltranslate/outputs/elasticsearch/config_otel.go @@ -15,11 +15,12 @@ // specific language governing permissions and limitations // under the License. -package beatconverter +package elasticsearchtranslate import ( "fmt" "reflect" + "strings" "github.com/mitchellh/mapstructure" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" @@ -30,13 +31,13 @@ import ( "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" ) // TODO: add following unuspported params to below struct // indices // pipelines // parameters -// preset // setup.ilm.* -> supported but the logic is not in place yet // proxy_disable -> supported but the logic is not in place yet // proxy_headers @@ -47,7 +48,6 @@ type unsupportedConfig struct { AllowOlderVersion bool `config:"allow_older_versions"` EscapeHTML bool `config:"escape_html"` Kerberos *kerberos.Config `config:"kerberos"` - BulkMaxSize int `config:"bulk_max_size"` } type esToOTelOptions struct { @@ -57,6 +57,7 @@ type esToOTelOptions struct { Index string `config:"index"` Pipeline string `config:"pipeline"` ProxyURL string `config:"proxy_url"` + Preset string `config:"preset"` } var defaultOptions = esToOTelOptions{ @@ -65,6 +66,7 @@ var defaultOptions = esToOTelOptions{ Index: "filebeat-9.0.0", // TODO. Default value should be filebeat-%{[agent.version]} Pipeline: "", ProxyURL: "", + Preset: "custom", // default is custom if not set } // ToOTelConfig converts a Beat config into an OTel elasticsearch exporter config @@ -80,6 +82,22 @@ func ToOTelConfig(output *config.C) (map[string]any, error) { return nil, fmt.Errorf("these configuration parameters are not supported %+v", temp) } + // apply preset here + // It is important to apply preset before unpacking the config, as preset can override output fields + preset, err := output.String("preset", -1) + if err == nil { + // Performance preset is present, apply it and log any fields that + // were overridden + overriddenFields, presetConfig, err := elasticsearch.ApplyPreset(preset, output) + if err != nil { + return nil, err + } + logp.Info("Applying performance preset '%v': %v", + preset, config.DebugString(presetConfig, false)) + logp.Warn("Performance preset '%v' overrides user setting for field(s): %s", + preset, strings.Join(overriddenFields, ",")) + } + // unpack and validate ES config if err := output.Unpack(&escfg); err != nil { return nil, fmt.Errorf("failed unpacking config. %w", err) @@ -127,11 +145,11 @@ func ToOTelConfig(output *config.C) (map[string]any, error) { }, - // Batcher is experimental and by not setting it, we are using the exporter's default batching mechanism - // "batcher": map[string]any{ - // "enabled": true, - // "max_size_items": escfg.BulkMaxSize, // bulk_max_size - // }, + // Batcher is experimental + "batcher": map[string]any{ + "enabled": true, + "max_size_items": escfg.BulkMaxSize, // bulk_max_size + }, } setIfNotNil(otelYAMLCfg, "headers", escfg.Headers) // headers @@ -148,7 +166,7 @@ func ToOTelConfig(output *config.C) (map[string]any, error) { // For type safety check func typeSafetyCheck(value map[string]any) error { - // the valued should match `elasticsearchexporter.Config` type. + // the value should match `elasticsearchexporter.Config` type. // it throws an error if non existing key names are set var result elasticsearchexporter.Config d, _ := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ diff --git a/libbeat/otelbeat/oteltranslate/outputs/elasticsearch/config_otel_test.go b/libbeat/otelbeat/oteltranslate/outputs/elasticsearch/config_otel_test.go new file mode 100644 index 000000000000..9b5fbd46f324 --- /dev/null +++ b/libbeat/otelbeat/oteltranslate/outputs/elasticsearch/config_otel_test.go @@ -0,0 +1,217 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package elasticsearchtranslate + +import ( + _ "embed" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/confmap" + "gopkg.in/yaml.v2" + "gotest.tools/v3/assert" + + "github.com/elastic/elastic-agent-libs/config" +) + +func TestToOtelConfig(t *testing.T) { + + t.Run("basic config translation", func(t *testing.T) { + beatCfg := ` +hosts: + - localhost:9200 + - localhost:9300 +protocol: http +path: /foo/bar +username: elastic +password: changeme +index: "some-index" +pipeline: "some-ingest-pipeline" +proxy_url: "https://proxy.url" +backoff: + init: 42s + max: 420s +workers: 30 +headers: + X-Header-1: foo + X-Bar-Header: bar` + + OTelCfg := ` +api_key: "" +endpoints: + - http://localhost:9200/foo/bar + - http://localhost:9300/foo/bar +idle_conn_timeout: 3s +logs_index: some-index +num_workers: 30 +password: changeme +pipeline: some-ingest-pipeline +proxy_url: https://proxy.url +retry: + enabled: true + initial_interval: 42s + max_interval: 7m0s + max_retries: 3 +timeout: 1m30s +user: elastic +headers: + X-Header-1: foo + X-Bar-Header: bar +batcher: + enabled: true + max_size_items: 1600 + ` + input := newFromYamlString(t, beatCfg) + cfg := config.MustNewConfigFrom(input.ToStringMap()) + got, err := ToOTelConfig(cfg) + require.NoError(t, err, "error translating elasticsearch output to OTel ES exporter type") + expOutput := newFromYamlString(t, OTelCfg) + compareAndAssert(t, expOutput, confmap.NewFromStringMap(got)) + + }) + + // when preset is configured, we only test worker, bulk_max_size, idle_connection_timeout here + // TODO: Check for compression_level when we add support upstream + t.Run("check preset config translation", func(t *testing.T) { + commonBeatCfg := ` +hosts: + - localhost:9200 +index: "some-index" +username: elastic +password: changeme +preset: %s +` + + commonOTelCfg := ` +api_key: "" +endpoints: + - http://localhost:9200 +retry: + enabled: true + initial_interval: 1s + max_interval: 1m0s + max_retries: 3 +logs_index: some-index +password: changeme +user: elastic +timeout: 1m30s +` + + tests := []struct { + presetName string + output string + }{ + { + presetName: "balanced", + output: commonOTelCfg + ` +idle_conn_timeout: 3s +num_workers: 1 +batcher: + enabled: true + max_size_items: 1600 + `, + }, + { + presetName: "throughput", + output: commonOTelCfg + ` +idle_conn_timeout: 15s +num_workers: 4 +batcher: + enabled: true + max_size_items: 1600 + `, + }, + { + presetName: "scale", + output: ` +api_key: "" +endpoints: + - http://localhost:9200 +retry: + enabled: true + initial_interval: 5s + max_interval: 5m0s + max_retries: 3 +logs_index: some-index +password: changeme +user: elastic +timeout: 1m30s +idle_conn_timeout: 1s +num_workers: 1 +batcher: + enabled: true + max_size_items: 1600 + `, + }, + { + presetName: "latency", + output: commonOTelCfg + ` +idle_conn_timeout: 1m0s +num_workers: 1 +batcher: + enabled: true + max_size_items: 50 + `, + }, + { + presetName: "custom", + output: commonOTelCfg + ` +idle_conn_timeout: 3s +num_workers: 0 +batcher: + enabled: true + max_size_items: 1600 + `, + }, + } + + for _, test := range tests { + t.Run("config translation w/"+test.presetName, func(t *testing.T) { + input := newFromYamlString(t, fmt.Sprintf(commonBeatCfg, test.presetName)) + cfg := config.MustNewConfigFrom(input.ToStringMap()) + got, err := ToOTelConfig(cfg) + require.NoError(t, err, "error translating elasticsearch output to OTel ES exporter type") + expOutput := newFromYamlString(t, test.output) + compareAndAssert(t, expOutput, confmap.NewFromStringMap(got)) + }) + } + + }) + +} + +func newFromYamlString(t *testing.T, input string) *confmap.Conf { + t.Helper() + var rawConf map[string]any + err := yaml.Unmarshal([]byte(input), &rawConf) + require.NoError(t, err) + + return confmap.NewFromStringMap(rawConf) +} + +func compareAndAssert(t *testing.T, expectedOutput *confmap.Conf, gotOutput *confmap.Conf) { + t.Helper() + // convert it to a common type + want, err := yaml.Marshal(expectedOutput.ToStringMap()) + require.NoError(t, err) + got, err := yaml.Marshal(gotOutput.ToStringMap()) + require.NoError(t, err) + + assert.Equal(t, string(want), string(got)) +} diff --git a/libbeat/outputs/elasticsearch/config.go b/libbeat/outputs/elasticsearch/config.go index 0c9be8553e65..e70c3e4b9782 100644 --- a/libbeat/outputs/elasticsearch/config.go +++ b/libbeat/outputs/elasticsearch/config.go @@ -74,7 +74,8 @@ var ( Init: 1 * time.Second, Max: 60 * time.Second, }, - Transport: esDefaultTransportSettings(), + BulkMaxSize: defaultBulkSize, + Transport: esDefaultTransportSettings(), } ) diff --git a/libbeat/outputs/elasticsearch/config_presets.go b/libbeat/outputs/elasticsearch/config_presets.go index 1fa01d82fa0e..10200d35a6f9 100644 --- a/libbeat/outputs/elasticsearch/config_presets.go +++ b/libbeat/outputs/elasticsearch/config_presets.go @@ -81,7 +81,7 @@ var presetConfigs = map[string]*config.C{ // config overrides. // Returns a list of the user fields that were overwritten, and the full // preset config that was applied. -func applyPreset(preset string, userConfig *config.C) ([]string, *config.C, error) { +func ApplyPreset(preset string, userConfig *config.C) ([]string, *config.C, error) { presetConfig := presetConfigs[preset] if presetConfig == nil { return nil, nil, fmt.Errorf("unknown preset value %v", preset) diff --git a/libbeat/outputs/elasticsearch/config_presets_test.go b/libbeat/outputs/elasticsearch/config_presets_test.go index 7ae24797b12f..7cf183cdbe51 100644 --- a/libbeat/outputs/elasticsearch/config_presets_test.go +++ b/libbeat/outputs/elasticsearch/config_presets_test.go @@ -38,9 +38,9 @@ func TestApplyPresetNoConflicts(t *testing.T) { "loadbalance": true, }) // Apply the preset and make sure no conflicts are reported. - conflicts, _, err := applyPreset(presetThroughput, cfg) + conflicts, _, err := ApplyPreset(presetThroughput, cfg) require.NoError(t, err, "Valid preset must apply successfully") - assert.Equal(t, 0, len(conflicts), "applyPreset should report no conflicts from non-preset fields") + assert.Equal(t, 0, len(conflicts), "ApplyPreset should report no conflicts from non-preset fields") // Unpack the final config into elasticsearchConfig and verify that both user // and preset fields are set correctly. @@ -49,13 +49,13 @@ func TestApplyPresetNoConflicts(t *testing.T) { require.NoError(t, err, "Config should unpack successfully") // Check basic user params - assert.Equal(t, 5, esConfig.MaxRetries, "Non-preset fields should be unchanged by applyPreset") - assert.Equal(t, true, esConfig.LoadBalance, "Non-preset fields should be unchanged by applyPreset") + assert.Equal(t, 5, esConfig.MaxRetries, "Non-preset fields should be unchanged by ApplyPreset") + assert.Equal(t, true, esConfig.LoadBalance, "Non-preset fields should be unchanged by ApplyPreset") // Check basic preset params - assert.Equal(t, 1600, esConfig.BulkMaxSize, "Preset fields should be set by applyPreset") - assert.Equal(t, 1, esConfig.CompressionLevel, "Preset fields should be set by applyPreset") - assert.Equal(t, 15*time.Second, esConfig.Transport.IdleConnTimeout, "Preset fields should be set by applyPreset") + assert.Equal(t, 1600, esConfig.BulkMaxSize, "Preset fields should be set by ApplyPreset") + assert.Equal(t, 1, esConfig.CompressionLevel, "Preset fields should be set by ApplyPreset") + assert.Equal(t, 15*time.Second, esConfig.Transport.IdleConnTimeout, "Preset fields should be set by ApplyPreset") // Check preset queue params var memQueueConfig struct { @@ -63,9 +63,9 @@ func TestApplyPresetNoConflicts(t *testing.T) { FlushMinEvents int `config:"flush.min_events"` FlushTimeout time.Duration `config:"flush.timeout"` } - require.Equal(t, "mem", esConfig.Queue.Name(), "applyPreset should configure the memory queue") + require.Equal(t, "mem", esConfig.Queue.Name(), "ApplyPreset should configure the memory queue") err = esConfig.Queue.Config().Unpack(&memQueueConfig) - assert.NoError(t, err, "applyPreset should set valid memory queue config") + assert.NoError(t, err, "ApplyPreset should set valid memory queue config") assert.Equal(t, 12800, memQueueConfig.Events, "Queue fields should match preset definition") assert.Equal(t, 1600, memQueueConfig.FlushMinEvents, "Queue fields should match preset definition") @@ -98,7 +98,7 @@ func TestApplyPresetWithConflicts(t *testing.T) { "idle_connection_timeout": 100 * time.Second, }) // Apply the preset and ensure all preset fields are reported as conflicts - conflicts, _, err := applyPreset(presetBalanced, cfg) + conflicts, _, err := ApplyPreset(presetBalanced, cfg) require.NoError(t, err, "Valid preset must apply successfully") expectedConflicts := []string{ "bulk_max_size", @@ -118,9 +118,9 @@ func TestApplyPresetWithConflicts(t *testing.T) { require.NoError(t, err, "Valid config tree must unpack successfully") // Check basic preset params - assert.Equal(t, 1600, esConfig.BulkMaxSize, "Preset fields should be set by applyPreset") - assert.Equal(t, 1, esConfig.CompressionLevel, "Preset fields should be set by applyPreset") - assert.Equal(t, 3*time.Second, esConfig.Transport.IdleConnTimeout, "Preset fields should be set by applyPreset") + assert.Equal(t, 1600, esConfig.BulkMaxSize, "Preset fields should be set by ApplyPreset") + assert.Equal(t, 1, esConfig.CompressionLevel, "Preset fields should be set by ApplyPreset") + assert.Equal(t, 3*time.Second, esConfig.Transport.IdleConnTimeout, "Preset fields should be set by ApplyPreset") // Check preset queue params var memQueueConfig struct { @@ -128,9 +128,9 @@ func TestApplyPresetWithConflicts(t *testing.T) { FlushMinEvents int `config:"flush.min_events"` FlushTimeout time.Duration `config:"flush.timeout"` } - require.Equal(t, "mem", esConfig.Queue.Name(), "applyPreset should configure the memory queue") + require.Equal(t, "mem", esConfig.Queue.Name(), "ApplyPreset should configure the memory queue") err = esConfig.Queue.Config().Unpack(&memQueueConfig) - assert.NoError(t, err, "applyPreset should set valid memory queue config") + assert.NoError(t, err, "ApplyPreset should set valid memory queue config") assert.Equal(t, 3200, memQueueConfig.Events, "Queue fields should match preset definition") assert.Equal(t, 1600, memQueueConfig.FlushMinEvents, "Queue fields should match preset definition") @@ -161,9 +161,9 @@ func TestApplyPresetCustom(t *testing.T) { "idle_connection_timeout": 100 * time.Second, }) // Apply the preset and make sure no conflicts are reported. - conflicts, _, err := applyPreset(presetCustom, cfg) + conflicts, _, err := ApplyPreset(presetCustom, cfg) require.NoError(t, err, "Custom preset must apply successfully") - assert.Equal(t, 0, len(conflicts), "applyPreset should report no conflicts when preset is 'custom'") + assert.Equal(t, 0, len(conflicts), "ApplyPreset should report no conflicts when preset is 'custom'") // Unpack the final config into elasticsearchConfig and verify that both user // and preset fields are set correctly. @@ -172,9 +172,9 @@ func TestApplyPresetCustom(t *testing.T) { require.NoError(t, err, "Config should unpack successfully") // Check basic user params - assert.Equal(t, 100, esConfig.BulkMaxSize, "Preset fields should be set by applyPreset") - assert.Equal(t, 5, esConfig.CompressionLevel, "Preset fields should be set by applyPreset") - assert.Equal(t, 100*time.Second, esConfig.Transport.IdleConnTimeout, "Preset fields should be set by applyPreset") + assert.Equal(t, 100, esConfig.BulkMaxSize, "Preset fields should be set by ApplyPreset") + assert.Equal(t, 5, esConfig.CompressionLevel, "Preset fields should be set by ApplyPreset") + assert.Equal(t, 100*time.Second, esConfig.Transport.IdleConnTimeout, "Preset fields should be set by ApplyPreset") // Check user queue params var memQueueConfig struct { @@ -182,7 +182,7 @@ func TestApplyPresetCustom(t *testing.T) { FlushMinEvents int `config:"flush.min_events"` FlushTimeout time.Duration `config:"flush.timeout"` } - require.Equal(t, "mem", esConfig.Queue.Name(), "applyPreset with custom preset should preserve user queue settings") + require.Equal(t, "mem", esConfig.Queue.Name(), "ApplyPreset with custom preset should preserve user queue settings") err = esConfig.Queue.Config().Unpack(&memQueueConfig) assert.NoError(t, err, "Queue settings should unpack successfully") diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index 710c7f7616bd..661aa792a4fa 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -40,23 +40,17 @@ func makeES( cfg *config.C, ) (outputs.Group, error) { log := logp.NewLogger(logSelector) - if !cfg.HasField("bulk_max_size") { - if err := cfg.SetInt("bulk_max_size", -1, defaultBulkSize); err != nil { - return outputs.Fail(err) - } - } - + esConfig := defaultConfig indexSelector, pipelineSelector, err := buildSelectors(im, beatInfo, cfg) if err != nil { return outputs.Fail(err) } - esConfig := defaultConfig preset, err := cfg.String("preset", -1) if err == nil && preset != "" { // Performance preset is present, apply it and log any fields that // were overridden - overriddenFields, presetConfig, err := applyPreset(preset, cfg) + overriddenFields, presetConfig, err := ApplyPreset(preset, cfg) if err != nil { return outputs.Fail(err) }