Skip to content

Commit

Permalink
Fix after merge build errors
Browse files Browse the repository at this point in the history
  • Loading branch information
vadimalekseev committed Aug 28, 2023
1 parent 15178c0 commit 3d9f55e
Show file tree
Hide file tree
Showing 7 changed files with 10 additions and 12 deletions.
3 changes: 0 additions & 3 deletions e2e/split_join/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ pipelines:
event_timeout: 1h
input:
type: file
persistence_mode: async
watching_dir: SOME_DIR
offsets_file: SOME_FILE
offsets_op: reset
actions:
- type: split
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string)
func (c *Config) Send(t *testing.T) {
file, err := os.Create(path.Join(c.inputDir, "input.log"))
require.NoError(t, err)
defer file.Close()
defer func(file *os.File) {
_ = file.Close()
}(file)

for i := 0; i < c.count; i++ {
_, err = file.WriteString(`{ "data": [ { "hello": "world" }, { "message": "start " }, { "message": "continue" }, { "file": ".d" }, { "open": "source" } ] }` + "\n")
Expand Down
Empty file removed longpanic/longpanic.go
Empty file.
2 changes: 1 addition & 1 deletion pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func (p *Pipeline) Error(err string) {
}

func (p *Pipeline) finalize(event *Event, notifyInput bool, backEvent bool) {
if event.IsTimeoutKind() {
if event.IsTimeoutKind() || event.IsChildKind() {
return
}

Expand Down
8 changes: 4 additions & 4 deletions pipeline/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,16 +352,16 @@ func (p *processor) Propagate(event *Event) {
// Any attempts to ActionHold or ActionCollapse the event will be suppressed by timeout events.
func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) {
parent.SetChildParentKind()
nextActionIdx := parent.action.Load() + 1
nextActionIdx := parent.action + 1

for _, node := range nodes {
child := newEvent()
parent.children = append(parent.children, child)
child.Root.MutateToNode(node)
child.SetChildKind()
child.action.Store(nextActionIdx)
child.action = nextActionIdx

ok := p.doActions(child)
ok, _ := p.doActions(child)
if ok {
child.stage = eventStageOutput
p.output.Out(child)
Expand All @@ -378,7 +378,7 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) {
}

timeout := newTimeoutEvent(parent.stream)
timeout.action.Store(int64(i))
timeout.action = i
p.doActions(timeout)

Check warning on line 382 in pipeline/processor.go

View check run for this annotation

Codecov / codecov/patch

pipeline/processor.go#L380-L382

Added lines #L380 - L382 were not covered by tests
}
}
Expand Down
2 changes: 0 additions & 2 deletions plugin/action/split/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/plugin"
insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -38,7 +37,6 @@ type Plugin struct {
config *Config
logger *zap.Logger
pluginController pipeline.ActionPluginController
plugin.NoMetricsPlugin
}

// ! config-params
Expand Down
3 changes: 2 additions & 1 deletion plugin/output/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,8 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
data := (*workerData).(data)
data.reset()

for _, event := range batch.Events {
for batch.Next() {
event := batch.Value()
for _, col := range data.cols {
node := event.Root.Dig(col.Name)

Expand Down

0 comments on commit 3d9f55e

Please sign in to comment.