diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 8b0b0ab724f0..8875b834e66a 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -66,6 +66,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Rename `queue.Batch.ACK()` to `queue.Batch.Done()`. {pull}31903[31903] - `queue.ACKListener` has been removed. Queue configurations now accept an explicit callback function for ACK handling. {pull}35078[35078] - Split split httpmon out of x-pack/filebeat/input/internal/httplog. {pull}36385[36385] +- Beats publishing pipeline does not propagate the close signal to its clients any more. It's responsibility of the user to close the pipeline client. {issue}38197[38197] {pull}38556[38556] ==== Bugfixes @@ -93,6 +94,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Make winlogbeat/sys/wineventlog follow the unsafe.Pointer rules. {pull}36650[36650] - Cleaned up documentation errors & fixed a minor bug in Filebeat Azure blob storage input. {pull}36714[36714] - Fix copy arguments for strict aligned architectures. {pull}36976[36976] +- Fix panic when more than 32767 pipeline clients are active. {issue}38197[38197] {pull}38556[38556] ==== Added diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 0c9b3d89cb20..1b61e70423c6 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -101,6 +101,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix filestream's registry GC: registry entries are now removed from the in-memory and disk store when they're older than the set TTL {issue}36761[36761] {pull}38488[38488] - Fix indexing failures by re-enabling event normalisation in netflow input. {issue}38703[38703] {pull}38780[38780] - Fix handling of truncated files in Filestream {issue}38070[38070] {pull}38416[38416] +- Fix panic when more than 32767 pipeline clients are active. {issue}38197[38197] {pull}38556[38556] *Heartbeat* diff --git a/filebeat/input/filestream/internal/input-logfile/harvester.go b/filebeat/input/filestream/internal/input-logfile/harvester.go index b3f54e655947..41cfc83857f3 100644 --- a/filebeat/input/filestream/internal/input-logfile/harvester.go +++ b/filebeat/input/filestream/internal/input-logfile/harvester.go @@ -219,7 +219,6 @@ func startHarvester( defer releaseResource(resource) client, err := hg.pipeline.ConnectWith(beat.ClientConfig{ - CloseRef: ctx.Cancelation, EventListener: newInputACKHandler(hg.ackCH), }) if err != nil { diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 83114f2c630e..e2a04b5fa499 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -120,12 +120,12 @@ func (input *kafkaInput) Run(ctx input.Context, pipeline beat.Pipeline) error { } }), ), - CloseRef: ctx.Cancelation, WaitClose: input.config.WaitClose, }) if err != nil { return err } + defer client.Close() log.Info("Starting Kafka input") defer log.Info("Kafka input stopped") diff --git a/filebeat/input/v2/input-cursor/input.go b/filebeat/input/v2/input-cursor/input.go index 88e28dde2fba..37036e983c63 100644 --- a/filebeat/input/v2/input-cursor/input.go +++ b/filebeat/input/v2/input-cursor/input.go @@ -146,7 +146,6 @@ func (inp *managedInput) runSource( }() client, err := pipeline.ConnectWith(beat.ClientConfig{ - CloseRef: ctx.Cancelation, EventListener: newInputACKHandler(ctx.Logger), }) if err != nil { diff --git a/filebeat/input/v2/input-stateless/stateless.go b/filebeat/input/v2/input-stateless/stateless.go index c9d51143de38..4bc6c79c243e 100644 --- a/filebeat/input/v2/input-stateless/stateless.go +++ b/filebeat/input/v2/input-stateless/stateless.go @@ -85,9 +85,6 @@ func (si configuredInput) Run(ctx v2.Context, pipeline beat.PipelineConnector) ( client, err := pipeline.ConnectWith(beat.ClientConfig{ PublishMode: beat.DefaultGuarantees, - - // configure pipeline to disconnect input on stop signal. - CloseRef: ctx.Cancelation, }) if err != nil { return err diff --git a/filebeat/input/v2/input-stateless/stateless_test.go b/filebeat/input/v2/input-stateless/stateless_test.go index 13627338c69f..2febcb7e1b6d 100644 --- a/filebeat/input/v2/input-stateless/stateless_test.go +++ b/filebeat/input/v2/input-stateless/stateless_test.go @@ -107,6 +107,9 @@ func TestStateless_Run(t *testing.T) { }, }), nil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // connector creates a client the blocks forever until the shutdown signal is received var publishCalls atomic.Int connector := pubtest.FakeConnector{ @@ -114,15 +117,13 @@ func TestStateless_Run(t *testing.T) { return &pubtest.FakeClient{ PublishFunc: func(event beat.Event) { publishCalls.Inc() - <-config.CloseRef.Done() + // Unlock Publish once the input has been cancelled + <-ctx.Done() }, }, nil }, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - var wg sync.WaitGroup var err error wg.Add(1) diff --git a/libbeat/beat/pipeline.go b/libbeat/beat/pipeline.go index 8e8b285042c4..0917001a86c2 100644 --- a/libbeat/beat/pipeline.go +++ b/libbeat/beat/pipeline.go @@ -49,8 +49,6 @@ type ClientConfig struct { Processing ProcessingConfig - CloseRef CloseRef - // WaitClose sets the maximum duration to wait on ACK, if client still has events // active non-acknowledged events in the publisher pipeline. // WaitClose is only effective if one of ACKCount, ACKEvents and ACKLastEvents @@ -91,13 +89,6 @@ type EventListener interface { ClientClosed() } -// CloseRef allows users to close the client asynchronously. -// A CloseRef implements a subset of function required for context.Context. -type CloseRef interface { - Done() <-chan struct{} - Err() error -} - // ProcessingConfig provides additional event processing settings a client can // pass to the publisher pipeline on Connect. type ProcessingConfig struct { diff --git a/libbeat/publisher/pipeline/client.go b/libbeat/publisher/pipeline/client.go index c566a07942fd..a5c02faace6d 100644 --- a/libbeat/publisher/pipeline/client.go +++ b/libbeat/publisher/pipeline/client.go @@ -42,10 +42,8 @@ type client struct { eventWaitGroup *sync.WaitGroup // Open state, signaling, and sync primitives for coordinating client Close. - isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore. - closeOnce sync.Once // closeOnce ensure that the client shutdown sequence is only executed once - closeRef beat.CloseRef // extern closeRef for sending a signal that the client should be closed. - done chan struct{} // the done channel will be closed if the closeReg gets closed, or Close is run. + isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore. + closeOnce sync.Once // closeOnce ensure that the client shutdown sequence is only executed once observer observer eventListener beat.EventListener @@ -137,8 +135,6 @@ func (c *client) Close() error { // first stop ack handling. ACK handler might block on wait (with timeout), waiting // for pending events to be ACKed. c.closeOnce.Do(func() { - close(c.done) - c.isOpen.Store(false) c.onClosing() diff --git a/libbeat/publisher/pipeline/client_test.go b/libbeat/publisher/pipeline/client_test.go index 4ed45d25628b..25080c90615e 100644 --- a/libbeat/publisher/pipeline/client_test.go +++ b/libbeat/publisher/pipeline/client_test.go @@ -18,7 +18,6 @@ package pipeline import ( - "context" "errors" "io" "sync" @@ -95,15 +94,7 @@ func TestClient(t *testing.T) { pipeline := makePipeline(t, Settings{}, makeTestQueue()) defer pipeline.Close() - var ctx context.Context - var cancel func() - if test.context { - ctx, cancel = context.WithCancel(context.Background()) - } - - client, err := pipeline.ConnectWith(beat.ClientConfig{ - CloseRef: ctx, - }) + client, err := pipeline.ConnectWith(beat.ClientConfig{}) if err != nil { t.Fatal(err) } @@ -116,7 +107,9 @@ func TestClient(t *testing.T) { client.Publish(beat.Event{}) }() - test.close(client, cancel) + test.close(client, func() { + client.Close() + }) wg.Wait() }) } diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index 37bf437395c2..85eeb0e64977 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -22,7 +22,6 @@ package pipeline import ( "fmt" - "reflect" "sync" "time" @@ -197,9 +196,6 @@ func (p *Pipeline) Close() error { p.outputController.Close() p.observer.cleanup() - if p.sigNewClient != nil { - close(p.sigNewClient) - } return nil } @@ -212,6 +208,8 @@ func (p *Pipeline) Connect() (beat.Client, error) { // The client behavior on close and ACK handling can be configured by setting // the appropriate fields in the passed ClientConfig. // If not set otherwise the defaut publish mode is OutputChooses. +// +// It is responsibility of the caller to close the client. func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { var ( canDrop bool @@ -239,8 +237,6 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { client := &client{ logger: p.monitors.Logger, - closeRef: cfg.CloseRef, - done: make(chan struct{}), isOpen: atomic.MakeBool(true), clientListener: cfg.ClientListener, processors: processors, @@ -295,93 +291,9 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { } p.observer.clientConnected() - - if client.closeRef != nil { - p.registerSignalPropagation(client) - } - return client, nil } -func (p *Pipeline) registerSignalPropagation(c *client) { - p.guardStartSigPropagation.Do(func() { - p.sigNewClient = make(chan *client, 1) - go p.runSignalPropagation() - }) - p.sigNewClient <- c -} - -func (p *Pipeline) runSignalPropagation() { - var channels []reflect.SelectCase - var clients []*client - - channels = append(channels, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(p.sigNewClient), - }) - - for { - chosen, recv, recvOK := reflect.Select(channels) - if chosen == 0 { - if !recvOK { - // sigNewClient was closed - return - } - - // new client -> register client for signal propagation. - if client := recv.Interface().(*client); client != nil { - channels = append(channels, - reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(client.closeRef.Done()), - }, - reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(client.done), - }, - ) - clients = append(clients, client) - } - continue - } - - // find client we received a signal for. If client.done was closed, then - // we have to remove the client only. But if closeRef did trigger the signal, then - // we have to propagate the async close to the client. - // In either case, the client will be removed - - i := (chosen - 1) / 2 - isSig := (chosen & 1) == 1 - if isSig { - client := clients[i] - client.Close() - } - - // remove: - last := len(clients) - 1 - ch1 := i*2 + 1 - ch2 := ch1 + 1 - lastCh1 := last*2 + 1 - lastCh2 := lastCh1 + 1 - - clients[i], clients[last] = clients[last], nil - channels[ch1], channels[lastCh1] = channels[lastCh1], reflect.SelectCase{} - channels[ch2], channels[lastCh2] = channels[lastCh2], reflect.SelectCase{} - - clients = clients[:last] - channels = channels[:lastCh1] - if cap(clients) > 10 && len(clients) <= cap(clients)/2 { - clientsTmp := make([]*client, len(clients)) - copy(clientsTmp, clients) - clients = clientsTmp - - channelsTmp := make([]reflect.SelectCase, len(channels)) - copy(channelsTmp, channels) - channels = channelsTmp - } - } -} - func (p *Pipeline) createEventProcessing(cfg beat.ProcessingConfig, noPublish bool) (beat.Processor, error) { if p.processors == nil { return nil, nil diff --git a/libbeat/publisher/pipeline/pipeline_test.go b/libbeat/publisher/pipeline/pipeline_test.go index 5a236acc9c0a..feb01c4fa6e0 100644 --- a/libbeat/publisher/pipeline/pipeline_test.go +++ b/libbeat/publisher/pipeline/pipeline_test.go @@ -18,12 +18,104 @@ package pipeline import ( + "runtime" "sync" + "testing" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/publisher/queue" + "github.com/elastic/beats/v7/libbeat/tests/resources" + "github.com/elastic/elastic-agent-libs/mapstr" ) +func TestPipelineAcceptsAnyNumberOfClients(t *testing.T) { + routinesChecker := resources.NewGoroutinesChecker() + defer routinesChecker.Check(t) + + pipeline := makePipeline(t, Settings{}, makeDiscardQueue()) + + defer pipeline.Close() + + n := 66000 + clients := []beat.Client{} + for i := 0; i < n; i++ { + c, err := pipeline.ConnectWith(beat.ClientConfig{}) + if err != nil { + t.Fatalf("Could not connect to pipeline: %s", err) + } + clients = append(clients, c) + } + + for i, c := range clients { + c.Publish(beat.Event{ + Fields: mapstr.M{ + "count": i, + }, + }) + } + + // Close the first 105 clients + nn := 105 + clientsToClose := clients[:n] + clients = clients[nn:] + + for _, c := range clientsToClose { + c.Close() + } + + // Let other goroutines run + runtime.Gosched() + runtime.Gosched() + + // Make sure all clients are closed + for _, c := range clients { + c.Close() + } +} + +// makeDiscardQueue returns a queue that always discards all events +// the producers are assigned an unique incremental ID, when their +// close method is called, this ID is returned +func makeDiscardQueue() queue.Queue { + var wg sync.WaitGroup + producerID := atomic.NewInt(0) + + return &testQueue{ + close: func() error { + // Wait for all producers to finish + wg.Wait() + return nil + }, + get: func(count int) (queue.Batch, error) { + return nil, nil + }, + + producer: func(cfg queue.ProducerConfig) queue.Producer { + producerID.Inc() + id := producerID.Load() + + // count is a counter that increments on every published event + // it's also the returned Event ID + count := uint64(0) + producer := &testProducer{ + publish: func(try bool, event queue.Entry) (queue.EntryID, bool) { + count++ + return queue.EntryID(count), true + }, + cancel: func() int { + + wg.Done() + return id + }, + } + + wg.Add(1) + return producer + }, + } +} + type testQueue struct { close func() error bufferConfig func() queue.BufferConfig diff --git a/libbeat/publisher/testing/testing.go b/libbeat/publisher/testing/testing.go index 0c64e4601d5e..09c1fdb6b116 100644 --- a/libbeat/publisher/testing/testing.go +++ b/libbeat/publisher/testing/testing.go @@ -19,6 +19,8 @@ package testing // ChanClient implements Client interface, forwarding published events to some import ( + "sync" + "github.com/elastic/beats/v7/libbeat/beat" ) @@ -31,6 +33,7 @@ type ChanClient struct { done chan struct{} Channel chan beat.Event publishCallback func(event beat.Event) + closeOnce sync.Once } func PublisherWithClient(client beat.Client) beat.Pipeline { @@ -68,7 +71,9 @@ func NewChanClientWith(ch chan beat.Event) *ChanClient { } func (c *ChanClient) Close() error { - close(c.done) + c.closeOnce.Do(func() { + close(c.done) + }) return nil } diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index f9d69fe1184f..4ee9daa05ad0 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -112,7 +112,6 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) // Create client for publishing events and receive notification of their ACKs. client, err := pipeline.ConnectWith(beat.ClientConfig{ - CloseRef: inputContext.Cancelation, EventListener: awscommon.NewEventACKHandler(), }) if err != nil { diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 5fc1c1f0491c..855403e5dc46 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -156,7 +156,6 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { if in.config.BucketARN != "" || in.config.NonAWSBucketName != "" { // Create client for publishing events and receive notification of their ACKs. client, err := pipeline.ConnectWith(beat.ClientConfig{ - CloseRef: inputContext.Cancelation, EventListener: awscommon.NewEventACKHandler(), Processing: beat.ProcessingConfig{ // This input only produces events with basic types so normalization diff --git a/x-pack/filebeat/input/entityanalytics/internal/kvstore/input.go b/x-pack/filebeat/input/entityanalytics/internal/kvstore/input.go index b786acf29c79..8b9828980dbb 100644 --- a/x-pack/filebeat/input/entityanalytics/internal/kvstore/input.go +++ b/x-pack/filebeat/input/entityanalytics/internal/kvstore/input.go @@ -65,9 +65,12 @@ func (n *input) Run(runCtx v2.Context, connector beat.PipelineConnector) (err er }() client, err := connector.ConnectWith(beat.ClientConfig{ - CloseRef: runCtx.Cancelation, EventListener: NewTxACKHandler(), }) + if err != nil { + return fmt.Errorf("could not connect to publishing pipeline: %w", err) + } + defer client.Close() dataDir := paths.Resolve(paths.Data, "kvstore") if err = os.MkdirAll(dataDir, 0700); err != nil { diff --git a/x-pack/filebeat/input/lumberjack/input.go b/x-pack/filebeat/input/lumberjack/input.go index caa966a3814c..d42ee406562d 100644 --- a/x-pack/filebeat/input/lumberjack/input.go +++ b/x-pack/filebeat/input/lumberjack/input.go @@ -62,7 +62,6 @@ func (i *lumberjackInput) Run(inputCtx inputv2.Context, pipeline beat.Pipeline) // Create client for publishing events and receive notification of their ACKs. client, err := pipeline.ConnectWith(beat.ClientConfig{ - CloseRef: inputCtx.Cancelation, EventListener: newEventACKHandler(), }) if err != nil { diff --git a/x-pack/filebeat/input/netflow/input.go b/x-pack/filebeat/input/netflow/input.go index 52c6f8a76849..6d479bb243f2 100644 --- a/x-pack/filebeat/input/netflow/input.go +++ b/x-pack/filebeat/input/netflow/input.go @@ -118,7 +118,6 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err Processing: beat.ProcessingConfig{ EventNormalization: boolPtr(true), }, - CloseRef: ctx.Cancelation, EventListener: nil, }) if err != nil { @@ -126,6 +125,7 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err n.stop() return err } + defer client.Close() const pollInterval = time.Minute udpMetrics := netmetrics.NewUDP("netflow", ctx.ID, n.cfg.Host, uint64(n.cfg.ReadBuffer), pollInterval, n.logger) diff --git a/x-pack/filebeat/input/shipper/input.go b/x-pack/filebeat/input/shipper/input.go index 5cece851d9c2..f2472ccf7f27 100644 --- a/x-pack/filebeat/input/shipper/input.go +++ b/x-pack/filebeat/input/shipper/input.go @@ -173,12 +173,11 @@ func (in *shipperInput) Run(inputContext v2.Context, pipeline beat.Pipeline) err DisableHost: true, DisableType: true, }, - - CloseRef: inputContext.Cancelation, }) if err != nil { return fmt.Errorf("error creating client for stream %s: %w", streamID, err) } + defer client.Close() in.log.Infof("Creating beat client for stream %s", streamID) newStreamData := streamData{client: client, index: in.streams[streamID].index, processors: in.streams[streamID].processors}