Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
vadimalekseev committed Feb 17, 2023
1 parent 0837d53 commit 263371a
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 5 deletions.
2 changes: 1 addition & 1 deletion e2e/start_work_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func startForTest(t *testing.T, test E2ETest, num int) *fd.FileD {
test.Configure(t, conf, test.name)

// for each file.d its own port
filed := fd.New(conf, ":808"+strconv.Itoa(num))
filed := fd.New(conf, ":1508"+strconv.Itoa(num))
filed.Start()
return filed
}
16 changes: 13 additions & 3 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,11 @@ func (p *Pipeline) Error(err string) {

func (p *Pipeline) finalize(event *Event, notifyInput bool, backEvent bool) {
kind := event.kind.Load()
if kind == eventKindTimeout || kind == eventKindChild {
if kind == eventKindTimeout {
return
}
if kind == eventKindChild {
p.commitSample(event)
return
}

Expand All @@ -470,8 +474,8 @@ func (p *Pipeline) finalize(event *Event, notifyInput bool, backEvent bool) {
p.outputEvents.Inc()
p.outputSize.Add(int64(event.Size))

if len(p.outSample) == 0 && rand.Int()&1 == 1 {
p.outSample = event.Root.Encode(p.outSample)
if kind != eventKindChildParent {
p.commitSample(event)
}

if event.Size > p.maxSize {
Expand All @@ -495,6 +499,12 @@ func (p *Pipeline) finalize(event *Event, notifyInput bool, backEvent bool) {
p.eventPool.back(event)
}

func (p *Pipeline) commitSample(event *Event) {
if len(p.outSample) == 0 && rand.Int()&1 == 1 {
p.outSample = event.Root.Encode(p.outSample)
}
}

func (p *Pipeline) AddAction(info *ActionPluginStaticInfo) {
p.actionInfos = append(p.actionInfos, info)
p.metricsHolder.AddAction(info.MetricName, info.MetricLabels)
Expand Down
1 change: 1 addition & 0 deletions pipeline/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) {
child := *parent
child.Root = &insaneJSON.Root{Node: node}
child.SetChildKind()

nextActionIdx := child.action.Inc()
p.tryResetBusy(int(nextActionIdx - 1))

Expand Down
8 changes: 8 additions & 0 deletions plugin/action/remap/remap.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,15 @@ func (p *Plugin) Stop() {
}

func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
if event.IsChildKind() {
return pipeline.ActionPass
}

data := event.Root.Dig(p.config.Field_...)
if data == nil {
return pipeline.ActionPass
}

if !data.IsArray() {
p.logger.Warn("skip an event because is not an array", zap.String("type", data.TypeStr()))
return pipeline.ActionPass
Expand Down
2 changes: 1 addition & 1 deletion plugin/input/file/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (jp *jobProvider) commit(event *pipeline.Event) {

job.mu.Lock()
// commit offsets only not ignored AND regular events
if !event.IsRegularKind() || event.SeqID <= job.ignoreEventsLE {
if (!event.IsRegularKind() && !event.IsChildParentKind()) || event.SeqID <= job.ignoreEventsLE {
job.mu.Unlock()
return
}
Expand Down

0 comments on commit 263371a

Please sign in to comment.