Skip to content

Commit

Permalink
Fix panic when more than 32767 pipeline clients are active (#38556)
Browse files Browse the repository at this point in the history
The publishing pipeline would panic when more than 32767 clients were
active, that happened because each client would add two channels to a
slice and an infinity loop would use reflect.Select on this
list. reflect.Select supports a maximum of 65536 cases, if there are
more it panics.

This PR fixes this by removing the need for this list. The pipeline
used an infinity loop to detect if the CloseRef from the pipeline
client was closed, if it happened, then it would call client.Close. By
analysing the code setting CloseRef we identified there was no need
for the pipeline to be responsible for propagating this signal, most
of the times the pipeline client was created, there was already a
defer to close it, in the few places where it was not present we added
one.

Now the pipeline is not responsible for closing any client, whomever
creates a client is responsible for correctly closing it.
  • Loading branch information
belimawr authored Apr 12, 2024
1 parent 84d0eec commit e993e09
Show file tree
Hide file tree
Showing 19 changed files with 121 additions and 134 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Rename `queue.Batch.ACK()` to `queue.Batch.Done()`. {pull}31903[31903]
- `queue.ACKListener` has been removed. Queue configurations now accept an explicit callback function for ACK handling. {pull}35078[35078]
- Split split httpmon out of x-pack/filebeat/input/internal/httplog. {pull}36385[36385]
- Beats publishing pipeline does not propagate the close signal to its clients any more. It's responsibility of the user to close the pipeline client. {issue}38197[38197] {pull}38556[38556]

==== Bugfixes

Expand Down Expand Up @@ -93,6 +94,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Make winlogbeat/sys/wineventlog follow the unsafe.Pointer rules. {pull}36650[36650]
- Cleaned up documentation errors & fixed a minor bug in Filebeat Azure blob storage input. {pull}36714[36714]
- Fix copy arguments for strict aligned architectures. {pull}36976[36976]
- Fix panic when more than 32767 pipeline clients are active. {issue}38197[38197] {pull}38556[38556]

==== Added

Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix filestream's registry GC: registry entries are now removed from the in-memory and disk store when they're older than the set TTL {issue}36761[36761] {pull}38488[38488]
- Fix indexing failures by re-enabling event normalisation in netflow input. {issue}38703[38703] {pull}38780[38780]
- Fix handling of truncated files in Filestream {issue}38070[38070] {pull}38416[38416]
- Fix panic when more than 32767 pipeline clients are active. {issue}38197[38197] {pull}38556[38556]

*Heartbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ func startHarvester(
defer releaseResource(resource)

client, err := hg.pipeline.ConnectWith(beat.ClientConfig{
CloseRef: ctx.Cancelation,
EventListener: newInputACKHandler(hg.ackCH),
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/kafka/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,12 @@ func (input *kafkaInput) Run(ctx input.Context, pipeline beat.Pipeline) error {
}
}),
),
CloseRef: ctx.Cancelation,
WaitClose: input.config.WaitClose,
})
if err != nil {
return err
}
defer client.Close()

log.Info("Starting Kafka input")
defer log.Info("Kafka input stopped")
Expand Down
1 change: 0 additions & 1 deletion filebeat/input/v2/input-cursor/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ func (inp *managedInput) runSource(
}()

client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: ctx.Cancelation,
EventListener: newInputACKHandler(ctx.Logger),
})
if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions filebeat/input/v2/input-stateless/stateless.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ func (si configuredInput) Run(ctx v2.Context, pipeline beat.PipelineConnector) (

client, err := pipeline.ConnectWith(beat.ClientConfig{
PublishMode: beat.DefaultGuarantees,

// configure pipeline to disconnect input on stop signal.
CloseRef: ctx.Cancelation,
})
if err != nil {
return err
Expand Down
9 changes: 5 additions & 4 deletions filebeat/input/v2/input-stateless/stateless_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,22 +107,23 @@ func TestStateless_Run(t *testing.T) {
},
}), nil)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// connector creates a client the blocks forever until the shutdown signal is received
var publishCalls atomic.Int
connector := pubtest.FakeConnector{
ConnectFunc: func(config beat.ClientConfig) (beat.Client, error) {
return &pubtest.FakeClient{
PublishFunc: func(event beat.Event) {
publishCalls.Inc()
<-config.CloseRef.Done()
// Unlock Publish once the input has been cancelled
<-ctx.Done()
},
}, nil
},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var wg sync.WaitGroup
var err error
wg.Add(1)
Expand Down
9 changes: 0 additions & 9 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ type ClientConfig struct {

Processing ProcessingConfig

CloseRef CloseRef

// WaitClose sets the maximum duration to wait on ACK, if client still has events
// active non-acknowledged events in the publisher pipeline.
// WaitClose is only effective if one of ACKCount, ACKEvents and ACKLastEvents
Expand Down Expand Up @@ -91,13 +89,6 @@ type EventListener interface {
ClientClosed()
}

// CloseRef allows users to close the client asynchronously.
// A CloseRef implements a subset of function required for context.Context.
type CloseRef interface {
Done() <-chan struct{}
Err() error
}

// ProcessingConfig provides additional event processing settings a client can
// pass to the publisher pipeline on Connect.
type ProcessingConfig struct {
Expand Down
8 changes: 2 additions & 6 deletions libbeat/publisher/pipeline/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,8 @@ type client struct {
eventWaitGroup *sync.WaitGroup

// Open state, signaling, and sync primitives for coordinating client Close.
isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore.
closeOnce sync.Once // closeOnce ensure that the client shutdown sequence is only executed once
closeRef beat.CloseRef // extern closeRef for sending a signal that the client should be closed.
done chan struct{} // the done channel will be closed if the closeReg gets closed, or Close is run.
isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore.
closeOnce sync.Once // closeOnce ensure that the client shutdown sequence is only executed once

observer observer
eventListener beat.EventListener
Expand Down Expand Up @@ -137,8 +135,6 @@ func (c *client) Close() error {
// first stop ack handling. ACK handler might block on wait (with timeout), waiting
// for pending events to be ACKed.
c.closeOnce.Do(func() {
close(c.done)

c.isOpen.Store(false)
c.onClosing()

Expand Down
15 changes: 4 additions & 11 deletions libbeat/publisher/pipeline/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package pipeline

import (
"context"
"errors"
"io"
"sync"
Expand Down Expand Up @@ -95,15 +94,7 @@ func TestClient(t *testing.T) {
pipeline := makePipeline(t, Settings{}, makeTestQueue())
defer pipeline.Close()

var ctx context.Context
var cancel func()
if test.context {
ctx, cancel = context.WithCancel(context.Background())
}

client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: ctx,
})
client, err := pipeline.ConnectWith(beat.ClientConfig{})
if err != nil {
t.Fatal(err)
}
Expand All @@ -116,7 +107,9 @@ func TestClient(t *testing.T) {
client.Publish(beat.Event{})
}()

test.close(client, cancel)
test.close(client, func() {
client.Close()
})
wg.Wait()
})
}
Expand Down
92 changes: 2 additions & 90 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package pipeline

import (
"fmt"
"reflect"
"sync"
"time"

Expand Down Expand Up @@ -197,9 +196,6 @@ func (p *Pipeline) Close() error {
p.outputController.Close()

p.observer.cleanup()
if p.sigNewClient != nil {
close(p.sigNewClient)
}
return nil
}

Expand All @@ -212,6 +208,8 @@ func (p *Pipeline) Connect() (beat.Client, error) {
// The client behavior on close and ACK handling can be configured by setting
// the appropriate fields in the passed ClientConfig.
// If not set otherwise the defaut publish mode is OutputChooses.
//
// It is responsibility of the caller to close the client.
func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
var (
canDrop bool
Expand Down Expand Up @@ -239,8 +237,6 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {

client := &client{
logger: p.monitors.Logger,
closeRef: cfg.CloseRef,
done: make(chan struct{}),
isOpen: atomic.MakeBool(true),
clientListener: cfg.ClientListener,
processors: processors,
Expand Down Expand Up @@ -295,93 +291,9 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
}

p.observer.clientConnected()

if client.closeRef != nil {
p.registerSignalPropagation(client)
}

return client, nil
}

func (p *Pipeline) registerSignalPropagation(c *client) {
p.guardStartSigPropagation.Do(func() {
p.sigNewClient = make(chan *client, 1)
go p.runSignalPropagation()
})
p.sigNewClient <- c
}

func (p *Pipeline) runSignalPropagation() {
var channels []reflect.SelectCase
var clients []*client

channels = append(channels, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(p.sigNewClient),
})

for {
chosen, recv, recvOK := reflect.Select(channels)
if chosen == 0 {
if !recvOK {
// sigNewClient was closed
return
}

// new client -> register client for signal propagation.
if client := recv.Interface().(*client); client != nil {
channels = append(channels,
reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(client.closeRef.Done()),
},
reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(client.done),
},
)
clients = append(clients, client)
}
continue
}

// find client we received a signal for. If client.done was closed, then
// we have to remove the client only. But if closeRef did trigger the signal, then
// we have to propagate the async close to the client.
// In either case, the client will be removed

i := (chosen - 1) / 2
isSig := (chosen & 1) == 1
if isSig {
client := clients[i]
client.Close()
}

// remove:
last := len(clients) - 1
ch1 := i*2 + 1
ch2 := ch1 + 1
lastCh1 := last*2 + 1
lastCh2 := lastCh1 + 1

clients[i], clients[last] = clients[last], nil
channels[ch1], channels[lastCh1] = channels[lastCh1], reflect.SelectCase{}
channels[ch2], channels[lastCh2] = channels[lastCh2], reflect.SelectCase{}

clients = clients[:last]
channels = channels[:lastCh1]
if cap(clients) > 10 && len(clients) <= cap(clients)/2 {
clientsTmp := make([]*client, len(clients))
copy(clientsTmp, clients)
clients = clientsTmp

channelsTmp := make([]reflect.SelectCase, len(channels))
copy(channelsTmp, channels)
channels = channelsTmp
}
}
}

func (p *Pipeline) createEventProcessing(cfg beat.ProcessingConfig, noPublish bool) (beat.Processor, error) {
if p.processors == nil {
return nil, nil
Expand Down
Loading

0 comments on commit e993e09

Please sign in to comment.