Skip to content

Commit

Permalink
Implement iterator in the batcher
Browse files Browse the repository at this point in the history
  • Loading branch information
vadimalekseev committed Feb 15, 2023
1 parent 9019033 commit ab4592a
Show file tree
Hide file tree
Showing 20 changed files with 182 additions and 66 deletions.
1 change: 1 addition & 0 deletions cmd/file.d/file.d.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
_ "github.com/ozontech/file.d/plugin/action/modify"
_ "github.com/ozontech/file.d/plugin/action/parse_es"
_ "github.com/ozontech/file.d/plugin/action/parse_re2"
_ "github.com/ozontech/file.d/plugin/action/remap"
_ "github.com/ozontech/file.d/plugin/action/remove_fields"
_ "github.com/ozontech/file.d/plugin/action/rename"
_ "github.com/ozontech/file.d/plugin/action/set_time"
Expand Down
19 changes: 19 additions & 0 deletions e2e/remap_join/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
pipelines:
remap_join:
settings:
event_timeout: 10s
input:
type: file
persistence_mode: async
watching_dir: SOME_DIR
offsets_file: SOME_FILE
offsets_op: reset
actions:
- type: remap
field: data
- type: join
field: message
start: '/^start/'
continue: '/^continue/'
output:
type: file
56 changes: 56 additions & 0 deletions e2e/remap_join/remap_join.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package remap_join

import (
"os"
"path"
"path/filepath"
"testing"
"time"

"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type Config struct {
inputDir string
outputDir string
Count int
}

func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) {
c.inputDir = t.TempDir()
c.outputDir = t.TempDir()
offsetsDir := t.TempDir()

input := conf.Pipelines[pipelineName].Raw.Get("input")
input.Set("watching_dir", c.inputDir)
input.Set("filename_pattern", "input.log")
input.Set("offsets_file", filepath.Join(offsetsDir, "offsets.yaml"))

output := conf.Pipelines[pipelineName].Raw.Get("output")
output.Set("target_file", path.Join(c.outputDir, "output.log"))
}

func (c *Config) Send(t *testing.T) {
file, err := os.Create(path.Join(c.inputDir, "input.log"))
require.NoError(t, err)
defer file.Close()

for i := 0; i < c.Count; i++ {
_, err = file.WriteString(`{ "data": [ {"message":"start "}, {"message":"continue"} ]` + "\n")
_ = file.Sync()
require.NoError(t, err)
}
}

func (c *Config) Validate(t *testing.T) {
logFilePattern := path.Join(c.outputDir, "*")

expectedEvents := 2 * c.Count

test.WaitProcessEvents(t, expectedEvents, 50*time.Millisecond, 50*time.Second, logFilePattern)
got := test.CountLines(t, logFilePattern)
assert.Equal(t, expectedEvents, got)
}
1 change: 1 addition & 0 deletions e2e/start_work_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
_ "github.com/ozontech/file.d/plugin/action/modify"
_ "github.com/ozontech/file.d/plugin/action/parse_es"
_ "github.com/ozontech/file.d/plugin/action/parse_re2"
_ "github.com/ozontech/file.d/plugin/action/remap"
_ "github.com/ozontech/file.d/plugin/action/remove_fields"
_ "github.com/ozontech/file.d/plugin/action/rename"
_ "github.com/ozontech/file.d/plugin/action/set_time"
Expand Down
3 changes: 2 additions & 1 deletion longpanic/longpanic.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/ozontech/file.d/logger"
)
Expand Down Expand Up @@ -86,7 +87,7 @@ func (l *LongPanic) recoverUntilTimeout() {
l.panicHandler(err)
}

logger.Error(err.Error())
logger.Error(err.Error(), zap.Stack("stacktrace"))
logger.Error("wait for somebody to restart plugins via endpoint")

l.shouldPanic.Store(true)
Expand Down
53 changes: 46 additions & 7 deletions pipeline/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import (
)

type Batch struct {
Events []*Event
events []*Event
iteratorIndex int
parentsCount int

// eventsSize contains total size of the Events in bytes
eventsSize int
seq int64
Expand All @@ -24,6 +27,13 @@ type Batch struct {
maxSizeBytes int
}

func NewPreparedBatch(events []*Event) *Batch {
b := &Batch{}
b.reset()
b.events = events
return b
}

func newBatch(maxSizeCount int, maxSizeBytes int, timeout time.Duration) *Batch {
if maxSizeCount < 0 {
logger.Fatalf("why batch max count less than 0?")
Expand All @@ -35,32 +45,58 @@ func newBatch(maxSizeCount int, maxSizeBytes int, timeout time.Duration) *Batch
logger.Fatalf("batch limits are not set")
}

return &Batch{
b := &Batch{
maxSizeCount: maxSizeCount,
maxSizeBytes: maxSizeBytes,
timeout: timeout,
Events: make([]*Event, 0, maxSizeCount),
events: make([]*Event, 0, maxSizeCount),
}
b.reset()

return b
}

func (b *Batch) reset() {
b.Events = b.Events[:0]
b.events = b.events[:0]
b.iteratorIndex = -1
b.parentsCount = 0
b.eventsSize = 0
b.startTime = time.Now()
}

func (b *Batch) append(e *Event) {
b.Events = append(b.Events, e)
if e.IsChildParentKind() {
b.parentsCount++
}

b.events = append(b.events, e)
b.eventsSize += e.Size
}

func (b *Batch) isReady() bool {
l := len(b.Events)
l := len(b.events) - b.parentsCount
isFull := (b.maxSizeCount != 0 && l == b.maxSizeCount) || (b.maxSizeBytes != 0 && b.maxSizeBytes <= b.eventsSize)
isTimeout := l > 0 && time.Since(b.startTime) > b.timeout
return isFull || isTimeout
}

func (b *Batch) Next() bool {
b.iteratorIndex++
for ; b.iteratorIndex < len(b.events); b.iteratorIndex++ {
next := b.events[b.iteratorIndex]
if next.IsChildParentKind() {
continue
}
break
}

return b.iteratorIndex < len(b.events)
}

func (b *Batch) Value() *Event {
return b.events[b.iteratorIndex]
}

type Batcher struct {
opts BatcherOptions

Expand Down Expand Up @@ -140,7 +176,7 @@ func (b *Batcher) work() {
func (b *Batcher) commitBatch(events []*Event, batch *Batch) []*Event {
// we need to release batch first and then commit events
// so lets swap local slice with batch slice to avoid data copying
events, batch.Events = batch.Events, events
events, batch.events = batch.events, events

batchSeq := batch.seq

Expand All @@ -152,6 +188,9 @@ func (b *Batcher) commitBatch(events []*Event, batch *Batch) []*Event {
b.commitSeq++

for _, e := range events {
if e.IsChildKind() {
continue
}
b.opts.Controller.Commit(e)
}

Expand Down
10 changes: 5 additions & 5 deletions pipeline/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const (
const (
eventKindRegular int32 = iota
eventKindChild
eventKindLastChild
eventKindChildParent
eventKindTimeout
eventKindUnlock
)
Expand Down Expand Up @@ -145,12 +145,12 @@ func (e *Event) IsChildKind() bool {
return e.kind.Load() == eventKindChild
}

func (e *Event) SetLastChildKind() {
e.kind.Swap(eventKindLastChild)
func (e *Event) SetChildParentKind() {
e.kind.Swap(eventKindChildParent)
}

func (e *Event) IsLastChildKind() bool {
return e.kind.Load() == eventKindLastChild
func (e *Event) IsChildParentKind() bool {
return e.kind.Load() == eventKindChildParent
}

func (e *Event) parseJSON(json []byte) error {
Expand Down
2 changes: 1 addition & 1 deletion pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ func (p *Pipeline) finalize(event *Event, notifyInput bool, backEvent bool) {
// todo: avoid event.stream.commit(event)
event.stream.commit(event)

if !backEvent || kind == eventKindLastChild {
if !backEvent {
return
}

Expand Down
10 changes: 2 additions & 8 deletions pipeline/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,16 +355,10 @@ func (p *processor) Propagate(event *Event) {
}

func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) {
for i, node := range nodes {
for _, node := range nodes {
child := *parent
child.Root = &insaneJSON.Root{Node: node}

if i == len(nodes)-1 {
child.SetChildKind()
} else {
child.SetLastChildKind()
}

child.SetChildKind()
p.Propagate(&child)
}
}
Expand Down
12 changes: 7 additions & 5 deletions plugin/action/remap/remap.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,23 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
}

nodeArray := data.AsArray()
dataElements := make([]*insaneJSON.Node, 0, len(nodeArray))
children := make([]*insaneJSON.Node, 0, len(nodeArray))
for _, elem := range nodeArray {
if !elem.IsObject() {
p.logger.Warn("skip an event because %s is not an object", zap.String("type", data.TypeStr()))
continue
}
dataElements = append(dataElements, elem)
children = append(children, elem)
}

if len(dataElements) == 0 {
if len(children) == 0 {
// zero array or an array that does not contain objects
return pipeline.ActionPass
}

p.pluginController.Spawn(event, dataElements)
p.pluginController.Spawn(event, children)

return pipeline.ActionDiscard
event.SetChildParentKind()

return pipeline.ActionPass
}
11 changes: 9 additions & 2 deletions plugin/action/remap/remap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ func TestPlugin_Do(t *testing.T) {
config := test.NewConfig(&Config{Field: field}, nil)
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false))
wg := &sync.WaitGroup{}
wg.Add(6)

const children = 6
const parents = 2
const total = children + parents
wg.Add(total)

outEvents := make([]*pipeline.Event, 0)
output.SetOutFn(func(e *pipeline.Event) {
Expand All @@ -41,10 +45,13 @@ func TestPlugin_Do(t *testing.T) {
wg.Wait()
p.Stop()

require.Equal(t, 6, len(outEvents))
require.Equal(t, total, len(outEvents))

result := make([]string, 0, len(outEvents))
for _, e := range outEvents {
if e.IsChildParentKind() {
continue
}
result = append(result, strings.Clone(e.Root.Dig("message").AsString()))
}
require.Equal(t, []string{"go", "rust", "c++", "python", "ruby", "js"}, result)
Expand Down
4 changes: 2 additions & 2 deletions plugin/output/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
}

data.outBuf = data.outBuf[:0]
for _, event := range batch.Events {
data.outBuf = p.appendEvent(data.outBuf, event)
for batch.Next() {
data.outBuf = p.appendEvent(data.outBuf, batch.Value())
}

for {
Expand Down
4 changes: 2 additions & 2 deletions plugin/output/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {

outBuf := data.outBuf[:0]

for _, event := range batch.Events {
outBuf, _ = event.Encode(outBuf)
for batch.Next() {
outBuf, _ = batch.Value().Encode(outBuf)
outBuf = append(outBuf, byte('\n'))
}
data.outBuf = outBuf
Expand Down
3 changes: 2 additions & 1 deletion plugin/output/gelf/gelf.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {

outBuf := data.outBuf[:0]
encodeBuf := data.encodeBuf[:0]
for _, event := range batch.Events {
for batch.Next() {
event := batch.Value()
encodeBuf = p.formatEvent(encodeBuf, event)
outBuf, _ = event.Encode(outBuf)
outBuf = append(outBuf, byte(0))
Expand Down
8 changes: 5 additions & 3 deletions plugin/output/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (p *Plugin) RegisterMetrics(ctl *metric.Ctl) {
func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
if *workerData == nil {
*workerData = &data{
messages: make([]*sarama.ProducerMessage, p.config.BatchSize_),
messages: make([]*sarama.ProducerMessage, 0, p.config.BatchSize_),
outBuf: make([]byte, 0, p.config.BatchSize_*p.avgEventSize),
}
}
Expand All @@ -180,7 +180,9 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {

outBuf := data.outBuf[:0]
start := 0
for i, event := range batch.Events {
i := 0
for ; batch.Next(); i++ {
event := batch.Value()
outBuf, start = event.Encode(outBuf)

topic := p.config.DefaultTopic
Expand All @@ -200,7 +202,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {

data.outBuf = outBuf

err := p.producer.SendMessages(data.messages[:len(batch.Events)])
err := p.producer.SendMessages(data.messages[:i])
if err != nil {
errs := err.(sarama.ProducerErrors)
for _, e := range errs {
Expand Down
Loading

0 comments on commit ab4592a

Please sign in to comment.