Skip to content

Commit

Permalink
create backoff for every batch worker
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryRomanov committed Dec 8, 2023
1 parent ea58228 commit f8ff00c
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 147 deletions.
38 changes: 25 additions & 13 deletions pipeline/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"sync"
"time"

"github.com/cenkalti/backoff/v3"
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/metric"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -133,21 +135,24 @@ type Batcher struct {
}

type (
BatcherOutFn func(*WorkerData, *Batch)
BatcherOutFn func(*WorkerData, *Batch, *backoff.BackOff)
BatcherMaintenanceFn func(*WorkerData)

BatcherOptions struct {
PipelineName string
OutputType string
OutFn BatcherOutFn
MaintenanceFn BatcherMaintenanceFn
Controller OutputPluginController
Workers int
BatchSizeCount int
BatchSizeBytes int
FlushTimeout time.Duration
MaintenanceInterval time.Duration
MetricCtl *metric.Ctl
PipelineName string
OutputType string
OutFn BatcherOutFn
MaintenanceFn BatcherMaintenanceFn
Controller OutputPluginController
Workers int
BatchSizeCount int
BatchSizeBytes int
FlushTimeout time.Duration
MaintenanceInterval time.Duration
MetricCtl *metric.Ctl
Retry int
RetryRetention time.Duration
RetryRetentionExponentMultiplier int
}
)

Expand Down Expand Up @@ -197,12 +202,19 @@ func (b *Batcher) work() {

t := time.Now()
data := WorkerData(nil)
workerBackoff := cfg.GetBackoff(
b.opts.RetryRetention,
float64(b.opts.RetryRetentionExponentMultiplier),
uint64(b.opts.Retry),
)

for batch := range b.fullBatches {
b.workersInProgress.Inc()

if batch.hasIterableEvents {
now := time.Now()
b.opts.OutFn(&data, batch)
workerBackoff.Reset()
b.opts.OutFn(&data, batch, &workerBackoff)
b.batchOutFnSeconds.Observe(time.Since(now).Seconds())
}

Expand Down
34 changes: 15 additions & 19 deletions plugin/output/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ type Plugin struct {
batcher *pipeline.Batcher
ctx context.Context
cancelFunc context.CancelFunc
backoff backoff.BackOff

query string

Expand Down Expand Up @@ -370,22 +369,19 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.instances = append(p.instances, pool)
}

p.backoff = cfg.GetBackoff(
p.config.Retention_,
float64(p.config.RetentionExponentMultiplier),
uint64(p.config.Retry),
)

p.batcher = pipeline.NewBatcher(pipeline.BatcherOptions{
PipelineName: params.PipelineName,
OutputType: outPluginType,
OutFn: p.out,
Controller: params.Controller,
Workers: p.config.WorkersCount_,
BatchSizeCount: p.config.BatchSize_,
BatchSizeBytes: p.config.BatchSizeBytes_,
FlushTimeout: p.config.BatchFlushTimeout_,
MetricCtl: params.MetricCtl,
PipelineName: params.PipelineName,
OutputType: outPluginType,
OutFn: p.out,
Controller: params.Controller,
Workers: p.config.WorkersCount_,
BatchSizeCount: p.config.BatchSize_,
BatchSizeBytes: p.config.BatchSizeBytes_,
FlushTimeout: p.config.BatchFlushTimeout_,
MetricCtl: params.MetricCtl,
Retry: p.config.Retry,
RetryRetention: p.config.Retention_,
RetryRetentionExponentMultiplier: p.config.RetentionExponentMultiplier,
})

p.batcher.Start(p.ctx)
Expand Down Expand Up @@ -414,7 +410,7 @@ func (d data) reset() {
}
}

func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch, workerBackoff *backoff.BackOff) {
if *workerData == nil {
// we don't check the error, schema already validated in the Start
columns, _ := inferInsaneColInputs(p.config.Columns)
Expand Down Expand Up @@ -454,8 +450,8 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
}
})

p.backoff.Reset()
try := 0
(*workerBackoff).Reset()
err := backoff.Retry(func() error {
requestID := p.requestID.Inc()
clickhouse := p.getInstance(requestID, try)
Expand All @@ -470,7 +466,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
)
}
return err
}, p.backoff)
}, *workerBackoff)

if err != nil {
var errLogFunc func(msg string, fields ...zap.Field)
Expand Down
37 changes: 16 additions & 21 deletions plugin/output/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ type Plugin struct {
batcher *pipeline.Batcher
controller pipeline.OutputPluginController
mu *sync.Mutex
backoff backoff.BackOff

// plugin metrics

Expand Down Expand Up @@ -229,28 +228,25 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP

p.logger.Infof("starting batcher: timeout=%d", p.config.BatchFlushTimeout_)
p.batcher = pipeline.NewBatcher(pipeline.BatcherOptions{
PipelineName: params.PipelineName,
OutputType: outPluginType,
OutFn: p.out,
MaintenanceFn: p.maintenance,
Controller: p.controller,
Workers: p.config.WorkersCount_,
BatchSizeCount: p.config.BatchSize_,
BatchSizeBytes: p.config.BatchSizeBytes_,
FlushTimeout: p.config.BatchFlushTimeout_,
MaintenanceInterval: time.Minute,
MetricCtl: params.MetricCtl,
PipelineName: params.PipelineName,
OutputType: outPluginType,
OutFn: p.out,
MaintenanceFn: p.maintenance,
Controller: p.controller,
Workers: p.config.WorkersCount_,
BatchSizeCount: p.config.BatchSize_,
BatchSizeBytes: p.config.BatchSizeBytes_,
FlushTimeout: p.config.BatchFlushTimeout_,
MaintenanceInterval: time.Minute,
MetricCtl: params.MetricCtl,
Retry: p.config.Retry,
RetryRetention: p.config.Retention_,
RetryRetentionExponentMultiplier: p.config.RetentionExponentMultiplier,
})

ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel

p.backoff = cfg.GetBackoff(
p.config.Retention_,
float64(p.config.RetentionExponentMultiplier),
uint64(p.config.Retry),
)

p.batcher.Start(ctx)
}

Expand All @@ -268,7 +264,7 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
p.indexingErrorsMetric = ctl.RegisterCounter("output_elasticsearch_index_error", "Number of elasticsearch indexing errors")
}

func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch, workerBackoff *backoff.BackOff) {
if *workerData == nil {
*workerData = &data{
outBuf: make([]byte, 0, p.config.BatchSize_*p.avgEventSize),
Expand All @@ -286,15 +282,14 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
data.outBuf = p.appendEvent(data.outBuf, event)
})

p.backoff.Reset()
err := backoff.Retry(func() error {
err := p.send(data.outBuf)
if err != nil {
p.sendErrorMetric.WithLabelValues().Inc()
p.logger.Errorf("can't send to the elastic, will try other endpoint: %s", err.Error())
}
return err
}, p.backoff)
}, *workerBackoff)

if err != nil {
var errLogFunc func(args ...interface{})
Expand Down
3 changes: 2 additions & 1 deletion plugin/output/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

"github.com/cenkalti/backoff/v3"
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/logger"
Expand Down Expand Up @@ -181,7 +182,7 @@ func (p *Plugin) Out(event *pipeline.Event) {
p.batcher.Add(event)
}

func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch, workerBackoff *backoff.BackOff) {
if *workerData == nil {
*workerData = &data{
outBuf: make([]byte, 0, p.config.BatchSize_*p.avgEventSize),
Expand Down
38 changes: 17 additions & 21 deletions plugin/output/gelf/gelf.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type Plugin struct {
avgEventSize int
batcher *pipeline.Batcher
controller pipeline.OutputPluginController
backoff backoff.BackOff

// plugin metrics

Expand Down Expand Up @@ -204,12 +203,6 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.config = config.(*Config)
p.registerMetrics(params.MetricCtl)

p.backoff = cfg.GetBackoff(
p.config.Retention_,
float64(p.config.RetentionExponentMultiplier),
uint64(p.config.Retry),
)

p.config.hostField = pipeline.ByteToStringUnsafe(p.formatExtraField(nil, p.config.HostField))
p.config.shortMessageField = pipeline.ByteToStringUnsafe(p.formatExtraField(nil, p.config.ShortMessageField))
p.config.defaultShortMessageValue = strings.TrimSpace(p.config.DefaultShortMessageValue)
Expand All @@ -223,17 +216,20 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.config.levelField = pipeline.ByteToStringUnsafe(p.formatExtraField(nil, p.config.LevelField))

p.batcher = pipeline.NewBatcher(pipeline.BatcherOptions{
PipelineName: params.PipelineName,
OutputType: outPluginType,
OutFn: p.out,
MaintenanceFn: p.maintenance,
Controller: p.controller,
Workers: p.config.WorkersCount_,
BatchSizeCount: p.config.BatchSize_,
BatchSizeBytes: p.config.BatchSizeBytes_,
FlushTimeout: p.config.BatchFlushTimeout_,
MaintenanceInterval: p.config.ReconnectInterval_,
MetricCtl: params.MetricCtl,
PipelineName: params.PipelineName,
OutputType: outPluginType,
OutFn: p.out,
MaintenanceFn: p.maintenance,
Controller: p.controller,
Workers: p.config.WorkersCount_,
BatchSizeCount: p.config.BatchSize_,
BatchSizeBytes: p.config.BatchSizeBytes_,
FlushTimeout: p.config.BatchFlushTimeout_,
MaintenanceInterval: p.config.ReconnectInterval_,
MetricCtl: params.MetricCtl,
Retry: p.config.Retry,
RetryRetention: p.config.Retention_,
RetryRetentionExponentMultiplier: p.config.RetentionExponentMultiplier,
})

p.batcher.Start(context.TODO())
Expand All @@ -251,7 +247,7 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
p.sendErrorMetric = ctl.RegisterCounter("output_gelf_send_error", "Total GELF send errors")
}

func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch, workerBackoff *backoff.BackOff) {
if *workerData == nil {
*workerData = &data{
outBuf: make([]byte, 0, p.config.BatchSize_*p.avgEventSize),
Expand All @@ -277,7 +273,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
data.outBuf = outBuf
data.encodeBuf = encodeBuf

p.backoff.Reset()
(*workerBackoff).Reset()
err := backoff.Retry(func() error {
if data.gelf == nil {
p.logger.Infof("connecting to gelf address=%s", p.config.Endpoint)
Expand All @@ -303,7 +299,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
}

return nil
}, p.backoff)
}, *workerBackoff)

if err != nil {
var errLogFunc func(args ...interface{})
Expand Down
33 changes: 15 additions & 18 deletions plugin/output/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ type Plugin struct {

producer sarama.SyncProducer
batcher *pipeline.Batcher
backoff backoff.BackOff

// plugin metrics

Expand Down Expand Up @@ -173,25 +172,23 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
if p.config.Retention_ < 1 {
p.logger.Fatal("'retention' can't be <1")
}
p.backoff = cfg.GetBackoff(
p.config.Retention_,
float64(p.config.RetentionExponentMultiplier),
uint64(p.config.Retry),
)

p.logger.Infof("workers count=%d, batch size=%d", p.config.WorkersCount_, p.config.BatchSize_)

p.producer = p.newProducer()
p.batcher = pipeline.NewBatcher(pipeline.BatcherOptions{
PipelineName: params.PipelineName,
OutputType: outPluginType,
OutFn: p.out,
Controller: p.controller,
Workers: p.config.WorkersCount_,
BatchSizeCount: p.config.BatchSize_,
BatchSizeBytes: p.config.BatchSizeBytes_,
FlushTimeout: p.config.BatchFlushTimeout_,
MetricCtl: params.MetricCtl,
PipelineName: params.PipelineName,
OutputType: outPluginType,
OutFn: p.out,
Controller: p.controller,
Workers: p.config.WorkersCount_,
BatchSizeCount: p.config.BatchSize_,
BatchSizeBytes: p.config.BatchSizeBytes_,
FlushTimeout: p.config.BatchFlushTimeout_,
MetricCtl: params.MetricCtl,
Retry: p.config.Retry,
RetryRetention: p.config.Retention_,
RetryRetentionExponentMultiplier: p.config.RetentionExponentMultiplier,
})

p.batcher.Start(context.TODO())
Expand All @@ -205,7 +202,7 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
p.sendErrorMetric = ctl.RegisterCounter("output_kafka_send_errors", "Total Kafka send errors")
}

func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch, workerBackoff *backoff.BackOff) {
if *workerData == nil {
*workerData = &data{
messages: make([]*sarama.ProducerMessage, p.config.BatchSize_),
Expand Down Expand Up @@ -243,7 +240,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {

data.outBuf = outBuf

p.backoff.Reset()
(*workerBackoff).Reset()
err := backoff.Retry(func() error {
err := p.producer.SendMessages(data.messages[:i])
if err == nil {
Expand All @@ -261,7 +258,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
)

return err
}, p.backoff)
}, *workerBackoff)

if err != nil {
var errLogFunc func(args ...interface{})
Expand Down
Loading

0 comments on commit f8ff00c

Please sign in to comment.