From aaf5e692e9d313bc57b678905d3bf52804e7b3e0 Mon Sep 17 00:00:00 2001 From: Vadim Alekseev Date: Wed, 15 Feb 2023 19:58:43 +0300 Subject: [PATCH] The plugin now works with the join plugin --- e2e/remap_join/config.yml | 2 +- e2e/remap_join/remap_join.go | 17 +++++++++++++---- e2e/start_work_test.go | 8 ++++++++ pipeline/processor.go | 22 +++++++++++++++++++++- 4 files changed, 43 insertions(+), 6 deletions(-) diff --git a/e2e/remap_join/config.yml b/e2e/remap_join/config.yml index 75719cb2e..d77859721 100644 --- a/e2e/remap_join/config.yml +++ b/e2e/remap_join/config.yml @@ -1,7 +1,7 @@ pipelines: remap_join: settings: - event_timeout: 10s + event_timeout: 1h input: type: file persistence_mode: async diff --git a/e2e/remap_join/remap_join.go b/e2e/remap_join/remap_join.go index a12e0f05d..b4b4a3b11 100644 --- a/e2e/remap_join/remap_join.go +++ b/e2e/remap_join/remap_join.go @@ -4,12 +4,12 @@ import ( "os" "path" "path/filepath" + "strings" "testing" "time" "github.com/ozontech/file.d/cfg" "github.com/ozontech/file.d/test" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -39,7 +39,7 @@ func (c *Config) Send(t *testing.T) { defer file.Close() for i := 0; i < c.Count; i++ { - _, err = file.WriteString(`{ "data": [ {"message":"start "}, {"message":"continue"} ]` + "\n") + _, err = file.WriteString(`{ "data": [{ "message": "start " }, { "message": "continue" }] }` + "\n") _ = file.Sync() require.NoError(t, err) } @@ -48,9 +48,18 @@ func (c *Config) Send(t *testing.T) { func (c *Config) Validate(t *testing.T) { logFilePattern := path.Join(c.outputDir, "*") - expectedEvents := 2 * c.Count + expectedEvents := c.Count test.WaitProcessEvents(t, expectedEvents, 50*time.Millisecond, 50*time.Second, logFilePattern) got := test.CountLines(t, logFilePattern) - assert.Equal(t, expectedEvents, got) + + files := test.GetMatches(t, logFilePattern) + + require.Equal(t, 1, len(files)) + outputFile := files[0] + outputFileContent, err := os.ReadFile(outputFile) + require.NoError(t, err) + require.Equal(t, strings.Repeat(`{"message":"start continue"}`+"\n", expectedEvents), string(outputFileContent)) + + require.Equal(t, expectedEvents, got) } diff --git a/e2e/start_work_test.go b/e2e/start_work_test.go index aeec179f4..8025170ca 100644 --- a/e2e/start_work_test.go +++ b/e2e/start_work_test.go @@ -14,6 +14,7 @@ import ( "github.com/ozontech/file.d/e2e/http_file" "github.com/ozontech/file.d/e2e/join_throttle" "github.com/ozontech/file.d/e2e/kafka_file" + "github.com/ozontech/file.d/e2e/remap_join" "github.com/ozontech/file.d/fd" _ "github.com/ozontech/file.d/plugin/action/add_host" _ "github.com/ozontech/file.d/plugin/action/convert_date" @@ -108,6 +109,13 @@ func TestE2EStabilityWorkCase(t *testing.T) { }, cfgPath: "./join_throttle/config.yml", }, + { + name: "remap_join", + e2eTest: &remap_join.Config{ + Count: 100, + }, + cfgPath: "./remap_join/config.yml", + }, } for num, test := range testsList { diff --git a/pipeline/processor.go b/pipeline/processor.go index 951644888..a1a41893a 100644 --- a/pipeline/processor.go +++ b/pipeline/processor.go @@ -359,7 +359,27 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) { child := *parent child.Root = &insaneJSON.Root{Node: node} child.SetChildKind() - p.Propagate(&child) + nextActionIdx := child.action.Inc() + p.tryResetBusy(int(nextActionIdx - 1)) + + ok := p.doActions(&child) + if ok { + p.output.Out(&child) + } + } + + if p.busyActionsTotal == 0 { + return + } + + for i, busy := range p.busyActions { + if !busy { + continue + } + + timeout := newTimeoutEvent(parent.stream) + timeout.action.Store(int64(i)) + p.doActions(timeout) } }