diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 99e8981bb9d4..72ba95785ab2 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -258,13 +258,15 @@ func TestInputRunSQSOnLocalstack(t *testing.T) { t.Fatal(err) } + assert.Eventually(t, func() bool { + return s3Input.metrics.s3ObjectsRequestedTotal.Get() == 8 + }, 5*time.Second, 100*time.Millisecond) assert.EqualValues(t, s3Input.metrics.sqsMessagesReceivedTotal.Get(), 8) // S3 could batch notifications. assert.EqualValues(t, s3Input.metrics.sqsMessagesInflight.Get(), 0) assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) - assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 8) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), uint64(0x13)) assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0) assert.EqualValues(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end