Skip to content

Commit

Permalink
[WIP] Test case for truncating the file while Filebeat is running
Browse files Browse the repository at this point in the history
  • Loading branch information
belimawr committed Mar 19, 2024
1 parent 4a8bc16 commit f1a1d69
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 6 deletions.
88 changes: 82 additions & 6 deletions filebeat/tests/integration/filestream_truncation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ filebeat.inputs:
- type: filestream
id: id
enabled: true
prospector.scanner.check_interval: 30s
paths:
- %s
output:
Expand All @@ -46,7 +47,65 @@ logging:
enabled: false
`

func TestFilestreamFileTruncation(t *testing.T) {
func TestFilestreamLiveFileTruncation(t *testing.T) {
filebeat := integration.NewBeat(
t,
"filebeat",
"../../filebeat.test",
)

tempDir := filebeat.TempDir()
logFile := path.Join(tempDir, "log.log")
registryLogFile := filepath.Join(tempDir, "data/registry/filebeat/log.json")
filebeat.WriteConfigFile(fmt.Sprintf(truncationCfg, logFile, tempDir, tempDir))

// genLogCtx, cancel := context.WithCancel(context.Background())
// t.Cleanup(cancel)
// integration.GenerateLogFile(genLogCtx, t, logFile, false)

// 1. Create a log file and let Filebeat harvest all contents
writeLogFile(t, logFile, 200, false)
filebeat.Start()
filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file")
filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file")

// 2. Truncate the file and wait Filebeat to close the file
// time.Sleep(10 * time.Second)
t.Log("sleeping done, truncating file")
// time.Sleep(1 * time.Second)
if err := os.Truncate(logFile, 0); err != nil {
t.Fatalf("could not truncate log file: %s", err)
}

// 3. Ensure Filebeat detected the file truncation
filebeat.WaitForLogs("File was truncated as offset (10000) > size (0)", 20*time.Second, "file was not truncated")
filebeat.WaitForLogs("File was truncated, nothing to read", 20*time.Second, "reader loop did not stop")
filebeat.WaitForLogs("Stopped harvester for file", 20*time.Second, "harvester did not stop")
filebeat.WaitForLogs("Closing reader of filestream", 20*time.Second, "reader did not close")

// 4. Now we need to stop Filebeat before the next scan cycle
filebeat.Stop()

// Assert we offset in the registry
// TODO (Tiago): decide whether Filebeat can exit without
// updating the offset. Currently it does and it can be considered
// one of the root causes for the issue
assertLastOffset(t, registryLogFile, 10_000)

// TODO: ensure data was read

// Open for appending because the file has already been truncated
writeLogFile(t, logFile, 10, true)
// 5. Start Filebeat again.
filebeat.Start()
filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file")
filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file")

assertLastOffset(t, registryLogFile, 500)

}

func TestFilestreamOfflineFileTruncation(t *testing.T) {
filebeat := integration.NewBeat(
t,
"filebeat",
Expand Down Expand Up @@ -87,6 +146,7 @@ func TestFilestreamFileTruncation(t *testing.T) {
}

func assertLastOffset(t *testing.T, path string, offset int) {
t.Helper()
entries := readFilestreamRegistryLog(t, path)
lastEntry := entries[len(entries)-1]
if lastEntry.Offset != offset {
Expand Down Expand Up @@ -117,9 +177,25 @@ func writeLogFile(t *testing.T, path string, count int, append bool) {
} else {
file, err = os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666)

Check failure on line 178 in filebeat/tests/integration/filestream_truncation_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

ineffectual assignment to err (ineffassign)
}
defer assertFileSize(t, path, int64(count*50))
defer file.Close()
defer file.Sync()
defer func() {
assertFileSize(t, path, int64(count*50))
if t.Failed() {
t.Log("Waiting a few seconds")
time.Sleep(time.Second * 2)
t.Log("asserting file size again")
assertFileSize(t, path, int64(count*50))
}
}()
defer func() {
if err := file.Close(); err != nil {
t.Fatalf("could not close file: %s", err)
}
}()
defer func() {
if err := file.Sync(); err != nil {
t.Fatalf("could not sync file: %s", err)
}
}()

now := time.Now().Format(time.RFC3339Nano)
for i := 0; i < count; i++ {
Expand All @@ -133,11 +209,11 @@ func assertFileSize(t *testing.T, path string, size int64) {
t.Helper()
fi, err := os.Stat(path)
if err != nil {
t.Fatalf("could not call Stat on '%s': %s", path, err)
t.Errorf("could not call Stat on '%s': %s", path, err)
}

if fi.Size() != size {
t.Fatalf("[%s] expecting size %d, got: %d", path, size, fi.Size())
t.Errorf("[%s] expecting size %d, got: %d", path, size, fi.Size())
}
}

Expand Down
53 changes: 53 additions & 0 deletions libbeat/tests/integration/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,3 +686,56 @@ func readLastNBytes(filename string, numBytes int64) ([]byte, error) {
}
return io.ReadAll(f)
}

// GenerateLogFile generates a log file by appending the current
// time to it every second.
// TODO (Tiago): Find a better name
func GenerateLogFile(ctx context.Context, t *testing.T, fullPath string, append bool) {
var f *os.File
var err error
if !append {
f, err = os.Create(fullPath)
} else {
f, err = os.OpenFile(fullPath, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666)
}
if err != nil {
t.Fatalf("could not create file '%s': %s", fullPath, err)
}

go func() {
t.Helper()
ticker := time.NewTicker(time.Second)
t.Cleanup(ticker.Stop)

done := make(chan struct{})
t.Cleanup(func() { close(done) })

defer func() {
if err := f.Close(); err != nil {
t.Errorf("could not close log file '%s': %s", fullPath, err)
}
}()

for {
select {
case <-ctx.Done():
return
case <-done:
return
case now := <-ticker.C:
fmt.Println(now.Format(time.RFC3339))

Check failure on line 726 in libbeat/tests/integration/framework.go

View workflow job for this annotation

GitHub Actions / lint (linux)

use of `fmt.Println` forbidden by pattern `fmt.Print.*` (forbidigo)
_, err := fmt.Fprintln(f, now.Format(time.RFC3339))
if err != nil {
// The Go compiler does not allow me to call t.Fatalf from a non-test
// goroutine, so just log it instead
t.Errorf("could not write data to log file '%s': %s", fullPath, err)
return
}
// make sure log lines are synced as quickly as possible
if err := f.Sync(); err != nil {
t.Errorf("could not sync file '%s': %s", fullPath, err)
}
}
}
}()
}

0 comments on commit f1a1d69

Please sign in to comment.