Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock in error handling in OTelManager #6927

Merged
merged 4 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions changelog/fragments/1739981369-Fix-deadlock-in-OTelManager.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: Fix deadlock in OTelManager

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: |
Fixes a deadlock case in the OTelManager where an Update can occur while an error is also being reported
which causes the communication with the OTelManager to hit a deadlock.

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/6927

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
#issue: https://github.com/owner/repo/1234
50 changes: 25 additions & 25 deletions internal/pkg/otel/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type OTelManager struct {
func NewOTelManager(logger *logger.Logger) *OTelManager {
return &OTelManager{
logger: logger,
errCh: make(chan error),
errCh: make(chan error, 1), // holds at most one error
cfgCh: make(chan *confmap.Conf),
statusCh: make(chan *status.AggregateStatus),
doneChan: make(chan struct{}),
Expand Down Expand Up @@ -83,16 +83,10 @@ func (m *OTelManager) Run(ctx context.Context) error {
// it's failing to run). we do not retry creation on failure
// as it will always fail a new configuration is required for
// it not to fail (a new configuration will result in the retry)
select {
case m.errCh <- err:
case <-ctx.Done():
}
m.reportErr(ctx, err)
} else {
// all good at the moment (possible that it will fail)
select {
case m.errCh <- nil:
case <-ctx.Done():
}
m.reportErr(ctx, nil)
}
} else {
// error occurred while running the collector, this occurs in the
Expand All @@ -114,10 +108,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
}
// pass the error to the errCh so the coordinator, unless it's a cancel error
if !errors.Is(err, context.Canceled) {
select {
case m.errCh <- err:
case <-ctx.Done():
}
m.reportErr(ctx, err)
}
}
case cfg := <-m.cfgCh:
Expand All @@ -141,10 +132,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
}
// ensure that the coordinator knows that there is no error
// as the collector is not running anymore
select {
case m.errCh <- nil:
case <-ctx.Done():
}
m.reportErr(ctx, nil)
} else {
// either a new configuration or the first configuration
// that results in the collector being started
Expand All @@ -157,16 +145,10 @@ func (m *OTelManager) Run(ctx context.Context) error {
// it's failing to run). we do not retry creation on failure
// as it will always fail a new configuration is required for
// it not to fail (a new configuration will result in the retry)
select {
case m.errCh <- err:
case <-ctx.Done():
}
m.reportErr(ctx, err)
} else {
// all good at the moment (possible that it will fail)
select {
case m.errCh <- nil:
case <-ctx.Done():
}
m.reportErr(ctx, nil)
}
} else {
// collector is already running so only the configuration
Expand Down Expand Up @@ -226,3 +208,21 @@ func (m *OTelManager) startCollector(cfg *confmap.Conf, errCh chan error) (conte
}()
return cancel, ap, nil
}

// reportErr reports an error to the service that is controlling this manager
//
// the manager can be blocked doing other work like sending this manager a new configuration
// so we do not want error reporting to be a blocking send over the channel
//
// the manager really only needs the most recent error, so if it misses an error it's not a big
// deal, what matters is that it has the current error for the state of this manager
func (m *OTelManager) reportErr(ctx context.Context, err error) {
select {
case <-m.errCh:
default:
}
select {
case m.errCh <- err:
case <-ctx.Done():
}
}
19 changes: 16 additions & 3 deletions internal/pkg/otel/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ var (
)

func TestOTelManager_Run(t *testing.T) {
t.Skip("Flaky test") // https://github.com/elastic/elastic-agent/issues/6119
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
l, _ := loggertest.New("otel")
Expand Down Expand Up @@ -206,8 +205,22 @@ func TestOTelManager_ConfigError(t *testing.T) {
}
}()

cfg := confmap.New() // invalid config
m.Update(cfg)
// Errors channel is non-blocking, should be able to send an Update that causes an error multiple
// times without it blocking on sending over the errCh.
for range 3 {
cfg := confmap.New() // invalid config
m.Update(cfg)

// delay between updates to ensure the collector will have to fail
<-time.After(100 * time.Millisecond)
}

// because of the retry logic and timing we need to ensure
// that this keeps retrying to see the error and only store
// an actual error
//
// a nil error just means that the collector is trying to restart
// which clears the error on the restart loop
timeoutCh := time.After(time.Second * 5)
var err error
outer:
Expand Down
Loading