Skip to content

Commit

Permalink
Fix shutdown of beats receivers (#42652)
Browse files Browse the repository at this point in the history
According to the otel component interface, Shutdown should wait for
global resources to be freed. In particular, Shutdown returning means
that the collector may create and start another component with the same
name, which in case of beats receivers leads to a panic due to registry
conflicts.

(cherry picked from commit 7514fa9)
  • Loading branch information
swiatekm authored and mergify[bot] committed Feb 10, 2025
1 parent bf91bc4 commit 7089bf1
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 0 deletions.
5 changes: 5 additions & 0 deletions x-pack/filebeat/fbreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package fbreceiver

import (
"context"
"sync"

"github.com/elastic/beats/v7/libbeat/beat"

Expand All @@ -17,10 +18,13 @@ type filebeatReceiver struct {
beat *beat.Beat
beater beat.Beater
logger *zap.Logger
wg sync.WaitGroup
}

func (fb *filebeatReceiver) Start(ctx context.Context, host component.Host) error {
fb.wg.Add(1)
go func() {
defer fb.wg.Done()
fb.logger.Info("starting filebeat receiver")
err := fb.beater.Run(fb.beat)
if err != nil {
Expand All @@ -33,5 +37,6 @@ func (fb *filebeatReceiver) Start(ctx context.Context, host component.Host) erro
func (fb *filebeatReceiver) Shutdown(ctx context.Context) error {
fb.logger.Info("stopping filebeat receiver")
fb.beater.Stop()
fb.wg.Wait()
return nil
}
5 changes: 5 additions & 0 deletions x-pack/metricbeat/mbreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package mbreceiver

import (
"context"
"sync"

"github.com/elastic/beats/v7/libbeat/beat"

Expand All @@ -17,10 +18,13 @@ type metricbeatReceiver struct {
beat *beat.Beat
beater beat.Beater
logger *zap.Logger
wg sync.WaitGroup
}

func (mb *metricbeatReceiver) Start(ctx context.Context, host component.Host) error {
mb.wg.Add(1)
go func() {
defer mb.wg.Done()
mb.logger.Info("starting metricbeat receiver")
err := mb.beater.Run(mb.beat)
if err != nil {
Expand All @@ -33,5 +37,6 @@ func (mb *metricbeatReceiver) Start(ctx context.Context, host component.Host) er
func (mb *metricbeatReceiver) Shutdown(ctx context.Context) error {
mb.logger.Info("stopping metricbeat receiver")
mb.beater.Stop()
mb.wg.Wait()
return nil
}

0 comments on commit 7089bf1

Please sign in to comment.