Skip to content

Commit

Permalink
otel: add test for document equivalence between agent filestream inpu…
Browse files Browse the repository at this point in the history
…t and filebeat receiver (#6681)

* otel: add test for agent hybrid

* add missing fingerprint config options to index small files

* ensure new lines are written to the input file

* disable compression level so we inspect the requests

* add queue timeout to normal beats and separate path.home for fbreceiver

* add path.home for normal filebeat

* fix agent not starting

* print elastic-agent output if test fails

* add note for metadata field

* fail if ignored field is not present in both maps

* add host metadata processor, remove 'host.*' from ignored fields

* Use global processor

* more strict rules for ignored fields to avoid false positives

* use add_fields to add missing fields, use a single index

* add add_cloud_metadata processor

* avoid checking cmd.Wait error since it is bogus

(cherry picked from commit 41882ac)
  • Loading branch information
mauri870 authored and mergify[bot] committed Feb 14, 2025
1 parent 42809af commit bb70746
Show file tree
Hide file tree
Showing 2 changed files with 271 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pkg/testing/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ func (f *Fixture) ExecStatus(ctx context.Context, opts ...process.CmdOption) (Ag
status := AgentStatusOutput{}
if uerr := json.Unmarshal(out, &status); uerr != nil {
return AgentStatusOutput{},
fmt.Errorf("could not unmarshal agent status output: %w", errors.Join(uerr, err))
fmt.Errorf("could not unmarshal agent status output: %w:\n%s", errors.Join(uerr, err), out)
} else if status.IsZero() {
return status, fmt.Errorf("agent status output is empty: %w", err)
}
Expand Down
270 changes: 270 additions & 0 deletions testing/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package integration
import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"net/url"
Expand All @@ -21,9 +22,11 @@ import (
"text/template"
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/testing/estools"
"github.com/elastic/elastic-agent/pkg/control/v2/client"
aTesting "github.com/elastic/elastic-agent/pkg/testing"
Expand Down Expand Up @@ -1101,6 +1104,273 @@ service:
require.True(t, err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded), "Retrieved unexpected error: %s", err.Error())
}

func TestHybridAgentE2E(t *testing.T) {
// This test is a hybrid agent test that ingests a single log with
// filebeat and fbreceiver. It then compares the final documents in
// Elasticsearch to ensure they have no meaningful differences.
info := define.Require(t, define.Requirements{
Group: Default,
Local: true,
OS: []define.OS{
{Type: define.Windows},
{Type: define.Linux},
{Type: define.Darwin},
},
Stack: &define.Stack{},
})
tmpDir := t.TempDir()
numEvents := 1
fbIndex := "logs-generic-default"
fbReceiverIndex := "logs-generic-default"

inputFile, err := os.CreateTemp(tmpDir, "input-*.log")
require.NoError(t, err, "failed to create input log file")
inputFilePath := inputFile.Name()
for i := 0; i < numEvents; i++ {
_, err = inputFile.Write([]byte(fmt.Sprintf("Line %d", i)))
require.NoErrorf(t, err, "failed to write line %d to temp file", i)
_, err = inputFile.Write([]byte("\n"))
require.NoErrorf(t, err, "failed to write newline to input file")
time.Sleep(100 * time.Millisecond)
}
err = inputFile.Close()
require.NoError(t, err, "failed to close data input file")

t.Cleanup(func() {
if t.Failed() {
contents, err := os.ReadFile(inputFilePath)
if err != nil {
t.Logf("no data file to import at %s", inputFilePath)
return
}
t.Logf("contents of input file: %s\n", string(contents))
}
})

type configOptions struct {
InputPath string
HomeDir string
ESEndpoint string
ESApiKey string
BeatsESApiKey string
FBReceiverIndex string
}
esEndpoint, err := getESHost()
require.NoError(t, err, "error getting elasticsearch endpoint")
esApiKey, err := createESApiKey(info.ESClient)
require.NoError(t, err, "error creating API key")
require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey)

configTemplate := `agent.logging.level: info
agent.logging.to_stderr: true
inputs:
- id: filestream-filebeat
type: filestream
paths:
- {{.InputPath}}
prospector.scanner.fingerprint.enabled: false
file_identity.native: ~
use_output: default
queue.mem.flush.timeout: 0s
path.home: {{.HomeDir}}/filebeat
outputs:
default:
type: elasticsearch
hosts: [{{.ESEndpoint}}]
api_key: {{.BeatsESApiKey}}
compression_level: 0
receivers:
filebeatreceiver:
filebeat:
inputs:
- type: filestream
id: filestream-fbreceiver
enabled: true
paths:
- {{.InputPath}}
prospector.scanner.fingerprint.enabled: false
file_identity.native: ~
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- add_fields:
fields:
dataset: generic
namespace: default
type: logs
target: data_stream
- add_fields:
fields:
dataset: generic
target: event
output:
otelconsumer:
logging:
level: info
selectors:
- '*'
path.home: {{.HomeDir}}/fbreceiver
queue.mem.flush.timeout: 0s
exporters:
debug:
use_internal_logger: false
verbosity: detailed
elasticsearch/log:
endpoints:
- {{.ESEndpoint}}
compression: none
api_key: {{.ESApiKey}}
logs_index: {{.FBReceiverIndex}}
batcher:
enabled: true
flush_timeout: 1s
mapping:
mode: bodymap
service:
pipelines:
logs:
receivers:
- filebeatreceiver
exporters:
- elasticsearch/log
- debug
`

beatsApiKey, err := base64.StdEncoding.DecodeString(esApiKey.Encoded)
require.NoError(t, err, "error decoding api key")

var configBuffer bytes.Buffer
require.NoError(t,
template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer,
configOptions{
InputPath: inputFilePath,
HomeDir: tmpDir,
ESEndpoint: esEndpoint,
ESApiKey: esApiKey.Encoded,
BeatsESApiKey: string(beatsApiKey),
FBReceiverIndex: fbReceiverIndex,
}))
configContents := configBuffer.Bytes()
t.Cleanup(func() {
if t.Failed() {
t.Logf("Contents of agent config file:\n%s\n", string(configContents))
}
})

// Now we can actually create the fixture and run it
fixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
require.NoError(t, err)

ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute))
defer cancel()

err = fixture.Prepare(ctx)
require.NoError(t, err)
err = fixture.Configure(ctx, configContents)
require.NoError(t, err)

cmd, err := fixture.PrepareAgentCommand(ctx, nil)
require.NoError(t, err)
cmd.WaitDelay = 1 * time.Second

var output strings.Builder
cmd.Stderr = &output
cmd.Stdout = &output

err = cmd.Start()
require.NoError(t, err)

t.Cleanup(func() {
if t.Failed() {
t.Log("Elastic-Agent output:")
t.Log(output.String())
}
})

require.Eventually(t, func() bool {
err = fixture.IsHealthy(ctx)
if err != nil {
t.Logf("waiting for agent healthy: %s", err.Error())
return false
}
return true
}, 1*time.Minute, 1*time.Second)

var docs estools.Documents
actualHits := &struct {
Hits int
}{}
require.Eventually(t,
func() bool {
findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer findCancel()

docs, err = estools.GetLogsForIndexWithContext(findCtx, info.ESClient, ".ds-"+fbIndex+"*", map[string]interface{}{
"log.file.path": inputFilePath,
})
require.NoError(t, err)

actualHits.Hits = docs.Hits.Total.Value

return actualHits.Hits == numEvents*2 // filebeat + fbreceiver
},
1*time.Minute, 1*time.Second,
"Expected %d logs in elasticsearch, got: %v", numEvents, actualHits)

doc1 := docs.Hits.Hits[0].Source
doc2 := docs.Hits.Hits[1].Source
ignoredFields := []string{
// Expected to change between filebeat and fbreceiver
"@timestamp",
"agent.ephemeral_id",
"agent.id",

// Missing from fbreceiver doc
"elastic_agent.id",
"elastic_agent.snapshot",
"elastic_agent.version",

// TODO: fbreceiver adds metadata fields that are internal in filebeat.
// Remove this once https://github.com/elastic/beats/pull/42412
// is available in agent.
"@metadata.beat",
"@metadata.type",
"@metadata.version",
}

assertMapsEqual(t, doc1, doc2, ignoredFields, "expected documents to be equal")
cancel()
cmd.Wait()
}

func assertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg string) {
t.Helper()

flatM1 := m1.Flatten()
flatM2 := m2.Flatten()
for _, f := range ignoredFields {
hasKeyM1, _ := flatM1.HasKey(f)
hasKeyM2, _ := flatM2.HasKey(f)

if !hasKeyM1 && !hasKeyM2 {
assert.Failf(t, msg, "ignored field %q does not exist in either map, please remove it from the ignored fields", f)
}

// If the ignored field exists and is equal in both maps then it shouldn't be ignored
if hasKeyM1 && hasKeyM2 {
valM1, _ := flatM1.GetValue(f)
valM2, _ := flatM2.GetValue(f)
if valM1 == valM2 {
assert.Failf(t, msg, "ignored field %q is equal in both maps, please remove it from the ignored fields", f)
}
}

flatM1.Delete(f)
flatM2.Delete(f)
}
require.Equal(t, "", cmp.Diff(flatM1, flatM2), "expected maps to be equal")
}

func TestFBOtelRestartE2E(t *testing.T) {
// This test ensures that filebeatreceiver is able to deliver logs even
// in advent of a collector restart.
Expand Down

0 comments on commit bb70746

Please sign in to comment.