Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for variables in outputs and default provider #6602

Merged
merged 17 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Add context variable support to outputs

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: |
Adds support for using context variable providers in the outputs section of a policy. Includes fallback support
to reference env provider when no provider prefix is provided in the variable.

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: elastic-agent

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/6602

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/6376
62 changes: 46 additions & 16 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,11 @@ type ConfigManager interface {
type VarsManager interface {
Runner

// DefaultProvider returns the default provider that the variable manager is configured to use.
DefaultProvider() string

// Observe instructs the variables to observe.
Observe([]string)
Observe(context.Context, []string) ([]*transpiler.Vars, error)

// Watch returns the chanel to watch for variable changes.
Watch() <-chan []*transpiler.Vars
Expand Down Expand Up @@ -1244,7 +1247,11 @@ func (c *Coordinator) processConfigAgent(ctx context.Context, cfg *config.Config
}

// pass the observed vars from the AST to the varsMgr
c.observeASTVars()
err = c.observeASTVars(ctx)
if err != nil {
// only possible error here is the context being cancelled
return err
}

// Disabled for 8.8.0 release in order to limit the surface
// https://github.com/elastic/security-team/issues/6501
Expand Down Expand Up @@ -1327,25 +1334,32 @@ func (c *Coordinator) generateAST(cfg *config.Config) (err error) {
// observeASTVars identifies the variables that are referenced in the computed AST and passed to
// the varsMgr so it knows what providers are being referenced. If a providers is not being
// referenced then the provider does not need to be running.
func (c *Coordinator) observeASTVars() {
func (c *Coordinator) observeASTVars(ctx context.Context) error {
if c.varsMgr == nil {
// No varsMgr (only happens in testing)
return
return nil
}
if c.ast == nil {
// No AST; no vars
c.varsMgr.Observe(nil)
return
var vars []string
if c.ast != nil {
inputs, ok := transpiler.Lookup(c.ast, "inputs")
if ok {
vars = inputs.Vars(vars, c.varsMgr.DefaultProvider())
}
outputs, ok := transpiler.Lookup(c.ast, "outputs")
if ok {
vars = outputs.Vars(vars, c.varsMgr.DefaultProvider())
}
}
inputs, ok := transpiler.Lookup(c.ast, "inputs")
if !ok {
// No inputs; no vars
c.varsMgr.Observe(nil)
return
updated, err := c.varsMgr.Observe(ctx, vars)
if err != nil {
// context cancel
return err
}
var vars []string
vars = inputs.Vars(vars)
c.varsMgr.Observe(vars)
if updated != nil {
// provided an updated set of vars (observed changed)
c.vars = updated
}
return nil
}

// processVars updates the transpiler vars in the Coordinator.
Expand Down Expand Up @@ -1421,6 +1435,8 @@ func (c *Coordinator) generateComponentModel() (err error) {
}()

ast := c.ast.ShallowClone()

// perform variable substitution for inputs
inputs, ok := transpiler.Lookup(ast, "inputs")
if ok {
renderedInputs, err := transpiler.RenderInputs(inputs, c.vars)
Expand All @@ -1433,6 +1449,20 @@ func (c *Coordinator) generateComponentModel() (err error) {
}
}

// perform variable substitution for outputs
// outputs only support the context variables (dynamic provides are not provide to the outputs)
outputs, ok := transpiler.Lookup(ast, "outputs")
if ok {
renderedOutputs, err := transpiler.RenderOutputs(outputs, c.vars)
if err != nil {
return fmt.Errorf("rendering outputs failed: %w", err)
}
err = transpiler.Insert(ast, renderedOutputs, "outputs")
if err != nil {
return fmt.Errorf("inserting rendered outputs failed: %w", err)
}
}

cfg, err := ast.Map()
if err != nil {
return fmt.Errorf("failed to convert ast to map[string]interface{}: %w", err)
Expand Down
17 changes: 15 additions & 2 deletions internal/pkg/agent/application/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,7 @@ func BenchmarkCoordinator_generateComponentModel(b *testing.B) {
require.NoError(b, err)
vars := make([]*transpiler.Vars, len(varsMaps))
for i, vm := range varsMaps {
vars[i], err = transpiler.NewVars(fmt.Sprintf("%d", i), vm, mapstr.M{})
vars[i], err = transpiler.NewVars(fmt.Sprintf("%d", i), vm, mapstr.M{}, "")
require.NoError(b, err)
}

Expand Down Expand Up @@ -1188,6 +1188,9 @@ func (l *configChange) Fail(err error) {
}

type fakeVarsManager struct {
varsMx sync.RWMutex
vars []*transpiler.Vars

varsCh chan []*transpiler.Vars
errCh chan error

Expand Down Expand Up @@ -1222,19 +1225,29 @@ func (f *fakeVarsManager) Watch() <-chan []*transpiler.Vars {
return f.varsCh
}

func (f *fakeVarsManager) Observe(observed []string) {
func (f *fakeVarsManager) Observe(ctx context.Context, observed []string) ([]*transpiler.Vars, error) {
f.observedMx.Lock()
defer f.observedMx.Unlock()
f.observed = observed
f.varsMx.RLock()
defer f.varsMx.RUnlock()
return f.vars, nil
}

func (f *fakeVarsManager) Vars(ctx context.Context, vars []*transpiler.Vars) {
f.varsMx.Lock()
f.vars = vars
f.varsMx.Unlock()
select {
case <-ctx.Done():
case f.varsCh <- vars:
}
}

func (f *fakeVarsManager) DefaultProvider() string {
return ""
}

type fakeOTelManager struct {
updateCallback func(*confmap.Conf) error
result error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1104,7 +1104,7 @@ inputs:
components = nil
vars, err := transpiler.NewVars("", map[string]interface{}{
"TEST_VAR": "input-id",
}, nil)
}, nil, "")
require.NoError(t, err, "Vars creation must succeed")
varsChan <- []*transpiler.Vars{vars}
coord.runLoopIteration(ctx)
Expand All @@ -1121,7 +1121,7 @@ inputs:
components = nil
vars, err = transpiler.NewVars("", map[string]interface{}{
"TEST_VAR": "changed-input-id",
}, nil)
}, nil, "")
require.NoError(t, err, "Vars creation must succeed")
varsChan <- []*transpiler.Vars{vars}
coord.runLoopIteration(ctx)
Expand Down Expand Up @@ -1239,7 +1239,7 @@ func TestCoordinatorInitiatesUpgrade(t *testing.T) {
// (Coordinator will only regenerate its component model when it has non-nil
// vars).
func emptyVars(t *testing.T) []*transpiler.Vars {
vars, err := transpiler.NewVars("", map[string]interface{}{}, nil)
vars, err := transpiler.NewVars("", map[string]interface{}{}, nil, "")
require.NoError(t, err, "Vars creation must succeed")
return []*transpiler.Vars{vars}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func TestDiagnosticVariables(t *testing.T) {
map[string]interface{}{
"testvar": "testvalue",
},
nil)
nil, "")
require.NoError(t, err)

expected := `
Expand Down
26 changes: 13 additions & 13 deletions internal/pkg/agent/transpiler/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type Node interface {

// Vars adds to the array with the variables identified in the node. Returns the array in-case
// the capacity of the array had to be changed.
Vars([]string) []string
Vars([]string, string) []string

// Apply apply the current vars, returning the new value for the node. This does not modify the original Node.
Apply(*Vars) (Node, error)
Expand Down Expand Up @@ -182,10 +182,10 @@ func (d *Dict) Hash64With(h *xxhash.Digest) error {
}

// Vars returns a list of all variables referenced in the dictionary.
func (d *Dict) Vars(vars []string) []string {
func (d *Dict) Vars(vars []string, defaultProvider string) []string {
for _, v := range d.value {
k := v.(*Key)
vars = k.Vars(vars)
vars = k.Vars(vars, defaultProvider)
}
return vars
}
Expand Down Expand Up @@ -318,11 +318,11 @@ func (k *Key) Hash64With(h *xxhash.Digest) error {
}

// Vars returns a list of all variables referenced in the value.
func (k *Key) Vars(vars []string) []string {
func (k *Key) Vars(vars []string, defaultProvider string) []string {
if k.value == nil {
return vars
}
return k.value.Vars(vars)
return k.value.Vars(vars, defaultProvider)
}

// Apply applies the vars to the value. This does not modify the original node.
Expand Down Expand Up @@ -463,9 +463,9 @@ func (l *List) ShallowClone() Node {
}

// Vars returns a list of all variables referenced in the list.
func (l *List) Vars(vars []string) []string {
func (l *List) Vars(vars []string, defaultProvider string) []string {
for _, v := range l.value {
vars = v.Vars(vars)
vars = v.Vars(vars, defaultProvider)
}
return vars
}
Expand Down Expand Up @@ -552,12 +552,12 @@ func (s *StrVal) Hash64With(h *xxhash.Digest) error {
}

// Vars returns a list of all variables referenced in the string.
func (s *StrVal) Vars(vars []string) []string {
func (s *StrVal) Vars(vars []string, defaultProvider string) []string {
// errors are ignored (if there is an error determine the vars it will also error computing the policy)
_, _ = replaceVars(s.value, func(variable string) (Node, Processors, bool) {
vars = append(vars, variable)
return nil, nil, false
}, false)
}, false, defaultProvider)
return vars
}

Expand Down Expand Up @@ -613,7 +613,7 @@ func (s *IntVal) ShallowClone() Node {
}

// Vars does nothing. Cannot have variable in an IntVal.
func (s *IntVal) Vars(vars []string) []string {
func (s *IntVal) Vars(vars []string, defaultProvider string) []string {
return vars
}

Expand Down Expand Up @@ -691,7 +691,7 @@ func (s *UIntVal) Hash64With(h *xxhash.Digest) error {
}

// Vars does nothing. Cannot have variable in an UIntVal.
func (s *UIntVal) Vars(vars []string) []string {
func (s *UIntVal) Vars(vars []string, defaultProvider string) []string {
return vars
}

Expand Down Expand Up @@ -764,7 +764,7 @@ func (s *FloatVal) hashString() string {
}

// Vars does nothing. Cannot have variable in an FloatVal.
func (s *FloatVal) Vars(vars []string) []string {
func (s *FloatVal) Vars(vars []string, defaultProvider string) []string {
return vars
}

Expand Down Expand Up @@ -843,7 +843,7 @@ func (s *BoolVal) Hash64With(h *xxhash.Digest) error {
}

// Vars does nothing. Cannot have variable in an BoolVal.
func (s *BoolVal) Vars(vars []string) []string {
func (s *BoolVal) Vars(vars []string, defaultProvider string) []string {
return vars
}

Expand Down
Loading
Loading