From 9b9198000a47a989b446b5ffa422a766cad19bad Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 23 Jan 2025 16:05:04 -0500 Subject: [PATCH 1/8] Add filesource provider. --- internal/pkg/agent/cmd/include.go | 1 + .../providers/filesource/filesource.go | 177 ++++++++++++++++++ .../providers/filesource/filesource_test.go | 157 ++++++++++++++++ 3 files changed, 335 insertions(+) create mode 100644 internal/pkg/composable/providers/filesource/filesource.go create mode 100644 internal/pkg/composable/providers/filesource/filesource_test.go diff --git a/internal/pkg/agent/cmd/include.go b/internal/pkg/agent/cmd/include.go index 6750727626a..ab2ce160682 100644 --- a/internal/pkg/agent/cmd/include.go +++ b/internal/pkg/agent/cmd/include.go @@ -9,6 +9,7 @@ import ( _ "github.com/elastic/elastic-agent/internal/pkg/composable/providers/agent" _ "github.com/elastic/elastic-agent/internal/pkg/composable/providers/docker" _ "github.com/elastic/elastic-agent/internal/pkg/composable/providers/env" + _ "github.com/elastic/elastic-agent/internal/pkg/composable/providers/filesource" _ "github.com/elastic/elastic-agent/internal/pkg/composable/providers/host" _ "github.com/elastic/elastic-agent/internal/pkg/composable/providers/kubernetes" _ "github.com/elastic/elastic-agent/internal/pkg/composable/providers/kubernetesleaderelection" diff --git a/internal/pkg/composable/providers/filesource/filesource.go b/internal/pkg/composable/providers/filesource/filesource.go new file mode 100644 index 00000000000..b058590acd2 --- /dev/null +++ b/internal/pkg/composable/providers/filesource/filesource.go @@ -0,0 +1,177 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package filesource + +import ( + "context" + "fmt" + "os" + "path/filepath" + "slices" + + "github.com/fsnotify/fsnotify" + + "github.com/elastic/elastic-agent/internal/pkg/composable" + "github.com/elastic/elastic-agent/internal/pkg/config" + corecomp "github.com/elastic/elastic-agent/internal/pkg/core/composable" + "github.com/elastic/elastic-agent/pkg/core/logger" +) + +func init() { + // filesource provider reads and watches for changes on files that are defined in the provider configuration. + // + // To be notified when a file is change the provider will watch the parent directory of the file so if the file + // is replaced that it will read the new contents. If a file doesn't exist or the provider is unable to read + // the file then it will report the value as an empty string. + // + // If the provided path happens to be a directory then it just report the value as an empty string. + composable.Providers.MustAddContextProvider("filesource", ContextProviderBuilder) +} + +type fileSourceConfig struct { + Type string `config:"type"` + Path string `config:"path"` +} + +type contextProvider struct { + logger *logger.Logger + + sources map[string]fileSourceConfig +} + +// Run runs the filesource context provider. +func (c *contextProvider) Run(ctx context.Context, comm corecomp.ContextProviderComm) error { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return fmt.Errorf("failed to create watcher: %w", err) + } + defer watcher.Close() + + // invert the mapping to map paths to source names + inverted := make(map[string][]string, len(c.sources)) + for sourceName, sourceCfg := range c.sources { + sources, ok := inverted[sourceCfg.Path] + if !ok { + sources = []string{sourceName} + } else { + sources = append(sources, sourceName) + } + inverted[sourceCfg.Path] = sources + } + + // determine the paths to watch (watch is performed on the directories that contain the file) + // + // you cannot register the same directory multiple times so this ensures its only registered once + paths := make([]string, 0, len(c.sources)) + for _, cfg := range c.sources { + parent := filepath.Dir(cfg.Path) + if !slices.Contains(paths, parent) { + paths = append(paths, parent) + } + } + for _, path := range paths { + err = watcher.Add(path) + if err != nil { + return fmt.Errorf("failed to watch path %q: %w", path, err) + } + } + + // read the initial values after the watch has started + // this ensures that if the value changed between this code and the loop below + // the updated file changes will not be missed + current := make(map[string]interface{}, len(c.sources)) + for path, sources := range inverted { + value := c.readContents(path) + for _, source := range sources { + current[source] = value + } + } + err = comm.Set(current) + if err != nil { + return fmt.Errorf("failed to set current context: %w", err) + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case err, ok := <-watcher.Errors: + if !ok { + // watcher was closed + return nil + } + c.logger.Errorf("file watcher errored: %s", err) + case e, ok := <-watcher.Events: + if !ok { // Channel was closed (i.e. Watcher.Close() was called). + // watcher was closed + return nil + } + + path := filepath.Clean(e.Name) + sources, ok := inverted[path] + if !ok { + // watching the directory, it can contain files that we are not watching + // ignore these events unless we are actively watching this file + continue + } + + switch { + case e.Op&(fsnotify.Create|fsnotify.Write|fsnotify.Remove) != 0: + // file was created, updated, or deleted (update the value) + value := c.readContents(path) + for _, source := range sources { + current[source] = value + } + err = comm.Set(current) + if err != nil { + return fmt.Errorf("failed to set current context from notify event: %w", err) + } + } + } + } +} + +func (c *contextProvider) readContents(path string) string { + data, err := os.ReadFile(path) + if err != nil { + c.logger.Errorf("failed to read file %q: %s", path, err) + return "" + } + return string(data) +} + +// ContextProviderBuilder builds the context provider. +func ContextProviderBuilder(log *logger.Logger, c *config.Config, _ bool) (corecomp.ContextProvider, error) { + p := &contextProvider{ + logger: log, + } + if c != nil { + err := c.UnpackTo(&p.sources) + if err != nil { + return nil, fmt.Errorf("failed to unpack config: %w", err) + } + } + for sourceName, sourceCfg := range p.sources { + if sourceCfg.Type != "" && sourceCfg.Type != "raw" { + return nil, fmt.Errorf("%q defined an unsupported type %q", sourceName, sourceCfg.Type) + } + if sourceCfg.Path == "" { + return nil, fmt.Errorf("%q is missing a defined path", sourceName) + } + // only use an absolute path (convert from relative) + if !filepath.IsAbs(sourceCfg.Path) { + path, err := filepath.Abs(sourceCfg.Path) + if err != nil { + return nil, fmt.Errorf("%q failed to determine absolute path for %q: %w", sourceName, sourceCfg.Path, err) + } + sourceCfg.Path = path + } + path := filepath.Dir(sourceCfg.Path) + if path == "" || path == "." { + return nil, fmt.Errorf("%q has a path %q that is invalid", sourceName, sourceCfg.Path) + } + } + return p, nil +} diff --git a/internal/pkg/composable/providers/filesource/filesource_test.go b/internal/pkg/composable/providers/filesource/filesource_test.go new file mode 100644 index 00000000000..b9d5babda99 --- /dev/null +++ b/internal/pkg/composable/providers/filesource/filesource_test.go @@ -0,0 +1,157 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package filesource + +import ( + "context" + "errors" + "os" + "path/filepath" + "testing" + "time" + + ctesting "github.com/elastic/elastic-agent/internal/pkg/composable/testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-agent/internal/pkg/composable" + "github.com/elastic/elastic-agent/internal/pkg/config" + "github.com/elastic/elastic-agent/pkg/core/logger" +) + +func TestContextProvider_Config(t *testing.T) { + scenarios := []struct { + Name string + Config *config.Config + Err error + }{ + { + Name: "no path", + Config: config.MustNewConfigFrom(map[string]interface{}{ + "one": map[string]interface{}{}, + }), + Err: errors.New(`"one" is missing a defined path`), + }, + { + Name: "invalid type", + Config: config.MustNewConfigFrom(map[string]interface{}{ + "one": map[string]interface{}{ + "type": "json", + "path": "/etc/agent/content", + }, + }), + Err: errors.New(`"one" defined an unsupported type "json"`), + }, + // other errors in the config validation are hard to validate in a test + // they are just very defensive + { + Name: "valid path", + Config: config.MustNewConfigFrom(map[string]interface{}{ + "one": map[string]interface{}{ + "path": "/etc/agent/content1", + }, + "two": map[string]interface{}{ + "path": "/etc/agent/content2", + }, + }), + }, + } + for _, s := range scenarios { + t.Run(s.Name, func(t *testing.T) { + log, err := logger.New("filesource_test", false) + require.NoError(t, err) + + builder, _ := composable.Providers.GetContextProvider("filesource") + _, err = builder(log, s.Config, true) + if s.Err != nil { + require.Equal(t, s.Err, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestContextProvider(t *testing.T) { + const testTimeout = 1 * time.Second + + tmpDir := t.TempDir() + value1 := "value1" + file1 := filepath.Join(tmpDir, "value1_path") + require.NoError(t, os.WriteFile(file1, []byte(value1), 0o644)) + value2 := "value2" + file2 := filepath.Join(tmpDir, "value2_path") + require.NoError(t, os.WriteFile(file2, []byte(value2), 0o644)) + + log, err := logger.New("filesource_test", false) + require.NoError(t, err) + + c, err := config.NewConfigFrom(map[string]interface{}{ + "one": map[string]interface{}{ + "path": file1, + }, + "two": map[string]interface{}{ + "path": file2, + }, + }) + require.NoError(t, err) + builder, _ := composable.Providers.GetContextProvider("filesource") + provider, err := builder(log, c, true) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + comm := ctesting.NewContextComm(ctx) + setChan := make(chan map[string]interface{}) + comm.CallOnSet(func(value map[string]interface{}) { + // Forward Set's input to the test channel + setChan <- value + }) + + go func() { + _ = provider.Run(ctx, comm) + }() + + // wait for it to be called once + var current map[string]interface{} + select { + case current = <-setChan: + case <-time.After(testTimeout): + require.FailNow(t, "timeout waiting for provider to call Set") + } + + require.Equal(t, value1, current["one"]) + require.Equal(t, value2, current["two"]) + + // update the value in one + value1 = "update1" + require.NoError(t, os.WriteFile(file1, []byte(value1), 0o644)) + + // wait for file1 to be updated + var oneUpdated map[string]interface{} + select { + case oneUpdated = <-setChan: + case <-time.After(testTimeout): + require.FailNow(t, "timeout waiting for provider to call Set") + } + + require.Equal(t, value1, oneUpdated["one"]) + require.Equal(t, value2, oneUpdated["two"]) + + // update the value in two + value2 = "update2" + require.NoError(t, os.WriteFile(file2, []byte(value2), 0o644)) + + // wait for file2 to be updated + var twoUpdated map[string]interface{} + select { + case twoUpdated = <-setChan: + case <-time.After(testTimeout): + require.FailNow(t, "timeout waiting for provider to call Set") + } + + require.Equal(t, value1, twoUpdated["one"]) + require.Equal(t, value2, twoUpdated["two"]) +} From b57448b0cd13e9c2ab91b7759aee834bafa08f8f Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 23 Jan 2025 16:12:36 -0500 Subject: [PATCH 2/8] Add changelog. --- .../1737666699-Add-filesource-provider.yaml | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 changelog/fragments/1737666699-Add-filesource-provider.yaml diff --git a/changelog/fragments/1737666699-Add-filesource-provider.yaml b/changelog/fragments/1737666699-Add-filesource-provider.yaml new file mode 100644 index 00000000000..6ee40c0e8db --- /dev/null +++ b/changelog/fragments/1737666699-Add-filesource-provider.yaml @@ -0,0 +1,34 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: Add filesource provider + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +description: | + filesource provider watches for changes of the files and updates the values of the variables + when the content of the file changes. + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/6587 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/6362 From 8e429c6dc7507d0212e3ec6a17a61e370a4eaa10 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 23 Jan 2025 20:16:27 -0500 Subject: [PATCH 3/8] Add more time to tests for CI runners. --- internal/pkg/composable/providers/filesource/filesource_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/composable/providers/filesource/filesource_test.go b/internal/pkg/composable/providers/filesource/filesource_test.go index b9d5babda99..8f243250ec5 100644 --- a/internal/pkg/composable/providers/filesource/filesource_test.go +++ b/internal/pkg/composable/providers/filesource/filesource_test.go @@ -75,7 +75,7 @@ func TestContextProvider_Config(t *testing.T) { } func TestContextProvider(t *testing.T) { - const testTimeout = 1 * time.Second + const testTimeout = 10 * time.Second tmpDir := t.TempDir() value1 := "value1" From 425c4dc945038c30b64d768768fd004e231ef0a5 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 23 Jan 2025 20:21:15 -0500 Subject: [PATCH 4/8] Fix unit test. --- .../providers/filesource/filesource_test.go | 42 +++++++++++-------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/internal/pkg/composable/providers/filesource/filesource_test.go b/internal/pkg/composable/providers/filesource/filesource_test.go index 8f243250ec5..ab57560b157 100644 --- a/internal/pkg/composable/providers/filesource/filesource_test.go +++ b/internal/pkg/composable/providers/filesource/filesource_test.go @@ -75,7 +75,7 @@ func TestContextProvider_Config(t *testing.T) { } func TestContextProvider(t *testing.T) { - const testTimeout = 10 * time.Second + const testTimeout = 3 * time.Second tmpDir := t.TempDir() value1 := "value1" @@ -130,28 +130,34 @@ func TestContextProvider(t *testing.T) { require.NoError(t, os.WriteFile(file1, []byte(value1), 0o644)) // wait for file1 to be updated - var oneUpdated map[string]interface{} - select { - case oneUpdated = <-setChan: - case <-time.After(testTimeout): - require.FailNow(t, "timeout waiting for provider to call Set") + for { + var oneUpdated map[string]interface{} + select { + case oneUpdated = <-setChan: + case <-time.After(testTimeout): + require.FailNow(t, "timeout waiting for provider to call Set") + } + + if value1 == oneUpdated["one"] && value2 == oneUpdated["two"] { + break + } } - require.Equal(t, value1, oneUpdated["one"]) - require.Equal(t, value2, oneUpdated["two"]) - // update the value in two value2 = "update2" require.NoError(t, os.WriteFile(file2, []byte(value2), 0o644)) - // wait for file2 to be updated - var twoUpdated map[string]interface{} - select { - case twoUpdated = <-setChan: - case <-time.After(testTimeout): - require.FailNow(t, "timeout waiting for provider to call Set") + for { + // wait for file2 to be updated + var twoUpdated map[string]interface{} + select { + case twoUpdated = <-setChan: + case <-time.After(testTimeout): + require.FailNow(t, "timeout waiting for provider to call Set") + } + + if value1 == twoUpdated["one"] && value2 == twoUpdated["two"] { + break + } } - - require.Equal(t, value1, twoUpdated["one"]) - require.Equal(t, value2, twoUpdated["two"]) } From 8a9b06d648bed8bfcb0f5065382ac8715e8522c4 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Fri, 24 Jan 2025 08:59:35 -0500 Subject: [PATCH 5/8] Improvements from code review. --- .../providers/filesource/filesource.go | 150 +++++++++++++----- .../providers/filesource/filesource_test.go | 56 ++++--- 2 files changed, 150 insertions(+), 56 deletions(-) diff --git a/internal/pkg/composable/providers/filesource/filesource.go b/internal/pkg/composable/providers/filesource/filesource.go index b058590acd2..832d2c8319f 100644 --- a/internal/pkg/composable/providers/filesource/filesource.go +++ b/internal/pkg/composable/providers/filesource/filesource.go @@ -7,9 +7,12 @@ package filesource import ( "context" "fmt" + "io" "os" "path/filepath" + "runtime" "slices" + "strings" "github.com/fsnotify/fsnotify" @@ -30,15 +33,25 @@ func init() { composable.Providers.MustAddContextProvider("filesource", ContextProviderBuilder) } +const ( + DefaultMaxSize = 4 * 1024 // 4KiB +) + type fileSourceConfig struct { Type string `config:"type"` Path string `config:"path"` } +type providerConfig struct { + Enabled bool `config:"enabled"` // handled by composable manager (but here to show that it is part of the config) + Sources map[string]*fileSourceConfig `config:"sources"` + MaxSize uint `config:"max_size"` +} + type contextProvider struct { logger *logger.Logger - sources map[string]fileSourceConfig + cfg providerConfig } // Run runs the filesource context provider. @@ -50,8 +63,8 @@ func (c *contextProvider) Run(ctx context.Context, comm corecomp.ContextProvider defer watcher.Close() // invert the mapping to map paths to source names - inverted := make(map[string][]string, len(c.sources)) - for sourceName, sourceCfg := range c.sources { + inverted := make(map[string][]string, len(c.cfg.Sources)) + for sourceName, sourceCfg := range c.cfg.Sources { sources, ok := inverted[sourceCfg.Path] if !ok { sources = []string{sourceName} @@ -64,8 +77,8 @@ func (c *contextProvider) Run(ctx context.Context, comm corecomp.ContextProvider // determine the paths to watch (watch is performed on the directories that contain the file) // // you cannot register the same directory multiple times so this ensures its only registered once - paths := make([]string, 0, len(c.sources)) - for _, cfg := range c.sources { + paths := make([]string, 0, len(c.cfg.Sources)) + for _, cfg := range c.cfg.Sources { parent := filepath.Dir(cfg.Path) if !slices.Contains(paths, parent) { paths = append(paths, parent) @@ -81,7 +94,7 @@ func (c *contextProvider) Run(ctx context.Context, comm corecomp.ContextProvider // read the initial values after the watch has started // this ensures that if the value changed between this code and the loop below // the updated file changes will not be missed - current := make(map[string]interface{}, len(c.sources)) + current := make(map[string]interface{}, len(c.cfg.Sources)) for path, sources := range inverted { value := c.readContents(path) for _, source := range sources { @@ -98,48 +111,104 @@ func (c *contextProvider) Run(ctx context.Context, comm corecomp.ContextProvider case <-ctx.Done(): return ctx.Err() case err, ok := <-watcher.Errors: - if !ok { - // watcher was closed - return nil + if ok { + c.logger.Errorf("file watcher errored: %s", err) } - c.logger.Errorf("file watcher errored: %s", err) case e, ok := <-watcher.Events: - if !ok { // Channel was closed (i.e. Watcher.Close() was called). - // watcher was closed - return nil - } - - path := filepath.Clean(e.Name) - sources, ok := inverted[path] - if !ok { - // watching the directory, it can contain files that we are not watching - // ignore these events unless we are actively watching this file - continue - } - - switch { - case e.Op&(fsnotify.Create|fsnotify.Write|fsnotify.Remove) != 0: - // file was created, updated, or deleted (update the value) - value := c.readContents(path) - for _, source := range sources { - current[source] = value + if ok { + path := filepath.Clean(e.Name) + // Windows paths are case-insensitive + if runtime.GOOS == "windows" { + path = strings.ToLower(path) + } + sources, ok := inverted[path] + if !ok { + // watching the directory, it can contain files that we are not watching + // ignore these events unless we are actively watching this file + continue } - err = comm.Set(current) - if err != nil { - return fmt.Errorf("failed to set current context from notify event: %w", err) + + switch { + case e.Op&(fsnotify.Create|fsnotify.Write|fsnotify.Remove) != 0: + // file was created, updated, or deleted (update the value) + changed := false + value := c.readContents(path) + for _, source := range sources { + previous, _ := current[source] + if previous != value { + current[source] = value + changed = true + } + } + if changed { + err = comm.Set(current) + if err != nil { + return fmt.Errorf("failed to set current context from notify event: %w", err) + } + } } } } } } +// readContents reads the contents of the file but places a cap on the size of the data that +// is allowed to be read. If the file is larger than the max size then it will only read up to +// the maximum size. func (c *contextProvider) readContents(path string) string { - data, err := os.ReadFile(path) + maxSize := c.cfg.MaxSize + if maxSize == 0 { + maxSize = DefaultMaxSize + } + + f, err := os.Open(path) if err != nil { - c.logger.Errorf("failed to read file %q: %s", path, err) - return "" + c.logger.Errorf("failed to open file %q: %s", path, err) + } + defer f.Close() + + // determine the size needed in the buffer to read + var size int + if info, err := f.Stat(); err == nil { + size64 := info.Size() + if int64(int(size64)) == size64 { + size = int(size64) + } + } + size++ // one byte for final read at EOF + + // don't allow more than maxSize + if uint(size) > maxSize { + size = int(maxSize) + } + + // If a file claims a small size, read at least 512 bytes. + // In particular, files in Linux's /proc claim size 0 but + // then do not work right if read in small pieces, + // so an initial read of 1 byte would not work correctly. + if size < 512 { + size = 512 + } + + data := make([]byte, 0, size) + for { + n, err := f.Read(data[len(data):cap(data)]) + data = data[:len(data)+n] + if err != nil { + if err == io.EOF { + err = nil + } + if err != nil { + c.logger.Errorf("failed to read file %q: %s", path, err) + return "" + } + return string(data) + } + if len(data) >= cap(data) { + d := append(data[:cap(data)], 0) + data = d[:len(data)] + } } - return string(data) } // ContextProviderBuilder builds the context provider. @@ -148,12 +217,12 @@ func ContextProviderBuilder(log *logger.Logger, c *config.Config, _ bool) (corec logger: log, } if c != nil { - err := c.UnpackTo(&p.sources) + err := c.UnpackTo(&p.cfg) if err != nil { return nil, fmt.Errorf("failed to unpack config: %w", err) } } - for sourceName, sourceCfg := range p.sources { + for sourceName, sourceCfg := range p.cfg.Sources { if sourceCfg.Type != "" && sourceCfg.Type != "raw" { return nil, fmt.Errorf("%q defined an unsupported type %q", sourceName, sourceCfg.Type) } @@ -172,6 +241,11 @@ func ContextProviderBuilder(log *logger.Logger, c *config.Config, _ bool) (corec if path == "" || path == "." { return nil, fmt.Errorf("%q has a path %q that is invalid", sourceName, sourceCfg.Path) } + // Windows paths are case-insensitive, force lower here to simplify the implementation + if runtime.GOOS == "windows" { + sourceCfg.Path = strings.ToLower(sourceCfg.Path) + } + p.cfg.Sources[sourceName] = sourceCfg } return p, nil } diff --git a/internal/pkg/composable/providers/filesource/filesource_test.go b/internal/pkg/composable/providers/filesource/filesource_test.go index ab57560b157..12ab839a9b0 100644 --- a/internal/pkg/composable/providers/filesource/filesource_test.go +++ b/internal/pkg/composable/providers/filesource/filesource_test.go @@ -9,14 +9,15 @@ import ( "errors" "os" "path/filepath" + "runtime" + "strings" "testing" "time" - ctesting "github.com/elastic/elastic-agent/internal/pkg/composable/testing" - "github.com/stretchr/testify/require" "github.com/elastic/elastic-agent/internal/pkg/composable" + ctesting "github.com/elastic/elastic-agent/internal/pkg/composable/testing" "github.com/elastic/elastic-agent/internal/pkg/config" "github.com/elastic/elastic-agent/pkg/core/logger" ) @@ -30,16 +31,20 @@ func TestContextProvider_Config(t *testing.T) { { Name: "no path", Config: config.MustNewConfigFrom(map[string]interface{}{ - "one": map[string]interface{}{}, + "sources": map[string]interface{}{ + "one": map[string]interface{}{}, + }, }), Err: errors.New(`"one" is missing a defined path`), }, { Name: "invalid type", Config: config.MustNewConfigFrom(map[string]interface{}{ - "one": map[string]interface{}{ - "type": "json", - "path": "/etc/agent/content", + "sources": map[string]interface{}{ + "one": map[string]interface{}{ + "type": "json", + "path": "/etc/agent/content", + }, }, }), Err: errors.New(`"one" defined an unsupported type "json"`), @@ -49,11 +54,13 @@ func TestContextProvider_Config(t *testing.T) { { Name: "valid path", Config: config.MustNewConfigFrom(map[string]interface{}{ - "one": map[string]interface{}{ - "path": "/etc/agent/content1", - }, - "two": map[string]interface{}{ - "path": "/etc/agent/content2", + "sources": map[string]interface{}{ + "one": map[string]interface{}{ + "path": "/etc/agent/content1", + }, + "two": map[string]interface{}{ + "path": "/etc/agent/content2", + }, }, }), }, @@ -79,21 +86,34 @@ func TestContextProvider(t *testing.T) { tmpDir := t.TempDir() value1 := "value1" - file1 := filepath.Join(tmpDir, "value1_path") + file1 := filepath.Join(tmpDir, "vAlUe1_path") require.NoError(t, os.WriteFile(file1, []byte(value1), 0o644)) value2 := "value2" - file2 := filepath.Join(tmpDir, "value2_path") + file2 := filepath.Join(tmpDir, "vAlUe2_path") require.NoError(t, os.WriteFile(file2, []byte(value2), 0o644)) log, err := logger.New("filesource_test", false) require.NoError(t, err) + osPath := func(path string) string { + return path + } + if runtime.GOOS == "windows" { + // on Windows configure the path as lower case even though it + // is written as non-lower case to ensure that on Windows the + // filewatcher observes the correct path + osPath = func(path string) string { + return strings.ToLower(path) + } + } c, err := config.NewConfigFrom(map[string]interface{}{ - "one": map[string]interface{}{ - "path": file1, - }, - "two": map[string]interface{}{ - "path": file2, + "sources": map[string]interface{}{ + "one": map[string]interface{}{ + "path": osPath(file1), + }, + "two": map[string]interface{}{ + "path": osPath(file2), + }, }, }) require.NoError(t, err) From 578e3bc259c17f002f25a545733d5a987fad50ea Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Fri, 24 Jan 2025 10:16:25 -0500 Subject: [PATCH 6/8] Fix lint. --- .../pkg/composable/providers/filesource/filesource.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/pkg/composable/providers/filesource/filesource.go b/internal/pkg/composable/providers/filesource/filesource.go index 832d2c8319f..6877920169f 100644 --- a/internal/pkg/composable/providers/filesource/filesource.go +++ b/internal/pkg/composable/providers/filesource/filesource.go @@ -45,7 +45,7 @@ type fileSourceConfig struct { type providerConfig struct { Enabled bool `config:"enabled"` // handled by composable manager (but here to show that it is part of the config) Sources map[string]*fileSourceConfig `config:"sources"` - MaxSize uint `config:"max_size"` + MaxSize int `config:"max_size"` } type contextProvider struct { @@ -134,7 +134,7 @@ func (c *contextProvider) Run(ctx context.Context, comm corecomp.ContextProvider changed := false value := c.readContents(path) for _, source := range sources { - previous, _ := current[source] + previous := current[source] if previous != value { current[source] = value changed = true @@ -157,7 +157,7 @@ func (c *contextProvider) Run(ctx context.Context, comm corecomp.ContextProvider // the maximum size. func (c *contextProvider) readContents(path string) string { maxSize := c.cfg.MaxSize - if maxSize == 0 { + if maxSize <= 0 { maxSize = DefaultMaxSize } @@ -178,8 +178,8 @@ func (c *contextProvider) readContents(path string) string { size++ // one byte for final read at EOF // don't allow more than maxSize - if uint(size) > maxSize { - size = int(maxSize) + if size > maxSize { + size = maxSize } // If a file claims a small size, read at least 512 bytes. From af797d421ec3eb81035e47157cd423d9c6cb506c Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Mon, 27 Jan 2025 11:20:27 -0500 Subject: [PATCH 7/8] Handle ErrEventOverflow. --- .../providers/filesource/filesource.go | 44 ++++++++++++++++--- 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/internal/pkg/composable/providers/filesource/filesource.go b/internal/pkg/composable/providers/filesource/filesource.go index 6877920169f..b17d34920d1 100644 --- a/internal/pkg/composable/providers/filesource/filesource.go +++ b/internal/pkg/composable/providers/filesource/filesource.go @@ -6,6 +6,7 @@ package filesource import ( "context" + "errors" "fmt" "io" "os" @@ -95,15 +96,23 @@ func (c *contextProvider) Run(ctx context.Context, comm corecomp.ContextProvider // this ensures that if the value changed between this code and the loop below // the updated file changes will not be missed current := make(map[string]interface{}, len(c.cfg.Sources)) - for path, sources := range inverted { - value := c.readContents(path) - for _, source := range sources { - current[source] = value + readAll := func() error { + for path, sources := range inverted { + value := c.readContents(path) + for _, source := range sources { + current[source] = value + } + } + err = comm.Set(current) + if err != nil { + return fmt.Errorf("failed to set current context: %w", err) } + return nil } - err = comm.Set(current) + err = readAll() if err != nil { - return fmt.Errorf("failed to set current context: %w", err) + // context for the error already added + return err } for { @@ -113,6 +122,19 @@ func (c *contextProvider) Run(ctx context.Context, comm corecomp.ContextProvider case err, ok := <-watcher.Errors: if ok { c.logger.Errorf("file watcher errored: %s", err) + if errors.Is(err, fsnotify.ErrEventOverflow) { + // the queue is full and some events have been dropped + // at this point we don't know what has changed + // clear the queue of events and read all again + c.logger.Debug("draining file watcher queue") + drainQueue(watcher.Events) + c.logger.Infof("reading all sources to handle overflow") + err = readAll() + if err != nil { + // context for the error already added + c.logger.Error(err) + } + } } case e, ok := <-watcher.Events: if ok { @@ -249,3 +271,13 @@ func ContextProviderBuilder(log *logger.Logger, c *config.Config, _ bool) (corec } return p, nil } + +func drainQueue(e <-chan fsnotify.Event) { + for { + select { + case _, _ = <-e: + default: + return + } + } +} From 4662af4a79cb74027fd117bf86f3341ab467164e Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Mon, 27 Jan 2025 18:03:29 -0500 Subject: [PATCH 8/8] Handle closed channel on drain. --- internal/pkg/composable/providers/filesource/filesource.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/pkg/composable/providers/filesource/filesource.go b/internal/pkg/composable/providers/filesource/filesource.go index b17d34920d1..dbda0298c46 100644 --- a/internal/pkg/composable/providers/filesource/filesource.go +++ b/internal/pkg/composable/providers/filesource/filesource.go @@ -275,7 +275,10 @@ func ContextProviderBuilder(log *logger.Logger, c *config.Config, _ bool) (corec func drainQueue(e <-chan fsnotify.Event) { for { select { - case _, _ = <-e: + case _, ok := <-e: + if !ok { + return + } default: return }