From 2c31f82742c50c3b994cec1c484fd24db5d5ae53 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Fri, 24 Jan 2025 14:41:56 -0500 Subject: [PATCH 01/14] Add support for variables in outputs and default provider. --- .../application/coordinator/coordinator.go | 32 ++++- .../coordinator/coordinator_test.go | 6 +- .../coordinator/coordinator_unit_test.go | 6 +- .../coordinator/diagnostics_test.go | 2 +- internal/pkg/agent/transpiler/ast.go | 26 ++-- internal/pkg/agent/transpiler/ast_test.go | 79 +++++++++++- .../agent/transpiler/{utils.go => inputs.go} | 0 .../{utils_test.go => inputs_test.go} | 21 +++- internal/pkg/agent/transpiler/outputs.go | 62 ++++++++++ internal/pkg/agent/transpiler/outputs_test.go | 116 ++++++++++++++++++ internal/pkg/agent/transpiler/vars.go | 41 ++++--- internal/pkg/agent/transpiler/vars_test.go | 18 ++- internal/pkg/composable/benchmark_test.go | 2 +- internal/pkg/composable/config.go | 1 + internal/pkg/composable/controller.go | 28 ++++- internal/pkg/composable/controller_test.go | 31 +++++ pkg/component/component.go | 6 +- pkg/component/component_test.go | 2 +- 18 files changed, 421 insertions(+), 58 deletions(-) rename internal/pkg/agent/transpiler/{utils.go => inputs.go} (100%) rename internal/pkg/agent/transpiler/{utils_test.go => inputs_test.go} (97%) create mode 100644 internal/pkg/agent/transpiler/outputs.go create mode 100644 internal/pkg/agent/transpiler/outputs_test.go diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 0394d301df9..32ef899ad6b 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -172,6 +172,9 @@ type ConfigManager interface { type VarsManager interface { Runner + // DefaultProvider returns the default provider that the variable manager is configured to use. + DefaultProvider() string + // Observe instructs the variables to observe. Observe([]string) @@ -1337,14 +1340,15 @@ func (c *Coordinator) observeASTVars() { c.varsMgr.Observe(nil) return } + var vars []string inputs, ok := transpiler.Lookup(c.ast, "inputs") - if !ok { - // No inputs; no vars - c.varsMgr.Observe(nil) - return + if ok { + vars = inputs.Vars(vars, c.varsMgr.DefaultProvider()) + } + outputs, ok := transpiler.Lookup(c.ast, "outputs") + if ok { + vars = outputs.Vars(vars, c.varsMgr.DefaultProvider()) } - var vars []string - vars = inputs.Vars(vars) c.varsMgr.Observe(vars) } @@ -1421,6 +1425,8 @@ func (c *Coordinator) generateComponentModel() (err error) { }() ast := c.ast.ShallowClone() + + // perform variable substitution for inputs inputs, ok := transpiler.Lookup(ast, "inputs") if ok { renderedInputs, err := transpiler.RenderInputs(inputs, c.vars) @@ -1433,6 +1439,20 @@ func (c *Coordinator) generateComponentModel() (err error) { } } + // perform variable substitution for outputs + // outputs only support the context variables (dynamic provides are not provide to the outputs) + outputs, ok := transpiler.Lookup(ast, "outputs") + if ok { + renderedOutputs, err := transpiler.RenderOutputs(outputs, c.vars) + if err != nil { + return fmt.Errorf("rendering outputs failed: %w", err) + } + err = transpiler.Insert(ast, renderedOutputs, "outputs") + if err != nil { + return fmt.Errorf("inserting rendered outputs failed: %w", err) + } + } + cfg, err := ast.Map() if err != nil { return fmt.Errorf("failed to convert ast to map[string]interface{}: %w", err) diff --git a/internal/pkg/agent/application/coordinator/coordinator_test.go b/internal/pkg/agent/application/coordinator/coordinator_test.go index 9c0973e4c26..cb5659269ca 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_test.go @@ -928,7 +928,7 @@ func BenchmarkCoordinator_generateComponentModel(b *testing.B) { require.NoError(b, err) vars := make([]*transpiler.Vars, len(varsMaps)) for i, vm := range varsMaps { - vars[i], err = transpiler.NewVars(fmt.Sprintf("%d", i), vm, mapstr.M{}) + vars[i], err = transpiler.NewVars(fmt.Sprintf("%d", i), vm, mapstr.M{}, "") require.NoError(b, err) } @@ -1235,6 +1235,10 @@ func (f *fakeVarsManager) Vars(ctx context.Context, vars []*transpiler.Vars) { } } +func (f *fakeVarsManager) DefaultProvider() string { + return "" +} + type fakeOTelManager struct { updateCallback func(*confmap.Conf) error result error diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index a4f998bd82b..762309b37ef 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -1104,7 +1104,7 @@ inputs: components = nil vars, err := transpiler.NewVars("", map[string]interface{}{ "TEST_VAR": "input-id", - }, nil) + }, nil, "") require.NoError(t, err, "Vars creation must succeed") varsChan <- []*transpiler.Vars{vars} coord.runLoopIteration(ctx) @@ -1121,7 +1121,7 @@ inputs: components = nil vars, err = transpiler.NewVars("", map[string]interface{}{ "TEST_VAR": "changed-input-id", - }, nil) + }, nil, "") require.NoError(t, err, "Vars creation must succeed") varsChan <- []*transpiler.Vars{vars} coord.runLoopIteration(ctx) @@ -1239,7 +1239,7 @@ func TestCoordinatorInitiatesUpgrade(t *testing.T) { // (Coordinator will only regenerate its component model when it has non-nil // vars). func emptyVars(t *testing.T) []*transpiler.Vars { - vars, err := transpiler.NewVars("", map[string]interface{}{}, nil) + vars, err := transpiler.NewVars("", map[string]interface{}{}, nil, "") require.NoError(t, err, "Vars creation must succeed") return []*transpiler.Vars{vars} } diff --git a/internal/pkg/agent/application/coordinator/diagnostics_test.go b/internal/pkg/agent/application/coordinator/diagnostics_test.go index 307831c1b50..14b3e535a76 100644 --- a/internal/pkg/agent/application/coordinator/diagnostics_test.go +++ b/internal/pkg/agent/application/coordinator/diagnostics_test.go @@ -196,7 +196,7 @@ func TestDiagnosticVariables(t *testing.T) { map[string]interface{}{ "testvar": "testvalue", }, - nil) + nil, "") require.NoError(t, err) expected := ` diff --git a/internal/pkg/agent/transpiler/ast.go b/internal/pkg/agent/transpiler/ast.go index 82987336235..bacf1be208c 100644 --- a/internal/pkg/agent/transpiler/ast.go +++ b/internal/pkg/agent/transpiler/ast.go @@ -64,7 +64,7 @@ type Node interface { // Vars adds to the array with the variables identified in the node. Returns the array in-case // the capacity of the array had to be changed. - Vars([]string) []string + Vars([]string, string) []string // Apply apply the current vars, returning the new value for the node. This does not modify the original Node. Apply(*Vars) (Node, error) @@ -182,10 +182,10 @@ func (d *Dict) Hash64With(h *xxhash.Digest) error { } // Vars returns a list of all variables referenced in the dictionary. -func (d *Dict) Vars(vars []string) []string { +func (d *Dict) Vars(vars []string, defaultProvider string) []string { for _, v := range d.value { k := v.(*Key) - vars = k.Vars(vars) + vars = k.Vars(vars, defaultProvider) } return vars } @@ -318,11 +318,11 @@ func (k *Key) Hash64With(h *xxhash.Digest) error { } // Vars returns a list of all variables referenced in the value. -func (k *Key) Vars(vars []string) []string { +func (k *Key) Vars(vars []string, defaultProvider string) []string { if k.value == nil { return vars } - return k.value.Vars(vars) + return k.value.Vars(vars, defaultProvider) } // Apply applies the vars to the value. This does not modify the original node. @@ -463,9 +463,9 @@ func (l *List) ShallowClone() Node { } // Vars returns a list of all variables referenced in the list. -func (l *List) Vars(vars []string) []string { +func (l *List) Vars(vars []string, defaultProvider string) []string { for _, v := range l.value { - vars = v.Vars(vars) + vars = v.Vars(vars, defaultProvider) } return vars } @@ -552,12 +552,12 @@ func (s *StrVal) Hash64With(h *xxhash.Digest) error { } // Vars returns a list of all variables referenced in the string. -func (s *StrVal) Vars(vars []string) []string { +func (s *StrVal) Vars(vars []string, defaultProvider string) []string { // errors are ignored (if there is an error determine the vars it will also error computing the policy) _, _ = replaceVars(s.value, func(variable string) (Node, Processors, bool) { vars = append(vars, variable) return nil, nil, false - }, false) + }, false, defaultProvider) return vars } @@ -613,7 +613,7 @@ func (s *IntVal) ShallowClone() Node { } // Vars does nothing. Cannot have variable in an IntVal. -func (s *IntVal) Vars(vars []string) []string { +func (s *IntVal) Vars(vars []string, defaultProvider string) []string { return vars } @@ -691,7 +691,7 @@ func (s *UIntVal) Hash64With(h *xxhash.Digest) error { } // Vars does nothing. Cannot have variable in an UIntVal. -func (s *UIntVal) Vars(vars []string) []string { +func (s *UIntVal) Vars(vars []string, defaultProvider string) []string { return vars } @@ -764,7 +764,7 @@ func (s *FloatVal) hashString() string { } // Vars does nothing. Cannot have variable in an FloatVal. -func (s *FloatVal) Vars(vars []string) []string { +func (s *FloatVal) Vars(vars []string, defaultProvider string) []string { return vars } @@ -843,7 +843,7 @@ func (s *BoolVal) Hash64With(h *xxhash.Digest) error { } // Vars does nothing. Cannot have variable in an BoolVal. -func (s *BoolVal) Vars(vars []string) []string { +func (s *BoolVal) Vars(vars []string, defaultProvider string) []string { return vars } diff --git a/internal/pkg/agent/transpiler/ast_test.go b/internal/pkg/agent/transpiler/ast_test.go index 948df76c51e..3ea626c8624 100644 --- a/internal/pkg/agent/transpiler/ast_test.go +++ b/internal/pkg/agent/transpiler/ast_test.go @@ -882,7 +882,7 @@ func TestApplyDoesNotMutate(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { - vars, err := NewVars("", map[string]any{"var": "value"}, mapstr.M{}) + vars, err := NewVars("", map[string]any{"var": "value"}, mapstr.M{}, "") require.NoError(t, err) applied, err := test.input.Apply(vars) require.NoError(t, err) @@ -961,8 +961,9 @@ func TestShallowClone(t *testing.T) { func TestVars(t *testing.T) { tests := map[string]struct { - input map[string]interface{} - result []string + input map[string]interface{} + result []string + defaultProvider string }{ "empty": { input: map[string]interface{}{}, @@ -1045,6 +1046,66 @@ func TestVars(t *testing.T) { }, result: []string{"var1", "var2", "var3", "var1", "var5", "var6", "var1"}, }, + "nested with default": { + input: map[string]interface{}{ + "novars": map[string]interface{}{ + "list1": []interface{}{ + map[string]interface{}{ + "int": 1, + "float": 1.1234, + "bool": true, + "str": "value1", + }, + }, + "list2": []interface{}{ + map[string]interface{}{ + "int": 2, + "float": 2.3456, + "bool": false, + "str": "value2", + }, + }, + }, + "vars1": map[string]interface{}{ + "list1": []interface{}{ + map[string]interface{}{ + "int": 1, + "float": 1.1234, + "bool": true, + "str": "${custom.var1|host.var2|'constant'}", + }, + }, + "list2": []interface{}{ + map[string]interface{}{ + "int": 2, + "float": 2.3456, + "bool": false, + "str": "${var3|custom.var1|'constant'}", + }, + }, + }, + "vars2": map[string]interface{}{ + "list1": []interface{}{ + map[string]interface{}{ + "int": 1, + "float": 1.1234, + "bool": true, + "str": "${host.var5|host.var6|'constant'}", + }, + }, + "list2": []interface{}{ + map[string]interface{}{ + "int": 2, + "float": 2.3456, + "bool": false, + "str": "${var1}", + }, + }, + }, + }, + result: []string{"custom.var1", "host.var2", "custom.var3", "custom.var1", "host.var5", "host.var6", "custom.var1"}, + defaultProvider: "custom", + }, } for name, test := range tests { @@ -1052,7 +1113,7 @@ func TestVars(t *testing.T) { ast, err := NewAST(test.input) require.NoError(t, err) var vars []string - vars = ast.root.Vars(vars) + vars = ast.root.Vars(vars, test.defaultProvider) assert.Equal(t, test.result, vars) }) } @@ -1146,7 +1207,15 @@ func TestCondition(t *testing.T) { } func mustMakeVars(mapping map[string]interface{}) *Vars { - v, err := NewVars("", mapping, nil) + v, err := NewVars("", mapping, nil, "") + if err != nil { + panic(err) + } + return v +} + +func mustMakeVarsWithDefault(mapping map[string]interface{}, defaultProvider string) *Vars { + v, err := NewVars("", mapping, nil, defaultProvider) if err != nil { panic(err) } diff --git a/internal/pkg/agent/transpiler/utils.go b/internal/pkg/agent/transpiler/inputs.go similarity index 100% rename from internal/pkg/agent/transpiler/utils.go rename to internal/pkg/agent/transpiler/inputs.go diff --git a/internal/pkg/agent/transpiler/utils_test.go b/internal/pkg/agent/transpiler/inputs_test.go similarity index 97% rename from internal/pkg/agent/transpiler/utils_test.go rename to internal/pkg/agent/transpiler/inputs_test.go index 921b133f941..09ff78cb3b5 100644 --- a/internal/pkg/agent/transpiler/utils_test.go +++ b/internal/pkg/agent/transpiler/inputs_test.go @@ -59,6 +59,25 @@ func TestRenderInputs(t *testing.T) { }), }, }, + "basic single var with default": { + input: NewKey("inputs", NewList([]Node{ + NewDict([]Node{ + NewKey("key", NewStrVal("${name}")), + }), + })), + expected: NewList([]Node{ + NewDict([]Node{ + NewKey("key", NewStrVal("value1")), + }), + }), + varsArray: []*Vars{ + mustMakeVarsWithDefault(map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + }, + }, "var1"), + }, + }, "duplicate result is removed": { input: NewKey("inputs", NewList([]Node{ NewDict([]Node{ @@ -785,7 +804,7 @@ func TestRenderInputs(t *testing.T) { } func mustMakeVarsP(id string, mapping map[string]interface{}, processorKey string, processors Processors) *Vars { - v, err := NewVarsWithProcessors(id, mapping, processorKey, processors, nil) + v, err := NewVarsWithProcessors(id, mapping, processorKey, processors, nil, "") if err != nil { panic(err) } diff --git a/internal/pkg/agent/transpiler/outputs.go b/internal/pkg/agent/transpiler/outputs.go new file mode 100644 index 00000000000..73389869336 --- /dev/null +++ b/internal/pkg/agent/transpiler/outputs.go @@ -0,0 +1,62 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package transpiler + +import ( + "fmt" +) + +// RenderOutputs renders outputs section. +// +// outputs are only rendered using the context variables and they do not support +// using dynamic provider variables. +func RenderOutputs(outputs Node, varsArray []*Vars) (Node, error) { + if len(varsArray) == 0 { + // no context vars (nothing to do) + return outputs, nil + } + + // outputs only operates on the first set of vars because those are always the + // context provider variables and never include dynamic provider variables + // + // dynamic provider variables cannot be used for outputs as we don't want outputs + // to be duplicated (unlike inputs) + vars := varsArray[0] + + d, ok := outputs.Value().(*Dict) + if !ok { + return nil, fmt.Errorf("outputs must be an dict") + } + nodes := d.Value().([]Node) + keys := make([]Node, len(nodes)) + for i, node := range nodes { + key, ok := node.(*Key) + if !ok { + // not possible, but be defensive + continue + } + if key.value == nil { + keys[i] = key + continue + } + dict, ok := key.value.(*Dict) + if !ok { + // not possible, but be defensive + continue + } + // Apply creates a new Node with a deep copy of all the values + var err error + key.value, err = dict.Apply(vars) + // inputs allows a variable not to match and it will be removed + // outputs are not that way, if an ErrNoMatch is returned we + // return it back to the caller + if err != nil { + // another error that needs to be reported + return nil, err + } + keys[i] = key + } + return &Dict{keys, nil}, nil +} diff --git a/internal/pkg/agent/transpiler/outputs_test.go b/internal/pkg/agent/transpiler/outputs_test.go new file mode 100644 index 00000000000..56dd0b07ad5 --- /dev/null +++ b/internal/pkg/agent/transpiler/outputs_test.go @@ -0,0 +1,116 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package transpiler + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRenderOutputs(t *testing.T) { + testcases := map[string]struct { + input Node + expected Node + vars *Vars + err bool + }{ + "outputs not dict": { + input: NewKey("outputs", NewStrVal("not dict")), + err: true, + vars: mustMakeVars(map[string]interface{}{}), + }, + "missing variable error": { + input: NewKey("outputs", NewDict([]Node{ + NewKey("default", NewDict([]Node{ + NewKey("key", NewStrVal("${var1.missing}")), + })), + })), + err: true, + vars: mustMakeVars(map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + }, + }), + }, + "bad variable error": { + input: NewKey("outputs", NewDict([]Node{ + NewKey("default", NewDict([]Node{ + NewKey("key", NewStrVal("${var1.name|'missing ending quote}")), + })), + })), + err: true, + vars: mustMakeVars(map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + }, + }), + }, + "basic single var": { + input: NewKey("outputs", NewDict([]Node{ + NewKey("default", NewDict([]Node{ + NewKey("key", NewStrVal("${var1.name}")), + })), + })), + expected: NewDict([]Node{ + NewKey("default", NewDict([]Node{ + NewKey("key", NewStrVal("value1")), + })), + }), + vars: mustMakeVars(map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + }, + }), + }, + "basic default var": { + input: NewKey("outputs", NewDict([]Node{ + NewKey("default", NewDict([]Node{ + NewKey("key", NewStrVal("${var1.missing|'default'}")), + })), + })), + expected: NewDict([]Node{ + NewKey("default", NewDict([]Node{ + NewKey("key", NewStrVal("default")), + })), + }), + vars: mustMakeVars(map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + }, + }), + }, + "basic no provider var": { + input: NewKey("outputs", NewDict([]Node{ + NewKey("default", NewDict([]Node{ + NewKey("key", NewStrVal("${name}")), + })), + })), + expected: NewDict([]Node{ + NewKey("default", NewDict([]Node{ + NewKey("key", NewStrVal("value1")), + })), + }), + vars: mustMakeVarsWithDefault(map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + }, + }, "var1"), + }, + } + + for name, test := range testcases { + t.Run(name, func(t *testing.T) { + v, err := RenderOutputs(test.input, []*Vars{test.vars}) + if test.err { + require.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, test.expected.String(), v.String()) + } + }) + } +} diff --git a/internal/pkg/agent/transpiler/vars.go b/internal/pkg/agent/transpiler/vars.go index 71bd8bd4cb6..a7d2dc00d25 100644 --- a/internal/pkg/agent/transpiler/vars.go +++ b/internal/pkg/agent/transpiler/vars.go @@ -26,30 +26,31 @@ type Vars struct { processorsKey string processors Processors fetchContextProviders mapstr.M + defaultProvider string } // NewVars returns a new instance of vars. -func NewVars(id string, mapping map[string]interface{}, fetchContextProviders mapstr.M) (*Vars, error) { - return NewVarsWithProcessors(id, mapping, "", nil, fetchContextProviders) +func NewVars(id string, mapping map[string]interface{}, fetchContextProviders mapstr.M, defaultProvider string) (*Vars, error) { + return NewVarsWithProcessors(id, mapping, "", nil, fetchContextProviders, defaultProvider) } // NewVarsFromAst returns a new instance of vars. It takes the mapping as an *AST. -func NewVarsFromAst(id string, tree *AST, fetchContextProviders mapstr.M) *Vars { - return &Vars{id, tree, "", nil, fetchContextProviders} +func NewVarsFromAst(id string, tree *AST, fetchContextProviders mapstr.M, defaultProvider string) *Vars { + return &Vars{id, tree, "", nil, fetchContextProviders, defaultProvider} } // NewVarsWithProcessors returns a new instance of vars with attachment of processors. -func NewVarsWithProcessors(id string, mapping map[string]interface{}, processorKey string, processors Processors, fetchContextProviders mapstr.M) (*Vars, error) { +func NewVarsWithProcessors(id string, mapping map[string]interface{}, processorKey string, processors Processors, fetchContextProviders mapstr.M, defaultProvider string) (*Vars, error) { tree, err := NewAST(mapping) if err != nil { return nil, err } - return &Vars{id, tree, processorKey, processors, fetchContextProviders}, nil + return &Vars{id, tree, processorKey, processors, fetchContextProviders, defaultProvider}, nil } // NewVarsWithProcessorsFromAst returns a new instance of vars with attachment of processors. It takes the mapping as an *AST. -func NewVarsWithProcessorsFromAst(id string, tree *AST, processorKey string, processors Processors, fetchContextProviders mapstr.M) *Vars { - return &Vars{id, tree, processorKey, processors, fetchContextProviders} +func NewVarsWithProcessorsFromAst(id string, tree *AST, processorKey string, processors Processors, fetchContextProviders mapstr.M, defaultProvider string) *Vars { + return &Vars{id, tree, processorKey, processors, fetchContextProviders, defaultProvider} } // Replace returns a new value based on variable replacement. @@ -61,7 +62,7 @@ func (v *Vars) Replace(value string) (Node, error) { processors = v.processors } return node, processors, ok - }, true) + }, true, v.defaultProvider) } // ID returns the unique ID for the vars. @@ -103,7 +104,7 @@ func (v *Vars) lookupNode(name string) (Node, bool) { return Lookup(v.tree, name) } -func replaceVars(value string, replacer func(variable string) (Node, Processors, bool), reqMatch bool) (Node, error) { +func replaceVars(value string, replacer func(variable string) (Node, Processors, bool), reqMatch bool, defaultProvider string) (Node, error) { var processors Processors matchIdxs := varsRegex.FindAllSubmatchIndex([]byte(value), -1) if !validBrackets(value, matchIdxs) { @@ -120,7 +121,7 @@ func replaceVars(value string, replacer func(variable string) (Node, Processors, continue } // match on a non-escaped var - vars, err := extractVars(value[r[i+2]:r[i+3]]) + vars, err := extractVars(value[r[i+2]:r[i+3]], defaultProvider) if err != nil { return nil, fmt.Errorf(`error parsing variable "%s": %w`, value[r[i]:r[i+1]], err) } @@ -206,7 +207,7 @@ func (v *constString) Value() string { return v.value } -func extractVars(i string) ([]varI, error) { +func extractVars(i string, defaultProvider string) ([]varI, error) { const out = rune(0) quote := out @@ -226,7 +227,7 @@ func extractVars(i string) ([]varI, error) { if is[len(is)-1] == '.' { return nil, fmt.Errorf("variable cannot end with '.'") } - res = append(res, &varString{string(is)}) + res = append(res, &varString{maybeAddDefaultProvider(string(is), defaultProvider)}) } is = is[:0] // slice to zero length; to keep allocated memory constant = false @@ -267,7 +268,7 @@ func extractVars(i string) ([]varI, error) { if is[len(is)-1] == '.' { return nil, fmt.Errorf("variable cannot end with '.'") } - res = append(res, &varString{string(is)}) + res = append(res, &varString{maybeAddDefaultProvider(string(is), defaultProvider)}) } return res, nil } @@ -276,3 +277,15 @@ func varPrefixMatched(val string, key string) bool { s := strings.SplitN(val, ".", 2) return s[0] == key } + +func maybeAddDefaultProvider(val string, defaultProvider string) string { + if defaultProvider == "" { + return val + } + if strings.Contains(val, ".") { + // already has a provider in the variable name + return val + } + // at this point they variable doesn't have a provider + return fmt.Sprintf("%s.%s", defaultProvider, val) +} diff --git a/internal/pkg/agent/transpiler/vars_test.go b/internal/pkg/agent/transpiler/vars_test.go index fe0dcc7089e..9ebbc517041 100644 --- a/internal/pkg/agent/transpiler/vars_test.go +++ b/internal/pkg/agent/transpiler/vars_test.go @@ -16,7 +16,7 @@ import ( ) func TestVars_Replace(t *testing.T) { - vars := mustMakeVars(map[string]interface{}{ + vars := mustMakeVarsWithDefault(map[string]interface{}{ "un-der_score": map[string]interface{}{ "key1": "data1", "key2": "data2", @@ -42,7 +42,7 @@ func TestVars_Replace(t *testing.T) { "key5": "${", "key6": "$${", }, - }) + }, "other") tests := []struct { Input string Result Node @@ -79,6 +79,12 @@ func TestVars_Replace(t *testing.T) { false, false, }, + { + "${un-der_score.missing|un-der_score.missing2|data}", + NewStrVal("info"), + false, + false, + }, { "${un-der_score.missing|'fallback'}", NewStrVal("fallback"), @@ -297,13 +303,17 @@ func TestVars_ReplaceWithProcessors(t *testing.T) { }, "dynamic", processers, - nil) + nil, "testing") require.NoError(t, err) res, err := vars.Replace("${testing.key1}") require.NoError(t, err) assert.Equal(t, NewStrVal("data1"), res) + res, err = vars.Replace("${key1}") + require.NoError(t, err) + assert.Equal(t, NewStrVal("data1"), res) + res, err = vars.Replace("${dynamic.key1}") require.NoError(t, err) assert.Equal(t, NewStrValWithProcessors("dynamic1", processers), res) @@ -364,7 +374,7 @@ func TestVars_ReplaceWithFetchContextProvider(t *testing.T) { }, "dynamic", processers, - fetchContextProviders) + fetchContextProviders, "") require.NoError(t, err) res, err := vars.Replace("${testing.key1}") diff --git a/internal/pkg/composable/benchmark_test.go b/internal/pkg/composable/benchmark_test.go index 913d8d4fbd1..db3caaefa09 100644 --- a/internal/pkg/composable/benchmark_test.go +++ b/internal/pkg/composable/benchmark_test.go @@ -76,6 +76,6 @@ func BenchmarkGenerateVars100Pods(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - c.generateVars(mapstr.M{}) + c.generateVars(mapstr.M{}, "") } } diff --git a/internal/pkg/composable/config.go b/internal/pkg/composable/config.go index 04f1b38e0a2..b5b7393f837 100644 --- a/internal/pkg/composable/config.go +++ b/internal/pkg/composable/config.go @@ -15,4 +15,5 @@ type Config struct { Providers map[string]*config.Config `config:"providers"` ProvidersInitialDefault *bool `config:"agent.providers.initial_default"` ProvidersRestartInterval *time.Duration `config:"agent.providers.restart_interval"` + ProvidersDefaultProvider *string `config:"agent.providers.default"` } diff --git a/internal/pkg/composable/controller.go b/internal/pkg/composable/controller.go index 2743eee9b62..965d195deeb 100644 --- a/internal/pkg/composable/controller.go +++ b/internal/pkg/composable/controller.go @@ -23,7 +23,8 @@ import ( ) const ( - defaultRetryInterval = 30 * time.Second + defaultRetryInterval = 30 * time.Second + defaultDefaultProvider = "env" ) // Controller manages the state of the providers current context. @@ -41,6 +42,11 @@ type Controller interface { // Observe instructs the variables to observe. Observe([]string) + + // DefaultProvider returns the default provider used by the controller. + // + // This is used by any variable reference that doesn't add a provider prefix. + DefaultProvider() string } // controller manages the state of the providers current context. @@ -50,6 +56,7 @@ type controller struct { observedCh chan map[string]bool errCh chan error restartInterval time.Duration + defaultProvider string managed bool contextProviderBuilders map[string]contextProvider @@ -82,6 +89,11 @@ func New(log *logger.Logger, c *config.Config, managed bool) (Controller, error) restartInterval = *providersCfg.ProvidersRestartInterval } + defaultProvider := defaultDefaultProvider + if providersCfg.ProvidersDefaultProvider != nil { + defaultProvider = *providersCfg.ProvidersDefaultProvider + } + // build all the context providers contextProviders := map[string]contextProvider{} for name, builder := range Providers.contextProviders { @@ -117,6 +129,7 @@ func New(log *logger.Logger, c *config.Config, managed bool) (Controller, error) errCh: make(chan error), managed: managed, restartInterval: restartInterval, + defaultProvider: defaultProvider, contextProviderBuilders: contextProviders, dynamicProviderBuilders: dynamicProviders, contextProviderStates: make(map[string]*contextProviderState), @@ -237,7 +250,7 @@ func (c *controller) Run(ctx context.Context) error { func (c *controller) sendVars(ctx context.Context, fetchContextProviders mapstr.M) error { c.logger.Debugf("Computing new variable state for composable inputs") - vars := c.generateVars(fetchContextProviders) + vars := c.generateVars(fetchContextProviders, c.defaultProvider) for { select { case c.ch <- vars: @@ -285,6 +298,11 @@ func (c *controller) Observe(vars []string) { c.observedCh <- topLevel } +// DefaultProvider returns the default provider being used by the controller. +func (c *controller) DefaultProvider() string { + return c.defaultProvider +} + func (c *controller) handleObserved(ctx context.Context, wg *sync.WaitGroup, fetchCh chan fetchProvider, stateChangedChan chan bool, observed map[string]bool) bool { changed := false @@ -470,14 +488,14 @@ func (c *controller) startDynamicProvider(ctx context.Context, wg *sync.WaitGrou return state } -func (c *controller) generateVars(fetchContextProviders mapstr.M) []*transpiler.Vars { +func (c *controller) generateVars(fetchContextProviders mapstr.M, defaultProvider string) []*transpiler.Vars { // build the vars list of mappings vars := make([]*transpiler.Vars, 1) mapping, _ := transpiler.NewAST(map[string]any{}) for name, state := range c.contextProviderStates { _ = mapping.Insert(state.Current(), name) } - vars[0] = transpiler.NewVarsFromAst("", mapping, fetchContextProviders) + vars[0] = transpiler.NewVarsFromAst("", mapping, fetchContextProviders, defaultProvider) // add to the vars list for each dynamic providers mappings for name, state := range c.dynamicProviderStates { @@ -485,7 +503,7 @@ func (c *controller) generateVars(fetchContextProviders mapstr.M) []*transpiler. local := mapping.ShallowClone() _ = local.Insert(mappings.mapping, name) id := fmt.Sprintf("%s-%s", name, mappings.id) - v := transpiler.NewVarsWithProcessorsFromAst(id, local, name, mappings.processors, fetchContextProviders) + v := transpiler.NewVarsWithProcessorsFromAst(id, local, name, mappings.processors, fetchContextProviders, defaultProvider) vars = append(vars, v) } } diff --git a/internal/pkg/composable/controller_test.go b/internal/pkg/composable/controller_test.go index 3d3f532dc6b..b631efe1a5c 100644 --- a/internal/pkg/composable/controller_test.go +++ b/internal/pkg/composable/controller_test.go @@ -379,3 +379,34 @@ func TestCancellation(t *testing.T) { } }) } + +func TestDefaultProvider(t *testing.T) { + log, err := logger.New("", false) + require.NoError(t, err) + + t.Run("default env", func(t *testing.T) { + c, err := composable.New(log, nil, false) + require.NoError(t, err) + assert.Equal(t, "env", c.DefaultProvider()) + }) + + t.Run("no default", func(t *testing.T) { + cfg, err := config.NewConfigFrom(map[string]interface{}{ + "agent.providers.default": "", + }) + require.NoError(t, err) + c, err := composable.New(log, cfg, false) + require.NoError(t, err) + assert.Equal(t, "", c.DefaultProvider()) + }) + + t.Run("custom default", func(t *testing.T) { + cfg, err := config.NewConfigFrom(map[string]interface{}{ + "agent.providers.default": "custom", + }) + require.NoError(t, err) + c, err := composable.New(log, cfg, false) + require.NoError(t, err) + assert.Equal(t, "custom", c.DefaultProvider()) + }) +} diff --git a/pkg/component/component.go b/pkg/component/component.go index 21e87ea5233..bd1c4893171 100644 --- a/pkg/component/component.go +++ b/pkg/component/component.go @@ -712,7 +712,7 @@ type outputI struct { // varsForPlatform sets the runtime variables that are available in the // input specification runtime checks. This function should always be // edited in sync with the documentation in specs/README.md. -func varsForPlatform(platform PlatformDetail) (*transpiler.Vars, error) { +func varsForPlatform(platform PlatformDetail, defaultProvider string) (*transpiler.Vars, error) { return transpiler.NewVars("", map[string]interface{}{ "install": map[string]interface{}{ "in_default": paths.ArePathsEqual(paths.Top(), paths.InstallPath(paths.DefaultBasePath)) || platform.IsInstalledViaExternalPkgMgr, @@ -729,14 +729,14 @@ func varsForPlatform(platform PlatformDetail) (*transpiler.Vars, error) { "user": map[string]interface{}{ "root": platform.User.Root, }, - }, nil) + }, nil, defaultProvider) } func validateRuntimeChecks( runtime *RuntimeSpec, platform PlatformDetail, ) error { - vars, err := varsForPlatform(platform) + vars, err := varsForPlatform(platform, "") // no default provider if err != nil { return err } diff --git a/pkg/component/component_test.go b/pkg/component/component_test.go index 995b223d91d..23c4685d36e 100644 --- a/pkg/component/component_test.go +++ b/pkg/component/component_test.go @@ -2390,7 +2390,7 @@ func TestPreventionsAreValid(t *testing.T) { "user": map[string]interface{}{ "root": false, }, - }, nil) + }, nil, "") require.NoError(t, err) for path, spec := range specFiles { From 89ea9103eb55800a95b1ce13130b3ae073d3cad0 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Fri, 24 Jan 2025 14:44:47 -0500 Subject: [PATCH 02/14] Remove variable expansion in outputs from go-ucfg parsing. --- internal/pkg/config/config.go | 2 +- internal/pkg/config/config_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go index b57a36eca88..9168426430d 100644 --- a/internal/pkg/config/config.go +++ b/internal/pkg/config/config.go @@ -49,7 +49,7 @@ var DefaultOptions = []interface{}{ ucfg.PathSep("."), ucfg.ResolveEnv, ucfg.VarExp, - VarSkipKeys("inputs"), + VarSkipKeys("inputs", "outputs"), ucfg.IgnoreCommas, OTelKeys("connectors", "receivers", "processors", "exporters", "extensions", "service"), } diff --git a/internal/pkg/config/config_test.go b/internal/pkg/config/config_test.go index 55f5d60e7c8..9fa2961fd77 100644 --- a/internal/pkg/config/config_test.go +++ b/internal/pkg/config/config_test.go @@ -29,8 +29,8 @@ func TestInputsResolveNOOP(t *testing.T) { "default": map[string]interface{}{ "type": "elasticsearch", "hosts": []interface{}{"127.0.0.1:9200"}, - "username": "elastic", - "password": "changeme", + "username": "${env.ES_USER}", + "password": "${env.ES_PASSWORD}", }, }, "inputs": []interface{}{ From 4ef799fa280b65145255a1e339a3ef585a2c78f2 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Fri, 24 Jan 2025 15:25:08 -0500 Subject: [PATCH 03/14] Fix some issues. --- internal/pkg/agent/transpiler/outputs.go | 13 ++++++++----- internal/pkg/agent/transpiler/vars.go | 18 +++++++++++++++++- internal/pkg/agent/transpiler/vars_test.go | 2 +- .../pkg/composable/providers/agent/agent.go | 3 ++- .../composable/providers/agent/agent_test.go | 7 ++++--- internal/pkg/composable/providers/env/env.go | 3 ++- .../pkg/composable/providers/env/env_test.go | 7 ++++--- .../pkg/composable/providers/local/local.go | 3 ++- .../composable/providers/local/local_test.go | 7 ++++--- .../providers/localdynamic/localdynamic.go | 3 ++- .../localdynamic/localdynamic_test.go | 7 +++++-- internal/pkg/composable/providers/path/path.go | 3 ++- .../pkg/composable/providers/path/path_test.go | 7 ++++--- 13 files changed, 57 insertions(+), 26 deletions(-) diff --git a/internal/pkg/agent/transpiler/outputs.go b/internal/pkg/agent/transpiler/outputs.go index 73389869336..b232b7ef5cf 100644 --- a/internal/pkg/agent/transpiler/outputs.go +++ b/internal/pkg/agent/transpiler/outputs.go @@ -35,6 +35,7 @@ func RenderOutputs(outputs Node, varsArray []*Vars) (Node, error) { key, ok := node.(*Key) if !ok { // not possible, but be defensive + keys[i] = node continue } if key.value == nil { @@ -44,19 +45,21 @@ func RenderOutputs(outputs Node, varsArray []*Vars) (Node, error) { dict, ok := key.value.(*Dict) if !ok { // not possible, but be defensive + keys[i] = key continue } // Apply creates a new Node with a deep copy of all the values - var err error - key.value, err = dict.Apply(vars) + value, err := dict.Apply(vars) // inputs allows a variable not to match and it will be removed // outputs are not that way, if an ErrNoMatch is returned we // return it back to the caller if err != nil { - // another error that needs to be reported - return nil, err + return nil, fmt.Errorf("rendering output %q failed: %w", key.name, err) + } + keys[i] = &Key{ + name: key.name, + value: value, } - keys[i] = key } return &Dict{keys, nil}, nil } diff --git a/internal/pkg/agent/transpiler/vars.go b/internal/pkg/agent/transpiler/vars.go index a7d2dc00d25..2f2240361c2 100644 --- a/internal/pkg/agent/transpiler/vars.go +++ b/internal/pkg/agent/transpiler/vars.go @@ -152,7 +152,23 @@ func replaceVars(value string, replacer func(variable string) (Node, Processors, } } if !set && reqMatch { - return NewStrVal(""), ErrNoMatch + var sb strings.Builder + sb.WriteString("${") + for i, val := range vars { + switch val.(type) { + case *constString: + sb.WriteString(`'`) + sb.WriteString(val.Value()) + sb.WriteString(`'`) + case *varString: + sb.WriteString(val.Value()) + if i < len(vars)-1 { + sb.WriteString("|") + } + } + } + sb.WriteString("}") + return NewStrVal(""), fmt.Errorf("%w: %s", ErrNoMatch, sb.String()) } lastIndex = r[1] } diff --git a/internal/pkg/agent/transpiler/vars_test.go b/internal/pkg/agent/transpiler/vars_test.go index 9ebbc517041..1438afc2215 100644 --- a/internal/pkg/agent/transpiler/vars_test.go +++ b/internal/pkg/agent/transpiler/vars_test.go @@ -266,7 +266,7 @@ func TestVars_Replace(t *testing.T) { if test.Error { assert.Error(t, err) } else if test.NoMatch { - assert.Error(t, ErrNoMatch, err) + assert.ErrorIs(t, ErrNoMatch, err) } else { require.NoError(t, err) assert.Equal(t, test.Result, res) diff --git a/internal/pkg/composable/providers/agent/agent.go b/internal/pkg/composable/providers/agent/agent.go index 2f4eec39e3e..b73de416116 100644 --- a/internal/pkg/composable/providers/agent/agent.go +++ b/internal/pkg/composable/providers/agent/agent.go @@ -41,7 +41,8 @@ func (*contextProvider) Run(ctx context.Context, comm corecomp.ContextProviderCo if err != nil { return errors.New(err, "failed to set mapping", errors.TypeUnexpected) } - return nil + <-ctx.Done() + return ctx.Err() } // ContextProviderBuilder builds the context provider. diff --git a/internal/pkg/composable/providers/agent/agent_test.go b/internal/pkg/composable/providers/agent/agent_test.go index 9098fbfe5e1..44d812be037 100644 --- a/internal/pkg/composable/providers/agent/agent_test.go +++ b/internal/pkg/composable/providers/agent/agent_test.go @@ -7,6 +7,7 @@ package agent import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -23,12 +24,12 @@ func TestContextProvider(t *testing.T) { provider, err := builder(nil, nil, true) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - comm := ctesting.NewContextComm(context.Background()) + comm := ctesting.NewContextComm(ctx) err = provider.Run(ctx, comm) - require.NoError(t, err) + require.ErrorIs(t, err, context.DeadlineExceeded) current := comm.Current() _, hasID := current["id"] diff --git a/internal/pkg/composable/providers/env/env.go b/internal/pkg/composable/providers/env/env.go index 4eb6a73d0db..bdae491edc9 100644 --- a/internal/pkg/composable/providers/env/env.go +++ b/internal/pkg/composable/providers/env/env.go @@ -28,7 +28,8 @@ func (*contextProvider) Run(ctx context.Context, comm corecomp.ContextProviderCo if err != nil { return errors.New(err, "failed to set mapping", errors.TypeUnexpected) } - return nil + <-ctx.Done() + return ctx.Err() } // ContextProviderBuilder builds the context provider. diff --git a/internal/pkg/composable/providers/env/env_test.go b/internal/pkg/composable/providers/env/env_test.go index c9b5bc95dae..627b6acb615 100644 --- a/internal/pkg/composable/providers/env/env_test.go +++ b/internal/pkg/composable/providers/env/env_test.go @@ -7,6 +7,7 @@ package env import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -20,12 +21,12 @@ func TestContextProvider(t *testing.T) { provider, err := builder(nil, nil, true) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - comm := ctesting.NewContextComm(context.Background()) + comm := ctesting.NewContextComm(ctx) err = provider.Run(ctx, comm) - require.NoError(t, err) + require.ErrorIs(t, err, context.DeadlineExceeded) assert.Equal(t, getEnvMapping(), comm.Current()) } diff --git a/internal/pkg/composable/providers/local/local.go b/internal/pkg/composable/providers/local/local.go index 470d6c0a312..7e88cbdec60 100644 --- a/internal/pkg/composable/providers/local/local.go +++ b/internal/pkg/composable/providers/local/local.go @@ -29,7 +29,8 @@ func (c *contextProvider) Run(ctx context.Context, comm corecomp.ContextProvider if err != nil { return errors.New(err, "failed to set mapping", errors.TypeUnexpected) } - return nil + <-ctx.Done() + return ctx.Err() } // ContextProviderBuilder builds the context provider. diff --git a/internal/pkg/composable/providers/local/local_test.go b/internal/pkg/composable/providers/local/local_test.go index a1f2f04bc31..4a1b82e1e06 100644 --- a/internal/pkg/composable/providers/local/local_test.go +++ b/internal/pkg/composable/providers/local/local_test.go @@ -7,6 +7,7 @@ package local import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -29,12 +30,12 @@ func TestContextProvider(t *testing.T) { provider, err := builder(nil, cfg, true) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - comm := ctesting.NewContextComm(context.Background()) + comm := ctesting.NewContextComm(ctx) err = provider.Run(ctx, comm) - require.NoError(t, err) + require.ErrorIs(t, err, context.DeadlineExceeded) assert.Equal(t, mapping, comm.Current()) } diff --git a/internal/pkg/composable/providers/localdynamic/localdynamic.go b/internal/pkg/composable/providers/localdynamic/localdynamic.go index 6bef79633b7..e5897a85491 100644 --- a/internal/pkg/composable/providers/localdynamic/localdynamic.go +++ b/internal/pkg/composable/providers/localdynamic/localdynamic.go @@ -37,7 +37,8 @@ func (c *dynamicProvider) Run(comm composable.DynamicProviderComm) error { return errors.New(err, fmt.Sprintf("failed to add mapping for index %d", i), errors.TypeUnexpected) } } - return nil + <-comm.Done() + return comm.Err() } // DynamicProviderBuilder builds the dynamic provider. diff --git a/internal/pkg/composable/providers/localdynamic/localdynamic_test.go b/internal/pkg/composable/providers/localdynamic/localdynamic_test.go index 313f29ff4ad..cf50b42ee39 100644 --- a/internal/pkg/composable/providers/localdynamic/localdynamic_test.go +++ b/internal/pkg/composable/providers/localdynamic/localdynamic_test.go @@ -7,6 +7,7 @@ package localdynamic import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -63,9 +64,11 @@ func TestContextProvider(t *testing.T) { provider, err := builder(nil, cfg, true) require.NoError(t, err) - comm := ctesting.NewDynamicComm(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + comm := ctesting.NewDynamicComm(ctx) err = provider.Run(comm) - require.NoError(t, err) + require.ErrorIs(t, err, context.DeadlineExceeded) curr1, ok1 := comm.Current("0") assert.True(t, ok1) diff --git a/internal/pkg/composable/providers/path/path.go b/internal/pkg/composable/providers/path/path.go index f84c04294ad..a0fdf43b63e 100644 --- a/internal/pkg/composable/providers/path/path.go +++ b/internal/pkg/composable/providers/path/path.go @@ -32,7 +32,8 @@ func (*contextProvider) Run(ctx context.Context, comm corecomp.ContextProviderCo if err != nil { return errors.New(err, "failed to set mapping", errors.TypeUnexpected) } - return nil + <-ctx.Done() + return ctx.Err() } // ContextProviderBuilder builds the context provider. diff --git a/internal/pkg/composable/providers/path/path_test.go b/internal/pkg/composable/providers/path/path_test.go index c8d3c57b41a..00f3d6b03ef 100644 --- a/internal/pkg/composable/providers/path/path_test.go +++ b/internal/pkg/composable/providers/path/path_test.go @@ -7,6 +7,7 @@ package path import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -21,11 +22,11 @@ func TestContextProvider(t *testing.T) { provider, err := builder(nil, nil, true) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - comm := ctesting.NewContextComm(context.Background()) + comm := ctesting.NewContextComm(ctx) err = provider.Run(ctx, comm) - require.NoError(t, err) + require.ErrorIs(t, err, context.DeadlineExceeded) current := comm.Current() assert.Equal(t, paths.Home(), current["home"]) From 25547402b232e35947dbfd85128e2a71f0a1e7a8 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Sat, 25 Jan 2025 08:46:56 -0500 Subject: [PATCH 04/14] Fix tests. --- .../application/coordinator/coordinator.go | 29 ++++++-- .../coordinator/coordinator_test.go | 11 +++- internal/pkg/agent/transpiler/vars.go | 3 +- internal/pkg/agent/transpiler/vars_test.go | 2 +- internal/pkg/composable/controller.go | 66 ++++++++++++++----- internal/pkg/composable/controller_test.go | 46 +++++-------- 6 files changed, 101 insertions(+), 56 deletions(-) diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 32ef899ad6b..88dbd7a63b8 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -176,7 +176,7 @@ type VarsManager interface { DefaultProvider() string // Observe instructs the variables to observe. - Observe([]string) + Observe(context.Context, []string) ([]*transpiler.Vars, error) // Watch returns the chanel to watch for variable changes. Watch() <-chan []*transpiler.Vars @@ -1247,7 +1247,11 @@ func (c *Coordinator) processConfigAgent(ctx context.Context, cfg *config.Config } // pass the observed vars from the AST to the varsMgr - c.observeASTVars() + err = c.observeASTVars(ctx) + if err != nil { + // only possible error here is the context being cancelled + return err + } // Disabled for 8.8.0 release in order to limit the surface // https://github.com/elastic/security-team/issues/6501 @@ -1330,15 +1334,20 @@ func (c *Coordinator) generateAST(cfg *config.Config) (err error) { // observeASTVars identifies the variables that are referenced in the computed AST and passed to // the varsMgr so it knows what providers are being referenced. If a providers is not being // referenced then the provider does not need to be running. -func (c *Coordinator) observeASTVars() { +func (c *Coordinator) observeASTVars(ctx context.Context) error { if c.varsMgr == nil { // No varsMgr (only happens in testing) - return + return nil } if c.ast == nil { // No AST; no vars - c.varsMgr.Observe(nil) - return + updated, err := c.varsMgr.Observe(ctx, nil) + if err != nil { + // context cancel + return err + } + c.vars = updated + return nil } var vars []string inputs, ok := transpiler.Lookup(c.ast, "inputs") @@ -1349,7 +1358,13 @@ func (c *Coordinator) observeASTVars() { if ok { vars = outputs.Vars(vars, c.varsMgr.DefaultProvider()) } - c.varsMgr.Observe(vars) + updated, err := c.varsMgr.Observe(ctx, vars) + if err != nil { + // context cancel + return err + } + c.vars = updated + return nil } // processVars updates the transpiler vars in the Coordinator. diff --git a/internal/pkg/agent/application/coordinator/coordinator_test.go b/internal/pkg/agent/application/coordinator/coordinator_test.go index cb5659269ca..4f46440e04f 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_test.go @@ -1188,6 +1188,9 @@ func (l *configChange) Fail(err error) { } type fakeVarsManager struct { + varsMx sync.RWMutex + vars []*transpiler.Vars + varsCh chan []*transpiler.Vars errCh chan error @@ -1222,13 +1225,19 @@ func (f *fakeVarsManager) Watch() <-chan []*transpiler.Vars { return f.varsCh } -func (f *fakeVarsManager) Observe(observed []string) { +func (f *fakeVarsManager) Observe(ctx context.Context, observed []string) ([]*transpiler.Vars, error) { f.observedMx.Lock() defer f.observedMx.Unlock() f.observed = observed + f.varsMx.RLock() + defer f.varsMx.RUnlock() + return f.vars, nil } func (f *fakeVarsManager) Vars(ctx context.Context, vars []*transpiler.Vars) { + f.varsMx.Lock() + f.vars = vars + f.varsMx.Unlock() select { case <-ctx.Done(): case f.varsCh <- vars: diff --git a/internal/pkg/agent/transpiler/vars.go b/internal/pkg/agent/transpiler/vars.go index 2f2240361c2..5a45704aa40 100644 --- a/internal/pkg/agent/transpiler/vars.go +++ b/internal/pkg/agent/transpiler/vars.go @@ -5,6 +5,7 @@ package transpiler import ( + "errors" "fmt" "regexp" "strings" @@ -17,7 +18,7 @@ import ( var varsRegex = regexp.MustCompile(`\$\$?{([\p{L}\d\s\\\-_|.'":\/]*)}`) // ErrNoMatch is return when the replace didn't fail, just that no vars match to perform the replace. -var ErrNoMatch = fmt.Errorf("no matching vars") +var ErrNoMatch = errors.New("no matching vars") // Vars is a context of variables that also contain a list of processors that go with the mapping. type Vars struct { diff --git a/internal/pkg/agent/transpiler/vars_test.go b/internal/pkg/agent/transpiler/vars_test.go index 1438afc2215..a42e74b4fb2 100644 --- a/internal/pkg/agent/transpiler/vars_test.go +++ b/internal/pkg/agent/transpiler/vars_test.go @@ -266,7 +266,7 @@ func TestVars_Replace(t *testing.T) { if test.Error { assert.Error(t, err) } else if test.NoMatch { - assert.ErrorIs(t, ErrNoMatch, err) + assert.ErrorIs(t, err, ErrNoMatch) } else { require.NoError(t, err) assert.Equal(t, test.Result, res) diff --git a/internal/pkg/composable/controller.go b/internal/pkg/composable/controller.go index 965d195deeb..81094134095 100644 --- a/internal/pkg/composable/controller.go +++ b/internal/pkg/composable/controller.go @@ -40,8 +40,14 @@ type Controller interface { // Watch returns the channel to watch for variable changes. Watch() <-chan []*transpiler.Vars - // Observe instructs the variables to observe. - Observe([]string) + // Observe instructs the controller to enable the observed providers. + // + // This is a blocking call until the observation is handled and the most recent + // set of variables are returned the caller. This current observed state of variables + // that is returned is not sent over the Watch channel, the caller should coordinate this fact. + // + // Only error that is returned from this function is the result of the passed context. + Observe(context.Context, []string) ([]*transpiler.Vars, error) // DefaultProvider returns the default provider used by the controller. // @@ -49,11 +55,16 @@ type Controller interface { DefaultProvider() string } +type observer struct { + vars map[string]bool + result chan []*transpiler.Vars +} + // controller manages the state of the providers current context. type controller struct { logger *logger.Logger ch chan []*transpiler.Vars - observedCh chan map[string]bool + observedCh chan observer errCh chan error restartInterval time.Duration defaultProvider string @@ -125,7 +136,7 @@ func New(log *logger.Logger, c *config.Config, managed bool) (Controller, error) return &controller{ logger: l, ch: make(chan []*transpiler.Vars, 1), - observedCh: make(chan map[string]bool, 1), + observedCh: make(chan observer), errCh: make(chan error), managed: managed, restartInterval: restartInterval, @@ -196,7 +207,7 @@ func (c *controller) Run(ctx context.Context) error { // send initial vars state fetchProvidersLock.RLock() - err := c.sendVars(ctx, fetchProviders) + err := c.sendVars(ctx, nil, fetchProviders) if err != nil { fetchProvidersLock.RUnlock() // only error is context cancel, no need to add error message context @@ -205,6 +216,7 @@ func (c *controller) Run(ctx context.Context) error { fetchProvidersLock.RUnlock() // performs debounce of notifies; accumulates them into 100 millisecond chunks + var observedResult chan []*transpiler.Vars for { DEBOUNCE: for { @@ -212,7 +224,8 @@ func (c *controller) Run(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() case observed := <-c.observedCh: - changed := c.handleObserved(localCtx, &wg, fetchCh, stateChangedChan, observed) + observedResult = observed.result + changed := c.handleObserved(localCtx, &wg, fetchCh, stateChangedChan, observed.vars) if changed { t.Reset(100 * time.Millisecond) c.logger.Debugf("Observed state changed for composable inputs; debounce started") @@ -236,9 +249,10 @@ func (c *controller) Run(ctx context.Context) error { // batching done, gather results } - // send the vars to the watcher + // send the vars to the watcher or the observer caller fetchProvidersLock.RLock() - err := c.sendVars(ctx, fetchProviders) + err := c.sendVars(ctx, observedResult, fetchProviders) + observedResult = nil if err != nil { fetchProvidersLock.RUnlock() // only error is context cancel, no need to add error message context @@ -248,9 +262,17 @@ func (c *controller) Run(ctx context.Context) error { } } -func (c *controller) sendVars(ctx context.Context, fetchContextProviders mapstr.M) error { +func (c *controller) sendVars(ctx context.Context, observedResult chan []*transpiler.Vars, fetchContextProviders mapstr.M) error { c.logger.Debugf("Computing new variable state for composable inputs") vars := c.generateVars(fetchContextProviders, c.defaultProvider) + if observedResult != nil { + select { + case <-ctx.Done(): + return ctx.Err() + case observedResult <- vars: + return nil + } + } for { select { case c.ch <- vars: @@ -283,7 +305,7 @@ func (c *controller) Watch() <-chan []*transpiler.Vars { // Observe sends the observed variables from the AST to the controller. // // Based on this information it will determine which providers should even be running. -func (c *controller) Observe(vars []string) { +func (c *controller) Observe(ctx context.Context, vars []string) ([]*transpiler.Vars, error) { // only need the top-level variables to determine which providers to run // // future: possible that all vars could be organized and then passed to each provider to @@ -293,9 +315,19 @@ func (c *controller) Observe(vars []string) { vs := strings.SplitN(v, ".", 2) topLevel[vs[0]] = true } - // drain the channel first, if the previous vars had not been used yet the new list should be used instead - drainChan(c.observedCh) - c.observedCh <- topLevel + // blocks waiting for an updated set of variables + ch := make(chan []*transpiler.Vars) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case c.observedCh <- observer{topLevel, ch}: + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case vars := <-ch: + return vars, nil + } } // DefaultProvider returns the default provider being used by the controller. @@ -335,24 +367,28 @@ func (c *controller) handleObserved(ctx context.Context, wg *sync.WaitGroup, fet continue } + found := false contextInfo, ok := c.contextProviderBuilders[name] if ok { + found = true state := c.startContextProvider(ctx, wg, fetchCh, stateChangedChan, name, contextInfo) if state != nil { changed = true c.contextProviderStates[name] = state - } } dynamicInfo, ok := c.dynamicProviderBuilders[name] if ok { + found = true state := c.startDynamicProvider(ctx, wg, stateChangedChan, name, dynamicInfo) if state != nil { changed = true c.dynamicProviderStates[name] = state } } - c.logger.Warnf("provider %q referenced in policy but no provider exists or was explicitly disabled", name) + if !found { + c.logger.Warnf("provider %q referenced in policy but no provider exists or was explicitly disabled", name) + } } // running remaining need to be stopped diff --git a/internal/pkg/composable/controller_test.go b/internal/pkg/composable/controller_test.go index b631efe1a5c..3e843b7fca4 100644 --- a/internal/pkg/composable/controller_test.go +++ b/internal/pkg/composable/controller_test.go @@ -86,22 +86,17 @@ func TestController(t *testing.T) { var setVars2 []*transpiler.Vars var setVars3 []*transpiler.Vars go func() { - for { - select { - case <-ctx.Done(): - return - case vars := <-c.Watch(): - if setVars1 == nil { - setVars1 = vars - c.Observe([]string{"local.vars.key1", "local_dynamic.vars.key1"}) // observed local and local_dynamic - } else if setVars2 == nil { - setVars2 = vars - c.Observe(nil) // no observed (will turn off those providers) - } else { - setVars3 = vars - cancel() - } - } + defer cancel() + select { + case <-ctx.Done(): + return + case vars := <-c.Watch(): + // initial vars + setVars1 = vars + setVars2, err = c.Observe(ctx, []string{"local.vars.key1", "local_dynamic.vars.key1"}) // observed local and local_dynamic + require.NoError(t, err) + setVars3, err = c.Observe(ctx, nil) // no observed (will turn off those providers) + require.NoError(t, err) } }() @@ -241,31 +236,20 @@ func TestProvidersDefaultDisabled(t *testing.T) { c, err := composable.New(log, cfg, false) require.NoError(t, err) - c.Observe(tt.observed) - ctx, cancel := context.WithCancel(context.Background()) defer cancel() timeoutCtx, timeoutCancel := context.WithTimeout(ctx, 1*time.Second) defer timeoutCancel() - var setVars []*transpiler.Vars - go func() { - defer cancel() - for { - select { - case <-timeoutCtx.Done(): - return - case vars := <-c.Watch(): - setVars = vars - } - } - }() - errCh := make(chan error) go func() { errCh <- c.Run(ctx) }() + + setVars, err := c.Observe(timeoutCtx, tt.observed) + require.NoError(t, err) + err = <-errCh if errors.Is(err, context.Canceled) { err = nil From 9b0dc7072643885ae15ea5d52883057ffe2dfe0a Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Sat, 25 Jan 2025 08:52:11 -0500 Subject: [PATCH 05/14] Fix incorrect tests. --- internal/pkg/agent/transpiler/vars_test.go | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/internal/pkg/agent/transpiler/vars_test.go b/internal/pkg/agent/transpiler/vars_test.go index a42e74b4fb2..503292c2eb5 100644 --- a/internal/pkg/agent/transpiler/vars_test.go +++ b/internal/pkg/agent/transpiler/vars_test.go @@ -191,13 +191,10 @@ func TestVars_Replace(t *testing.T) { false, }, { - `list inside string ${un-der_score.list} causes no match`, - NewList([]Node{ - NewStrVal("array1"), - NewStrVal("array2"), - }), + `list inside string ${un-der_score.list} strings array`, + NewStrVal(`list inside string [array1,array2] strings array`), + false, false, - true, }, { `${un-der_score.dict}`, @@ -209,13 +206,10 @@ func TestVars_Replace(t *testing.T) { false, }, { - `dict inside string ${un-der_score.dict} causes no match`, - NewDict([]Node{ - NewKey("key1", NewStrVal("value1")), - NewKey("key2", NewStrVal("value2")), - }), + `dict inside string ${un-der_score.dict} strings dict`, + NewStrVal(`dict inside string {key1:value1},{key2:value2} strings dict`), + false, false, - true, }, { `start $${keep} ${un-der_score.key1} $${un-der_score.key1}`, From f6fd32c0428c74dc1d1c6d73840eb8ed9d26349d Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Sat, 25 Jan 2025 09:47:29 -0500 Subject: [PATCH 06/14] Fix out of order vars publish. --- .../application/coordinator/coordinator.go | 31 ++++++++----------- internal/pkg/composable/controller.go | 22 +++++++++++-- internal/pkg/composable/controller_test.go | 26 ++++++++++++++-- 3 files changed, 56 insertions(+), 23 deletions(-) diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 88dbd7a63b8..e83547ddadf 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -1339,31 +1339,26 @@ func (c *Coordinator) observeASTVars(ctx context.Context) error { // No varsMgr (only happens in testing) return nil } - if c.ast == nil { - // No AST; no vars - updated, err := c.varsMgr.Observe(ctx, nil) - if err != nil { - // context cancel - return err - } - c.vars = updated - return nil - } var vars []string - inputs, ok := transpiler.Lookup(c.ast, "inputs") - if ok { - vars = inputs.Vars(vars, c.varsMgr.DefaultProvider()) - } - outputs, ok := transpiler.Lookup(c.ast, "outputs") - if ok { - vars = outputs.Vars(vars, c.varsMgr.DefaultProvider()) + if c.ast != nil { + inputs, ok := transpiler.Lookup(c.ast, "inputs") + if ok { + vars = inputs.Vars(vars, c.varsMgr.DefaultProvider()) + } + outputs, ok := transpiler.Lookup(c.ast, "outputs") + if ok { + vars = outputs.Vars(vars, c.varsMgr.DefaultProvider()) + } } updated, err := c.varsMgr.Observe(ctx, vars) if err != nil { // context cancel return err } - c.vars = updated + if updated != nil { + // provided an updated set of vars (observed changed) + c.vars = updated + } return nil } diff --git a/internal/pkg/composable/controller.go b/internal/pkg/composable/controller.go index 81094134095..4eb80fd33f0 100644 --- a/internal/pkg/composable/controller.go +++ b/internal/pkg/composable/controller.go @@ -43,7 +43,8 @@ type Controller interface { // Observe instructs the controller to enable the observed providers. // // This is a blocking call until the observation is handled and the most recent - // set of variables are returned the caller. This current observed state of variables + // set of variables are returned the caller in the case a change occurred. If no change occurred then + // it will return with a nil array. If changed the current observed state of variables // that is returned is not sent over the Watch channel, the caller should coordinate this fact. // // Only error that is returned from this function is the result of the passed context. @@ -231,6 +232,9 @@ func (c *controller) Run(ctx context.Context) error { c.logger.Debugf("Observed state changed for composable inputs; debounce started") drainChan(stateChangedChan) break DEBOUNCE + } else { + observedResult <- nil + observedResult = nil } case <-stateChangedChan: t.Reset(100 * time.Millisecond) @@ -266,6 +270,13 @@ func (c *controller) sendVars(ctx context.Context, observedResult chan []*transp c.logger.Debugf("Computing new variable state for composable inputs") vars := c.generateVars(fetchContextProviders, c.defaultProvider) if observedResult != nil { + // drain any vars sitting on the watch channel + // this new set of vars replaces that set if that current + // value has not been read then it will result in vars state being incorrect + select { + case <-c.ch: + default: + } select { case <-ctx.Done(): return ctx.Err() @@ -302,9 +313,14 @@ func (c *controller) Watch() <-chan []*transpiler.Vars { return c.ch } -// Observe sends the observed variables from the AST to the controller. +// Observe instructs the controller to enable the observed providers. +// +// This is a blocking call until the observation is handled and the most recent +// set of variables are returned the caller in the case a change occurred. If no change occurred then +// it will return with a nil array. If changed the current observed state of variables +// that is returned is not sent over the Watch channel, the caller should coordinate this fact. // -// Based on this information it will determine which providers should even be running. +// Only error that is returned from this function is the result of the passed context. func (c *controller) Observe(ctx context.Context, vars []string) ([]*transpiler.Vars, error) { // only need the top-level variables to determine which providers to run // diff --git a/internal/pkg/composable/controller_test.go b/internal/pkg/composable/controller_test.go index 3e843b7fca4..2f83c7ac486 100644 --- a/internal/pkg/composable/controller_test.go +++ b/internal/pkg/composable/controller_test.go @@ -247,14 +247,36 @@ func TestProvidersDefaultDisabled(t *testing.T) { errCh <- c.Run(ctx) }() - setVars, err := c.Observe(timeoutCtx, tt.observed) - require.NoError(t, err) + var setVars []*transpiler.Vars + go func() { + defer cancel() + + observed := false + for { + select { + case <-timeoutCtx.Done(): + return + case vars := <-c.Watch(): + setVars = vars + default: + if !observed { + vars, err := c.Observe(timeoutCtx, tt.observed) + require.NoError(t, err) + if vars != nil { + setVars = vars + } + observed = true + } + } + } + }() err = <-errCh if errors.Is(err, context.Canceled) { err = nil } require.NoError(t, err) + require.NotNil(t, setVars) if len(tt.context) > 0 { for _, name := range tt.context { From f45d09156370d2e9cd7411ff8ab5ce80f0784a27 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Sat, 25 Jan 2025 09:50:00 -0500 Subject: [PATCH 07/14] Add changelog. --- ...d-context-variable-support-to-outputs.yaml | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 changelog/fragments/1737816507-Add-context-variable-support-to-outputs.yaml diff --git a/changelog/fragments/1737816507-Add-context-variable-support-to-outputs.yaml b/changelog/fragments/1737816507-Add-context-variable-support-to-outputs.yaml new file mode 100644 index 00000000000..9d861edf943 --- /dev/null +++ b/changelog/fragments/1737816507-Add-context-variable-support-to-outputs.yaml @@ -0,0 +1,34 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: Add context variable support to outputs + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +description: | + Adds support for using context variable providers in the outputs section of a policy. Includes fallback support + to reference env provider when no provider prefix is provided in the variable. + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/6602 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/6376 From a162e8358aa5f4eec233ad1c10147e68300cbc52 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Fri, 31 Jan 2025 13:35:04 -0500 Subject: [PATCH 08/14] Fixes from code review. --- internal/pkg/agent/transpiler/outputs.go | 2 +- internal/pkg/agent/transpiler/vars.go | 38 ++++++++++++---------- internal/pkg/agent/transpiler/vars_test.go | 1 + internal/pkg/composable/controller.go | 5 +++ 4 files changed, 28 insertions(+), 18 deletions(-) diff --git a/internal/pkg/agent/transpiler/outputs.go b/internal/pkg/agent/transpiler/outputs.go index b232b7ef5cf..d59f8a0d5e1 100644 --- a/internal/pkg/agent/transpiler/outputs.go +++ b/internal/pkg/agent/transpiler/outputs.go @@ -27,7 +27,7 @@ func RenderOutputs(outputs Node, varsArray []*Vars) (Node, error) { d, ok := outputs.Value().(*Dict) if !ok { - return nil, fmt.Errorf("outputs must be an dict") + return nil, fmt.Errorf("outputs must be an dict, got %T instead", outputs.Value()) } nodes := d.Value().([]Node) keys := make([]Node, len(nodes)) diff --git a/internal/pkg/agent/transpiler/vars.go b/internal/pkg/agent/transpiler/vars.go index 5a45704aa40..6b72ff4cf8b 100644 --- a/internal/pkg/agent/transpiler/vars.go +++ b/internal/pkg/agent/transpiler/vars.go @@ -153,23 +153,7 @@ func replaceVars(value string, replacer func(variable string) (Node, Processors, } } if !set && reqMatch { - var sb strings.Builder - sb.WriteString("${") - for i, val := range vars { - switch val.(type) { - case *constString: - sb.WriteString(`'`) - sb.WriteString(val.Value()) - sb.WriteString(`'`) - case *varString: - sb.WriteString(val.Value()) - if i < len(vars)-1 { - sb.WriteString("|") - } - } - } - sb.WriteString("}") - return NewStrVal(""), fmt.Errorf("%w: %s", ErrNoMatch, sb.String()) + return NewStrVal(""), fmt.Errorf("%w: %s", ErrNoMatch, toRepresentation(vars)) } lastIndex = r[1] } @@ -177,6 +161,26 @@ func replaceVars(value string, replacer func(variable string) (Node, Processors, return NewStrValWithProcessors(result+value[lastIndex:], processors), nil } +func toRepresentation(vars []varI) string { + var sb strings.Builder + sb.WriteString("${") + for i, val := range vars { + switch val.(type) { + case *constString: + sb.WriteString(`'`) + sb.WriteString(val.Value()) + sb.WriteString(`'`) + case *varString: + sb.WriteString(val.Value()) + if i < len(vars)-1 { + sb.WriteString("|") + } + } + } + sb.WriteString("}") + return sb.String() +} + // nodeToValue ensures that the node is an actual value. func nodeToValue(node Node) Node { switch n := node.(type) { diff --git a/internal/pkg/agent/transpiler/vars_test.go b/internal/pkg/agent/transpiler/vars_test.go index 503292c2eb5..99571512369 100644 --- a/internal/pkg/agent/transpiler/vars_test.go +++ b/internal/pkg/agent/transpiler/vars_test.go @@ -80,6 +80,7 @@ func TestVars_Replace(t *testing.T) { false, }, { + // data will be resolved to other.data "${un-der_score.missing|un-der_score.missing2|data}", NewStrVal("info"), false, diff --git a/internal/pkg/composable/controller.go b/internal/pkg/composable/controller.go index 4eb80fd33f0..2c397682618 100644 --- a/internal/pkg/composable/controller.go +++ b/internal/pkg/composable/controller.go @@ -225,6 +225,9 @@ func (c *controller) Run(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() case observed := <-c.observedCh: + // observedResult holds the channel to send the latest observed results on + // if nothing is changed then nil will be sent over the channel if the set of running + // providers does change then the latest observed variables will be sent over the channel observedResult = observed.result changed := c.handleObserved(localCtx, &wg, fetchCh, stateChangedChan, observed.vars) if changed { @@ -233,6 +236,8 @@ func (c *controller) Run(ctx context.Context) error { drainChan(stateChangedChan) break DEBOUNCE } else { + // nothing changed send nil to alert the caller + // observedResult must be set to nil here so on next loop it is not set observedResult <- nil observedResult = nil } From 950f5c86253a769a61efa6feb085f4f7dca99a1a Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Fri, 31 Jan 2025 13:38:45 -0500 Subject: [PATCH 09/14] Add const. --- internal/pkg/agent/transpiler/vars.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/internal/pkg/agent/transpiler/vars.go b/internal/pkg/agent/transpiler/vars.go index 6b72ff4cf8b..399a0e33cd7 100644 --- a/internal/pkg/agent/transpiler/vars.go +++ b/internal/pkg/agent/transpiler/vars.go @@ -15,6 +15,8 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/core/composable" ) +const varsSeperator = "." + var varsRegex = regexp.MustCompile(`\$\$?{([\p{L}\d\s\\\-_|.'":\/]*)}`) // ErrNoMatch is return when the replace didn't fail, just that no vars match to perform the replace. @@ -295,16 +297,13 @@ func extractVars(i string, defaultProvider string) ([]varI, error) { } func varPrefixMatched(val string, key string) bool { - s := strings.SplitN(val, ".", 2) + s := strings.SplitN(val, varsSeperator, 2) return s[0] == key } func maybeAddDefaultProvider(val string, defaultProvider string) string { - if defaultProvider == "" { - return val - } - if strings.Contains(val, ".") { - // already has a provider in the variable name + if defaultProvider == "" || strings.Contains(val, varsSeperator) { + // no default set or already has a provider in the variable name return val } // at this point they variable doesn't have a provider From 1ac5474ee2ebb465845b655e5f3c88ce70359f76 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Wed, 5 Feb 2025 13:48:53 -0500 Subject: [PATCH 10/14] Fixes from code review. --- internal/pkg/agent/transpiler/vars.go | 13 ++++++++++--- internal/pkg/agent/transpiler/vars_test.go | 3 ++- internal/pkg/composable/controller.go | 4 ++++ 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/internal/pkg/agent/transpiler/vars.go b/internal/pkg/agent/transpiler/vars.go index 399a0e33cd7..332c843929b 100644 --- a/internal/pkg/agent/transpiler/vars.go +++ b/internal/pkg/agent/transpiler/vars.go @@ -15,7 +15,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/core/composable" ) -const varsSeperator = "." +const varsSeparator = "." var varsRegex = regexp.MustCompile(`\$\$?{([\p{L}\d\s\\\-_|.'":\/]*)}`) @@ -297,12 +297,19 @@ func extractVars(i string, defaultProvider string) ([]varI, error) { } func varPrefixMatched(val string, key string) bool { - s := strings.SplitN(val, varsSeperator, 2) + s := strings.SplitN(val, varsSeparator, 2) return s[0] == key } +// maybeAddDefaultProvider adds a defaultProvide as a prefix on the value only in the case that +// the defaultProvider is set and the val doesn't contain any varsSeparator. +// +// This is done here and not at resolve time of the variable because the Observe flow of the AST +// for the variables provider needs to known exactly which providers to run. It also is an issue with +// using fetch providers because we would have to hit each to determine if that variable was present first +// before apply the default and we do not want that behavior. func maybeAddDefaultProvider(val string, defaultProvider string) string { - if defaultProvider == "" || strings.Contains(val, varsSeperator) { + if defaultProvider == "" || strings.Contains(val, varsSeparator) { // no default set or already has a provider in the variable name return val } diff --git a/internal/pkg/agent/transpiler/vars_test.go b/internal/pkg/agent/transpiler/vars_test.go index 99571512369..97a28f3bce2 100644 --- a/internal/pkg/agent/transpiler/vars_test.go +++ b/internal/pkg/agent/transpiler/vars_test.go @@ -80,7 +80,8 @@ func TestVars_Replace(t *testing.T) { false, }, { - // data will be resolved to other.data + // data will be resolved to other.data since 'other' is the default provider + // set at variable creation (see mustMakeVarsWithDefault call) "${un-der_score.missing|un-der_score.missing2|data}", NewStrVal("info"), false, diff --git a/internal/pkg/composable/controller.go b/internal/pkg/composable/controller.go index 2c397682618..453a3d47fb7 100644 --- a/internal/pkg/composable/controller.go +++ b/internal/pkg/composable/controller.go @@ -47,6 +47,8 @@ type Controller interface { // it will return with a nil array. If changed the current observed state of variables // that is returned is not sent over the Watch channel, the caller should coordinate this fact. // + // Maximum amount of time for resolve is 500ms as that is the debounce window for variable resolution. + // // Only error that is returned from this function is the result of the passed context. Observe(context.Context, []string) ([]*transpiler.Vars, error) @@ -325,6 +327,8 @@ func (c *controller) Watch() <-chan []*transpiler.Vars { // it will return with a nil array. If changed the current observed state of variables // that is returned is not sent over the Watch channel, the caller should coordinate this fact. // +// Maximum amount of time for resolve is 500ms as that is the debounce window for variable resolution. +// // Only error that is returned from this function is the result of the passed context. func (c *controller) Observe(ctx context.Context, vars []string) ([]*transpiler.Vars, error) { // only need the top-level variables to determine which providers to run From 5e25eba435e83b093419dc558e62518de079b800 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 5 Feb 2025 13:06:37 +0100 Subject: [PATCH 11/14] build(deps): bump github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor from 0.3.0 to 0.4.0 (#6533) * build(deps): bump github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor Bumps [github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor](https://github.com/elastic/opentelemetry-collector-components) from 0.3.0 to 0.4.0. - [Release notes](https://github.com/elastic/opentelemetry-collector-components/releases) - [Commits](https://github.com/elastic/opentelemetry-collector-components/compare/processor/lsmintervalprocessor/v0.3.0...processor/lsmintervalprocessor/v0.4.0) --- updated-dependencies: - dependency-name: github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] * Update NOTICE.txt * Update otel README.md --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] --- NOTICE.txt | 70 ++++++++++++++++++++++++++++++++++--- go.mod | 6 ++-- go.sum | 12 ++++--- internal/pkg/otel/README.md | 2 +- 4 files changed, 79 insertions(+), 11 deletions(-) diff --git a/NOTICE.txt b/NOTICE.txt index de0a7beff68..17e1265c4b3 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -3454,11 +3454,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/opentelemetry-c -------------------------------------------------------------------------------- Dependency : github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor -Version: v0.3.0 +Version: v0.4.0 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor@v0.3.0/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor@v0.4.0/LICENSE: Apache License Version 2.0, January 2004 @@ -36490,6 +36490,34 @@ Contents of probable licence file $GOMODCACHE/github.com/aws/smithy-go@v1.22.1/L of your accepting any such warranty or additional liability. +-------------------------------------------------------------------------------- +Dependency : github.com/axiomhq/hyperloglog +Version: v0.2.0 +Licence type (autodetected): MIT +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/axiomhq/hyperloglog@v0.2.0/LICENSE: + +Copyright (c) 2021, Axiom, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + -------------------------------------------------------------------------------- Dependency : github.com/bboreham/go-loser Version: v0.0.0-20230920113527-fcc2c21820a3 @@ -38500,11 +38528,11 @@ Contents of probable licence file $GOMODCACHE/github.com/cockroachdb/logtags@v0. -------------------------------------------------------------------------------- Dependency : github.com/cockroachdb/pebble -Version: v1.1.2 +Version: v1.1.3 Licence type (autodetected): BSD-3-Clause -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/cockroachdb/pebble@v1.1.2/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/cockroachdb/pebble@v1.1.3/LICENSE: Copyright (c) 2011 The LevelDB-Go Authors. All rights reserved. @@ -41237,6 +41265,40 @@ THE SOFTWARE. +-------------------------------------------------------------------------------- +Dependency : github.com/dgryski/go-metro +Version: v0.0.0-20180109044635-280f6062b5bc +Licence type (autodetected): MIT +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/dgryski/go-metro@v0.0.0-20180109044635-280f6062b5bc/LICENSE: + +This package is a mechanical translation of the reference C++ code for +MetroHash, available at https://github.com/jandrewrogers/MetroHash + +The MIT License (MIT) + +Copyright (c) 2016 Damian Gryski + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + + -------------------------------------------------------------------------------- Dependency : github.com/dgryski/go-rendezvous Version: v0.0.0-20200823014737-9f7001d12a5f diff --git a/go.mod b/go.mod index 190d17cdc44..acff3164a15 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( github.com/elastic/opentelemetry-collector-components/connector/signaltometricsconnector v0.3.0 github.com/elastic/opentelemetry-collector-components/processor/elasticinframetricsprocessor v0.13.0 github.com/elastic/opentelemetry-collector-components/processor/elastictraceprocessor v0.3.0 - github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor v0.3.0 + github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor v0.4.0 github.com/fatih/color v1.17.0 github.com/fsnotify/fsnotify v1.8.0 github.com/go-viper/mapstructure/v2 v2.2.1 @@ -236,6 +236,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.7 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.33.3 // indirect github.com/aws/smithy-go v1.22.1 // indirect + github.com/axiomhq/hyperloglog v0.2.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bmatcuk/doublestar/v4 v4.7.1 // indirect github.com/chai2010/gettext-go v1.0.2 // indirect @@ -246,7 +247,7 @@ require ( github.com/cockroachdb/errors v1.11.3 // indirect github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce // indirect github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect - github.com/cockroachdb/pebble v1.1.2 // indirect + github.com/cockroachdb/pebble v1.1.3 // indirect github.com/cockroachdb/redact v1.1.5 // indirect github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/containerd/containerd v1.7.18 // indirect @@ -260,6 +261,7 @@ require ( github.com/devigned/tab v0.1.2-0.20190607222403-0c15cf42f9a2 // indirect github.com/dgraph-io/badger/v4 v4.5.0 // indirect github.com/dgraph-io/ristretto/v2 v2.0.0 // indirect + github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/digitalocean/go-libvirt v0.0.0-20240709142323-d8406205c752 // indirect github.com/digitalocean/godo v1.122.0 // indirect diff --git a/go.sum b/go.sum index 2eb88c7839c..45bc5d4a941 100644 --- a/go.sum +++ b/go.sum @@ -287,6 +287,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.33.3 h1:Xgv/hyNgvLda/M9l9qxXc4UFSgpp github.com/aws/aws-sdk-go-v2/service/sts v1.33.3/go.mod h1:5Gn+d+VaaRgsjewpMvGazt0WfcFO+Md4wLOuBfGR9Bc= github.com/aws/smithy-go v1.22.1 h1:/HPHZQ0g7f4eUeK6HKglFz8uwVfZKgoI25rb/J+dnro= github.com/aws/smithy-go v1.22.1/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= +github.com/axiomhq/hyperloglog v0.2.0 h1:u1XT3yyY1rjzlWuP6NQIrV4bRYHOaqZaovqjcBEvZJo= +github.com/axiomhq/hyperloglog v0.2.0/go.mod h1:GcgMjz9gaDKZ3G0UMS6Fq/VkZ4l7uGgcJyxA7M+omIM= github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 h1:6df1vn4bBlDDo4tARvBm7l6KA9iVMnE3NWizDeWSrps= github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3/go.mod h1:CIWtjkly68+yqLPbvwwR/fjNJA/idrtULjZWh2v1ys0= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= @@ -346,8 +348,8 @@ github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce h1:giXvy4KSc/6g/e github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce/go.mod h1:9/y3cnZ5GKakj/H4y9r9GTjCvAFta7KLgSHPJJYc52M= github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE= github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= -github.com/cockroachdb/pebble v1.1.2 h1:CUh2IPtR4swHlEj48Rhfzw6l/d0qA31fItcIszQVIsA= -github.com/cockroachdb/pebble v1.1.2/go.mod h1:4exszw1r40423ZsmkG/09AFEG83I0uDgfujJdbL6kYU= +github.com/cockroachdb/pebble v1.1.3 h1:GM5YY3Yb09KCGUQoyWdi3vsLErXHsmc3qRRWsX+tBqw= +github.com/cockroachdb/pebble v1.1.3/go.mod h1:4exszw1r40423ZsmkG/09AFEG83I0uDgfujJdbL6kYU= github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30= github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo= @@ -393,6 +395,8 @@ github.com/dgraph-io/ristretto/v2 v2.0.0 h1:l0yiSOtlJvc0otkqyMaDNysg8E9/F/TYZwMb github.com/dgraph-io/ristretto/v2 v2.0.0/go.mod h1:FVFokF2dRqXyPyeMnK1YDy8Fc6aTe0IKgbcd03CYeEk= github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y= github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc h1:8WFBn63wegobsYAX0YjD+8suexZDga5CctH4CCTx2+8= +github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/digitalocean/go-libvirt v0.0.0-20240709142323-d8406205c752 h1:NI7XEcHzWVvBfVjSVK6Qk4wmrUfoyQxCNpBjrHelZFk= @@ -502,8 +506,8 @@ github.com/elastic/opentelemetry-collector-components/processor/elasticinframetr github.com/elastic/opentelemetry-collector-components/processor/elasticinframetricsprocessor v0.13.0/go.mod h1:iLTitn5RdFtBHYLC2YVAgjv2OxCKY51rdhtrsT1hQ8I= github.com/elastic/opentelemetry-collector-components/processor/elastictraceprocessor v0.3.0 h1:LgVBXE5ASDsheFLyM4ILG3jTuK3eJ6S67/NcKsHKP2Q= github.com/elastic/opentelemetry-collector-components/processor/elastictraceprocessor v0.3.0/go.mod h1:G0l0UkTeT1lxWCvURonQQLgNcaWX6bBuQexp5tubUZ8= -github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor v0.3.0 h1:kesYkHn4cTMyRwwVCihaqARHx0zn3/sl32TfdU/ugdQ= -github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor v0.3.0/go.mod h1:9FNHYqG5Z+4jZTuEJUrgEYYPQm7J5XZYvH2eUHwTfRg= +github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor v0.4.0 h1:ha16pisdi6FOsMLe7cJViNKzmrxmq1iNqRLuzS/Rqv8= +github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor v0.4.0/go.mod h1:iXCsBaErHjeNYUnygpK3MOqsmfxAh4oOmuexF1pS0Wo= github.com/elastic/opentelemetry-lib v0.14.0 h1:4P5q3RzwZTbAclHBmQp2dXxSsOMBQXZgkDStIR2iZnM= github.com/elastic/opentelemetry-lib v0.14.0/go.mod h1:/FfOjBoi8gaKQrkhFxzxQzP5g7soH/tShRWDxfeIUq8= github.com/elastic/pkcs8 v1.0.0 h1:HhitlUKxhN288kcNcYkjW6/ouvuwJWd9ioxpjnD9jVA= diff --git a/internal/pkg/otel/README.md b/internal/pkg/otel/README.md index ee726a8731c..7ccc37ca35f 100644 --- a/internal/pkg/otel/README.md +++ b/internal/pkg/otel/README.md @@ -75,7 +75,7 @@ This section provides a summary of components included in the Elastic Distributi | [filterprocessor](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/processor/filterprocessor/v0.117.0/processor/filterprocessor/README.md) | v0.117.0 | | [geoipprocessor](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/processor/geoipprocessor/v0.117.0/processor/geoipprocessor/README.md) | v0.117.0 | | [k8sattributesprocessor](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/processor/k8sattributesprocessor/v0.117.0/processor/k8sattributesprocessor/README.md) | v0.117.0 | -| [lsmintervalprocessor](https://github.com/elastic/opentelemetry-collector-components/blob/processor/lsmintervalprocessor/v0.3.0/processor/lsmintervalprocessor/README.md) | v0.3.0 | +| [lsmintervalprocessor](https://github.com/elastic/opentelemetry-collector-components/blob/processor/lsmintervalprocessor/v0.4.0/processor/lsmintervalprocessor/README.md) | v0.4.0 | | [memorylimiterprocessor](https://github.com/open-telemetry/opentelemetry-collector/blob/processor/memorylimiterprocessor/v0.117.0/processor/memorylimiterprocessor/README.md) | v0.117.0 | | [resourcedetectionprocessor](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/processor/resourcedetectionprocessor/v0.117.0/processor/resourcedetectionprocessor/README.md) | v0.117.0 | | [resourceprocessor](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/processor/resourceprocessor/v0.117.0/processor/resourceprocessor/README.md) | v0.117.0 | From 3885b01bf8362b26d31844752f4982953a5c0f46 Mon Sep 17 00:00:00 2001 From: Victor Martinez Date: Wed, 5 Feb 2025 14:35:19 +0100 Subject: [PATCH 12/14] updatecli: set force to false, it won't try to update the base branch (#6705) --- .ci/updatecli/updatecli-bump-golang.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.ci/updatecli/updatecli-bump-golang.yml b/.ci/updatecli/updatecli-bump-golang.yml index a5af6980968..be86caf4d20 100644 --- a/.ci/updatecli/updatecli-bump-golang.yml +++ b/.ci/updatecli/updatecli-bump-golang.yml @@ -13,6 +13,7 @@ scms: token: '{{ requiredEnv "GITHUB_TOKEN" }}' commitusingapi: true branch: main + force: false actions: elastic-agent: From ed6a47518a34f59b75471d5898a5ee3676fde0a3 Mon Sep 17 00:00:00 2001 From: Victor Martinez Date: Wed, 5 Feb 2025 16:16:58 +0100 Subject: [PATCH 13/14] github-actions: filter by current active branches in the repository (#6326) --- .github/workflows/bump-agent-versions.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/bump-agent-versions.yml b/.github/workflows/bump-agent-versions.yml index a58bc50dcaf..43e35c06d67 100644 --- a/.github/workflows/bump-agent-versions.yml +++ b/.github/workflows/bump-agent-versions.yml @@ -12,11 +12,14 @@ jobs: timeout-minutes: 1 outputs: matrix: ${{ steps.generator.outputs.matrix }} + permissions: + contents: read steps: - id: generator uses: elastic/oblt-actions/elastic/active-branches@v1 with: exclude-branches: "7.17" + filter-branches: true update_versions: runs-on: ubuntu-latest From bf5978415f812c8ed8566bb08f61e23e09f78e99 Mon Sep 17 00:00:00 2001 From: Victor Martinez Date: Wed, 5 Feb 2025 16:22:22 +0100 Subject: [PATCH 14/14] mergify: support backport-active-8, backport-active-9 and backport-active-all (#6708) --- .mergify.yml | 59 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 58 insertions(+), 1 deletion(-) diff --git a/.mergify.yml b/.mergify.yml index 3b523032f6f..6d3baa92534 100644 --- a/.mergify.yml +++ b/.mergify.yml @@ -122,7 +122,11 @@ pull_request_rules: This pull request does not have a backport label. Could you fix it @{{author}}? 🙏 To fixup this pull request, you need to add the backport labels for the needed branches, such as: - * `backport-./d./d` is the label to automatically backport to the `8./d` branch. `/d` is the digit + * `backport-./d./d` is the label that automatically backports to the `8./d` branch. `/d` is the digit + * `backport-active-all` is the label that automatically backports to all active branches. + * `backport-active-8` is the label that automatically backports to all active minor branches for the 8 major. + * `backport-active-9` is the label that automatically backports to all active minor branches for the 9 major. + - name: backport patches to 7.17 branch conditions: - merged @@ -370,3 +374,56 @@ pull_request_rules: labels: - "backport" title: "[{{ destination_branch }}](backport #{{ number }}) {{ title }}" + + - name: backport patches to all active minor branches for the 8 major. + conditions: + - merged + - label=backport-active-8 + actions: + backport: + assignees: + - "{{ author }}" + # NOTE: this list needs to be changed when a new minor branch is created + # or an existing minor branch reached EOL. + branches: + - "8.18" + - "8.17" + - "8.16" + labels: + - "backport" + title: "[{{ destination_branch }}](backport #{{ number }}) {{ title }}" + - name: backport patches to all active minor branches for the 9 major. + conditions: + - merged + - label=backport-active-9 + actions: + backport: + assignees: + - "{{ author }}" + # NOTE: this list needs to be changed when a new minor branch is created + # or an existing minor branch reached EOL. + branches: + - "9.0" + labels: + - "backport" + title: "[{{ destination_branch }}](backport #{{ number }}) {{ title }}" + - name: backport patches to all active branches + conditions: + - merged + - label=backport-active-all + actions: + backport: + assignees: + - "{{ author }}" + # NOTE: this list needs to be changed when a new minor branch is created + # or an existing release branch reached EOL. + branches: + - "9.0" + - "8.18" + - "8.17" + - "8.16" + - "8.x" + - "7.17" + labels: + - "backport" + title: "[{{ destination_branch }}](backport #{{ number }}) {{ title }}"