diff --git a/filebeat/input/filestream/input_test.go b/filebeat/input/filestream/input_test.go index a257745124b..55b8d2e7fc6 100644 --- a/filebeat/input/filestream/input_test.go +++ b/filebeat/input/filestream/input_test.go @@ -36,18 +36,70 @@ import ( "github.com/elastic/elastic-agent-libs/logp" ) +func BenchmarkFilestream(b *testing.B) { + logp.TestingSetup(logp.ToDiscardOutput()) + lineCount := 10000 + filename := generateFile(b, lineCount) + + b.ResetTimer() + + b.Run("filestream default throughput", func(b *testing.B) { + cfg := ` +type: filestream +prospector.scanner.check_interval: 1s +paths: + - ` + filename + ` +` + for i := 0; i < b.N; i++ { + runFilestreamBenchmark(b, fmt.Sprintf("default-benchmark-%d", i), cfg, lineCount) + } + }) + + b.Run("filestream fingerprint throughput", func(b *testing.B) { + cfg := ` +type: filestream +prospector.scanner: + fingerprint.enabled: true + check_interval: 1s +file_identity.fingerprint: ~ +paths: + - ` + filename + ` +` + for i := 0; i < b.N; i++ { + runFilestreamBenchmark(b, fmt.Sprintf("fp-benchmark-%d", i), cfg, lineCount) + } + }) +} + // runFilestreamBenchmark runs the entire filestream input with the in-memory registry and the test pipeline. // `testID` must be unique for each test run // `cfg` must be a valid YAML string containing valid filestream configuration // `expEventCount` is an expected amount of produced events -func runFilestreamBenchmark(t testing.TB, testID string, cfg string, expEventCount int) { +func runFilestreamBenchmark(b *testing.B, testID string, cfg string, expEventCount int) { + // we don't include initialization in the benchmark time + b.StopTimer() + runner := createFilestreamTestRunner(b, testID, cfg, expEventCount) + // this is where the benchmark actually starts + b.StartTimer() + events := runner(b) + require.Len(b, events, expEventCount) +} + +// createFilestreamTestRunner can be used for both benchmarks and regular tests to run a filestream input +// with the given configuration and event limit. +// `testID` must be unique for each test run +// `cfg` must be a valid YAML string containing valid filestream configuration +// `eventLimit` is an amount of produced events after which the filestream will shutdown +// +// returns a runner function that returns produced events. +func createFilestreamTestRunner(b testing.TB, testID string, cfg string, eventLimit int) func(t testing.TB) []beat.Event { logger := logp.L() c, err := conf.NewConfigWithYAML([]byte(cfg), cfg) - require.NoError(t, err) + require.NoError(b, err) - p := Plugin(logger, createTestStore(t)) + p := Plugin(logger, createTestStore(b)) input, err := p.Manager.Create(c) - require.NoError(t, err) + require.NoError(b, err) ctx, cancel := context.WithCancel(context.Background()) context := v2.Context{ @@ -56,17 +108,22 @@ func runFilestreamBenchmark(t testing.TB, testID string, cfg string, expEventCou Cancelation: ctx, } - connector, eventsDone := newTestPipeline(expEventCount) + events := make([]beat.Event, 0, eventLimit) + connector, eventsDone := newTestPipeline(eventLimit, &events) done := make(chan struct{}) - go func() { - err := input.Run(context, connector) - assert.NoError(t, err) - done <- struct{}{} - }() - <-eventsDone - cancel() - <-done // for more stable results we should wait until the full shutdown + return func(t testing.TB) []beat.Event { + go func() { + err := input.Run(context, connector) + assert.NoError(b, err) + close(done) + }() + + <-eventsDone + cancel() + <-done // for more stable results we should wait until the full shutdown + return events + } } func generateFile(t testing.TB, lineCount int) string { @@ -84,39 +141,6 @@ func generateFile(t testing.TB, lineCount int) string { return filename } -func BenchmarkFilestream(b *testing.B) { - logp.TestingSetup(logp.ToDiscardOutput()) - lineCount := 10000 - filename := generateFile(b, lineCount) - - b.Run("filestream default throughput", func(b *testing.B) { - cfg := ` -type: filestream -prospector.scanner.check_interval: 1s -paths: - - ` + filename + ` -` - for i := 0; i < b.N; i++ { - runFilestreamBenchmark(b, fmt.Sprintf("default-benchmark-%d", i), cfg, lineCount) - } - }) - - b.Run("filestream fingerprint throughput", func(b *testing.B) { - cfg := ` -type: filestream -prospector.scanner: - fingerprint.enabled: true - check_interval: 1s -file_identity.fingerprint: ~ -paths: - - ` + filename + ` -` - for i := 0; i < b.N; i++ { - runFilestreamBenchmark(b, fmt.Sprintf("fp-benchmark-%d", i), cfg, lineCount) - } - }) -} - func createTestStore(t testing.TB) loginp.StateStore { return &testStore{registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend())} } @@ -137,14 +161,15 @@ func (s *testStore) CleanupInterval() time.Duration { return time.Second } -func newTestPipeline(eventLimit int) (pc beat.PipelineConnector, done <-chan struct{}) { +func newTestPipeline(eventLimit int, out *[]beat.Event) (pc beat.PipelineConnector, done <-chan struct{}) { ch := make(chan struct{}) - return &testPipeline{limit: eventLimit, done: ch}, ch + return &testPipeline{limit: eventLimit, done: ch, out: out}, ch } type testPipeline struct { done chan struct{} limit int + out *[]beat.Event } func (p *testPipeline) ConnectWith(beat.ClientConfig) (beat.Client, error) { @@ -160,8 +185,12 @@ type testClient struct { func (c *testClient) Publish(event beat.Event) { c.testPipeline.limit-- + if c.testPipeline.limit < 0 { + return + } + *c.testPipeline.out = append(*c.testPipeline.out, event) if c.testPipeline.limit == 0 { - c.testPipeline.done <- struct{}{} + close(c.testPipeline.done) } }