diff --git a/x-pack/filebeat/fbreceiver/receiver.go b/x-pack/filebeat/fbreceiver/receiver.go index eeba511dd1e7..495877abf43b 100644 --- a/x-pack/filebeat/fbreceiver/receiver.go +++ b/x-pack/filebeat/fbreceiver/receiver.go @@ -6,6 +6,7 @@ package fbreceiver import ( "context" + "sync" "github.com/elastic/beats/v7/libbeat/beat" @@ -17,10 +18,13 @@ type filebeatReceiver struct { beat *beat.Beat beater beat.Beater logger *zap.Logger + wg sync.WaitGroup } func (fb *filebeatReceiver) Start(ctx context.Context, host component.Host) error { + fb.wg.Add(1) go func() { + defer fb.wg.Done() fb.logger.Info("starting filebeat receiver") err := fb.beater.Run(fb.beat) if err != nil { @@ -33,5 +37,6 @@ func (fb *filebeatReceiver) Start(ctx context.Context, host component.Host) erro func (fb *filebeatReceiver) Shutdown(ctx context.Context) error { fb.logger.Info("stopping filebeat receiver") fb.beater.Stop() + fb.wg.Wait() return nil } diff --git a/x-pack/metricbeat/mbreceiver/receiver.go b/x-pack/metricbeat/mbreceiver/receiver.go index 848ca9347aab..de0f7d9e1968 100644 --- a/x-pack/metricbeat/mbreceiver/receiver.go +++ b/x-pack/metricbeat/mbreceiver/receiver.go @@ -6,6 +6,7 @@ package mbreceiver import ( "context" + "sync" "github.com/elastic/beats/v7/libbeat/beat" @@ -17,10 +18,13 @@ type metricbeatReceiver struct { beat *beat.Beat beater beat.Beater logger *zap.Logger + wg sync.WaitGroup } func (mb *metricbeatReceiver) Start(ctx context.Context, host component.Host) error { + mb.wg.Add(1) go func() { + defer mb.wg.Done() mb.logger.Info("starting metricbeat receiver") err := mb.beater.Run(mb.beat) if err != nil { @@ -33,5 +37,6 @@ func (mb *metricbeatReceiver) Start(ctx context.Context, host component.Host) er func (mb *metricbeatReceiver) Shutdown(ctx context.Context) error { mb.logger.Info("stopping metricbeat receiver") mb.beater.Stop() + mb.wg.Wait() return nil }