Skip to content

Commit

Permalink
[WIP] Fix panic when more than 32767 clients are active
Browse files Browse the repository at this point in the history
  • Loading branch information
belimawr committed Mar 22, 2024
1 parent 62235ef commit bd92336
Show file tree
Hide file tree
Showing 12 changed files with 26 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func startHarvester(
defer releaseResource(resource)

client, err := hg.pipeline.ConnectWith(beat.ClientConfig{
CloseRef: ctx.Cancelation,
// CloseRef: ctx.Cancelation,
EventListener: newInputACKHandler(hg.ackCH),
})
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion filebeat/input/kafka/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,13 @@ func (input *kafkaInput) Run(ctx input.Context, pipeline beat.Pipeline) error {
}
}),
),
CloseRef: ctx.Cancelation,
// CloseRef: ctx.Cancelation,
WaitClose: input.config.WaitClose,
})
if err != nil {
return err
}
defer client.Close()

log.Info("Starting Kafka input")
defer log.Info("Kafka input stopped")
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/v2/input-cursor/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (inp *managedInput) runSource(
}()

client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: ctx.Cancelation,
// CloseRef: ctx.Cancelation,
EventListener: newInputACKHandler(ctx.Logger),
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/v2/input-stateless/stateless.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (si configuredInput) Run(ctx v2.Context, pipeline beat.PipelineConnector) (
PublishMode: beat.DefaultGuarantees,

// configure pipeline to disconnect input on stop signal.
CloseRef: ctx.Cancelation,
// CloseRef: ctx.Cancelation,
})
if err != nil {
return err
Expand Down
6 changes: 4 additions & 2 deletions libbeat/publisher/pipeline/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type client struct {
isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore.
closeOnce sync.Once // closeOnce ensure that the client shutdown sequence is only executed once
closeRef beat.CloseRef // extern closeRef for sending a signal that the client should be closed.
done chan struct{} // the done channel will be closed if the closeReg gets closed, or Close is run.
// done chan struct{} // the done channel will be closed if the closeReg gets closed, or Close is run.

observer observer
eventListener beat.EventListener
Expand Down Expand Up @@ -137,7 +137,9 @@ func (c *client) Close() error {
// first stop ack handling. ACK handler might block on wait (with timeout), waiting
// for pending events to be ACKed.
c.closeOnce.Do(func() {
close(c.done)
// This is not needed any more as the pipeline does not
// keep any list of clients
// close(c.done)

c.isOpen.Store(false)
c.onClosing()
Expand Down
96 changes: 5 additions & 91 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package pipeline

import (
"fmt"
"reflect"
"sync"
"time"

Expand Down Expand Up @@ -197,9 +196,6 @@ func (p *Pipeline) Close() error {
p.outputController.Close()

p.observer.cleanup()
if p.sigNewClient != nil {
close(p.sigNewClient)
}
return nil
}

Expand All @@ -212,6 +208,8 @@ func (p *Pipeline) Connect() (beat.Client, error) {
// The client behavior on close and ACK handling can be configured by setting
// the appropriate fields in the passed ClientConfig.
// If not set otherwise the defaut publish mode is OutputChooses.
//
// It is responsibility of the caller to close the client.
func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
var (
canDrop bool
Expand All @@ -238,9 +236,9 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
}

client := &client{
logger: p.monitors.Logger,
closeRef: cfg.CloseRef,
done: make(chan struct{}),
debugID: clientDebugID.Inc(),

Check failure on line 239 in libbeat/publisher/pipeline/pipeline.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

unknown field debugID in struct literal of type client (typecheck)
logger: p.monitors.Logger,
// closeRef: cfg.CloseRef,
isOpen: atomic.MakeBool(true),
clientListener: cfg.ClientListener,
processors: processors,
Expand Down Expand Up @@ -295,93 +293,9 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
}

p.observer.clientConnected()

if client.closeRef != nil {
p.registerSignalPropagation(client)
}

return client, nil
}

func (p *Pipeline) registerSignalPropagation(c *client) {
p.guardStartSigPropagation.Do(func() {
p.sigNewClient = make(chan *client, 1)
go p.runSignalPropagation()
})
p.sigNewClient <- c
}

func (p *Pipeline) runSignalPropagation() {
var channels []reflect.SelectCase
var clients []*client

channels = append(channels, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(p.sigNewClient),
})

for {
chosen, recv, recvOK := reflect.Select(channels)
if chosen == 0 {
if !recvOK {
// sigNewClient was closed
return
}

// new client -> register client for signal propagation.
if client := recv.Interface().(*client); client != nil {
channels = append(channels,
reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(client.closeRef.Done()),
},
reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(client.done),
},
)
clients = append(clients, client)
}
continue
}

// find client we received a signal for. If client.done was closed, then
// we have to remove the client only. But if closeRef did trigger the signal, then
// we have to propagate the async close to the client.
// In either case, the client will be removed

i := (chosen - 1) / 2
isSig := (chosen & 1) == 1
if isSig {
client := clients[i]
client.Close()
}

// remove:
last := len(clients) - 1
ch1 := i*2 + 1
ch2 := ch1 + 1
lastCh1 := last*2 + 1
lastCh2 := lastCh1 + 1

clients[i], clients[last] = clients[last], nil
channels[ch1], channels[lastCh1] = channels[lastCh1], reflect.SelectCase{}
channels[ch2], channels[lastCh2] = channels[lastCh2], reflect.SelectCase{}

clients = clients[:last]
channels = channels[:lastCh1]
if cap(clients) > 10 && len(clients) <= cap(clients)/2 {
clientsTmp := make([]*client, len(clients))
copy(clientsTmp, clients)
clients = clientsTmp

channelsTmp := make([]reflect.SelectCase, len(channels))
copy(channelsTmp, channels)
channels = channelsTmp
}
}
}

func (p *Pipeline) createEventProcessing(cfg beat.ProcessingConfig, noPublish bool) (beat.Processor, error) {
if p.processors == nil {
return nil, nil
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awscloudwatch/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline)

// Create client for publishing events and receive notification of their ACKs.
client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: inputContext.Cancelation,
// CloseRef: inputContext.Cancelation,
EventListener: awscommon.NewEventACKHandler(),
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
if in.config.BucketARN != "" || in.config.NonAWSBucketName != "" {
// Create client for publishing events and receive notification of their ACKs.
client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: inputContext.Cancelation,
// CloseRef: inputContext.Cancelation,
EventListener: awscommon.NewEventACKHandler(),
Processing: beat.ProcessingConfig{
// This input only produces events with basic types so normalization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,13 @@ func (n *input) Run(runCtx v2.Context, connector beat.PipelineConnector) (err er
}()

client, err := connector.ConnectWith(beat.ClientConfig{
CloseRef: runCtx.Cancelation,
// CloseRef: runCtx.Cancelation,
EventListener: NewTxACKHandler(),
})
if err != nil {
return fmt.Errorf("could not connect to publishing pipeline: %s", err)
}
defer client.Close()

dataDir := paths.Resolve(paths.Data, "kvstore")
if err = os.MkdirAll(dataDir, 0700); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/lumberjack/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (i *lumberjackInput) Run(inputCtx inputv2.Context, pipeline beat.Pipeline)

// Create client for publishing events and receive notification of their ACKs.
client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: inputCtx.Cancelation,
// CloseRef: inputCtx.Cancelation,
EventListener: newEventACKHandler(),
})
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion x-pack/filebeat/input/netflow/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,15 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err
// is not required.
EventNormalization: boolPtr(false),
},
CloseRef: ctx.Cancelation,
// CloseRef: ctx.Cancelation,
EventListener: nil,
})
if err != nil {
n.logger.Errorw("Failed connecting to beat event publishing", "error", err)
n.stop()
return err
}
defer client.Close()

const pollInterval = time.Minute
udpMetrics := netmetrics.NewUDP("netflow", ctx.ID, n.cfg.Host, uint64(n.cfg.ReadBuffer), pollInterval, n.logger)
Expand Down
3 changes: 2 additions & 1 deletion x-pack/filebeat/input/shipper/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,12 @@ func (in *shipperInput) Run(inputContext v2.Context, pipeline beat.Pipeline) err
DisableType: true,
},

CloseRef: inputContext.Cancelation,
// CloseRef: inputContext.Cancelation,
})
if err != nil {
return fmt.Errorf("error creating client for stream %s: %w", streamID, err)
}
defer client.Close()
in.log.Infof("Creating beat client for stream %s", streamID)

newStreamData := streamData{client: client, index: in.streams[streamID].index, processors: in.streams[streamID].processors}
Expand Down

0 comments on commit bd92336

Please sign in to comment.