Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

otel: fix flakiness and various issues in TestFBOtelRestartE2E #6819

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 11 additions & 18 deletions testing/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1352,15 +1352,6 @@ func assertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg
assert.Failf(t, msg, "ignored field %q does not exist in either map, please remove it from the ignored fields", f)
}

// If the ignored field exists and is equal in both maps then it shouldn't be ignored
if hasKeyM1 && hasKeyM2 {
valM1, _ := flatM1.GetValue(f)
valM2, _ := flatM2.GetValue(f)
if valM1 == valM2 {
assert.Failf(t, msg, "ignored field %q is equal in both maps, please remove it from the ignored fields", f)
}
}

flatM1.Delete(f)
flatM2.Delete(f)
}
Expand All @@ -1374,7 +1365,7 @@ func TestFBOtelRestartE2E(t *testing.T) {
// It starts a filebeat receiver, waits for some logs and then stops it.
// It then restarts the collector for the remaining of the test.
// At the end it asserts that the unique number of logs in ES is equal to the number of
// lines in the input file. It is likely that there are duplicates due to the restart.
// lines in the input file.
info := define.Require(t, define.Requirements{
Group: Default,
Local: true,
Expand Down Expand Up @@ -1404,7 +1395,8 @@ func TestFBOtelRestartE2E(t *testing.T) {
esApiKey, err := createESApiKey(info.ESClient)
require.NoError(t, err, "error creating API key")
require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey)
index := "logs-integration-default"
// Use a unique index to avoid conflicts with other parallel runners
index := strings.ToLower("logs-generic-default-" + randStr(8))
otelConfigTemplate := `receivers:
filebeatreceiver:
filebeat:
Expand Down Expand Up @@ -1442,6 +1434,8 @@ exporters:
flush_timeout: 1s
mapping:
mode: bodymap
logs_dynamic_id:
enabled: true
service:
pipelines:
logs:
Expand Down Expand Up @@ -1492,14 +1486,14 @@ service:
}

_, err = inputFile.Write([]byte(fmt.Sprintf(`{"id": "%d", "message": "%d"}`, i, i)))
require.NoErrorf(t, err, "failed to write line %d to temp file", i)
assert.NoErrorf(t, err, "failed to write line %d to temp file", i)
_, err = inputFile.Write([]byte("\n"))
require.NoErrorf(t, err, "failed to write newline to temp file")
assert.NoErrorf(t, err, "failed to write newline to temp file")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert.NoErrorf(t, err, "failed to write newline to temp file")
assert.NoError(t, err, "failed to write newline to temp file")

inputLinesCounter.Add(1)
time.Sleep(100 * time.Millisecond)
}
err = inputFile.Close()
require.NoError(t, err, "failed to close input file")
assert.NoError(t, err, "failed to close input file")
}()

t.Cleanup(func() {
Expand All @@ -1519,7 +1513,7 @@ service:
go func() {
err = fixture.RunOtelWithClient(fCtx)
cancel()
require.True(t, errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled), "unexpected error: %v", err)
assert.True(t, err == nil || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled), "unexpected error: %v", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since we are not testing a boolean we can use https://pkg.go.dev/github.com/stretchr/testify/assert#Conditionf with a small function where we can also do some additional logging if necessary

close(stoppedCh)
}()

Expand Down Expand Up @@ -1580,9 +1574,8 @@ service:
require.True(t, found, "expected message field in document %q", hit.Source)
msg, ok := message.(string)
require.True(t, ok, "expected message field to be a string, got %T", message)
if _, found := uniqueIngestedLogs[msg]; found {
t.Logf("log line %q was ingested more than once", message)
}
_, found = uniqueIngestedLogs[msg]
require.False(t, found, "found duplicated log message %q", msg)
Comment on lines +1577 to +1578
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason why we can't use assert.NotContains here?

Suggested change
_, found = uniqueIngestedLogs[msg]
require.False(t, found, "found duplicated log message %q", msg)
require.NotContainsf(uniqueIngestedLogs, msg, "found duplicated log message %q", msg)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not at all, I just didn't knew about this function. TIL.

uniqueIngestedLogs[msg] = struct{}{}
}
actualHits.UniqueHits = len(uniqueIngestedLogs)
Expand Down