Skip to content

Commit

Permalink
The plugin now works with the join plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
vadimalekseev committed Feb 15, 2023
1 parent ab4592a commit aaf5e69
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 6 deletions.
2 changes: 1 addition & 1 deletion e2e/remap_join/config.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pipelines:
remap_join:
settings:
event_timeout: 10s
event_timeout: 1h
input:
type: file
persistence_mode: async
Expand Down
17 changes: 13 additions & 4 deletions e2e/remap_join/remap_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"os"
"path"
"path/filepath"
"strings"
"testing"
"time"

"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -39,7 +39,7 @@ func (c *Config) Send(t *testing.T) {
defer file.Close()

for i := 0; i < c.Count; i++ {
_, err = file.WriteString(`{ "data": [ {"message":"start "}, {"message":"continue"} ]` + "\n")
_, err = file.WriteString(`{ "data": [{ "message": "start " }, { "message": "continue" }] }` + "\n")
_ = file.Sync()
require.NoError(t, err)
}
Expand All @@ -48,9 +48,18 @@ func (c *Config) Send(t *testing.T) {
func (c *Config) Validate(t *testing.T) {
logFilePattern := path.Join(c.outputDir, "*")

expectedEvents := 2 * c.Count
expectedEvents := c.Count

test.WaitProcessEvents(t, expectedEvents, 50*time.Millisecond, 50*time.Second, logFilePattern)
got := test.CountLines(t, logFilePattern)
assert.Equal(t, expectedEvents, got)

files := test.GetMatches(t, logFilePattern)

require.Equal(t, 1, len(files))
outputFile := files[0]
outputFileContent, err := os.ReadFile(outputFile)
require.NoError(t, err)
require.Equal(t, strings.Repeat(`{"message":"start continue"}`+"\n", expectedEvents), string(outputFileContent))

require.Equal(t, expectedEvents, got)
}
8 changes: 8 additions & 0 deletions e2e/start_work_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ozontech/file.d/e2e/http_file"
"github.com/ozontech/file.d/e2e/join_throttle"
"github.com/ozontech/file.d/e2e/kafka_file"
"github.com/ozontech/file.d/e2e/remap_join"
"github.com/ozontech/file.d/fd"
_ "github.com/ozontech/file.d/plugin/action/add_host"
_ "github.com/ozontech/file.d/plugin/action/convert_date"
Expand Down Expand Up @@ -108,6 +109,13 @@ func TestE2EStabilityWorkCase(t *testing.T) {
},
cfgPath: "./join_throttle/config.yml",
},
{
name: "remap_join",
e2eTest: &remap_join.Config{
Count: 100,
},
cfgPath: "./remap_join/config.yml",
},
}

for num, test := range testsList {
Expand Down
22 changes: 21 additions & 1 deletion pipeline/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,27 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) {
child := *parent
child.Root = &insaneJSON.Root{Node: node}
child.SetChildKind()
p.Propagate(&child)
nextActionIdx := child.action.Inc()
p.tryResetBusy(int(nextActionIdx - 1))

ok := p.doActions(&child)
if ok {
p.output.Out(&child)
}
}

if p.busyActionsTotal == 0 {
return
}

for i, busy := range p.busyActions {
if !busy {
continue
}

timeout := newTimeoutEvent(parent.stream)
timeout.action.Store(int64(i))
p.doActions(timeout)
}
}

Expand Down

0 comments on commit aaf5e69

Please sign in to comment.