diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index 1fd854aeeb..c7081f2758 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -1352,15 +1352,6 @@ func assertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg assert.Failf(t, msg, "ignored field %q does not exist in either map, please remove it from the ignored fields", f) } - // If the ignored field exists and is equal in both maps then it shouldn't be ignored - if hasKeyM1 && hasKeyM2 { - valM1, _ := flatM1.GetValue(f) - valM2, _ := flatM2.GetValue(f) - if valM1 == valM2 { - assert.Failf(t, msg, "ignored field %q is equal in both maps, please remove it from the ignored fields", f) - } - } - flatM1.Delete(f) flatM2.Delete(f) } @@ -1374,7 +1365,7 @@ func TestFBOtelRestartE2E(t *testing.T) { // It starts a filebeat receiver, waits for some logs and then stops it. // It then restarts the collector for the remaining of the test. // At the end it asserts that the unique number of logs in ES is equal to the number of - // lines in the input file. It is likely that there are duplicates due to the restart. + // lines in the input file. info := define.Require(t, define.Requirements{ Group: Default, Local: true, @@ -1404,7 +1395,8 @@ func TestFBOtelRestartE2E(t *testing.T) { esApiKey, err := createESApiKey(info.ESClient) require.NoError(t, err, "error creating API key") require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) - index := "logs-integration-default" + // Use a unique index to avoid conflicts with other parallel runners + index := strings.ToLower("logs-generic-default-" + randStr(8)) otelConfigTemplate := `receivers: filebeatreceiver: filebeat: @@ -1442,6 +1434,8 @@ exporters: flush_timeout: 1s mapping: mode: bodymap + logs_dynamic_id: + enabled: true service: pipelines: logs: @@ -1492,14 +1486,14 @@ service: } _, err = inputFile.Write([]byte(fmt.Sprintf(`{"id": "%d", "message": "%d"}`, i, i))) - require.NoErrorf(t, err, "failed to write line %d to temp file", i) + assert.NoErrorf(t, err, "failed to write line %d to temp file", i) _, err = inputFile.Write([]byte("\n")) - require.NoErrorf(t, err, "failed to write newline to temp file") + assert.NoErrorf(t, err, "failed to write newline to temp file") inputLinesCounter.Add(1) time.Sleep(100 * time.Millisecond) } err = inputFile.Close() - require.NoError(t, err, "failed to close input file") + assert.NoError(t, err, "failed to close input file") }() t.Cleanup(func() { @@ -1519,7 +1513,7 @@ service: go func() { err = fixture.RunOtelWithClient(fCtx) cancel() - require.True(t, errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled), "unexpected error: %v", err) + assert.True(t, err == nil || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled), "unexpected error: %v", err) close(stoppedCh) }() @@ -1580,9 +1574,8 @@ service: require.True(t, found, "expected message field in document %q", hit.Source) msg, ok := message.(string) require.True(t, ok, "expected message field to be a string, got %T", message) - if _, found := uniqueIngestedLogs[msg]; found { - t.Logf("log line %q was ingested more than once", message) - } + _, found = uniqueIngestedLogs[msg] + require.False(t, found, "found duplicated log message %q", msg) uniqueIngestedLogs[msg] = struct{}{} } actualHits.UniqueHits = len(uniqueIngestedLogs)