From 7089bf195802d03d02f5d3064cde3d06d8a13ed0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Mon, 10 Feb 2025 17:32:54 +0100 Subject: [PATCH] Fix shutdown of beats receivers (#42652) According to the otel component interface, Shutdown should wait for global resources to be freed. In particular, Shutdown returning means that the collector may create and start another component with the same name, which in case of beats receivers leads to a panic due to registry conflicts. (cherry picked from commit 7514fa92158ad062feb7c7b308740cdd47bf4dd6) --- x-pack/filebeat/fbreceiver/receiver.go | 5 +++++ x-pack/metricbeat/mbreceiver/receiver.go | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/x-pack/filebeat/fbreceiver/receiver.go b/x-pack/filebeat/fbreceiver/receiver.go index eeba511dd1e7..495877abf43b 100644 --- a/x-pack/filebeat/fbreceiver/receiver.go +++ b/x-pack/filebeat/fbreceiver/receiver.go @@ -6,6 +6,7 @@ package fbreceiver import ( "context" + "sync" "github.com/elastic/beats/v7/libbeat/beat" @@ -17,10 +18,13 @@ type filebeatReceiver struct { beat *beat.Beat beater beat.Beater logger *zap.Logger + wg sync.WaitGroup } func (fb *filebeatReceiver) Start(ctx context.Context, host component.Host) error { + fb.wg.Add(1) go func() { + defer fb.wg.Done() fb.logger.Info("starting filebeat receiver") err := fb.beater.Run(fb.beat) if err != nil { @@ -33,5 +37,6 @@ func (fb *filebeatReceiver) Start(ctx context.Context, host component.Host) erro func (fb *filebeatReceiver) Shutdown(ctx context.Context) error { fb.logger.Info("stopping filebeat receiver") fb.beater.Stop() + fb.wg.Wait() return nil } diff --git a/x-pack/metricbeat/mbreceiver/receiver.go b/x-pack/metricbeat/mbreceiver/receiver.go index 848ca9347aab..de0f7d9e1968 100644 --- a/x-pack/metricbeat/mbreceiver/receiver.go +++ b/x-pack/metricbeat/mbreceiver/receiver.go @@ -6,6 +6,7 @@ package mbreceiver import ( "context" + "sync" "github.com/elastic/beats/v7/libbeat/beat" @@ -17,10 +18,13 @@ type metricbeatReceiver struct { beat *beat.Beat beater beat.Beater logger *zap.Logger + wg sync.WaitGroup } func (mb *metricbeatReceiver) Start(ctx context.Context, host component.Host) error { + mb.wg.Add(1) go func() { + defer mb.wg.Done() mb.logger.Info("starting metricbeat receiver") err := mb.beater.Run(mb.beat) if err != nil { @@ -33,5 +37,6 @@ func (mb *metricbeatReceiver) Start(ctx context.Context, host component.Host) er func (mb *metricbeatReceiver) Shutdown(ctx context.Context) error { mb.logger.Info("stopping metricbeat receiver") mb.beater.Stop() + mb.wg.Wait() return nil }