From e993e09c5e6b1a3d61c824f16e2fe43d57b2b99d Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Fri, 12 Apr 2024 17:05:04 +0200 Subject: [PATCH] Fix panic when more than 32767 pipeline clients are active (#38556) The publishing pipeline would panic when more than 32767 clients were active, that happened because each client would add two channels to a slice and an infinity loop would use reflect.Select on this list. reflect.Select supports a maximum of 65536 cases, if there are more it panics. This PR fixes this by removing the need for this list. The pipeline used an infinity loop to detect if the CloseRef from the pipeline client was closed, if it happened, then it would call client.Close. By analysing the code setting CloseRef we identified there was no need for the pipeline to be responsible for propagating this signal, most of the times the pipeline client was created, there was already a defer to close it, in the few places where it was not present we added one. Now the pipeline is not responsible for closing any client, whomever creates a client is responsible for correctly closing it. --- CHANGELOG-developer.next.asciidoc | 2 + CHANGELOG.next.asciidoc | 1 + .../internal/input-logfile/harvester.go | 1 - filebeat/input/kafka/input.go | 2 +- filebeat/input/v2/input-cursor/input.go | 1 - .../input/v2/input-stateless/stateless.go | 3 - .../v2/input-stateless/stateless_test.go | 9 +- libbeat/beat/pipeline.go | 9 -- libbeat/publisher/pipeline/client.go | 8 +- libbeat/publisher/pipeline/client_test.go | 15 +-- libbeat/publisher/pipeline/pipeline.go | 92 +------------------ libbeat/publisher/pipeline/pipeline_test.go | 92 +++++++++++++++++++ libbeat/publisher/testing/testing.go | 7 +- x-pack/filebeat/input/awscloudwatch/input.go | 1 - x-pack/filebeat/input/awss3/input.go | 1 - .../entityanalytics/internal/kvstore/input.go | 5 +- x-pack/filebeat/input/lumberjack/input.go | 1 - x-pack/filebeat/input/netflow/input.go | 2 +- x-pack/filebeat/input/shipper/input.go | 3 +- 19 files changed, 121 insertions(+), 134 deletions(-) diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 8b0b0ab724f0..8875b834e66a 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -66,6 +66,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Rename `queue.Batch.ACK()` to `queue.Batch.Done()`. {pull}31903[31903] - `queue.ACKListener` has been removed. Queue configurations now accept an explicit callback function for ACK handling. {pull}35078[35078] - Split split httpmon out of x-pack/filebeat/input/internal/httplog. {pull}36385[36385] +- Beats publishing pipeline does not propagate the close signal to its clients any more. It's responsibility of the user to close the pipeline client. {issue}38197[38197] {pull}38556[38556] ==== Bugfixes @@ -93,6 +94,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Make winlogbeat/sys/wineventlog follow the unsafe.Pointer rules. {pull}36650[36650] - Cleaned up documentation errors & fixed a minor bug in Filebeat Azure blob storage input. {pull}36714[36714] - Fix copy arguments for strict aligned architectures. {pull}36976[36976] +- Fix panic when more than 32767 pipeline clients are active. {issue}38197[38197] {pull}38556[38556] ==== Added diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 0c9b3d89cb20..1b61e70423c6 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -101,6 +101,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix filestream's registry GC: registry entries are now removed from the in-memory and disk store when they're older than the set TTL {issue}36761[36761] {pull}38488[38488] - Fix indexing failures by re-enabling event normalisation in netflow input. {issue}38703[38703] {pull}38780[38780] - Fix handling of truncated files in Filestream {issue}38070[38070] {pull}38416[38416] +- Fix panic when more than 32767 pipeline clients are active. {issue}38197[38197] {pull}38556[38556] *Heartbeat* diff --git a/filebeat/input/filestream/internal/input-logfile/harvester.go b/filebeat/input/filestream/internal/input-logfile/harvester.go index b3f54e655947..41cfc83857f3 100644 --- a/filebeat/input/filestream/internal/input-logfile/harvester.go +++ b/filebeat/input/filestream/internal/input-logfile/harvester.go @@ -219,7 +219,6 @@ func startHarvester( defer releaseResource(resource) client, err := hg.pipeline.ConnectWith(beat.ClientConfig{ - CloseRef: ctx.Cancelation, EventListener: newInputACKHandler(hg.ackCH), }) if err != nil { diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 83114f2c630e..e2a04b5fa499 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -120,12 +120,12 @@ func (input *kafkaInput) Run(ctx input.Context, pipeline beat.Pipeline) error { } }), ), - CloseRef: ctx.Cancelation, WaitClose: input.config.WaitClose, }) if err != nil { return err } + defer client.Close() log.Info("Starting Kafka input") defer log.Info("Kafka input stopped") diff --git a/filebeat/input/v2/input-cursor/input.go b/filebeat/input/v2/input-cursor/input.go index 88e28dde2fba..37036e983c63 100644 --- a/filebeat/input/v2/input-cursor/input.go +++ b/filebeat/input/v2/input-cursor/input.go @@ -146,7 +146,6 @@ func (inp *managedInput) runSource( }() client, err := pipeline.ConnectWith(beat.ClientConfig{ - CloseRef: ctx.Cancelation, EventListener: newInputACKHandler(ctx.Logger), }) if err != nil { diff --git a/filebeat/input/v2/input-stateless/stateless.go b/filebeat/input/v2/input-stateless/stateless.go index c9d51143de38..4bc6c79c243e 100644 --- a/filebeat/input/v2/input-stateless/stateless.go +++ b/filebeat/input/v2/input-stateless/stateless.go @@ -85,9 +85,6 @@ func (si configuredInput) Run(ctx v2.Context, pipeline beat.PipelineConnector) ( client, err := pipeline.ConnectWith(beat.ClientConfig{ PublishMode: beat.DefaultGuarantees, - - // configure pipeline to disconnect input on stop signal. - CloseRef: ctx.Cancelation, }) if err != nil { return err diff --git a/filebeat/input/v2/input-stateless/stateless_test.go b/filebeat/input/v2/input-stateless/stateless_test.go index 13627338c69f..2febcb7e1b6d 100644 --- a/filebeat/input/v2/input-stateless/stateless_test.go +++ b/filebeat/input/v2/input-stateless/stateless_test.go @@ -107,6 +107,9 @@ func TestStateless_Run(t *testing.T) { }, }), nil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // connector creates a client the blocks forever until the shutdown signal is received var publishCalls atomic.Int connector := pubtest.FakeConnector{ @@ -114,15 +117,13 @@ func TestStateless_Run(t *testing.T) { return &pubtest.FakeClient{ PublishFunc: func(event beat.Event) { publishCalls.Inc() - <-config.CloseRef.Done() + // Unlock Publish once the input has been cancelled + <-ctx.Done() }, }, nil }, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - var wg sync.WaitGroup var err error wg.Add(1) diff --git a/libbeat/beat/pipeline.go b/libbeat/beat/pipeline.go index 8e8b285042c4..0917001a86c2 100644 --- a/libbeat/beat/pipeline.go +++ b/libbeat/beat/pipeline.go @@ -49,8 +49,6 @@ type ClientConfig struct { Processing ProcessingConfig - CloseRef CloseRef - // WaitClose sets the maximum duration to wait on ACK, if client still has events // active non-acknowledged events in the publisher pipeline. // WaitClose is only effective if one of ACKCount, ACKEvents and ACKLastEvents @@ -91,13 +89,6 @@ type EventListener interface { ClientClosed() } -// CloseRef allows users to close the client asynchronously. -// A CloseRef implements a subset of function required for context.Context. -type CloseRef interface { - Done() <-chan struct{} - Err() error -} - // ProcessingConfig provides additional event processing settings a client can // pass to the publisher pipeline on Connect. type ProcessingConfig struct { diff --git a/libbeat/publisher/pipeline/client.go b/libbeat/publisher/pipeline/client.go index c566a07942fd..a5c02faace6d 100644 --- a/libbeat/publisher/pipeline/client.go +++ b/libbeat/publisher/pipeline/client.go @@ -42,10 +42,8 @@ type client struct { eventWaitGroup *sync.WaitGroup // Open state, signaling, and sync primitives for coordinating client Close. - isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore. - closeOnce sync.Once // closeOnce ensure that the client shutdown sequence is only executed once - closeRef beat.CloseRef // extern closeRef for sending a signal that the client should be closed. - done chan struct{} // the done channel will be closed if the closeReg gets closed, or Close is run. + isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore. + closeOnce sync.Once // closeOnce ensure that the client shutdown sequence is only executed once observer observer eventListener beat.EventListener @@ -137,8 +135,6 @@ func (c *client) Close() error { // first stop ack handling. ACK handler might block on wait (with timeout), waiting // for pending events to be ACKed. c.closeOnce.Do(func() { - close(c.done) - c.isOpen.Store(false) c.onClosing() diff --git a/libbeat/publisher/pipeline/client_test.go b/libbeat/publisher/pipeline/client_test.go index 4ed45d25628b..25080c90615e 100644 --- a/libbeat/publisher/pipeline/client_test.go +++ b/libbeat/publisher/pipeline/client_test.go @@ -18,7 +18,6 @@ package pipeline import ( - "context" "errors" "io" "sync" @@ -95,15 +94,7 @@ func TestClient(t *testing.T) { pipeline := makePipeline(t, Settings{}, makeTestQueue()) defer pipeline.Close() - var ctx context.Context - var cancel func() - if test.context { - ctx, cancel = context.WithCancel(context.Background()) - } - - client, err := pipeline.ConnectWith(beat.ClientConfig{ - CloseRef: ctx, - }) + client, err := pipeline.ConnectWith(beat.ClientConfig{}) if err != nil { t.Fatal(err) } @@ -116,7 +107,9 @@ func TestClient(t *testing.T) { client.Publish(beat.Event{}) }() - test.close(client, cancel) + test.close(client, func() { + client.Close() + }) wg.Wait() }) } diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index 37bf437395c2..85eeb0e64977 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -22,7 +22,6 @@ package pipeline import ( "fmt" - "reflect" "sync" "time" @@ -197,9 +196,6 @@ func (p *Pipeline) Close() error { p.outputController.Close() p.observer.cleanup() - if p.sigNewClient != nil { - close(p.sigNewClient) - } return nil } @@ -212,6 +208,8 @@ func (p *Pipeline) Connect() (beat.Client, error) { // The client behavior on close and ACK handling can be configured by setting // the appropriate fields in the passed ClientConfig. // If not set otherwise the defaut publish mode is OutputChooses. +// +// It is responsibility of the caller to close the client. func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { var ( canDrop bool @@ -239,8 +237,6 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { client := &client{ logger: p.monitors.Logger, - closeRef: cfg.CloseRef, - done: make(chan struct{}), isOpen: atomic.MakeBool(true), clientListener: cfg.ClientListener, processors: processors, @@ -295,93 +291,9 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { } p.observer.clientConnected() - - if client.closeRef != nil { - p.registerSignalPropagation(client) - } - return client, nil } -func (p *Pipeline) registerSignalPropagation(c *client) { - p.guardStartSigPropagation.Do(func() { - p.sigNewClient = make(chan *client, 1) - go p.runSignalPropagation() - }) - p.sigNewClient <- c -} - -func (p *Pipeline) runSignalPropagation() { - var channels []reflect.SelectCase - var clients []*client - - channels = append(channels, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(p.sigNewClient), - }) - - for { - chosen, recv, recvOK := reflect.Select(channels) - if chosen == 0 { - if !recvOK { - // sigNewClient was closed - return - } - - // new client -> register client for signal propagation. - if client := recv.Interface().(*client); client != nil { - channels = append(channels, - reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(client.closeRef.Done()), - }, - reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(client.done), - }, - ) - clients = append(clients, client) - } - continue - } - - // find client we received a signal for. If client.done was closed, then - // we have to remove the client only. But if closeRef did trigger the signal, then - // we have to propagate the async close to the client. - // In either case, the client will be removed - - i := (chosen - 1) / 2 - isSig := (chosen & 1) == 1 - if isSig { - client := clients[i] - client.Close() - } - - // remove: - last := len(clients) - 1 - ch1 := i*2 + 1 - ch2 := ch1 + 1 - lastCh1 := last*2 + 1 - lastCh2 := lastCh1 + 1 - - clients[i], clients[last] = clients[last], nil - channels[ch1], channels[lastCh1] = channels[lastCh1], reflect.SelectCase{} - channels[ch2], channels[lastCh2] = channels[lastCh2], reflect.SelectCase{} - - clients = clients[:last] - channels = channels[:lastCh1] - if cap(clients) > 10 && len(clients) <= cap(clients)/2 { - clientsTmp := make([]*client, len(clients)) - copy(clientsTmp, clients) - clients = clientsTmp - - channelsTmp := make([]reflect.SelectCase, len(channels)) - copy(channelsTmp, channels) - channels = channelsTmp - } - } -} - func (p *Pipeline) createEventProcessing(cfg beat.ProcessingConfig, noPublish bool) (beat.Processor, error) { if p.processors == nil { return nil, nil diff --git a/libbeat/publisher/pipeline/pipeline_test.go b/libbeat/publisher/pipeline/pipeline_test.go index 5a236acc9c0a..feb01c4fa6e0 100644 --- a/libbeat/publisher/pipeline/pipeline_test.go +++ b/libbeat/publisher/pipeline/pipeline_test.go @@ -18,12 +18,104 @@ package pipeline import ( + "runtime" "sync" + "testing" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/publisher/queue" + "github.com/elastic/beats/v7/libbeat/tests/resources" + "github.com/elastic/elastic-agent-libs/mapstr" ) +func TestPipelineAcceptsAnyNumberOfClients(t *testing.T) { + routinesChecker := resources.NewGoroutinesChecker() + defer routinesChecker.Check(t) + + pipeline := makePipeline(t, Settings{}, makeDiscardQueue()) + + defer pipeline.Close() + + n := 66000 + clients := []beat.Client{} + for i := 0; i < n; i++ { + c, err := pipeline.ConnectWith(beat.ClientConfig{}) + if err != nil { + t.Fatalf("Could not connect to pipeline: %s", err) + } + clients = append(clients, c) + } + + for i, c := range clients { + c.Publish(beat.Event{ + Fields: mapstr.M{ + "count": i, + }, + }) + } + + // Close the first 105 clients + nn := 105 + clientsToClose := clients[:n] + clients = clients[nn:] + + for _, c := range clientsToClose { + c.Close() + } + + // Let other goroutines run + runtime.Gosched() + runtime.Gosched() + + // Make sure all clients are closed + for _, c := range clients { + c.Close() + } +} + +// makeDiscardQueue returns a queue that always discards all events +// the producers are assigned an unique incremental ID, when their +// close method is called, this ID is returned +func makeDiscardQueue() queue.Queue { + var wg sync.WaitGroup + producerID := atomic.NewInt(0) + + return &testQueue{ + close: func() error { + // Wait for all producers to finish + wg.Wait() + return nil + }, + get: func(count int) (queue.Batch, error) { + return nil, nil + }, + + producer: func(cfg queue.ProducerConfig) queue.Producer { + producerID.Inc() + id := producerID.Load() + + // count is a counter that increments on every published event + // it's also the returned Event ID + count := uint64(0) + producer := &testProducer{ + publish: func(try bool, event queue.Entry) (queue.EntryID, bool) { + count++ + return queue.EntryID(count), true + }, + cancel: func() int { + + wg.Done() + return id + }, + } + + wg.Add(1) + return producer + }, + } +} + type testQueue struct { close func() error bufferConfig func() queue.BufferConfig diff --git a/libbeat/publisher/testing/testing.go b/libbeat/publisher/testing/testing.go index 0c64e4601d5e..09c1fdb6b116 100644 --- a/libbeat/publisher/testing/testing.go +++ b/libbeat/publisher/testing/testing.go @@ -19,6 +19,8 @@ package testing // ChanClient implements Client interface, forwarding published events to some import ( + "sync" + "github.com/elastic/beats/v7/libbeat/beat" ) @@ -31,6 +33,7 @@ type ChanClient struct { done chan struct{} Channel chan beat.Event publishCallback func(event beat.Event) + closeOnce sync.Once } func PublisherWithClient(client beat.Client) beat.Pipeline { @@ -68,7 +71,9 @@ func NewChanClientWith(ch chan beat.Event) *ChanClient { } func (c *ChanClient) Close() error { - close(c.done) + c.closeOnce.Do(func() { + close(c.done) + }) return nil } diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index f9d69fe1184f..4ee9daa05ad0 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -112,7 +112,6 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) // Create client for publishing events and receive notification of their ACKs. client, err := pipeline.ConnectWith(beat.ClientConfig{ - CloseRef: inputContext.Cancelation, EventListener: awscommon.NewEventACKHandler(), }) if err != nil { diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 5fc1c1f0491c..855403e5dc46 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -156,7 +156,6 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { if in.config.BucketARN != "" || in.config.NonAWSBucketName != "" { // Create client for publishing events and receive notification of their ACKs. client, err := pipeline.ConnectWith(beat.ClientConfig{ - CloseRef: inputContext.Cancelation, EventListener: awscommon.NewEventACKHandler(), Processing: beat.ProcessingConfig{ // This input only produces events with basic types so normalization diff --git a/x-pack/filebeat/input/entityanalytics/internal/kvstore/input.go b/x-pack/filebeat/input/entityanalytics/internal/kvstore/input.go index b786acf29c79..8b9828980dbb 100644 --- a/x-pack/filebeat/input/entityanalytics/internal/kvstore/input.go +++ b/x-pack/filebeat/input/entityanalytics/internal/kvstore/input.go @@ -65,9 +65,12 @@ func (n *input) Run(runCtx v2.Context, connector beat.PipelineConnector) (err er }() client, err := connector.ConnectWith(beat.ClientConfig{ - CloseRef: runCtx.Cancelation, EventListener: NewTxACKHandler(), }) + if err != nil { + return fmt.Errorf("could not connect to publishing pipeline: %w", err) + } + defer client.Close() dataDir := paths.Resolve(paths.Data, "kvstore") if err = os.MkdirAll(dataDir, 0700); err != nil { diff --git a/x-pack/filebeat/input/lumberjack/input.go b/x-pack/filebeat/input/lumberjack/input.go index caa966a3814c..d42ee406562d 100644 --- a/x-pack/filebeat/input/lumberjack/input.go +++ b/x-pack/filebeat/input/lumberjack/input.go @@ -62,7 +62,6 @@ func (i *lumberjackInput) Run(inputCtx inputv2.Context, pipeline beat.Pipeline) // Create client for publishing events and receive notification of their ACKs. client, err := pipeline.ConnectWith(beat.ClientConfig{ - CloseRef: inputCtx.Cancelation, EventListener: newEventACKHandler(), }) if err != nil { diff --git a/x-pack/filebeat/input/netflow/input.go b/x-pack/filebeat/input/netflow/input.go index 52c6f8a76849..6d479bb243f2 100644 --- a/x-pack/filebeat/input/netflow/input.go +++ b/x-pack/filebeat/input/netflow/input.go @@ -118,7 +118,6 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err Processing: beat.ProcessingConfig{ EventNormalization: boolPtr(true), }, - CloseRef: ctx.Cancelation, EventListener: nil, }) if err != nil { @@ -126,6 +125,7 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err n.stop() return err } + defer client.Close() const pollInterval = time.Minute udpMetrics := netmetrics.NewUDP("netflow", ctx.ID, n.cfg.Host, uint64(n.cfg.ReadBuffer), pollInterval, n.logger) diff --git a/x-pack/filebeat/input/shipper/input.go b/x-pack/filebeat/input/shipper/input.go index 5cece851d9c2..f2472ccf7f27 100644 --- a/x-pack/filebeat/input/shipper/input.go +++ b/x-pack/filebeat/input/shipper/input.go @@ -173,12 +173,11 @@ func (in *shipperInput) Run(inputContext v2.Context, pipeline beat.Pipeline) err DisableHost: true, DisableType: true, }, - - CloseRef: inputContext.Cancelation, }) if err != nil { return fmt.Errorf("error creating client for stream %s: %w", streamID, err) } + defer client.Close() in.log.Infof("Creating beat client for stream %s", streamID) newStreamData := streamData{client: client, index: in.streams[streamID].index, processors: in.streams[streamID].processors}