Skip to content

Commit

Permalink
Fix race in error handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
blakerouse committed Feb 19, 2025
1 parent f7fc674 commit 3cc8fa1
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 26 deletions.
50 changes: 25 additions & 25 deletions internal/pkg/otel/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type OTelManager struct {
func NewOTelManager(logger *logger.Logger) *OTelManager {
return &OTelManager{
logger: logger,
errCh: make(chan error),
errCh: make(chan error, 1), // holds at most one error
cfgCh: make(chan *confmap.Conf),
statusCh: make(chan *status.AggregateStatus),
doneChan: make(chan struct{}),
Expand Down Expand Up @@ -83,16 +83,10 @@ func (m *OTelManager) Run(ctx context.Context) error {
// it's failing to run). we do not retry creation on failure
// as it will always fail a new configuration is required for
// it not to fail (a new configuration will result in the retry)
select {
case m.errCh <- err:
case <-ctx.Done():
}
m.reportErr(ctx, err)
} else {
// all good at the moment (possible that it will fail)
select {
case m.errCh <- nil:
case <-ctx.Done():
}
m.reportErr(ctx, nil)
}
} else {
// error occurred while running the collector, this occurs in the
Expand All @@ -114,10 +108,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
}
// pass the error to the errCh so the coordinator, unless it's a cancel error
if !errors.Is(err, context.Canceled) {
select {
case m.errCh <- err:
case <-ctx.Done():
}
m.reportErr(ctx, err)
}
}
case cfg := <-m.cfgCh:
Expand All @@ -141,10 +132,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
}
// ensure that the coordinator knows that there is no error
// as the collector is not running anymore
select {
case m.errCh <- nil:
case <-ctx.Done():
}
m.reportErr(ctx, nil)
} else {
// either a new configuration or the first configuration
// that results in the collector being started
Expand All @@ -157,16 +145,10 @@ func (m *OTelManager) Run(ctx context.Context) error {
// it's failing to run). we do not retry creation on failure
// as it will always fail a new configuration is required for
// it not to fail (a new configuration will result in the retry)
select {
case m.errCh <- err:
case <-ctx.Done():
}
m.reportErr(ctx, err)
} else {
// all good at the moment (possible that it will fail)
select {
case m.errCh <- nil:
case <-ctx.Done():
}
m.reportErr(ctx, nil)
}
} else {
// collector is already running so only the configuration
Expand Down Expand Up @@ -226,3 +208,21 @@ func (m *OTelManager) startCollector(cfg *confmap.Conf, errCh chan error) (conte
}()
return cancel, ap, nil
}

// reportErr reports an error to the service that is controlling this manager
//
// the manager can be blocked doing other work like sending this manager a new configuration
// so we do not want error reporting to be a blocking send over the channel
//
// the manager really only needs the most recent error, so if it misses an error it's not a big
// deal, what matters is that it has the current error for the state of this manager
func (m *OTelManager) reportErr(ctx context.Context, err error) {
select {
case <-m.errCh:
default:
}
select {
case m.errCh <- err:
case <-ctx.Done():
}
}
1 change: 0 additions & 1 deletion internal/pkg/otel/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ var (
)

func TestOTelManager_Run(t *testing.T) {
t.Skip("Flaky test") // https://github.com/elastic/elastic-agent/issues/6119
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
l, _ := loggertest.New("otel")
Expand Down

0 comments on commit 3cc8fa1

Please sign in to comment.