From ccd7b135df70358f8a02393d9bd8b716428b8048 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Fri, 12 Apr 2024 21:49:23 +0200 Subject: [PATCH] Fix clean_inactive for Filestream input (#38632) * Add tests for store GC * Add changelog * Run mage check and update all necessary files * Improve changelog entry * Fix `clean_inactive` for Filestream input The `clean_inactive` parameter was being parsed with the wrong key. It is parsed/used by an anonymous struct on `input-logfile/manager.go`, there it was parsed and used as `CleanTimeout` (`clean_timeout`). This `CleanTimeout` setting has got exactly the same effect as the `clean_inactive` described in our documentation. This commit fixes this bug by renaming `clean_timeout` to `clean_inactive` so the configuration value can have effect. * Add tests for `clean_inactive` and fix documentation * Add changelog * Update test config * Add licence headers and build tags * PR improvements - Rename `CleanTimeout` to `CleanInactive` - Remove commented out code * Fix rebase conflicts * Fix lint errors * Jenkins test this PR --- CHANGELOG.next.asciidoc | 2 + .../input-filestream-file-options.asciidoc | 13 ++- .../internal/input-logfile/clean.go | 2 +- .../internal/input-logfile/manager.go | 6 +- filebeat/input/v2/input-cursor/clean.go | 3 +- filebeat/input/v2/input-cursor/manager.go | 8 +- filebeat/tests/integration/filestream_test.go | 107 ++++++++++++++++++ 7 files changed, 126 insertions(+), 15 deletions(-) create mode 100644 filebeat/tests/integration/filestream_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 1b61e70423c6..f03115b6b05f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -102,6 +102,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix indexing failures by re-enabling event normalisation in netflow input. {issue}38703[38703] {pull}38780[38780] - Fix handling of truncated files in Filestream {issue}38070[38070] {pull}38416[38416] - Fix panic when more than 32767 pipeline clients are active. {issue}38197[38197] {pull}38556[38556] +- Fix filestream's registry GC: registry entries are now removed from the in-memory and disk store when they're older than the set TTL {issue}36761[36761] {pull}38488[38488] + *Heartbeat* diff --git a/filebeat/docs/inputs/input-filestream-file-options.asciidoc b/filebeat/docs/inputs/input-filestream-file-options.asciidoc index a3be665e28e9..47a8c819d9ea 100644 --- a/filebeat/docs/inputs/input-filestream-file-options.asciidoc +++ b/filebeat/docs/inputs/input-filestream-file-options.asciidoc @@ -440,12 +440,12 @@ WARNING: Only use this option if you understand that data loss is a potential side effect. When this option is enabled, {beatname_uc} removes the state of a file after the -specified period of inactivity has elapsed. The state can only be removed if +specified period of inactivity has elapsed. The state can only be removed if the file is already ignored by {beatname_uc} (the file is older than `ignore_older`). The `clean_inactive` setting must be greater than `ignore_older + prospector.scanner.check_interval` to make sure that no states are removed while a file is still being harvested. Otherwise, the setting could result in {beatname_uc} resending -the full content constantly because `clean_inactive` removes state for files +the full content constantly because `clean_inactive` removes state for files that are still detected by {beatname_uc}. If a file is updated or appears again, the file is read from the beginning. @@ -461,10 +461,11 @@ for `clean_inactive` starts at 0 again. TIP: During testing, you might notice that the registry contains state entries that should be removed based on the `clean_inactive` setting. This happens -because {beatname_uc} doesn't remove the entries until it opens the registry -again to read a different file. If you are testing the `clean_inactive` setting, -make sure {beatname_uc} is configured to read from more than one file, or the -file state will never be removed from the registry. +because {beatname_uc} doesn't remove the entries until the registry garbage +collector (GC) runs. Once the TTL for a state expired, there are no active +harvesters for the file and the registry GC runs, then, and only then +the state is removed from memory and an `op: remove` is added to the registry +log file. [float] [id="{beatname_lc}-input-{type}-clean-removed"] diff --git a/filebeat/input/filestream/internal/input-logfile/clean.go b/filebeat/input/filestream/internal/input-logfile/clean.go index c87997a0afa2..36f429f3f861 100644 --- a/filebeat/input/filestream/internal/input-logfile/clean.go +++ b/filebeat/input/filestream/internal/input-logfile/clean.go @@ -36,7 +36,7 @@ type cleaner struct { // run starts a loop that tries to clean entries from the registry. // The cleaner locks the store, such that no new states can be created // during the cleanup phase. Only resources that are finished and whose TTL -// (clean_timeout setting) has expired will be removed. +// (clean_inactive setting) has expired will be removed. // // Resources are considered "Finished" if they do not have a current owner (active input), and // if they have no pending updates that still need to be written to the registry file after associated diff --git a/filebeat/input/filestream/internal/input-logfile/manager.go b/filebeat/input/filestream/internal/input-logfile/manager.go index c95efd1c5a1b..ec25ff60b2f0 100644 --- a/filebeat/input/filestream/internal/input-logfile/manager.go +++ b/filebeat/input/filestream/internal/input-logfile/manager.go @@ -165,9 +165,9 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) { settings := struct { ID string `config:"id"` - CleanTimeout time.Duration `config:"clean_timeout"` + CleanInactive time.Duration `config:"clean_inactive"` HarvesterLimit uint64 `config:"harvester_limit"` - }{CleanTimeout: cim.DefaultCleanTimeout} + }{CleanInactive: cim.DefaultCleanTimeout} if err := config.Unpack(&settings); err != nil { return nil, err } @@ -230,7 +230,7 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) { prospector: prospector, harvester: harvester, sourceIdentifier: sourceIdentifier, - cleanTimeout: settings.CleanTimeout, + cleanTimeout: settings.CleanInactive, harvesterLimit: settings.HarvesterLimit, }, nil } diff --git a/filebeat/input/v2/input-cursor/clean.go b/filebeat/input/v2/input-cursor/clean.go index 92124f4e3fb9..179110ea6248 100644 --- a/filebeat/input/v2/input-cursor/clean.go +++ b/filebeat/input/v2/input-cursor/clean.go @@ -34,7 +34,7 @@ type cleaner struct { // run starts a loop that tries to clean entries from the registry. // The cleaner locks the store, such that no new states can be created // during the cleanup phase. Only resources that are finished and whos TTL -// (clean_timeout setting) has expired will be removed. +// (clean_inactive setting) has expired will be removed. // // Resources are considered "Finished" if they do not have a current owner (active input), and // if they have no pending updates that still need to be written to the registry file after associated @@ -44,6 +44,7 @@ type cleaner struct { // once the last event has been ACKed. func (c *cleaner) run(canceler unison.Canceler, store *store, interval time.Duration) { started := time.Now() + //nolint: errcheck // gcStore does not return an error timed.Periodic(canceler, interval, func() error { gcStore(c.log, started, store) return nil diff --git a/filebeat/input/v2/input-cursor/manager.go b/filebeat/input/v2/input-cursor/manager.go index bb0b526a7bfc..74dd8a3bd342 100644 --- a/filebeat/input/v2/input-cursor/manager.go +++ b/filebeat/input/v2/input-cursor/manager.go @@ -153,9 +153,9 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) { } settings := struct { - ID string `config:"id"` - CleanTimeout time.Duration `config:"clean_timeout"` - }{ID: "", CleanTimeout: cim.DefaultCleanTimeout} + ID string `config:"id"` + CleanInactive time.Duration `config:"clean_inactive"` + }{ID: "", CleanInactive: cim.DefaultCleanTimeout} if err := config.Unpack(&settings); err != nil { return nil, err } @@ -176,7 +176,7 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) { userID: settings.ID, sources: sources, input: inp, - cleanTimeout: settings.CleanTimeout, + cleanTimeout: settings.CleanInactive, }, nil } diff --git a/filebeat/tests/integration/filestream_test.go b/filebeat/tests/integration/filestream_test.go new file mode 100644 index 000000000000..3ddb04a2c20c --- /dev/null +++ b/filebeat/tests/integration/filestream_test.go @@ -0,0 +1,107 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build integration + +package integration + +import ( + "fmt" + "path" + "path/filepath" + "testing" + "time" + + "github.com/elastic/beats/v7/libbeat/tests/integration" +) + +var filestreamCleanInactiveCfg = ` +filebeat.inputs: + - type: filestream + id: "test-clean-inactive" + paths: + - %s + + clean_inactive: 3s + ignore_older: 2s + close.on_state_change.inactive: 1s + prospector.scanner.check_interval: 1s + +filebeat.registry: + cleanup_interval: 5s + flush: 1s + +queue.mem: + events: 32 + flush.min_events: 8 + flush.timeout: 0.1s + +path.home: %s + +output.file: + path: ${path.home} + filename: "output-file" + rotate_every_kb: 10000 + +logging: + level: debug + selectors: + - input + - input.filestream + metrics: + enabled: false +` + +func TestFilestreamCleanInactive(t *testing.T) { + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + tempDir := filebeat.TempDir() + + // 1. Generate the log file path, but do not write data to it + logFilePath := path.Join(tempDir, "log.log") + + // 2. Write configuration file ans start Filebeat + filebeat.WriteConfigFile(fmt.Sprintf(filestreamCleanInactiveCfg, logFilePath, tempDir)) + filebeat.Start() + + // 3. Create the log file + integration.GenerateLogFile(t, logFilePath, 10, false) + + // 4. Wait for Filebeat to start scanning for files + // + filebeat.WaitForLogs( + fmt.Sprintf("A new file %s has been found", logFilePath), + 10*time.Second, + "Filebeat did not start looking for files to ingest") + + filebeat.WaitForLogs( + fmt.Sprintf("Reader was closed. Closing. Path='%s", logFilePath), + 10*time.Second, "Filebeat did not close the file") + + // 5. Now that the reader has been closed, nothing is holding the state + // of the file, so once the TTL of its state expires and the store GC runs, + // it will be removed from the registry. + // Wait for the log message stating 1 entry has been removed from the registry + filebeat.WaitForLogs("1 entries removed", 20*time.Second, "entry was not removed from registtry") + + // 6. Then assess it has been removed in the registry + registryFile := filepath.Join(filebeat.TempDir(), "data", "registry", "filebeat", "log.json") + filebeat.WaitFileContains(registryFile, `"op":"remove"`, time.Second) +}