diff --git a/changelog/fragments/1737666699-Add-filesource-provider.yaml b/changelog/fragments/1737666699-Add-filesource-provider.yaml new file mode 100644 index 00000000000..6ee40c0e8db --- /dev/null +++ b/changelog/fragments/1737666699-Add-filesource-provider.yaml @@ -0,0 +1,34 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: Add filesource provider + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +description: | + filesource provider watches for changes of the files and updates the values of the variables + when the content of the file changes. + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/6587 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/6362 diff --git a/internal/pkg/agent/cmd/include.go b/internal/pkg/agent/cmd/include.go index 6750727626a..ab2ce160682 100644 --- a/internal/pkg/agent/cmd/include.go +++ b/internal/pkg/agent/cmd/include.go @@ -9,6 +9,7 @@ import ( _ "github.com/elastic/elastic-agent/internal/pkg/composable/providers/agent" _ "github.com/elastic/elastic-agent/internal/pkg/composable/providers/docker" _ "github.com/elastic/elastic-agent/internal/pkg/composable/providers/env" + _ "github.com/elastic/elastic-agent/internal/pkg/composable/providers/filesource" _ "github.com/elastic/elastic-agent/internal/pkg/composable/providers/host" _ "github.com/elastic/elastic-agent/internal/pkg/composable/providers/kubernetes" _ "github.com/elastic/elastic-agent/internal/pkg/composable/providers/kubernetesleaderelection" diff --git a/internal/pkg/composable/providers/filesource/filesource.go b/internal/pkg/composable/providers/filesource/filesource.go new file mode 100644 index 00000000000..dbda0298c46 --- /dev/null +++ b/internal/pkg/composable/providers/filesource/filesource.go @@ -0,0 +1,286 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package filesource + +import ( + "context" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "runtime" + "slices" + "strings" + + "github.com/fsnotify/fsnotify" + + "github.com/elastic/elastic-agent/internal/pkg/composable" + "github.com/elastic/elastic-agent/internal/pkg/config" + corecomp "github.com/elastic/elastic-agent/internal/pkg/core/composable" + "github.com/elastic/elastic-agent/pkg/core/logger" +) + +func init() { + // filesource provider reads and watches for changes on files that are defined in the provider configuration. + // + // To be notified when a file is change the provider will watch the parent directory of the file so if the file + // is replaced that it will read the new contents. If a file doesn't exist or the provider is unable to read + // the file then it will report the value as an empty string. + // + // If the provided path happens to be a directory then it just report the value as an empty string. + composable.Providers.MustAddContextProvider("filesource", ContextProviderBuilder) +} + +const ( + DefaultMaxSize = 4 * 1024 // 4KiB +) + +type fileSourceConfig struct { + Type string `config:"type"` + Path string `config:"path"` +} + +type providerConfig struct { + Enabled bool `config:"enabled"` // handled by composable manager (but here to show that it is part of the config) + Sources map[string]*fileSourceConfig `config:"sources"` + MaxSize int `config:"max_size"` +} + +type contextProvider struct { + logger *logger.Logger + + cfg providerConfig +} + +// Run runs the filesource context provider. +func (c *contextProvider) Run(ctx context.Context, comm corecomp.ContextProviderComm) error { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return fmt.Errorf("failed to create watcher: %w", err) + } + defer watcher.Close() + + // invert the mapping to map paths to source names + inverted := make(map[string][]string, len(c.cfg.Sources)) + for sourceName, sourceCfg := range c.cfg.Sources { + sources, ok := inverted[sourceCfg.Path] + if !ok { + sources = []string{sourceName} + } else { + sources = append(sources, sourceName) + } + inverted[sourceCfg.Path] = sources + } + + // determine the paths to watch (watch is performed on the directories that contain the file) + // + // you cannot register the same directory multiple times so this ensures its only registered once + paths := make([]string, 0, len(c.cfg.Sources)) + for _, cfg := range c.cfg.Sources { + parent := filepath.Dir(cfg.Path) + if !slices.Contains(paths, parent) { + paths = append(paths, parent) + } + } + for _, path := range paths { + err = watcher.Add(path) + if err != nil { + return fmt.Errorf("failed to watch path %q: %w", path, err) + } + } + + // read the initial values after the watch has started + // this ensures that if the value changed between this code and the loop below + // the updated file changes will not be missed + current := make(map[string]interface{}, len(c.cfg.Sources)) + readAll := func() error { + for path, sources := range inverted { + value := c.readContents(path) + for _, source := range sources { + current[source] = value + } + } + err = comm.Set(current) + if err != nil { + return fmt.Errorf("failed to set current context: %w", err) + } + return nil + } + err = readAll() + if err != nil { + // context for the error already added + return err + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case err, ok := <-watcher.Errors: + if ok { + c.logger.Errorf("file watcher errored: %s", err) + if errors.Is(err, fsnotify.ErrEventOverflow) { + // the queue is full and some events have been dropped + // at this point we don't know what has changed + // clear the queue of events and read all again + c.logger.Debug("draining file watcher queue") + drainQueue(watcher.Events) + c.logger.Infof("reading all sources to handle overflow") + err = readAll() + if err != nil { + // context for the error already added + c.logger.Error(err) + } + } + } + case e, ok := <-watcher.Events: + if ok { + path := filepath.Clean(e.Name) + // Windows paths are case-insensitive + if runtime.GOOS == "windows" { + path = strings.ToLower(path) + } + sources, ok := inverted[path] + if !ok { + // watching the directory, it can contain files that we are not watching + // ignore these events unless we are actively watching this file + continue + } + + switch { + case e.Op&(fsnotify.Create|fsnotify.Write|fsnotify.Remove) != 0: + // file was created, updated, or deleted (update the value) + changed := false + value := c.readContents(path) + for _, source := range sources { + previous := current[source] + if previous != value { + current[source] = value + changed = true + } + } + if changed { + err = comm.Set(current) + if err != nil { + return fmt.Errorf("failed to set current context from notify event: %w", err) + } + } + } + } + } + } +} + +// readContents reads the contents of the file but places a cap on the size of the data that +// is allowed to be read. If the file is larger than the max size then it will only read up to +// the maximum size. +func (c *contextProvider) readContents(path string) string { + maxSize := c.cfg.MaxSize + if maxSize <= 0 { + maxSize = DefaultMaxSize + } + + f, err := os.Open(path) + if err != nil { + c.logger.Errorf("failed to open file %q: %s", path, err) + } + defer f.Close() + + // determine the size needed in the buffer to read + var size int + if info, err := f.Stat(); err == nil { + size64 := info.Size() + if int64(int(size64)) == size64 { + size = int(size64) + } + } + size++ // one byte for final read at EOF + + // don't allow more than maxSize + if size > maxSize { + size = maxSize + } + + // If a file claims a small size, read at least 512 bytes. + // In particular, files in Linux's /proc claim size 0 but + // then do not work right if read in small pieces, + // so an initial read of 1 byte would not work correctly. + if size < 512 { + size = 512 + } + + data := make([]byte, 0, size) + for { + n, err := f.Read(data[len(data):cap(data)]) + data = data[:len(data)+n] + if err != nil { + if err == io.EOF { + err = nil + } + if err != nil { + c.logger.Errorf("failed to read file %q: %s", path, err) + return "" + } + return string(data) + } + if len(data) >= cap(data) { + d := append(data[:cap(data)], 0) + data = d[:len(data)] + } + } +} + +// ContextProviderBuilder builds the context provider. +func ContextProviderBuilder(log *logger.Logger, c *config.Config, _ bool) (corecomp.ContextProvider, error) { + p := &contextProvider{ + logger: log, + } + if c != nil { + err := c.UnpackTo(&p.cfg) + if err != nil { + return nil, fmt.Errorf("failed to unpack config: %w", err) + } + } + for sourceName, sourceCfg := range p.cfg.Sources { + if sourceCfg.Type != "" && sourceCfg.Type != "raw" { + return nil, fmt.Errorf("%q defined an unsupported type %q", sourceName, sourceCfg.Type) + } + if sourceCfg.Path == "" { + return nil, fmt.Errorf("%q is missing a defined path", sourceName) + } + // only use an absolute path (convert from relative) + if !filepath.IsAbs(sourceCfg.Path) { + path, err := filepath.Abs(sourceCfg.Path) + if err != nil { + return nil, fmt.Errorf("%q failed to determine absolute path for %q: %w", sourceName, sourceCfg.Path, err) + } + sourceCfg.Path = path + } + path := filepath.Dir(sourceCfg.Path) + if path == "" || path == "." { + return nil, fmt.Errorf("%q has a path %q that is invalid", sourceName, sourceCfg.Path) + } + // Windows paths are case-insensitive, force lower here to simplify the implementation + if runtime.GOOS == "windows" { + sourceCfg.Path = strings.ToLower(sourceCfg.Path) + } + p.cfg.Sources[sourceName] = sourceCfg + } + return p, nil +} + +func drainQueue(e <-chan fsnotify.Event) { + for { + select { + case _, ok := <-e: + if !ok { + return + } + default: + return + } + } +} diff --git a/internal/pkg/composable/providers/filesource/filesource_test.go b/internal/pkg/composable/providers/filesource/filesource_test.go new file mode 100644 index 00000000000..12ab839a9b0 --- /dev/null +++ b/internal/pkg/composable/providers/filesource/filesource_test.go @@ -0,0 +1,183 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package filesource + +import ( + "context" + "errors" + "os" + "path/filepath" + "runtime" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-agent/internal/pkg/composable" + ctesting "github.com/elastic/elastic-agent/internal/pkg/composable/testing" + "github.com/elastic/elastic-agent/internal/pkg/config" + "github.com/elastic/elastic-agent/pkg/core/logger" +) + +func TestContextProvider_Config(t *testing.T) { + scenarios := []struct { + Name string + Config *config.Config + Err error + }{ + { + Name: "no path", + Config: config.MustNewConfigFrom(map[string]interface{}{ + "sources": map[string]interface{}{ + "one": map[string]interface{}{}, + }, + }), + Err: errors.New(`"one" is missing a defined path`), + }, + { + Name: "invalid type", + Config: config.MustNewConfigFrom(map[string]interface{}{ + "sources": map[string]interface{}{ + "one": map[string]interface{}{ + "type": "json", + "path": "/etc/agent/content", + }, + }, + }), + Err: errors.New(`"one" defined an unsupported type "json"`), + }, + // other errors in the config validation are hard to validate in a test + // they are just very defensive + { + Name: "valid path", + Config: config.MustNewConfigFrom(map[string]interface{}{ + "sources": map[string]interface{}{ + "one": map[string]interface{}{ + "path": "/etc/agent/content1", + }, + "two": map[string]interface{}{ + "path": "/etc/agent/content2", + }, + }, + }), + }, + } + for _, s := range scenarios { + t.Run(s.Name, func(t *testing.T) { + log, err := logger.New("filesource_test", false) + require.NoError(t, err) + + builder, _ := composable.Providers.GetContextProvider("filesource") + _, err = builder(log, s.Config, true) + if s.Err != nil { + require.Equal(t, s.Err, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestContextProvider(t *testing.T) { + const testTimeout = 3 * time.Second + + tmpDir := t.TempDir() + value1 := "value1" + file1 := filepath.Join(tmpDir, "vAlUe1_path") + require.NoError(t, os.WriteFile(file1, []byte(value1), 0o644)) + value2 := "value2" + file2 := filepath.Join(tmpDir, "vAlUe2_path") + require.NoError(t, os.WriteFile(file2, []byte(value2), 0o644)) + + log, err := logger.New("filesource_test", false) + require.NoError(t, err) + + osPath := func(path string) string { + return path + } + if runtime.GOOS == "windows" { + // on Windows configure the path as lower case even though it + // is written as non-lower case to ensure that on Windows the + // filewatcher observes the correct path + osPath = func(path string) string { + return strings.ToLower(path) + } + } + c, err := config.NewConfigFrom(map[string]interface{}{ + "sources": map[string]interface{}{ + "one": map[string]interface{}{ + "path": osPath(file1), + }, + "two": map[string]interface{}{ + "path": osPath(file2), + }, + }, + }) + require.NoError(t, err) + builder, _ := composable.Providers.GetContextProvider("filesource") + provider, err := builder(log, c, true) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + comm := ctesting.NewContextComm(ctx) + setChan := make(chan map[string]interface{}) + comm.CallOnSet(func(value map[string]interface{}) { + // Forward Set's input to the test channel + setChan <- value + }) + + go func() { + _ = provider.Run(ctx, comm) + }() + + // wait for it to be called once + var current map[string]interface{} + select { + case current = <-setChan: + case <-time.After(testTimeout): + require.FailNow(t, "timeout waiting for provider to call Set") + } + + require.Equal(t, value1, current["one"]) + require.Equal(t, value2, current["two"]) + + // update the value in one + value1 = "update1" + require.NoError(t, os.WriteFile(file1, []byte(value1), 0o644)) + + // wait for file1 to be updated + for { + var oneUpdated map[string]interface{} + select { + case oneUpdated = <-setChan: + case <-time.After(testTimeout): + require.FailNow(t, "timeout waiting for provider to call Set") + } + + if value1 == oneUpdated["one"] && value2 == oneUpdated["two"] { + break + } + } + + // update the value in two + value2 = "update2" + require.NoError(t, os.WriteFile(file2, []byte(value2), 0o644)) + + for { + // wait for file2 to be updated + var twoUpdated map[string]interface{} + select { + case twoUpdated = <-setChan: + case <-time.After(testTimeout): + require.FailNow(t, "timeout waiting for provider to call Set") + } + + if value1 == twoUpdated["one"] && value2 == twoUpdated["two"] { + break + } + } +}