diff --git a/filebeat/tests/integration/filestream_truncation_test.go b/filebeat/tests/integration/filestream_truncation_test.go index d8d8132fe8f4..d5de27a58763 100644 --- a/filebeat/tests/integration/filestream_truncation_test.go +++ b/filebeat/tests/integration/filestream_truncation_test.go @@ -20,6 +20,7 @@ filebeat.inputs: - type: filestream id: id enabled: true + prospector.scanner.check_interval: 30s paths: - %s output: @@ -46,7 +47,65 @@ logging: enabled: false ` -func TestFilestreamFileTruncation(t *testing.T) { +func TestFilestreamLiveFileTruncation(t *testing.T) { + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + + tempDir := filebeat.TempDir() + logFile := path.Join(tempDir, "log.log") + registryLogFile := filepath.Join(tempDir, "data/registry/filebeat/log.json") + filebeat.WriteConfigFile(fmt.Sprintf(truncationCfg, logFile, tempDir, tempDir)) + + // genLogCtx, cancel := context.WithCancel(context.Background()) + // t.Cleanup(cancel) + // integration.GenerateLogFile(genLogCtx, t, logFile, false) + + // 1. Create a log file and let Filebeat harvest all contents + writeLogFile(t, logFile, 200, false) + filebeat.Start() + filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file") + filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file") + + // 2. Truncate the file and wait Filebeat to close the file + // time.Sleep(10 * time.Second) + t.Log("sleeping done, truncating file") + // time.Sleep(1 * time.Second) + if err := os.Truncate(logFile, 0); err != nil { + t.Fatalf("could not truncate log file: %s", err) + } + + // 3. Ensure Filebeat detected the file truncation + filebeat.WaitForLogs("File was truncated as offset (10000) > size (0)", 20*time.Second, "file was not truncated") + filebeat.WaitForLogs("File was truncated, nothing to read", 20*time.Second, "reader loop did not stop") + filebeat.WaitForLogs("Stopped harvester for file", 20*time.Second, "harvester did not stop") + filebeat.WaitForLogs("Closing reader of filestream", 20*time.Second, "reader did not close") + + // 4. Now we need to stop Filebeat before the next scan cycle + filebeat.Stop() + + // Assert we offset in the registry + // TODO (Tiago): decide whether Filebeat can exit without + // updating the offset. Currently it does and it can be considered + // one of the root causes for the issue + assertLastOffset(t, registryLogFile, 10_000) + + // TODO: ensure data was read + + // Open for appending because the file has already been truncated + writeLogFile(t, logFile, 10, true) + // 5. Start Filebeat again. + filebeat.Start() + filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file") + filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file") + + assertLastOffset(t, registryLogFile, 500) + +} + +func TestFilestreamOfflineFileTruncation(t *testing.T) { filebeat := integration.NewBeat( t, "filebeat", @@ -87,6 +146,7 @@ func TestFilestreamFileTruncation(t *testing.T) { } func assertLastOffset(t *testing.T, path string, offset int) { + t.Helper() entries := readFilestreamRegistryLog(t, path) lastEntry := entries[len(entries)-1] if lastEntry.Offset != offset { @@ -117,9 +177,25 @@ func writeLogFile(t *testing.T, path string, count int, append bool) { } else { file, err = os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666) } - defer assertFileSize(t, path, int64(count*50)) - defer file.Close() - defer file.Sync() + defer func() { + assertFileSize(t, path, int64(count*50)) + if t.Failed() { + t.Log("Waiting a few seconds") + time.Sleep(time.Second * 2) + t.Log("asserting file size again") + assertFileSize(t, path, int64(count*50)) + } + }() + defer func() { + if err := file.Close(); err != nil { + t.Fatalf("could not close file: %s", err) + } + }() + defer func() { + if err := file.Sync(); err != nil { + t.Fatalf("could not sync file: %s", err) + } + }() now := time.Now().Format(time.RFC3339Nano) for i := 0; i < count; i++ { @@ -133,11 +209,11 @@ func assertFileSize(t *testing.T, path string, size int64) { t.Helper() fi, err := os.Stat(path) if err != nil { - t.Fatalf("could not call Stat on '%s': %s", path, err) + t.Errorf("could not call Stat on '%s': %s", path, err) } if fi.Size() != size { - t.Fatalf("[%s] expecting size %d, got: %d", path, size, fi.Size()) + t.Errorf("[%s] expecting size %d, got: %d", path, size, fi.Size()) } } diff --git a/libbeat/tests/integration/framework.go b/libbeat/tests/integration/framework.go index ee36815dbfdb..83d5d9d9ba71 100644 --- a/libbeat/tests/integration/framework.go +++ b/libbeat/tests/integration/framework.go @@ -686,3 +686,56 @@ func readLastNBytes(filename string, numBytes int64) ([]byte, error) { } return io.ReadAll(f) } + +// GenerateLogFile generates a log file by appending the current +// time to it every second. +// TODO (Tiago): Find a better name +func GenerateLogFile(ctx context.Context, t *testing.T, fullPath string, append bool) { + var f *os.File + var err error + if !append { + f, err = os.Create(fullPath) + } else { + f, err = os.OpenFile(fullPath, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666) + } + if err != nil { + t.Fatalf("could not create file '%s': %s", fullPath, err) + } + + go func() { + t.Helper() + ticker := time.NewTicker(time.Second) + t.Cleanup(ticker.Stop) + + done := make(chan struct{}) + t.Cleanup(func() { close(done) }) + + defer func() { + if err := f.Close(); err != nil { + t.Errorf("could not close log file '%s': %s", fullPath, err) + } + }() + + for { + select { + case <-ctx.Done(): + return + case <-done: + return + case now := <-ticker.C: + fmt.Println(now.Format(time.RFC3339)) + _, err := fmt.Fprintln(f, now.Format(time.RFC3339)) + if err != nil { + // The Go compiler does not allow me to call t.Fatalf from a non-test + // goroutine, so just log it instead + t.Errorf("could not write data to log file '%s': %s", fullPath, err) + return + } + // make sure log lines are synced as quickly as possible + if err := f.Sync(); err != nil { + t.Errorf("could not sync file '%s': %s", fullPath, err) + } + } + } + }() +}