diff --git a/changelog/fragments/1737816507-Add-context-variable-support-to-outputs.yaml b/changelog/fragments/1737816507-Add-context-variable-support-to-outputs.yaml new file mode 100644 index 00000000000..9d861edf943 --- /dev/null +++ b/changelog/fragments/1737816507-Add-context-variable-support-to-outputs.yaml @@ -0,0 +1,34 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: Add context variable support to outputs + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +description: | + Adds support for using context variable providers in the outputs section of a policy. Includes fallback support + to reference env provider when no provider prefix is provided in the variable. + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/6602 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/6376 diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 0394d301df9..e83547ddadf 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -172,8 +172,11 @@ type ConfigManager interface { type VarsManager interface { Runner + // DefaultProvider returns the default provider that the variable manager is configured to use. + DefaultProvider() string + // Observe instructs the variables to observe. - Observe([]string) + Observe(context.Context, []string) ([]*transpiler.Vars, error) // Watch returns the chanel to watch for variable changes. Watch() <-chan []*transpiler.Vars @@ -1244,7 +1247,11 @@ func (c *Coordinator) processConfigAgent(ctx context.Context, cfg *config.Config } // pass the observed vars from the AST to the varsMgr - c.observeASTVars() + err = c.observeASTVars(ctx) + if err != nil { + // only possible error here is the context being cancelled + return err + } // Disabled for 8.8.0 release in order to limit the surface // https://github.com/elastic/security-team/issues/6501 @@ -1327,25 +1334,32 @@ func (c *Coordinator) generateAST(cfg *config.Config) (err error) { // observeASTVars identifies the variables that are referenced in the computed AST and passed to // the varsMgr so it knows what providers are being referenced. If a providers is not being // referenced then the provider does not need to be running. -func (c *Coordinator) observeASTVars() { +func (c *Coordinator) observeASTVars(ctx context.Context) error { if c.varsMgr == nil { // No varsMgr (only happens in testing) - return + return nil } - if c.ast == nil { - // No AST; no vars - c.varsMgr.Observe(nil) - return + var vars []string + if c.ast != nil { + inputs, ok := transpiler.Lookup(c.ast, "inputs") + if ok { + vars = inputs.Vars(vars, c.varsMgr.DefaultProvider()) + } + outputs, ok := transpiler.Lookup(c.ast, "outputs") + if ok { + vars = outputs.Vars(vars, c.varsMgr.DefaultProvider()) + } } - inputs, ok := transpiler.Lookup(c.ast, "inputs") - if !ok { - // No inputs; no vars - c.varsMgr.Observe(nil) - return + updated, err := c.varsMgr.Observe(ctx, vars) + if err != nil { + // context cancel + return err } - var vars []string - vars = inputs.Vars(vars) - c.varsMgr.Observe(vars) + if updated != nil { + // provided an updated set of vars (observed changed) + c.vars = updated + } + return nil } // processVars updates the transpiler vars in the Coordinator. @@ -1421,6 +1435,8 @@ func (c *Coordinator) generateComponentModel() (err error) { }() ast := c.ast.ShallowClone() + + // perform variable substitution for inputs inputs, ok := transpiler.Lookup(ast, "inputs") if ok { renderedInputs, err := transpiler.RenderInputs(inputs, c.vars) @@ -1433,6 +1449,20 @@ func (c *Coordinator) generateComponentModel() (err error) { } } + // perform variable substitution for outputs + // outputs only support the context variables (dynamic provides are not provide to the outputs) + outputs, ok := transpiler.Lookup(ast, "outputs") + if ok { + renderedOutputs, err := transpiler.RenderOutputs(outputs, c.vars) + if err != nil { + return fmt.Errorf("rendering outputs failed: %w", err) + } + err = transpiler.Insert(ast, renderedOutputs, "outputs") + if err != nil { + return fmt.Errorf("inserting rendered outputs failed: %w", err) + } + } + cfg, err := ast.Map() if err != nil { return fmt.Errorf("failed to convert ast to map[string]interface{}: %w", err) diff --git a/internal/pkg/agent/application/coordinator/coordinator_test.go b/internal/pkg/agent/application/coordinator/coordinator_test.go index 9c0973e4c26..4f46440e04f 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_test.go @@ -928,7 +928,7 @@ func BenchmarkCoordinator_generateComponentModel(b *testing.B) { require.NoError(b, err) vars := make([]*transpiler.Vars, len(varsMaps)) for i, vm := range varsMaps { - vars[i], err = transpiler.NewVars(fmt.Sprintf("%d", i), vm, mapstr.M{}) + vars[i], err = transpiler.NewVars(fmt.Sprintf("%d", i), vm, mapstr.M{}, "") require.NoError(b, err) } @@ -1188,6 +1188,9 @@ func (l *configChange) Fail(err error) { } type fakeVarsManager struct { + varsMx sync.RWMutex + vars []*transpiler.Vars + varsCh chan []*transpiler.Vars errCh chan error @@ -1222,19 +1225,29 @@ func (f *fakeVarsManager) Watch() <-chan []*transpiler.Vars { return f.varsCh } -func (f *fakeVarsManager) Observe(observed []string) { +func (f *fakeVarsManager) Observe(ctx context.Context, observed []string) ([]*transpiler.Vars, error) { f.observedMx.Lock() defer f.observedMx.Unlock() f.observed = observed + f.varsMx.RLock() + defer f.varsMx.RUnlock() + return f.vars, nil } func (f *fakeVarsManager) Vars(ctx context.Context, vars []*transpiler.Vars) { + f.varsMx.Lock() + f.vars = vars + f.varsMx.Unlock() select { case <-ctx.Done(): case f.varsCh <- vars: } } +func (f *fakeVarsManager) DefaultProvider() string { + return "" +} + type fakeOTelManager struct { updateCallback func(*confmap.Conf) error result error diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index a4f998bd82b..762309b37ef 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -1104,7 +1104,7 @@ inputs: components = nil vars, err := transpiler.NewVars("", map[string]interface{}{ "TEST_VAR": "input-id", - }, nil) + }, nil, "") require.NoError(t, err, "Vars creation must succeed") varsChan <- []*transpiler.Vars{vars} coord.runLoopIteration(ctx) @@ -1121,7 +1121,7 @@ inputs: components = nil vars, err = transpiler.NewVars("", map[string]interface{}{ "TEST_VAR": "changed-input-id", - }, nil) + }, nil, "") require.NoError(t, err, "Vars creation must succeed") varsChan <- []*transpiler.Vars{vars} coord.runLoopIteration(ctx) @@ -1239,7 +1239,7 @@ func TestCoordinatorInitiatesUpgrade(t *testing.T) { // (Coordinator will only regenerate its component model when it has non-nil // vars). func emptyVars(t *testing.T) []*transpiler.Vars { - vars, err := transpiler.NewVars("", map[string]interface{}{}, nil) + vars, err := transpiler.NewVars("", map[string]interface{}{}, nil, "") require.NoError(t, err, "Vars creation must succeed") return []*transpiler.Vars{vars} } diff --git a/internal/pkg/agent/application/coordinator/diagnostics_test.go b/internal/pkg/agent/application/coordinator/diagnostics_test.go index 6e7f4309c85..6448d2b526e 100644 --- a/internal/pkg/agent/application/coordinator/diagnostics_test.go +++ b/internal/pkg/agent/application/coordinator/diagnostics_test.go @@ -198,7 +198,7 @@ func TestDiagnosticVariables(t *testing.T) { map[string]interface{}{ "testvar": "testvalue", }, - nil) + nil, "") require.NoError(t, err) expected := ` diff --git a/internal/pkg/agent/transpiler/ast.go b/internal/pkg/agent/transpiler/ast.go index 82987336235..bacf1be208c 100644 --- a/internal/pkg/agent/transpiler/ast.go +++ b/internal/pkg/agent/transpiler/ast.go @@ -64,7 +64,7 @@ type Node interface { // Vars adds to the array with the variables identified in the node. Returns the array in-case // the capacity of the array had to be changed. - Vars([]string) []string + Vars([]string, string) []string // Apply apply the current vars, returning the new value for the node. This does not modify the original Node. Apply(*Vars) (Node, error) @@ -182,10 +182,10 @@ func (d *Dict) Hash64With(h *xxhash.Digest) error { } // Vars returns a list of all variables referenced in the dictionary. -func (d *Dict) Vars(vars []string) []string { +func (d *Dict) Vars(vars []string, defaultProvider string) []string { for _, v := range d.value { k := v.(*Key) - vars = k.Vars(vars) + vars = k.Vars(vars, defaultProvider) } return vars } @@ -318,11 +318,11 @@ func (k *Key) Hash64With(h *xxhash.Digest) error { } // Vars returns a list of all variables referenced in the value. -func (k *Key) Vars(vars []string) []string { +func (k *Key) Vars(vars []string, defaultProvider string) []string { if k.value == nil { return vars } - return k.value.Vars(vars) + return k.value.Vars(vars, defaultProvider) } // Apply applies the vars to the value. This does not modify the original node. @@ -463,9 +463,9 @@ func (l *List) ShallowClone() Node { } // Vars returns a list of all variables referenced in the list. -func (l *List) Vars(vars []string) []string { +func (l *List) Vars(vars []string, defaultProvider string) []string { for _, v := range l.value { - vars = v.Vars(vars) + vars = v.Vars(vars, defaultProvider) } return vars } @@ -552,12 +552,12 @@ func (s *StrVal) Hash64With(h *xxhash.Digest) error { } // Vars returns a list of all variables referenced in the string. -func (s *StrVal) Vars(vars []string) []string { +func (s *StrVal) Vars(vars []string, defaultProvider string) []string { // errors are ignored (if there is an error determine the vars it will also error computing the policy) _, _ = replaceVars(s.value, func(variable string) (Node, Processors, bool) { vars = append(vars, variable) return nil, nil, false - }, false) + }, false, defaultProvider) return vars } @@ -613,7 +613,7 @@ func (s *IntVal) ShallowClone() Node { } // Vars does nothing. Cannot have variable in an IntVal. -func (s *IntVal) Vars(vars []string) []string { +func (s *IntVal) Vars(vars []string, defaultProvider string) []string { return vars } @@ -691,7 +691,7 @@ func (s *UIntVal) Hash64With(h *xxhash.Digest) error { } // Vars does nothing. Cannot have variable in an UIntVal. -func (s *UIntVal) Vars(vars []string) []string { +func (s *UIntVal) Vars(vars []string, defaultProvider string) []string { return vars } @@ -764,7 +764,7 @@ func (s *FloatVal) hashString() string { } // Vars does nothing. Cannot have variable in an FloatVal. -func (s *FloatVal) Vars(vars []string) []string { +func (s *FloatVal) Vars(vars []string, defaultProvider string) []string { return vars } @@ -843,7 +843,7 @@ func (s *BoolVal) Hash64With(h *xxhash.Digest) error { } // Vars does nothing. Cannot have variable in an BoolVal. -func (s *BoolVal) Vars(vars []string) []string { +func (s *BoolVal) Vars(vars []string, defaultProvider string) []string { return vars } diff --git a/internal/pkg/agent/transpiler/ast_test.go b/internal/pkg/agent/transpiler/ast_test.go index 948df76c51e..3ea626c8624 100644 --- a/internal/pkg/agent/transpiler/ast_test.go +++ b/internal/pkg/agent/transpiler/ast_test.go @@ -882,7 +882,7 @@ func TestApplyDoesNotMutate(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { - vars, err := NewVars("", map[string]any{"var": "value"}, mapstr.M{}) + vars, err := NewVars("", map[string]any{"var": "value"}, mapstr.M{}, "") require.NoError(t, err) applied, err := test.input.Apply(vars) require.NoError(t, err) @@ -961,8 +961,9 @@ func TestShallowClone(t *testing.T) { func TestVars(t *testing.T) { tests := map[string]struct { - input map[string]interface{} - result []string + input map[string]interface{} + result []string + defaultProvider string }{ "empty": { input: map[string]interface{}{}, @@ -1045,6 +1046,66 @@ func TestVars(t *testing.T) { }, result: []string{"var1", "var2", "var3", "var1", "var5", "var6", "var1"}, }, + "nested with default": { + input: map[string]interface{}{ + "novars": map[string]interface{}{ + "list1": []interface{}{ + map[string]interface{}{ + "int": 1, + "float": 1.1234, + "bool": true, + "str": "value1", + }, + }, + "list2": []interface{}{ + map[string]interface{}{ + "int": 2, + "float": 2.3456, + "bool": false, + "str": "value2", + }, + }, + }, + "vars1": map[string]interface{}{ + "list1": []interface{}{ + map[string]interface{}{ + "int": 1, + "float": 1.1234, + "bool": true, + "str": "${custom.var1|host.var2|'constant'}", + }, + }, + "list2": []interface{}{ + map[string]interface{}{ + "int": 2, + "float": 2.3456, + "bool": false, + "str": "${var3|custom.var1|'constant'}", + }, + }, + }, + "vars2": map[string]interface{}{ + "list1": []interface{}{ + map[string]interface{}{ + "int": 1, + "float": 1.1234, + "bool": true, + "str": "${host.var5|host.var6|'constant'}", + }, + }, + "list2": []interface{}{ + map[string]interface{}{ + "int": 2, + "float": 2.3456, + "bool": false, + "str": "${var1}", + }, + }, + }, + }, + result: []string{"custom.var1", "host.var2", "custom.var3", "custom.var1", "host.var5", "host.var6", "custom.var1"}, + defaultProvider: "custom", + }, } for name, test := range tests { @@ -1052,7 +1113,7 @@ func TestVars(t *testing.T) { ast, err := NewAST(test.input) require.NoError(t, err) var vars []string - vars = ast.root.Vars(vars) + vars = ast.root.Vars(vars, test.defaultProvider) assert.Equal(t, test.result, vars) }) } @@ -1146,7 +1207,15 @@ func TestCondition(t *testing.T) { } func mustMakeVars(mapping map[string]interface{}) *Vars { - v, err := NewVars("", mapping, nil) + v, err := NewVars("", mapping, nil, "") + if err != nil { + panic(err) + } + return v +} + +func mustMakeVarsWithDefault(mapping map[string]interface{}, defaultProvider string) *Vars { + v, err := NewVars("", mapping, nil, defaultProvider) if err != nil { panic(err) } diff --git a/internal/pkg/agent/transpiler/utils.go b/internal/pkg/agent/transpiler/inputs.go similarity index 100% rename from internal/pkg/agent/transpiler/utils.go rename to internal/pkg/agent/transpiler/inputs.go diff --git a/internal/pkg/agent/transpiler/utils_test.go b/internal/pkg/agent/transpiler/inputs_test.go similarity index 97% rename from internal/pkg/agent/transpiler/utils_test.go rename to internal/pkg/agent/transpiler/inputs_test.go index 921b133f941..09ff78cb3b5 100644 --- a/internal/pkg/agent/transpiler/utils_test.go +++ b/internal/pkg/agent/transpiler/inputs_test.go @@ -59,6 +59,25 @@ func TestRenderInputs(t *testing.T) { }), }, }, + "basic single var with default": { + input: NewKey("inputs", NewList([]Node{ + NewDict([]Node{ + NewKey("key", NewStrVal("${name}")), + }), + })), + expected: NewList([]Node{ + NewDict([]Node{ + NewKey("key", NewStrVal("value1")), + }), + }), + varsArray: []*Vars{ + mustMakeVarsWithDefault(map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + }, + }, "var1"), + }, + }, "duplicate result is removed": { input: NewKey("inputs", NewList([]Node{ NewDict([]Node{ @@ -785,7 +804,7 @@ func TestRenderInputs(t *testing.T) { } func mustMakeVarsP(id string, mapping map[string]interface{}, processorKey string, processors Processors) *Vars { - v, err := NewVarsWithProcessors(id, mapping, processorKey, processors, nil) + v, err := NewVarsWithProcessors(id, mapping, processorKey, processors, nil, "") if err != nil { panic(err) } diff --git a/internal/pkg/agent/transpiler/outputs.go b/internal/pkg/agent/transpiler/outputs.go new file mode 100644 index 00000000000..d59f8a0d5e1 --- /dev/null +++ b/internal/pkg/agent/transpiler/outputs.go @@ -0,0 +1,65 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package transpiler + +import ( + "fmt" +) + +// RenderOutputs renders outputs section. +// +// outputs are only rendered using the context variables and they do not support +// using dynamic provider variables. +func RenderOutputs(outputs Node, varsArray []*Vars) (Node, error) { + if len(varsArray) == 0 { + // no context vars (nothing to do) + return outputs, nil + } + + // outputs only operates on the first set of vars because those are always the + // context provider variables and never include dynamic provider variables + // + // dynamic provider variables cannot be used for outputs as we don't want outputs + // to be duplicated (unlike inputs) + vars := varsArray[0] + + d, ok := outputs.Value().(*Dict) + if !ok { + return nil, fmt.Errorf("outputs must be an dict, got %T instead", outputs.Value()) + } + nodes := d.Value().([]Node) + keys := make([]Node, len(nodes)) + for i, node := range nodes { + key, ok := node.(*Key) + if !ok { + // not possible, but be defensive + keys[i] = node + continue + } + if key.value == nil { + keys[i] = key + continue + } + dict, ok := key.value.(*Dict) + if !ok { + // not possible, but be defensive + keys[i] = key + continue + } + // Apply creates a new Node with a deep copy of all the values + value, err := dict.Apply(vars) + // inputs allows a variable not to match and it will be removed + // outputs are not that way, if an ErrNoMatch is returned we + // return it back to the caller + if err != nil { + return nil, fmt.Errorf("rendering output %q failed: %w", key.name, err) + } + keys[i] = &Key{ + name: key.name, + value: value, + } + } + return &Dict{keys, nil}, nil +} diff --git a/internal/pkg/agent/transpiler/outputs_test.go b/internal/pkg/agent/transpiler/outputs_test.go new file mode 100644 index 00000000000..56dd0b07ad5 --- /dev/null +++ b/internal/pkg/agent/transpiler/outputs_test.go @@ -0,0 +1,116 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package transpiler + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRenderOutputs(t *testing.T) { + testcases := map[string]struct { + input Node + expected Node + vars *Vars + err bool + }{ + "outputs not dict": { + input: NewKey("outputs", NewStrVal("not dict")), + err: true, + vars: mustMakeVars(map[string]interface{}{}), + }, + "missing variable error": { + input: NewKey("outputs", NewDict([]Node{ + NewKey("default", NewDict([]Node{ + NewKey("key", NewStrVal("${var1.missing}")), + })), + })), + err: true, + vars: mustMakeVars(map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + }, + }), + }, + "bad variable error": { + input: NewKey("outputs", NewDict([]Node{ + NewKey("default", NewDict([]Node{ + NewKey("key", NewStrVal("${var1.name|'missing ending quote}")), + })), + })), + err: true, + vars: mustMakeVars(map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + }, + }), + }, + "basic single var": { + input: NewKey("outputs", NewDict([]Node{ + NewKey("default", NewDict([]Node{ + NewKey("key", NewStrVal("${var1.name}")), + })), + })), + expected: NewDict([]Node{ + NewKey("default", NewDict([]Node{ + NewKey("key", NewStrVal("value1")), + })), + }), + vars: mustMakeVars(map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + }, + }), + }, + "basic default var": { + input: NewKey("outputs", NewDict([]Node{ + NewKey("default", NewDict([]Node{ + NewKey("key", NewStrVal("${var1.missing|'default'}")), + })), + })), + expected: NewDict([]Node{ + NewKey("default", NewDict([]Node{ + NewKey("key", NewStrVal("default")), + })), + }), + vars: mustMakeVars(map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + }, + }), + }, + "basic no provider var": { + input: NewKey("outputs", NewDict([]Node{ + NewKey("default", NewDict([]Node{ + NewKey("key", NewStrVal("${name}")), + })), + })), + expected: NewDict([]Node{ + NewKey("default", NewDict([]Node{ + NewKey("key", NewStrVal("value1")), + })), + }), + vars: mustMakeVarsWithDefault(map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + }, + }, "var1"), + }, + } + + for name, test := range testcases { + t.Run(name, func(t *testing.T) { + v, err := RenderOutputs(test.input, []*Vars{test.vars}) + if test.err { + require.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, test.expected.String(), v.String()) + } + }) + } +} diff --git a/internal/pkg/agent/transpiler/vars.go b/internal/pkg/agent/transpiler/vars.go index 71bd8bd4cb6..332c843929b 100644 --- a/internal/pkg/agent/transpiler/vars.go +++ b/internal/pkg/agent/transpiler/vars.go @@ -5,6 +5,7 @@ package transpiler import ( + "errors" "fmt" "regexp" "strings" @@ -14,10 +15,12 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/core/composable" ) +const varsSeparator = "." + var varsRegex = regexp.MustCompile(`\$\$?{([\p{L}\d\s\\\-_|.'":\/]*)}`) // ErrNoMatch is return when the replace didn't fail, just that no vars match to perform the replace. -var ErrNoMatch = fmt.Errorf("no matching vars") +var ErrNoMatch = errors.New("no matching vars") // Vars is a context of variables that also contain a list of processors that go with the mapping. type Vars struct { @@ -26,30 +29,31 @@ type Vars struct { processorsKey string processors Processors fetchContextProviders mapstr.M + defaultProvider string } // NewVars returns a new instance of vars. -func NewVars(id string, mapping map[string]interface{}, fetchContextProviders mapstr.M) (*Vars, error) { - return NewVarsWithProcessors(id, mapping, "", nil, fetchContextProviders) +func NewVars(id string, mapping map[string]interface{}, fetchContextProviders mapstr.M, defaultProvider string) (*Vars, error) { + return NewVarsWithProcessors(id, mapping, "", nil, fetchContextProviders, defaultProvider) } // NewVarsFromAst returns a new instance of vars. It takes the mapping as an *AST. -func NewVarsFromAst(id string, tree *AST, fetchContextProviders mapstr.M) *Vars { - return &Vars{id, tree, "", nil, fetchContextProviders} +func NewVarsFromAst(id string, tree *AST, fetchContextProviders mapstr.M, defaultProvider string) *Vars { + return &Vars{id, tree, "", nil, fetchContextProviders, defaultProvider} } // NewVarsWithProcessors returns a new instance of vars with attachment of processors. -func NewVarsWithProcessors(id string, mapping map[string]interface{}, processorKey string, processors Processors, fetchContextProviders mapstr.M) (*Vars, error) { +func NewVarsWithProcessors(id string, mapping map[string]interface{}, processorKey string, processors Processors, fetchContextProviders mapstr.M, defaultProvider string) (*Vars, error) { tree, err := NewAST(mapping) if err != nil { return nil, err } - return &Vars{id, tree, processorKey, processors, fetchContextProviders}, nil + return &Vars{id, tree, processorKey, processors, fetchContextProviders, defaultProvider}, nil } // NewVarsWithProcessorsFromAst returns a new instance of vars with attachment of processors. It takes the mapping as an *AST. -func NewVarsWithProcessorsFromAst(id string, tree *AST, processorKey string, processors Processors, fetchContextProviders mapstr.M) *Vars { - return &Vars{id, tree, processorKey, processors, fetchContextProviders} +func NewVarsWithProcessorsFromAst(id string, tree *AST, processorKey string, processors Processors, fetchContextProviders mapstr.M, defaultProvider string) *Vars { + return &Vars{id, tree, processorKey, processors, fetchContextProviders, defaultProvider} } // Replace returns a new value based on variable replacement. @@ -61,7 +65,7 @@ func (v *Vars) Replace(value string) (Node, error) { processors = v.processors } return node, processors, ok - }, true) + }, true, v.defaultProvider) } // ID returns the unique ID for the vars. @@ -103,7 +107,7 @@ func (v *Vars) lookupNode(name string) (Node, bool) { return Lookup(v.tree, name) } -func replaceVars(value string, replacer func(variable string) (Node, Processors, bool), reqMatch bool) (Node, error) { +func replaceVars(value string, replacer func(variable string) (Node, Processors, bool), reqMatch bool, defaultProvider string) (Node, error) { var processors Processors matchIdxs := varsRegex.FindAllSubmatchIndex([]byte(value), -1) if !validBrackets(value, matchIdxs) { @@ -120,7 +124,7 @@ func replaceVars(value string, replacer func(variable string) (Node, Processors, continue } // match on a non-escaped var - vars, err := extractVars(value[r[i+2]:r[i+3]]) + vars, err := extractVars(value[r[i+2]:r[i+3]], defaultProvider) if err != nil { return nil, fmt.Errorf(`error parsing variable "%s": %w`, value[r[i]:r[i+1]], err) } @@ -151,7 +155,7 @@ func replaceVars(value string, replacer func(variable string) (Node, Processors, } } if !set && reqMatch { - return NewStrVal(""), ErrNoMatch + return NewStrVal(""), fmt.Errorf("%w: %s", ErrNoMatch, toRepresentation(vars)) } lastIndex = r[1] } @@ -159,6 +163,26 @@ func replaceVars(value string, replacer func(variable string) (Node, Processors, return NewStrValWithProcessors(result+value[lastIndex:], processors), nil } +func toRepresentation(vars []varI) string { + var sb strings.Builder + sb.WriteString("${") + for i, val := range vars { + switch val.(type) { + case *constString: + sb.WriteString(`'`) + sb.WriteString(val.Value()) + sb.WriteString(`'`) + case *varString: + sb.WriteString(val.Value()) + if i < len(vars)-1 { + sb.WriteString("|") + } + } + } + sb.WriteString("}") + return sb.String() +} + // nodeToValue ensures that the node is an actual value. func nodeToValue(node Node) Node { switch n := node.(type) { @@ -206,7 +230,7 @@ func (v *constString) Value() string { return v.value } -func extractVars(i string) ([]varI, error) { +func extractVars(i string, defaultProvider string) ([]varI, error) { const out = rune(0) quote := out @@ -226,7 +250,7 @@ func extractVars(i string) ([]varI, error) { if is[len(is)-1] == '.' { return nil, fmt.Errorf("variable cannot end with '.'") } - res = append(res, &varString{string(is)}) + res = append(res, &varString{maybeAddDefaultProvider(string(is), defaultProvider)}) } is = is[:0] // slice to zero length; to keep allocated memory constant = false @@ -267,12 +291,28 @@ func extractVars(i string) ([]varI, error) { if is[len(is)-1] == '.' { return nil, fmt.Errorf("variable cannot end with '.'") } - res = append(res, &varString{string(is)}) + res = append(res, &varString{maybeAddDefaultProvider(string(is), defaultProvider)}) } return res, nil } func varPrefixMatched(val string, key string) bool { - s := strings.SplitN(val, ".", 2) + s := strings.SplitN(val, varsSeparator, 2) return s[0] == key } + +// maybeAddDefaultProvider adds a defaultProvide as a prefix on the value only in the case that +// the defaultProvider is set and the val doesn't contain any varsSeparator. +// +// This is done here and not at resolve time of the variable because the Observe flow of the AST +// for the variables provider needs to known exactly which providers to run. It also is an issue with +// using fetch providers because we would have to hit each to determine if that variable was present first +// before apply the default and we do not want that behavior. +func maybeAddDefaultProvider(val string, defaultProvider string) string { + if defaultProvider == "" || strings.Contains(val, varsSeparator) { + // no default set or already has a provider in the variable name + return val + } + // at this point they variable doesn't have a provider + return fmt.Sprintf("%s.%s", defaultProvider, val) +} diff --git a/internal/pkg/agent/transpiler/vars_test.go b/internal/pkg/agent/transpiler/vars_test.go index fe0dcc7089e..97a28f3bce2 100644 --- a/internal/pkg/agent/transpiler/vars_test.go +++ b/internal/pkg/agent/transpiler/vars_test.go @@ -16,7 +16,7 @@ import ( ) func TestVars_Replace(t *testing.T) { - vars := mustMakeVars(map[string]interface{}{ + vars := mustMakeVarsWithDefault(map[string]interface{}{ "un-der_score": map[string]interface{}{ "key1": "data1", "key2": "data2", @@ -42,7 +42,7 @@ func TestVars_Replace(t *testing.T) { "key5": "${", "key6": "$${", }, - }) + }, "other") tests := []struct { Input string Result Node @@ -79,6 +79,14 @@ func TestVars_Replace(t *testing.T) { false, false, }, + { + // data will be resolved to other.data since 'other' is the default provider + // set at variable creation (see mustMakeVarsWithDefault call) + "${un-der_score.missing|un-der_score.missing2|data}", + NewStrVal("info"), + false, + false, + }, { "${un-der_score.missing|'fallback'}", NewStrVal("fallback"), @@ -185,13 +193,10 @@ func TestVars_Replace(t *testing.T) { false, }, { - `list inside string ${un-der_score.list} causes no match`, - NewList([]Node{ - NewStrVal("array1"), - NewStrVal("array2"), - }), + `list inside string ${un-der_score.list} strings array`, + NewStrVal(`list inside string [array1,array2] strings array`), + false, false, - true, }, { `${un-der_score.dict}`, @@ -203,13 +208,10 @@ func TestVars_Replace(t *testing.T) { false, }, { - `dict inside string ${un-der_score.dict} causes no match`, - NewDict([]Node{ - NewKey("key1", NewStrVal("value1")), - NewKey("key2", NewStrVal("value2")), - }), + `dict inside string ${un-der_score.dict} strings dict`, + NewStrVal(`dict inside string {key1:value1},{key2:value2} strings dict`), + false, false, - true, }, { `start $${keep} ${un-der_score.key1} $${un-der_score.key1}`, @@ -260,7 +262,7 @@ func TestVars_Replace(t *testing.T) { if test.Error { assert.Error(t, err) } else if test.NoMatch { - assert.Error(t, ErrNoMatch, err) + assert.ErrorIs(t, err, ErrNoMatch) } else { require.NoError(t, err) assert.Equal(t, test.Result, res) @@ -297,13 +299,17 @@ func TestVars_ReplaceWithProcessors(t *testing.T) { }, "dynamic", processers, - nil) + nil, "testing") require.NoError(t, err) res, err := vars.Replace("${testing.key1}") require.NoError(t, err) assert.Equal(t, NewStrVal("data1"), res) + res, err = vars.Replace("${key1}") + require.NoError(t, err) + assert.Equal(t, NewStrVal("data1"), res) + res, err = vars.Replace("${dynamic.key1}") require.NoError(t, err) assert.Equal(t, NewStrValWithProcessors("dynamic1", processers), res) @@ -364,7 +370,7 @@ func TestVars_ReplaceWithFetchContextProvider(t *testing.T) { }, "dynamic", processers, - fetchContextProviders) + fetchContextProviders, "") require.NoError(t, err) res, err := vars.Replace("${testing.key1}") diff --git a/internal/pkg/composable/benchmark_test.go b/internal/pkg/composable/benchmark_test.go index 913d8d4fbd1..db3caaefa09 100644 --- a/internal/pkg/composable/benchmark_test.go +++ b/internal/pkg/composable/benchmark_test.go @@ -76,6 +76,6 @@ func BenchmarkGenerateVars100Pods(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - c.generateVars(mapstr.M{}) + c.generateVars(mapstr.M{}, "") } } diff --git a/internal/pkg/composable/config.go b/internal/pkg/composable/config.go index 04f1b38e0a2..b5b7393f837 100644 --- a/internal/pkg/composable/config.go +++ b/internal/pkg/composable/config.go @@ -15,4 +15,5 @@ type Config struct { Providers map[string]*config.Config `config:"providers"` ProvidersInitialDefault *bool `config:"agent.providers.initial_default"` ProvidersRestartInterval *time.Duration `config:"agent.providers.restart_interval"` + ProvidersDefaultProvider *string `config:"agent.providers.default"` } diff --git a/internal/pkg/composable/controller.go b/internal/pkg/composable/controller.go index 2743eee9b62..453a3d47fb7 100644 --- a/internal/pkg/composable/controller.go +++ b/internal/pkg/composable/controller.go @@ -23,7 +23,8 @@ import ( ) const ( - defaultRetryInterval = 30 * time.Second + defaultRetryInterval = 30 * time.Second + defaultDefaultProvider = "env" ) // Controller manages the state of the providers current context. @@ -39,17 +40,37 @@ type Controller interface { // Watch returns the channel to watch for variable changes. Watch() <-chan []*transpiler.Vars - // Observe instructs the variables to observe. - Observe([]string) + // Observe instructs the controller to enable the observed providers. + // + // This is a blocking call until the observation is handled and the most recent + // set of variables are returned the caller in the case a change occurred. If no change occurred then + // it will return with a nil array. If changed the current observed state of variables + // that is returned is not sent over the Watch channel, the caller should coordinate this fact. + // + // Maximum amount of time for resolve is 500ms as that is the debounce window for variable resolution. + // + // Only error that is returned from this function is the result of the passed context. + Observe(context.Context, []string) ([]*transpiler.Vars, error) + + // DefaultProvider returns the default provider used by the controller. + // + // This is used by any variable reference that doesn't add a provider prefix. + DefaultProvider() string +} + +type observer struct { + vars map[string]bool + result chan []*transpiler.Vars } // controller manages the state of the providers current context. type controller struct { logger *logger.Logger ch chan []*transpiler.Vars - observedCh chan map[string]bool + observedCh chan observer errCh chan error restartInterval time.Duration + defaultProvider string managed bool contextProviderBuilders map[string]contextProvider @@ -82,6 +103,11 @@ func New(log *logger.Logger, c *config.Config, managed bool) (Controller, error) restartInterval = *providersCfg.ProvidersRestartInterval } + defaultProvider := defaultDefaultProvider + if providersCfg.ProvidersDefaultProvider != nil { + defaultProvider = *providersCfg.ProvidersDefaultProvider + } + // build all the context providers contextProviders := map[string]contextProvider{} for name, builder := range Providers.contextProviders { @@ -113,10 +139,11 @@ func New(log *logger.Logger, c *config.Config, managed bool) (Controller, error) return &controller{ logger: l, ch: make(chan []*transpiler.Vars, 1), - observedCh: make(chan map[string]bool, 1), + observedCh: make(chan observer), errCh: make(chan error), managed: managed, restartInterval: restartInterval, + defaultProvider: defaultProvider, contextProviderBuilders: contextProviders, dynamicProviderBuilders: dynamicProviders, contextProviderStates: make(map[string]*contextProviderState), @@ -183,7 +210,7 @@ func (c *controller) Run(ctx context.Context) error { // send initial vars state fetchProvidersLock.RLock() - err := c.sendVars(ctx, fetchProviders) + err := c.sendVars(ctx, nil, fetchProviders) if err != nil { fetchProvidersLock.RUnlock() // only error is context cancel, no need to add error message context @@ -192,6 +219,7 @@ func (c *controller) Run(ctx context.Context) error { fetchProvidersLock.RUnlock() // performs debounce of notifies; accumulates them into 100 millisecond chunks + var observedResult chan []*transpiler.Vars for { DEBOUNCE: for { @@ -199,12 +227,21 @@ func (c *controller) Run(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() case observed := <-c.observedCh: - changed := c.handleObserved(localCtx, &wg, fetchCh, stateChangedChan, observed) + // observedResult holds the channel to send the latest observed results on + // if nothing is changed then nil will be sent over the channel if the set of running + // providers does change then the latest observed variables will be sent over the channel + observedResult = observed.result + changed := c.handleObserved(localCtx, &wg, fetchCh, stateChangedChan, observed.vars) if changed { t.Reset(100 * time.Millisecond) c.logger.Debugf("Observed state changed for composable inputs; debounce started") drainChan(stateChangedChan) break DEBOUNCE + } else { + // nothing changed send nil to alert the caller + // observedResult must be set to nil here so on next loop it is not set + observedResult <- nil + observedResult = nil } case <-stateChangedChan: t.Reset(100 * time.Millisecond) @@ -223,9 +260,10 @@ func (c *controller) Run(ctx context.Context) error { // batching done, gather results } - // send the vars to the watcher + // send the vars to the watcher or the observer caller fetchProvidersLock.RLock() - err := c.sendVars(ctx, fetchProviders) + err := c.sendVars(ctx, observedResult, fetchProviders) + observedResult = nil if err != nil { fetchProvidersLock.RUnlock() // only error is context cancel, no need to add error message context @@ -235,9 +273,24 @@ func (c *controller) Run(ctx context.Context) error { } } -func (c *controller) sendVars(ctx context.Context, fetchContextProviders mapstr.M) error { +func (c *controller) sendVars(ctx context.Context, observedResult chan []*transpiler.Vars, fetchContextProviders mapstr.M) error { c.logger.Debugf("Computing new variable state for composable inputs") - vars := c.generateVars(fetchContextProviders) + vars := c.generateVars(fetchContextProviders, c.defaultProvider) + if observedResult != nil { + // drain any vars sitting on the watch channel + // this new set of vars replaces that set if that current + // value has not been read then it will result in vars state being incorrect + select { + case <-c.ch: + default: + } + select { + case <-ctx.Done(): + return ctx.Err() + case observedResult <- vars: + return nil + } + } for { select { case c.ch <- vars: @@ -267,10 +320,17 @@ func (c *controller) Watch() <-chan []*transpiler.Vars { return c.ch } -// Observe sends the observed variables from the AST to the controller. +// Observe instructs the controller to enable the observed providers. +// +// This is a blocking call until the observation is handled and the most recent +// set of variables are returned the caller in the case a change occurred. If no change occurred then +// it will return with a nil array. If changed the current observed state of variables +// that is returned is not sent over the Watch channel, the caller should coordinate this fact. // -// Based on this information it will determine which providers should even be running. -func (c *controller) Observe(vars []string) { +// Maximum amount of time for resolve is 500ms as that is the debounce window for variable resolution. +// +// Only error that is returned from this function is the result of the passed context. +func (c *controller) Observe(ctx context.Context, vars []string) ([]*transpiler.Vars, error) { // only need the top-level variables to determine which providers to run // // future: possible that all vars could be organized and then passed to each provider to @@ -280,9 +340,24 @@ func (c *controller) Observe(vars []string) { vs := strings.SplitN(v, ".", 2) topLevel[vs[0]] = true } - // drain the channel first, if the previous vars had not been used yet the new list should be used instead - drainChan(c.observedCh) - c.observedCh <- topLevel + // blocks waiting for an updated set of variables + ch := make(chan []*transpiler.Vars) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case c.observedCh <- observer{topLevel, ch}: + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case vars := <-ch: + return vars, nil + } +} + +// DefaultProvider returns the default provider being used by the controller. +func (c *controller) DefaultProvider() string { + return c.defaultProvider } func (c *controller) handleObserved(ctx context.Context, wg *sync.WaitGroup, fetchCh chan fetchProvider, stateChangedChan chan bool, observed map[string]bool) bool { @@ -317,24 +392,28 @@ func (c *controller) handleObserved(ctx context.Context, wg *sync.WaitGroup, fet continue } + found := false contextInfo, ok := c.contextProviderBuilders[name] if ok { + found = true state := c.startContextProvider(ctx, wg, fetchCh, stateChangedChan, name, contextInfo) if state != nil { changed = true c.contextProviderStates[name] = state - } } dynamicInfo, ok := c.dynamicProviderBuilders[name] if ok { + found = true state := c.startDynamicProvider(ctx, wg, stateChangedChan, name, dynamicInfo) if state != nil { changed = true c.dynamicProviderStates[name] = state } } - c.logger.Warnf("provider %q referenced in policy but no provider exists or was explicitly disabled", name) + if !found { + c.logger.Warnf("provider %q referenced in policy but no provider exists or was explicitly disabled", name) + } } // running remaining need to be stopped @@ -470,14 +549,14 @@ func (c *controller) startDynamicProvider(ctx context.Context, wg *sync.WaitGrou return state } -func (c *controller) generateVars(fetchContextProviders mapstr.M) []*transpiler.Vars { +func (c *controller) generateVars(fetchContextProviders mapstr.M, defaultProvider string) []*transpiler.Vars { // build the vars list of mappings vars := make([]*transpiler.Vars, 1) mapping, _ := transpiler.NewAST(map[string]any{}) for name, state := range c.contextProviderStates { _ = mapping.Insert(state.Current(), name) } - vars[0] = transpiler.NewVarsFromAst("", mapping, fetchContextProviders) + vars[0] = transpiler.NewVarsFromAst("", mapping, fetchContextProviders, defaultProvider) // add to the vars list for each dynamic providers mappings for name, state := range c.dynamicProviderStates { @@ -485,7 +564,7 @@ func (c *controller) generateVars(fetchContextProviders mapstr.M) []*transpiler. local := mapping.ShallowClone() _ = local.Insert(mappings.mapping, name) id := fmt.Sprintf("%s-%s", name, mappings.id) - v := transpiler.NewVarsWithProcessorsFromAst(id, local, name, mappings.processors, fetchContextProviders) + v := transpiler.NewVarsWithProcessorsFromAst(id, local, name, mappings.processors, fetchContextProviders, defaultProvider) vars = append(vars, v) } } diff --git a/internal/pkg/composable/controller_test.go b/internal/pkg/composable/controller_test.go index 3d3f532dc6b..2f83c7ac486 100644 --- a/internal/pkg/composable/controller_test.go +++ b/internal/pkg/composable/controller_test.go @@ -86,22 +86,17 @@ func TestController(t *testing.T) { var setVars2 []*transpiler.Vars var setVars3 []*transpiler.Vars go func() { - for { - select { - case <-ctx.Done(): - return - case vars := <-c.Watch(): - if setVars1 == nil { - setVars1 = vars - c.Observe([]string{"local.vars.key1", "local_dynamic.vars.key1"}) // observed local and local_dynamic - } else if setVars2 == nil { - setVars2 = vars - c.Observe(nil) // no observed (will turn off those providers) - } else { - setVars3 = vars - cancel() - } - } + defer cancel() + select { + case <-ctx.Done(): + return + case vars := <-c.Watch(): + // initial vars + setVars1 = vars + setVars2, err = c.Observe(ctx, []string{"local.vars.key1", "local_dynamic.vars.key1"}) // observed local and local_dynamic + require.NoError(t, err) + setVars3, err = c.Observe(ctx, nil) // no observed (will turn off those providers) + require.NoError(t, err) } }() @@ -241,36 +236,47 @@ func TestProvidersDefaultDisabled(t *testing.T) { c, err := composable.New(log, cfg, false) require.NoError(t, err) - c.Observe(tt.observed) - ctx, cancel := context.WithCancel(context.Background()) defer cancel() timeoutCtx, timeoutCancel := context.WithTimeout(ctx, 1*time.Second) defer timeoutCancel() + errCh := make(chan error) + go func() { + errCh <- c.Run(ctx) + }() + var setVars []*transpiler.Vars go func() { defer cancel() + + observed := false for { select { case <-timeoutCtx.Done(): return case vars := <-c.Watch(): setVars = vars + default: + if !observed { + vars, err := c.Observe(timeoutCtx, tt.observed) + require.NoError(t, err) + if vars != nil { + setVars = vars + } + observed = true + } } } }() - errCh := make(chan error) - go func() { - errCh <- c.Run(ctx) - }() err = <-errCh if errors.Is(err, context.Canceled) { err = nil } require.NoError(t, err) + require.NotNil(t, setVars) if len(tt.context) > 0 { for _, name := range tt.context { @@ -379,3 +385,34 @@ func TestCancellation(t *testing.T) { } }) } + +func TestDefaultProvider(t *testing.T) { + log, err := logger.New("", false) + require.NoError(t, err) + + t.Run("default env", func(t *testing.T) { + c, err := composable.New(log, nil, false) + require.NoError(t, err) + assert.Equal(t, "env", c.DefaultProvider()) + }) + + t.Run("no default", func(t *testing.T) { + cfg, err := config.NewConfigFrom(map[string]interface{}{ + "agent.providers.default": "", + }) + require.NoError(t, err) + c, err := composable.New(log, cfg, false) + require.NoError(t, err) + assert.Equal(t, "", c.DefaultProvider()) + }) + + t.Run("custom default", func(t *testing.T) { + cfg, err := config.NewConfigFrom(map[string]interface{}{ + "agent.providers.default": "custom", + }) + require.NoError(t, err) + c, err := composable.New(log, cfg, false) + require.NoError(t, err) + assert.Equal(t, "custom", c.DefaultProvider()) + }) +} diff --git a/internal/pkg/composable/providers/agent/agent.go b/internal/pkg/composable/providers/agent/agent.go index 2f4eec39e3e..b73de416116 100644 --- a/internal/pkg/composable/providers/agent/agent.go +++ b/internal/pkg/composable/providers/agent/agent.go @@ -41,7 +41,8 @@ func (*contextProvider) Run(ctx context.Context, comm corecomp.ContextProviderCo if err != nil { return errors.New(err, "failed to set mapping", errors.TypeUnexpected) } - return nil + <-ctx.Done() + return ctx.Err() } // ContextProviderBuilder builds the context provider. diff --git a/internal/pkg/composable/providers/agent/agent_test.go b/internal/pkg/composable/providers/agent/agent_test.go index 9098fbfe5e1..44d812be037 100644 --- a/internal/pkg/composable/providers/agent/agent_test.go +++ b/internal/pkg/composable/providers/agent/agent_test.go @@ -7,6 +7,7 @@ package agent import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -23,12 +24,12 @@ func TestContextProvider(t *testing.T) { provider, err := builder(nil, nil, true) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - comm := ctesting.NewContextComm(context.Background()) + comm := ctesting.NewContextComm(ctx) err = provider.Run(ctx, comm) - require.NoError(t, err) + require.ErrorIs(t, err, context.DeadlineExceeded) current := comm.Current() _, hasID := current["id"] diff --git a/internal/pkg/composable/providers/env/env.go b/internal/pkg/composable/providers/env/env.go index 4eb6a73d0db..bdae491edc9 100644 --- a/internal/pkg/composable/providers/env/env.go +++ b/internal/pkg/composable/providers/env/env.go @@ -28,7 +28,8 @@ func (*contextProvider) Run(ctx context.Context, comm corecomp.ContextProviderCo if err != nil { return errors.New(err, "failed to set mapping", errors.TypeUnexpected) } - return nil + <-ctx.Done() + return ctx.Err() } // ContextProviderBuilder builds the context provider. diff --git a/internal/pkg/composable/providers/env/env_test.go b/internal/pkg/composable/providers/env/env_test.go index c9b5bc95dae..627b6acb615 100644 --- a/internal/pkg/composable/providers/env/env_test.go +++ b/internal/pkg/composable/providers/env/env_test.go @@ -7,6 +7,7 @@ package env import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -20,12 +21,12 @@ func TestContextProvider(t *testing.T) { provider, err := builder(nil, nil, true) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - comm := ctesting.NewContextComm(context.Background()) + comm := ctesting.NewContextComm(ctx) err = provider.Run(ctx, comm) - require.NoError(t, err) + require.ErrorIs(t, err, context.DeadlineExceeded) assert.Equal(t, getEnvMapping(), comm.Current()) } diff --git a/internal/pkg/composable/providers/local/local.go b/internal/pkg/composable/providers/local/local.go index 470d6c0a312..7e88cbdec60 100644 --- a/internal/pkg/composable/providers/local/local.go +++ b/internal/pkg/composable/providers/local/local.go @@ -29,7 +29,8 @@ func (c *contextProvider) Run(ctx context.Context, comm corecomp.ContextProvider if err != nil { return errors.New(err, "failed to set mapping", errors.TypeUnexpected) } - return nil + <-ctx.Done() + return ctx.Err() } // ContextProviderBuilder builds the context provider. diff --git a/internal/pkg/composable/providers/local/local_test.go b/internal/pkg/composable/providers/local/local_test.go index a1f2f04bc31..4a1b82e1e06 100644 --- a/internal/pkg/composable/providers/local/local_test.go +++ b/internal/pkg/composable/providers/local/local_test.go @@ -7,6 +7,7 @@ package local import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -29,12 +30,12 @@ func TestContextProvider(t *testing.T) { provider, err := builder(nil, cfg, true) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - comm := ctesting.NewContextComm(context.Background()) + comm := ctesting.NewContextComm(ctx) err = provider.Run(ctx, comm) - require.NoError(t, err) + require.ErrorIs(t, err, context.DeadlineExceeded) assert.Equal(t, mapping, comm.Current()) } diff --git a/internal/pkg/composable/providers/localdynamic/localdynamic.go b/internal/pkg/composable/providers/localdynamic/localdynamic.go index 6bef79633b7..e5897a85491 100644 --- a/internal/pkg/composable/providers/localdynamic/localdynamic.go +++ b/internal/pkg/composable/providers/localdynamic/localdynamic.go @@ -37,7 +37,8 @@ func (c *dynamicProvider) Run(comm composable.DynamicProviderComm) error { return errors.New(err, fmt.Sprintf("failed to add mapping for index %d", i), errors.TypeUnexpected) } } - return nil + <-comm.Done() + return comm.Err() } // DynamicProviderBuilder builds the dynamic provider. diff --git a/internal/pkg/composable/providers/localdynamic/localdynamic_test.go b/internal/pkg/composable/providers/localdynamic/localdynamic_test.go index 313f29ff4ad..cf50b42ee39 100644 --- a/internal/pkg/composable/providers/localdynamic/localdynamic_test.go +++ b/internal/pkg/composable/providers/localdynamic/localdynamic_test.go @@ -7,6 +7,7 @@ package localdynamic import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -63,9 +64,11 @@ func TestContextProvider(t *testing.T) { provider, err := builder(nil, cfg, true) require.NoError(t, err) - comm := ctesting.NewDynamicComm(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + comm := ctesting.NewDynamicComm(ctx) err = provider.Run(comm) - require.NoError(t, err) + require.ErrorIs(t, err, context.DeadlineExceeded) curr1, ok1 := comm.Current("0") assert.True(t, ok1) diff --git a/internal/pkg/composable/providers/path/path.go b/internal/pkg/composable/providers/path/path.go index f84c04294ad..a0fdf43b63e 100644 --- a/internal/pkg/composable/providers/path/path.go +++ b/internal/pkg/composable/providers/path/path.go @@ -32,7 +32,8 @@ func (*contextProvider) Run(ctx context.Context, comm corecomp.ContextProviderCo if err != nil { return errors.New(err, "failed to set mapping", errors.TypeUnexpected) } - return nil + <-ctx.Done() + return ctx.Err() } // ContextProviderBuilder builds the context provider. diff --git a/internal/pkg/composable/providers/path/path_test.go b/internal/pkg/composable/providers/path/path_test.go index c8d3c57b41a..00f3d6b03ef 100644 --- a/internal/pkg/composable/providers/path/path_test.go +++ b/internal/pkg/composable/providers/path/path_test.go @@ -7,6 +7,7 @@ package path import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -21,11 +22,11 @@ func TestContextProvider(t *testing.T) { provider, err := builder(nil, nil, true) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - comm := ctesting.NewContextComm(context.Background()) + comm := ctesting.NewContextComm(ctx) err = provider.Run(ctx, comm) - require.NoError(t, err) + require.ErrorIs(t, err, context.DeadlineExceeded) current := comm.Current() assert.Equal(t, paths.Home(), current["home"]) diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go index b57a36eca88..9168426430d 100644 --- a/internal/pkg/config/config.go +++ b/internal/pkg/config/config.go @@ -49,7 +49,7 @@ var DefaultOptions = []interface{}{ ucfg.PathSep("."), ucfg.ResolveEnv, ucfg.VarExp, - VarSkipKeys("inputs"), + VarSkipKeys("inputs", "outputs"), ucfg.IgnoreCommas, OTelKeys("connectors", "receivers", "processors", "exporters", "extensions", "service"), } diff --git a/internal/pkg/config/config_test.go b/internal/pkg/config/config_test.go index 55f5d60e7c8..9fa2961fd77 100644 --- a/internal/pkg/config/config_test.go +++ b/internal/pkg/config/config_test.go @@ -29,8 +29,8 @@ func TestInputsResolveNOOP(t *testing.T) { "default": map[string]interface{}{ "type": "elasticsearch", "hosts": []interface{}{"127.0.0.1:9200"}, - "username": "elastic", - "password": "changeme", + "username": "${env.ES_USER}", + "password": "${env.ES_PASSWORD}", }, }, "inputs": []interface{}{ diff --git a/pkg/component/component.go b/pkg/component/component.go index 21e87ea5233..bd1c4893171 100644 --- a/pkg/component/component.go +++ b/pkg/component/component.go @@ -712,7 +712,7 @@ type outputI struct { // varsForPlatform sets the runtime variables that are available in the // input specification runtime checks. This function should always be // edited in sync with the documentation in specs/README.md. -func varsForPlatform(platform PlatformDetail) (*transpiler.Vars, error) { +func varsForPlatform(platform PlatformDetail, defaultProvider string) (*transpiler.Vars, error) { return transpiler.NewVars("", map[string]interface{}{ "install": map[string]interface{}{ "in_default": paths.ArePathsEqual(paths.Top(), paths.InstallPath(paths.DefaultBasePath)) || platform.IsInstalledViaExternalPkgMgr, @@ -729,14 +729,14 @@ func varsForPlatform(platform PlatformDetail) (*transpiler.Vars, error) { "user": map[string]interface{}{ "root": platform.User.Root, }, - }, nil) + }, nil, defaultProvider) } func validateRuntimeChecks( runtime *RuntimeSpec, platform PlatformDetail, ) error { - vars, err := varsForPlatform(platform) + vars, err := varsForPlatform(platform, "") // no default provider if err != nil { return err } diff --git a/pkg/component/component_test.go b/pkg/component/component_test.go index 995b223d91d..23c4685d36e 100644 --- a/pkg/component/component_test.go +++ b/pkg/component/component_test.go @@ -2390,7 +2390,7 @@ func TestPreventionsAreValid(t *testing.T) { "user": map[string]interface{}{ "root": false, }, - }, nil) + }, nil, "") require.NoError(t, err) for path, spec := range specFiles {