From 219af1678f4456b9d17e1e32368db093cd52f748 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Fri, 6 Dec 2024 14:11:11 -0300 Subject: [PATCH 01/16] otel: add test for agent hybrid --- pkg/testing/fixture.go | 2 +- testing/integration/otel_test.go | 197 +++++++++++++++++++++++++++++++ 2 files changed, 198 insertions(+), 1 deletion(-) diff --git a/pkg/testing/fixture.go b/pkg/testing/fixture.go index 89968451747..794d82c67a3 100644 --- a/pkg/testing/fixture.go +++ b/pkg/testing/fixture.go @@ -726,7 +726,7 @@ func (f *Fixture) ExecStatus(ctx context.Context, opts ...process.CmdOption) (Ag status := AgentStatusOutput{} if uerr := json.Unmarshal(out, &status); uerr != nil { return AgentStatusOutput{}, - fmt.Errorf("could not unmarshal agent status output: %w", errors.Join(uerr, err)) + fmt.Errorf("could not unmarshal agent status output: %w:\n%s", errors.Join(uerr, err), out) } else if status.IsZero() { return status, fmt.Errorf("agent status output is empty: %w", err) } diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index e42178a2005..45823a635e7 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -9,6 +9,7 @@ package integration import ( "bytes" "context" + "encoding/base64" "errors" "fmt" "net/url" @@ -20,6 +21,7 @@ import ( "text/template" "time" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -1099,3 +1101,198 @@ service: fixtureWg.Wait() require.True(t, err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded), "Retrieved unexpected error: %s", err.Error()) } + +func TestHybridAgentE2E(t *testing.T) { + info := define.Require(t, define.Requirements{ + Group: Default, + Local: true, + OS: []define.OS{ + {Type: define.Windows}, + {Type: define.Linux}, + {Type: define.Darwin}, + }, + Stack: &define.Stack{}, + }) + tmpDir := t.TempDir() + numEvents := 1 + fbIndex := "logs-generic-default" + fbReceiverIndex := "logs-fbreceiver-default" + + inputFile, err := os.CreateTemp(tmpDir, "input-*.log") + require.NoError(t, err, "failed to create input log file") + inputFilePath := inputFile.Name() + for i := 0; i < numEvents; i++ { + _, err = inputFile.Write([]byte(fmt.Sprintf("Line %d\n", i))) + require.NoErrorf(t, err, "failed to write line %d to temp file", i) + } + err = inputFile.Close() + require.NoError(t, err, "failed to close data temp file") + + t.Cleanup(func() { + if t.Failed() { + contents, err := os.ReadFile(inputFilePath) + if err != nil { + t.Logf("no data file to import at %s", inputFilePath) + return + } + t.Logf("contents of input file: %s\n", string(contents)) + } + }) + + // Create the otel configuration file + type otelConfigOptions struct { + InputPath string + HomeDir string + ESEndpoint string + ESApiKey string + BeatsESApiKey string + FBReceiverIndex string + } + esEndpoint, err := getESHost() + require.NoError(t, err, "error getting elasticsearch endpoint") + esApiKey, err := createESApiKey(info.ESClient) + require.NoError(t, err, "error creating API key") + require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) + + // TODO: add some processors here + configTemplate := `inputs: + - id: filestream-filebeat + type: filestream + paths: + - {{.InputPath}} + use_output: default +outputs: + default: + type: elasticsearch + hosts: [{{.ESEndpoint}}] + api_key: {{.BeatsESApiKey}} +receivers: + filebeatreceiver: + filebeat: + inputs: + - type: filestream + id: filestream-fbreceiver + enabled: true + paths: + - {{.InputPath}} + output: + otelconsumer: + logging: + level: info + selectors: + - '*' + path.home: {{.HomeDir}} + queue.mem.flush.timeout: 0s +exporters: + debug: + use_internal_logger: false + verbosity: detailed + elasticsearch/log: + endpoints: + - {{.ESEndpoint}} + api_key: {{.ESApiKey}} + logs_index: {{.FBReceiverIndex}} + batcher: + enabled: true + flush_timeout: 1s + mapping: + mode: bodymap +service: + pipelines: + logs: + receivers: + - filebeatreceiver + exporters: + - elasticsearch/log + - debug +` + + beatsApiKey, err := base64.StdEncoding.DecodeString(esApiKey.Encoded) + require.NoError(t, err, "error decoding api key") + + var configBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer, + otelConfigOptions{ + InputPath: inputFilePath, + HomeDir: tmpDir, + ESEndpoint: esEndpoint, + ESApiKey: esApiKey.Encoded, + BeatsESApiKey: string(beatsApiKey), + FBReceiverIndex: fbReceiverIndex, + })) + configContents := configBuffer.Bytes() + t.Cleanup(func() { + if t.Failed() { + t.Logf("Contents of agent config file:\n%s\n", string(configContents)) + // TODO(mauri870): for debugging, remove this line + require.NoError(t, os.WriteFile("/tmp/agent.yml", configContents, 0o600)) + } + }) + // Now we can actually create the fixture and run it + fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) + require.NoError(t, err) + + ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute)) + defer cancel() + err = fixture.Prepare(ctx) + require.NoError(t, err) + + var fixtureWg sync.WaitGroup + fixtureWg.Add(1) + go func() { + defer fixtureWg.Done() + err = fixture.Run(ctx, aTesting.State{ + Configure: string(configContents), + Reached: func(state *client.AgentState) bool { + // keep running (context cancel will stop it) + return false + }, + }) + }() + + // // TODO(mauri870): uncomment this when the PR is ready + // require.Eventually(t, func() bool { + // err = fixture.IsHealthy(ctx) + // if err != nil { + // t.Logf("waiting for agent healthy: %s", err.Error()) + // return false + // } + // return true + // }, 10*time.Minute, 1*time.Second) + + var fbDocs, fbReceiverDocs estools.Documents + actualHits := &struct { + FB int + FBReceiver int + }{} + require.Eventually(t, + func() bool { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + fbDocs, err = estools.GetLogsForIndexWithContext(findCtx, info.ESClient, ".ds-"+fbIndex+"*", map[string]interface{}{ + "log.file.path": inputFilePath, + }) + require.NoError(t, err) + + fbReceiverDocs, err = estools.GetLogsForIndexWithContext(findCtx, info.ESClient, ".ds-"+fbReceiverIndex+"*", map[string]interface{}{ + "log.file.path": inputFilePath, + }) + require.NoError(t, err) + + actualHits.FB = fbDocs.Hits.Total.Value + actualHits.FBReceiver = fbReceiverDocs.Hits.Total.Value + + return actualHits.FB == numEvents && actualHits.FBReceiver == numEvents + }, + 1*time.Minute, 1*time.Second, + "Expected %d logs in elasticsearch, got: %v", numEvents, actualHits) + + // TODO(mauri870): will likely have to ignore some fields from the doc. + require.Equal(t, "", cmp.Diff(fbDocs.Hits.Hits, fbReceiverDocs.Hits.Hits), "filebeat and fbreceiver docs are not equal") + + cancel() + fixtureWg.Wait() + require.True(t, err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded), "Retrieved unexpected error: %s", err.Error()) +} From 74e9b515da3bfebbb5bd33798915860894f7807a Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 28 Jan 2025 10:04:19 -0300 Subject: [PATCH 02/16] add missing fingerprint config options to index small files --- testing/integration/otel_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index 4ad66f434bc..0ccbac733e3 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -1154,12 +1154,13 @@ func TestHybridAgentE2E(t *testing.T) { require.NoError(t, err, "error creating API key") require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) - // TODO: add some processors here configTemplate := `inputs: - id: filestream-filebeat type: filestream paths: - {{.InputPath}} + prospector.scanner.fingerprint.enabled: false + file_identity.native: ~ use_output: default outputs: default: @@ -1175,6 +1176,8 @@ receivers: enabled: true paths: - {{.InputPath}} + prospector.scanner.fingerprint.enabled: false + file_identity.native: ~ output: otelconsumer: logging: From 309640b3dcf2a50d77b16e34c7730ad462bc8895 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 28 Jan 2025 11:41:40 -0300 Subject: [PATCH 03/16] ensure new lines are written to the input file --- testing/integration/otel_test.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index 0ccbac733e3..da6b58dac73 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -1122,11 +1122,14 @@ func TestHybridAgentE2E(t *testing.T) { require.NoError(t, err, "failed to create input log file") inputFilePath := inputFile.Name() for i := 0; i < numEvents; i++ { - _, err = inputFile.Write([]byte(fmt.Sprintf("Line %d\n", i))) + _, err = inputFile.Write([]byte(fmt.Sprintf("Line %d", i))) require.NoErrorf(t, err, "failed to write line %d to temp file", i) + _, err = inputFile.Write([]byte("\n")) + require.NoErrorf(t, err, "failed to write newline to input file") + time.Sleep(100 * time.Millisecond) } err = inputFile.Close() - require.NoError(t, err, "failed to close data temp file") + require.NoError(t, err, "failed to close data input file") t.Cleanup(func() { if t.Failed() { From 83e989665cf15cf1afaf2c8d9acae25d243eb8de Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 28 Jan 2025 11:42:28 -0300 Subject: [PATCH 04/16] disable compression level so we inspect the requests --- testing/integration/otel_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index da6b58dac73..778c93d8394 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -1170,6 +1170,7 @@ outputs: type: elasticsearch hosts: [{{.ESEndpoint}}] api_key: {{.BeatsESApiKey}} + compression_level: 0 receivers: filebeatreceiver: filebeat: @@ -1196,6 +1197,7 @@ exporters: elasticsearch/log: endpoints: - {{.ESEndpoint}} + compression: none api_key: {{.ESApiKey}} logs_index: {{.FBReceiverIndex}} batcher: From 7051ed8342d039127a55436dd2041f686571a2a4 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 28 Jan 2025 13:41:38 -0300 Subject: [PATCH 05/16] add queue timeout to normal beats and separate path.home for fbreceiver --- testing/integration/otel_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index 778c93d8394..d13f89ab6a2 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -1165,6 +1165,7 @@ func TestHybridAgentE2E(t *testing.T) { prospector.scanner.fingerprint.enabled: false file_identity.native: ~ use_output: default + queue.mem.flush.timeout: 0s outputs: default: type: elasticsearch @@ -1188,7 +1189,7 @@ receivers: level: info selectors: - '*' - path.home: {{.HomeDir}} + path.home: {{.HomeDir}}/fbreceiver queue.mem.flush.timeout: 0s exporters: debug: From f86787cd976f2cf61eeaeec41b8172bc043c6424 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Wed, 29 Jan 2025 09:21:27 -0300 Subject: [PATCH 06/16] add path.home for normal filebeat --- testing/integration/otel_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index d13f89ab6a2..2111d5c0eb8 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -1166,6 +1166,7 @@ func TestHybridAgentE2E(t *testing.T) { file_identity.native: ~ use_output: default queue.mem.flush.timeout: 0s + path.home: {{.HomeDir}}/filebeat outputs: default: type: elasticsearch @@ -1234,10 +1235,11 @@ service: t.Cleanup(func() { if t.Failed() { t.Logf("Contents of agent config file:\n%s\n", string(configContents)) - // TODO(mauri870): for debugging, remove this line - require.NoError(t, os.WriteFile("/tmp/agent.yml", configContents, 0o600)) } }) + // TODO(mauri870): for debugging, remove this line + require.NoError(t, os.WriteFile("/tmp/agent.yml", configContents, 0o600)) + // Now we can actually create the fixture and run it fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) require.NoError(t, err) From f130377c17ff7255b6f10dbb6bffa01b6775ba11 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Fri, 31 Jan 2025 10:09:00 -0300 Subject: [PATCH 07/16] fix agent not starting --- testing/integration/otel_test.go | 107 ++++++++++++++++++++++--------- 1 file changed, 77 insertions(+), 30 deletions(-) diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index 2111d5c0eb8..f8930a32a08 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/testing/estools" "github.com/elastic/elastic-agent/pkg/control/v2/client" aTesting "github.com/elastic/elastic-agent/pkg/testing" @@ -1103,6 +1104,9 @@ service: } func TestHybridAgentE2E(t *testing.T) { + // This test is a hybrid agent test that ingests a single log with + // filebeat and fbreceiver. It then compares the final documents in + // Elasticsearch to ensure they have no meaningful differences. info := define.Require(t, define.Requirements{ Group: Default, Local: true, @@ -1142,8 +1146,7 @@ func TestHybridAgentE2E(t *testing.T) { } }) - // Create the otel configuration file - type otelConfigOptions struct { + type configOptions struct { InputPath string HomeDir string ESEndpoint string @@ -1223,7 +1226,7 @@ service: var configBuffer bytes.Buffer require.NoError(t, template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer, - otelConfigOptions{ + configOptions{ InputPath: inputFilePath, HomeDir: tmpDir, ESEndpoint: esEndpoint, @@ -1237,8 +1240,6 @@ service: t.Logf("Contents of agent config file:\n%s\n", string(configContents)) } }) - // TODO(mauri870): for debugging, remove this line - require.NoError(t, os.WriteFile("/tmp/agent.yml", configContents, 0o600)) // Now we can actually create the fixture and run it fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) @@ -1246,31 +1247,32 @@ service: ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute)) defer cancel() + err = fixture.Prepare(ctx) require.NoError(t, err) + err = fixture.Configure(ctx, configContents) + require.NoError(t, err) - var fixtureWg sync.WaitGroup - fixtureWg.Add(1) - go func() { - defer fixtureWg.Done() - err = fixture.Run(ctx, aTesting.State{ - Configure: string(configContents), - Reached: func(state *client.AgentState) bool { - // keep running (context cancel will stop it) - return false - }, - }) - }() + cmd, err := fixture.PrepareAgentCommand(ctx, nil) + require.NoError(t, err) + cmd.WaitDelay = 1 * time.Second - // // TODO(mauri870): uncomment this when the PR is ready - // require.Eventually(t, func() bool { - // err = fixture.IsHealthy(ctx) - // if err != nil { - // t.Logf("waiting for agent healthy: %s", err.Error()) - // return false - // } - // return true - // }, 10*time.Minute, 1*time.Second) + var output strings.Builder + cmd.Stderr = &output + cmd.Stdout = &output + + err = cmd.Start() + require.NoError(t, err) + cmd.WaitDelay = 1 * time.Second + + require.Eventually(t, func() bool { + err = fixture.IsHealthy(ctx) + if err != nil { + t.Logf("waiting for agent healthy: %s", err.Error()) + return false + } + return true + }, 1*time.Minute, 1*time.Second) var fbDocs, fbReceiverDocs estools.Documents actualHits := &struct { @@ -1300,10 +1302,55 @@ service: 1*time.Minute, 1*time.Second, "Expected %d logs in elasticsearch, got: %v", numEvents, actualHits) - // TODO(mauri870): will likely have to ignore some fields from the doc. - require.Equal(t, "", cmp.Diff(fbDocs.Hits.Hits, fbReceiverDocs.Hits.Hits), "filebeat and fbreceiver docs are not equal") + fbDoc := fbDocs.Hits.Hits[0].Source + fbReceiverDoc := fbReceiverDocs.Hits.Hits[0].Source + ignoredFields := []string{ + // Expected to change between filebeat and fbreceiver + "@timestamp", + "agent.ephemeral_id", + "agent.id", + + // Missing from fbreceiver doc + "data_stream.dataset", + "data_stream.namespace", + "data_stream.type", + "elastic_agent.id", + "elastic_agent.snapshot", + "elastic_agent.version", + "event.dataset", + "host.architecture", + "host.containerized", + "host.hostname", + "host.id", + "host.ip", + "host.mac", + "host.os.build", + "host.os.family", + "host.os.kernel", + "host.os.name", + "host.os.platform", + "host.os.type", + "host.os.version", + + // fbreceiver adds metadata fields that are internal in filebeat + "@metadata.beat", + "@metadata.type", + "@metadata.version", + } + + require.Equal(t, "", diffMapstrM(fbDoc, fbReceiverDoc, ignoredFields), "filebeat and fbreceiver docs are not equal") cancel() - fixtureWg.Wait() - require.True(t, err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded), "Retrieved unexpected error: %s", err.Error()) + err = cmd.Wait() + require.True(t, err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || strings.Contains(err.Error(), "signal: killed"), "Retrieved unexpected error: %s", err.Error()) +} + +func diffMapstrM(m1, m2 mapstr.M, ignoredFields []string) string { + flatM1 := m1.Flatten() + flatM2 := m2.Flatten() + for _, f := range ignoredFields { + flatM1.Delete(f) + flatM2.Delete(f) + } + return cmp.Diff(flatM1, flatM2) } From 6e7e882a192516094cae5ae0a3777ec59c52f9da Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Fri, 31 Jan 2025 10:16:43 -0300 Subject: [PATCH 08/16] print elastic-agent output if test fails --- testing/integration/otel_test.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index f8930a32a08..f67cb48bc5c 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -1160,7 +1160,9 @@ func TestHybridAgentE2E(t *testing.T) { require.NoError(t, err, "error creating API key") require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) - configTemplate := `inputs: + configTemplate := `agent.logging.level: info +agent.logging.to_stderr: true +inputs: - id: filestream-filebeat type: filestream paths: @@ -1263,7 +1265,13 @@ service: err = cmd.Start() require.NoError(t, err) - cmd.WaitDelay = 1 * time.Second + + t.Cleanup(func() { + if t.Failed() { + t.Log("Elastic-Agent output:") + t.Log(output.String()) + } + }) require.Eventually(t, func() bool { err = fixture.IsHealthy(ctx) From db4caef972394cccec7b6b48f7d8f08864c19e22 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Fri, 31 Jan 2025 10:49:42 -0300 Subject: [PATCH 09/16] add note for metadata field --- testing/integration/otel_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index f67cb48bc5c..7d4b29690b5 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -1340,7 +1340,9 @@ service: "host.os.type", "host.os.version", - // fbreceiver adds metadata fields that are internal in filebeat + // TODO: fbreceiver adds metadata fields that are internal in filebeat. + // Remove this once https://github.com/elastic/beats/pull/42412 + // is available in agent. "@metadata.beat", "@metadata.type", "@metadata.version", From 3a9c9ac35dc346bdd8bb716baab3e98fb1f718b1 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Wed, 5 Feb 2025 09:44:59 -0300 Subject: [PATCH 10/16] fail if ignored field is not present in both maps --- testing/integration/otel_test.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index 7d4b29690b5..5547fd5cc61 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -1348,19 +1348,26 @@ service: "@metadata.version", } - require.Equal(t, "", diffMapstrM(fbDoc, fbReceiverDoc, ignoredFields), "filebeat and fbreceiver docs are not equal") - + assertMapsEqual(t, fbDoc, fbReceiverDoc, ignoredFields) cancel() err = cmd.Wait() require.True(t, err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || strings.Contains(err.Error(), "signal: killed"), "Retrieved unexpected error: %s", err.Error()) } -func diffMapstrM(m1, m2 mapstr.M, ignoredFields []string) string { +func assertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string) { + t.Helper() + flatM1 := m1.Flatten() flatM2 := m2.Flatten() for _, f := range ignoredFields { + hasKeyM1, _ := flatM1.HasKey(f) + hasKeyM2, _ := flatM2.HasKey(f) + if !hasKeyM1 && !hasKeyM2 { + assert.Failf(t, "ignored field %s not found in both maps", f) + } + flatM1.Delete(f) flatM2.Delete(f) } - return cmp.Diff(flatM1, flatM2) + require.Equal(t, "", cmp.Diff(flatM1, flatM2), "expected maps to be equal") } From 3985e844c6377424467f066c605bff016fa959ce Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Wed, 5 Feb 2025 17:06:35 -0300 Subject: [PATCH 11/16] add host metadata processor, remove 'host.*' from ignored fields --- testing/integration/otel_test.go | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index 5547fd5cc61..4ffb0891d9e 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -1189,6 +1189,11 @@ receivers: - {{.InputPath}} prospector.scanner.fingerprint.enabled: false file_identity.native: ~ + processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ output: otelconsumer: logging: @@ -1326,19 +1331,6 @@ service: "elastic_agent.snapshot", "elastic_agent.version", "event.dataset", - "host.architecture", - "host.containerized", - "host.hostname", - "host.id", - "host.ip", - "host.mac", - "host.os.build", - "host.os.family", - "host.os.kernel", - "host.os.name", - "host.os.platform", - "host.os.type", - "host.os.version", // TODO: fbreceiver adds metadata fields that are internal in filebeat. // Remove this once https://github.com/elastic/beats/pull/42412 From 59253facae09f45350d3d79f4ff29bfb0c7b8b17 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Wed, 5 Feb 2025 17:16:15 -0300 Subject: [PATCH 12/16] Use global processor --- testing/integration/otel_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index 4ffb0891d9e..4145eac1269 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -1189,11 +1189,11 @@ receivers: - {{.InputPath}} prospector.scanner.fingerprint.enabled: false file_identity.native: ~ - processors: - - add_host_metadata: ~ - - add_cloud_metadata: ~ - - add_docker_metadata: ~ - - add_kubernetes_metadata: ~ + processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ output: otelconsumer: logging: From c0f641573c15330c63a2d79928d7329caa1c6dfc Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Wed, 5 Feb 2025 17:23:15 -0300 Subject: [PATCH 13/16] more strict rules for ignored fields to avoid false positives --- testing/integration/otel_test.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index 4145eac1269..f2e1beecfc8 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -1340,13 +1340,13 @@ service: "@metadata.version", } - assertMapsEqual(t, fbDoc, fbReceiverDoc, ignoredFields) + assertMapsEqual(t, fbDoc, fbReceiverDoc, ignoredFields, "expected documents to be equal") cancel() err = cmd.Wait() require.True(t, err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || strings.Contains(err.Error(), "signal: killed"), "Retrieved unexpected error: %s", err.Error()) } -func assertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string) { +func assertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg string) { t.Helper() flatM1 := m1.Flatten() @@ -1354,8 +1354,19 @@ func assertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string) { for _, f := range ignoredFields { hasKeyM1, _ := flatM1.HasKey(f) hasKeyM2, _ := flatM2.HasKey(f) + + // Check if the ignored field exists in either map if !hasKeyM1 && !hasKeyM2 { - assert.Failf(t, "ignored field %s not found in both maps", f) + require.Failf(t, msg, "ignored field %q does not exist in either map, please remove it from the ignored fields", f) + } + + // If the ignored field exists and is equal in both maps then it shouldn't be ignored + if hasKeyM1 && hasKeyM2 { + valM1, _ := flatM1.GetValue(f) + valM2, _ := flatM2.GetValue(f) + if valM1 == valM2 { + require.Failf(t, msg, "ignored field %q is equal in both maps, please remove it from the ignored fields", f) + } } flatM1.Delete(f) From 9359f0cd004d4104c7e08abe30e69ab6a9d41e30 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Thu, 6 Feb 2025 08:44:18 -0300 Subject: [PATCH 14/16] use add_fields to add missing fields, use a single index --- testing/integration/otel_test.go | 47 ++++++++++++++------------------ 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index f2e1beecfc8..3e56be10885 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -1120,7 +1120,7 @@ func TestHybridAgentE2E(t *testing.T) { tmpDir := t.TempDir() numEvents := 1 fbIndex := "logs-generic-default" - fbReceiverIndex := "logs-fbreceiver-default" + fbReceiverIndex := "logs-generic-default" inputFile, err := os.CreateTemp(tmpDir, "input-*.log") require.NoError(t, err, "failed to create input log file") @@ -1191,9 +1191,16 @@ receivers: file_identity.native: ~ processors: - add_host_metadata: ~ - - add_cloud_metadata: ~ - - add_docker_metadata: ~ - - add_kubernetes_metadata: ~ + - add_fields: + fields: + dataset: generic + namespace: default + type: logs + target: data_stream + - add_fields: + fields: + dataset: generic + target: event output: otelconsumer: logging: @@ -1287,36 +1294,29 @@ service: return true }, 1*time.Minute, 1*time.Second) - var fbDocs, fbReceiverDocs estools.Documents + var docs estools.Documents actualHits := &struct { - FB int - FBReceiver int + Hits int }{} require.Eventually(t, func() bool { findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) defer findCancel() - fbDocs, err = estools.GetLogsForIndexWithContext(findCtx, info.ESClient, ".ds-"+fbIndex+"*", map[string]interface{}{ + docs, err = estools.GetLogsForIndexWithContext(findCtx, info.ESClient, ".ds-"+fbIndex+"*", map[string]interface{}{ "log.file.path": inputFilePath, }) require.NoError(t, err) - fbReceiverDocs, err = estools.GetLogsForIndexWithContext(findCtx, info.ESClient, ".ds-"+fbReceiverIndex+"*", map[string]interface{}{ - "log.file.path": inputFilePath, - }) - require.NoError(t, err) - - actualHits.FB = fbDocs.Hits.Total.Value - actualHits.FBReceiver = fbReceiverDocs.Hits.Total.Value + actualHits.Hits = docs.Hits.Total.Value - return actualHits.FB == numEvents && actualHits.FBReceiver == numEvents + return actualHits.Hits == numEvents*2 // filebeat + fbreceiver }, 1*time.Minute, 1*time.Second, "Expected %d logs in elasticsearch, got: %v", numEvents, actualHits) - fbDoc := fbDocs.Hits.Hits[0].Source - fbReceiverDoc := fbReceiverDocs.Hits.Hits[0].Source + doc1 := docs.Hits.Hits[0].Source + doc2 := docs.Hits.Hits[1].Source ignoredFields := []string{ // Expected to change between filebeat and fbreceiver "@timestamp", @@ -1324,13 +1324,9 @@ service: "agent.id", // Missing from fbreceiver doc - "data_stream.dataset", - "data_stream.namespace", - "data_stream.type", "elastic_agent.id", "elastic_agent.snapshot", "elastic_agent.version", - "event.dataset", // TODO: fbreceiver adds metadata fields that are internal in filebeat. // Remove this once https://github.com/elastic/beats/pull/42412 @@ -1340,7 +1336,7 @@ service: "@metadata.version", } - assertMapsEqual(t, fbDoc, fbReceiverDoc, ignoredFields, "expected documents to be equal") + assertMapsEqual(t, doc1, doc2, ignoredFields, "expected documents to be equal") cancel() err = cmd.Wait() require.True(t, err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || strings.Contains(err.Error(), "signal: killed"), "Retrieved unexpected error: %s", err.Error()) @@ -1355,9 +1351,8 @@ func assertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg hasKeyM1, _ := flatM1.HasKey(f) hasKeyM2, _ := flatM2.HasKey(f) - // Check if the ignored field exists in either map if !hasKeyM1 && !hasKeyM2 { - require.Failf(t, msg, "ignored field %q does not exist in either map, please remove it from the ignored fields", f) + assert.Failf(t, msg, "ignored field %q does not exist in either map, please remove it from the ignored fields", f) } // If the ignored field exists and is equal in both maps then it shouldn't be ignored @@ -1365,7 +1360,7 @@ func assertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg valM1, _ := flatM1.GetValue(f) valM2, _ := flatM2.GetValue(f) if valM1 == valM2 { - require.Failf(t, msg, "ignored field %q is equal in both maps, please remove it from the ignored fields", f) + assert.Failf(t, msg, "ignored field %q is equal in both maps, please remove it from the ignored fields", f) } } From 1238a43bed9de13be9371005b272acd35212b2c9 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Fri, 7 Feb 2025 12:27:28 -0300 Subject: [PATCH 15/16] add add_cloud_metadata processor --- testing/integration/otel_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index 3e56be10885..d661e2c3df1 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -1191,6 +1191,7 @@ receivers: file_identity.native: ~ processors: - add_host_metadata: ~ + - add_cloud_metadata: ~ - add_fields: fields: dataset: generic From 763318bdb1da804a505ce3df8a92942e3733a380 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Fri, 7 Feb 2025 17:01:33 -0300 Subject: [PATCH 16/16] avoid checking cmd.Wait error since it is bogus --- testing/integration/otel_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index d661e2c3df1..add783852b8 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -1339,8 +1339,7 @@ service: assertMapsEqual(t, doc1, doc2, ignoredFields, "expected documents to be equal") cancel() - err = cmd.Wait() - require.True(t, err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || strings.Contains(err.Error(), "signal: killed"), "Retrieved unexpected error: %s", err.Error()) + cmd.Wait() } func assertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg string) {