From 2ee6f87a6f0ec67da51d7bb54b37aa479b996918 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Mon, 27 Jan 2025 11:36:31 +0100 Subject: [PATCH] Fix shutdown of beats receivers According to the otel component interface, Shutdown should wait for global resources to be freed. In particular, Shutdown returning means that the collector may create and start another component with the same name, which in case of beats receivers leads to a panic due to registry conflicts. --- x-pack/filebeat/fbreceiver/receiver.go | 5 +++++ x-pack/metricbeat/mbreceiver/receiver.go | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/x-pack/filebeat/fbreceiver/receiver.go b/x-pack/filebeat/fbreceiver/receiver.go index eeba511dd1e7..495877abf43b 100644 --- a/x-pack/filebeat/fbreceiver/receiver.go +++ b/x-pack/filebeat/fbreceiver/receiver.go @@ -6,6 +6,7 @@ package fbreceiver import ( "context" + "sync" "github.com/elastic/beats/v7/libbeat/beat" @@ -17,10 +18,13 @@ type filebeatReceiver struct { beat *beat.Beat beater beat.Beater logger *zap.Logger + wg sync.WaitGroup } func (fb *filebeatReceiver) Start(ctx context.Context, host component.Host) error { + fb.wg.Add(1) go func() { + defer fb.wg.Done() fb.logger.Info("starting filebeat receiver") err := fb.beater.Run(fb.beat) if err != nil { @@ -33,5 +37,6 @@ func (fb *filebeatReceiver) Start(ctx context.Context, host component.Host) erro func (fb *filebeatReceiver) Shutdown(ctx context.Context) error { fb.logger.Info("stopping filebeat receiver") fb.beater.Stop() + fb.wg.Wait() return nil } diff --git a/x-pack/metricbeat/mbreceiver/receiver.go b/x-pack/metricbeat/mbreceiver/receiver.go index 848ca9347aab..de0f7d9e1968 100644 --- a/x-pack/metricbeat/mbreceiver/receiver.go +++ b/x-pack/metricbeat/mbreceiver/receiver.go @@ -6,6 +6,7 @@ package mbreceiver import ( "context" + "sync" "github.com/elastic/beats/v7/libbeat/beat" @@ -17,10 +18,13 @@ type metricbeatReceiver struct { beat *beat.Beat beater beat.Beater logger *zap.Logger + wg sync.WaitGroup } func (mb *metricbeatReceiver) Start(ctx context.Context, host component.Host) error { + mb.wg.Add(1) go func() { + defer mb.wg.Done() mb.logger.Info("starting metricbeat receiver") err := mb.beater.Run(mb.beat) if err != nil { @@ -33,5 +37,6 @@ func (mb *metricbeatReceiver) Start(ctx context.Context, host component.Host) er func (mb *metricbeatReceiver) Shutdown(ctx context.Context) error { mb.logger.Info("stopping metricbeat receiver") mb.beater.Stop() + mb.wg.Wait() return nil }