From 87aa254997f144087af219b8736256519952fba7 Mon Sep 17 00:00:00 2001 From: Yaroslav Kirillov Date: Mon, 13 Nov 2023 11:48:22 +0500 Subject: [PATCH] Add move plugin (#534) * Add move plugin * Fix races in tests --------- Co-authored-by: Yaroslav Kirillov --- README.md | 2 +- _sidebar.md | 1 + plugin/README.md | 93 ++++++ plugin/action/README.md | 93 ++++++ .../add_file_name/add_file_name_test.go | 13 +- plugin/action/add_host/add_host_test.go | 9 +- .../action/convert_date/convert_date_test.go | 47 +-- .../convert_log_level_test.go | 6 +- plugin/action/discard/discard_test.go | 305 ++++++++---------- plugin/action/flatten/flatten_test.go | 23 +- plugin/action/join/join_test.go | 19 +- .../join_template/join_template_test.go | 15 +- plugin/action/json_decode/json_decode_test.go | 21 +- plugin/action/json_encode/json_encode_test.go | 21 +- plugin/action/keep_fields/keep_fields_test.go | 10 +- plugin/action/mask/mask_test.go | 67 ++-- plugin/action/modify/modify_test.go | 8 +- plugin/action/move/README.idoc.md | 5 + plugin/action/move/README.md | 116 +++++++ plugin/action/move/move.go | 224 +++++++++++++ plugin/action/move/move_test.go | 265 +++++++++++++++ plugin/action/parse_es/pipeline_test.go | 19 +- plugin/action/parse_re2/parse_re2_test.go | 49 +-- .../remove_fields/remove_fields_test.go | 13 +- plugin/action/rename/rename_test.go | 25 +- plugin/action/set_time/set_time_test.go | 24 +- plugin/action/throttle/throttle_test.go | 64 ++-- plugin/output/kafka/README.md | 2 +- 28 files changed, 1071 insertions(+), 488 deletions(-) create mode 100644 plugin/action/move/README.idoc.md create mode 100755 plugin/action/move/README.md create mode 100644 plugin/action/move/move.go create mode 100644 plugin/action/move/move_test.go diff --git a/README.md b/README.md index 3bad61fd9..4a3c25363 100755 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ TBD: throughput on production servers. **Input**: [dmesg](plugin/input/dmesg/README.md), [fake](plugin/input/fake/README.md), [file](plugin/input/file/README.md), [http](plugin/input/http/README.md), [journalctl](plugin/input/journalctl/README.md), [k8s](plugin/input/k8s/README.md), [kafka](plugin/input/kafka/README.md) -**Action**: [add_file_name](plugin/action/add_file_name/README.md), [add_host](plugin/action/add_host/README.md), [convert_date](plugin/action/convert_date/README.md), [convert_log_level](plugin/action/convert_log_level/README.md), [debug](plugin/action/debug/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [join](plugin/action/join/README.md), [join_template](plugin/action/join_template/README.md), [json_decode](plugin/action/json_decode/README.md), [json_encode](plugin/action/json_encode/README.md), [keep_fields](plugin/action/keep_fields/README.md), [mask](plugin/action/mask/README.md), [modify](plugin/action/modify/README.md), [parse_es](plugin/action/parse_es/README.md), [parse_re2](plugin/action/parse_re2/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [set_time](plugin/action/set_time/README.md), [throttle](plugin/action/throttle/README.md) +**Action**: [add_file_name](plugin/action/add_file_name/README.md), [add_host](plugin/action/add_host/README.md), [convert_date](plugin/action/convert_date/README.md), [convert_log_level](plugin/action/convert_log_level/README.md), [debug](plugin/action/debug/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [join](plugin/action/join/README.md), [join_template](plugin/action/join_template/README.md), [json_decode](plugin/action/json_decode/README.md), [json_encode](plugin/action/json_encode/README.md), [keep_fields](plugin/action/keep_fields/README.md), [mask](plugin/action/mask/README.md), [modify](plugin/action/modify/README.md), [move](plugin/action/move/README.md), [parse_es](plugin/action/parse_es/README.md), [parse_re2](plugin/action/parse_re2/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [set_time](plugin/action/set_time/README.md), [throttle](plugin/action/throttle/README.md) **Output**: [clickhouse](plugin/output/clickhouse/README.md), [devnull](plugin/output/devnull/README.md), [elasticsearch](plugin/output/elasticsearch/README.md), [file](plugin/output/file/README.md), [gelf](plugin/output/gelf/README.md), [kafka](plugin/output/kafka/README.md), [postgres](plugin/output/postgres/README.md), [s3](plugin/output/s3/README.md), [splunk](plugin/output/splunk/README.md), [stdout](plugin/output/stdout/README.md) diff --git a/_sidebar.md b/_sidebar.md index 46663300a..3ca88fab9 100644 --- a/_sidebar.md +++ b/_sidebar.md @@ -36,6 +36,7 @@ - [keep_fields](plugin/action/keep_fields/README.md) - [mask](plugin/action/mask/README.md) - [modify](plugin/action/modify/README.md) + - [move](plugin/action/move/README.md) - [parse_es](plugin/action/parse_es/README.md) - [parse_re2](plugin/action/parse_re2/README.md) - [remove_fields](plugin/action/remove_fields/README.md) diff --git a/plugin/README.md b/plugin/README.md index 68fa73004..db6385eeb 100755 --- a/plugin/README.md +++ b/plugin/README.md @@ -357,6 +357,99 @@ The resulting event could look like: ``` [More details...](plugin/action/modify/README.md) +## move +It moves fields to the target field in a certain mode. +> In `allow` mode, the specified `fields` will be moved; +> in `block` mode, the unspecified `fields` will be moved. + +### Examples +```yaml +pipelines: + example_pipeline: + ... + actions: + - type: move + mode: allow + target: other + fields: + - log.stream + - zone + ... +``` +The original event: +```json +{ + "service": "test", + "log": { + "level": "error", + "message": "error occurred", + "ts": "2023-10-30T13:35:33.638720813Z", + "stream": "stderr" + }, + "zone": "z501" +} +``` +The resulting event: +```json +{ + "service": "test", + "log": { + "level": "error", + "message": "error occurred", + "ts": "2023-10-30T13:35:33.638720813Z" + }, + "other": { + "stream": "stderr", + "zone": "z501" + } +} +``` +--- +```yaml +pipelines: + example_pipeline: + ... + actions: + - type: move + mode: block + target: other + fields: + - log + ... +``` +The original event: +```json +{ + "service": "test", + "log": { + "level": "error", + "message": "error occurred", + "ts": "2023-10-30T13:35:33.638720813Z", + "stream": "stderr" + }, + "zone": "z501", + "other": { + "user": "ivanivanov" + } +} +``` +The resulting event: +```json +{ + "log": { + "level": "error", + "message": "error occurred", + "ts": "2023-10-30T13:35:33.638720813Z" + }, + "other": { + "user": "ivanivanov", + "service": "test", + "zone": "z501" + } +} +``` + +[More details...](plugin/action/move/README.md) ## parse_es It parses HTTP input using Elasticsearch `/_bulk` API format. It converts sources defining create/index actions to the events. Update/delete actions are ignored. > Check out the details in [Elastic Bulk API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html). diff --git a/plugin/action/README.md b/plugin/action/README.md index 60436e8a0..40374c9b5 100755 --- a/plugin/action/README.md +++ b/plugin/action/README.md @@ -192,6 +192,99 @@ The resulting event could look like: ``` [More details...](plugin/action/modify/README.md) +## move +It moves fields to the target field in a certain mode. +> In `allow` mode, the specified `fields` will be moved; +> in `block` mode, the unspecified `fields` will be moved. + +### Examples +```yaml +pipelines: + example_pipeline: + ... + actions: + - type: move + mode: allow + target: other + fields: + - log.stream + - zone + ... +``` +The original event: +```json +{ + "service": "test", + "log": { + "level": "error", + "message": "error occurred", + "ts": "2023-10-30T13:35:33.638720813Z", + "stream": "stderr" + }, + "zone": "z501" +} +``` +The resulting event: +```json +{ + "service": "test", + "log": { + "level": "error", + "message": "error occurred", + "ts": "2023-10-30T13:35:33.638720813Z" + }, + "other": { + "stream": "stderr", + "zone": "z501" + } +} +``` +--- +```yaml +pipelines: + example_pipeline: + ... + actions: + - type: move + mode: block + target: other + fields: + - log + ... +``` +The original event: +```json +{ + "service": "test", + "log": { + "level": "error", + "message": "error occurred", + "ts": "2023-10-30T13:35:33.638720813Z", + "stream": "stderr" + }, + "zone": "z501", + "other": { + "user": "ivanivanov" + } +} +``` +The resulting event: +```json +{ + "log": { + "level": "error", + "message": "error occurred", + "ts": "2023-10-30T13:35:33.638720813Z" + }, + "other": { + "user": "ivanivanov", + "service": "test", + "zone": "z501" + } +} +``` + +[More details...](plugin/action/move/README.md) ## parse_es It parses HTTP input using Elasticsearch `/_bulk` API format. It converts sources defining create/index actions to the events. Update/delete actions are ignored. > Check out the details in [Elastic Bulk API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html). diff --git a/plugin/action/add_file_name/add_file_name_test.go b/plugin/action/add_file_name/add_file_name_test.go index 13b4b7210..7278e1b16 100644 --- a/plugin/action/add_file_name/add_file_name_test.go +++ b/plugin/action/add_file_name/add_file_name_test.go @@ -1,7 +1,6 @@ package add_file_name import ( - "strings" "sync" "testing" @@ -16,19 +15,15 @@ func TestModify(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(2) - outEvents := make([]string, 0) + sourceName := "my_file" output.SetOutFn(func(e *pipeline.Event) { - outEvents = append(outEvents, strings.Clone(e.Root.Dig("file").AsString())) + assert.Equal(t, sourceName, e.Root.Dig("file").AsString(), "wrong field value") wg.Done() }) - input.In(0, "my_file", 0, []byte(`{"error":"info about error"}`)) - input.In(0, "my_file", 0, []byte(`{"file":"not_my_file"}`)) + input.In(0, sourceName, 0, []byte(`{"error":"info about error"}`)) + input.In(0, sourceName, 0, []byte(`{"file":"not_my_file"}`)) wg.Wait() p.Stop() - - assert.Equal(t, 2, len(outEvents), "wrong out events count") - assert.Equal(t, "my_file", outEvents[0], "wrong field value") - assert.Equal(t, "my_file", outEvents[1], "wrong field value") } diff --git a/plugin/action/add_host/add_host_test.go b/plugin/action/add_host/add_host_test.go index e8e0685aa..8f21492e0 100644 --- a/plugin/action/add_host/add_host_test.go +++ b/plugin/action/add_host/add_host_test.go @@ -16,9 +16,9 @@ func TestModify(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) - outEvents := make([]*pipeline.Event, 0) + host, _ := os.Hostname() output.SetOutFn(func(e *pipeline.Event) { - outEvents = append(outEvents, e) + assert.Equal(t, host, e.Root.Dig("hostname").AsString(), "wrong field value") wg.Done() }) @@ -26,9 +26,4 @@ func TestModify(t *testing.T) { wg.Wait() p.Stop() - - host, _ := os.Hostname() - - assert.Equal(t, 1, len(outEvents), "wrong out events count") - assert.Equal(t, host, outEvents[0].Root.Dig("hostname").AsString(), "wrong field value") } diff --git a/plugin/action/convert_date/convert_date_test.go b/plugin/action/convert_date/convert_date_test.go index 88ebfa1e2..54eee171c 100644 --- a/plugin/action/convert_date/convert_date_test.go +++ b/plugin/action/convert_date/convert_date_test.go @@ -4,33 +4,20 @@ import ( "sync" "testing" - "github.com/ozontech/file.d/cfg" - "github.com/ozontech/file.d/logger" "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/test" "github.com/stretchr/testify/assert" ) func TestConvert(t *testing.T) { - config := &Config{SourceFormats: []string{"rfc3339nano", "rfc3339", "ansic", pipeline.UnixTime, "nginx_errorlog"}} - - err := cfg.Parse(config, nil) - if err != nil { - logger.Panicf("wrong config") - } - + config := test.NewConfig(&Config{SourceFormats: []string{"rfc3339nano", "rfc3339", "ansic", pipeline.UnixTime, "nginx_errorlog"}}, nil) p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false)) wg := &sync.WaitGroup{} wg.Add(3) - inEvents := 0 - input.SetInFn(func() { - inEvents++ - }) - - outEvents := make([]*pipeline.Event, 0) + outEvents := make([]string, 0, 3) output.SetOutFn(func(e *pipeline.Event) { - outEvents = append(outEvents, e) + outEvents = append(outEvents, e.Root.EncodeToString()) wg.Done() }) @@ -41,34 +28,20 @@ func TestConvert(t *testing.T) { wg.Wait() p.Stop() - assert.Equal(t, 3, inEvents, "wrong in events count") assert.Equal(t, 3, len(outEvents), "wrong out events count") - - assert.Equal(t, `{"time":998578502}`, outEvents[0].Root.EncodeToString(), "wrong out event") - assert.Equal(t, `{"time":998578999}`, outEvents[1].Root.EncodeToString(), "wrong out event") - assert.Equal(t, `{"time":1644239174}`, outEvents[2].Root.EncodeToString(), "wrong out event") + assert.Equal(t, `{"time":998578502}`, outEvents[0], "wrong out event") + assert.Equal(t, `{"time":998578999}`, outEvents[1], "wrong out event") + assert.Equal(t, `{"time":1644239174}`, outEvents[2], "wrong out event") } func TestConvertFail(t *testing.T) { - config := &Config{SourceFormats: []string{"rfc3339nano", "rfc3339", "ansic"}, RemoveOnFail: true} - - err := cfg.Parse(config, nil) - if err != nil { - logger.Panicf("wrong config") - } - + config := test.NewConfig(&Config{SourceFormats: []string{"rfc3339nano", "rfc3339", "ansic"}, RemoveOnFail: true}, nil) p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false)) wg := &sync.WaitGroup{} wg.Add(1) - inEvents := 0 - input.SetInFn(func() { - inEvents++ - }) - - outEvents := make([]*pipeline.Event, 0) output.SetOutFn(func(e *pipeline.Event) { - outEvents = append(outEvents, e) + assert.Equal(t, `{}`, e.Root.EncodeToString(), "wrong out event") wg.Done() }) @@ -76,8 +49,4 @@ func TestConvertFail(t *testing.T) { wg.Wait() p.Stop() - - assert.Equal(t, 1, inEvents, "wrong in events count") - assert.Equal(t, 1, len(outEvents), "wrong out events count") - assert.Equal(t, `{}`, outEvents[0].Root.EncodeToString(), "wrong out event") } diff --git a/plugin/action/convert_log_level/convert_log_level_test.go b/plugin/action/convert_log_level/convert_log_level_test.go index b1241aa7a..ebe469b24 100644 --- a/plugin/action/convert_log_level/convert_log_level_test.go +++ b/plugin/action/convert_log_level/convert_log_level_test.go @@ -4,7 +4,6 @@ import ( "testing" "time" - "github.com/ozontech/file.d/cfg" "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/test" "github.com/stretchr/testify/require" @@ -224,9 +223,8 @@ func TestDo(t *testing.T) { for _, tc := range tcs { t.Run(tc.Name, func(t *testing.T) { - p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, &tc.Config, pipeline.MatchModeAnd, nil, false)) - err := cfg.Parse(&tc.Config, nil) - require.NoError(t, err) + config := test.NewConfig(&tc.Config, nil) + p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false)) outCounter := atomic.NewInt32(int32(len(tc.In))) diff --git a/plugin/action/discard/discard_test.go b/plugin/action/discard/discard_test.go index fe9e92691..e1e52f8e9 100644 --- a/plugin/action/discard/discard_test.go +++ b/plugin/action/discard/discard_test.go @@ -10,175 +10,150 @@ import ( "github.com/stretchr/testify/assert" ) -func TestDiscardAnd(t *testing.T) { - conds := pipeline.MatchConditions{ - pipeline.MatchCondition{ - Field: []string{"field1"}, - Values: []string{"value1"}, +func TestDiscard(t *testing.T) { + cases := []struct { + name string + + matchMode pipeline.MatchMode + matchConditions pipeline.MatchConditions + matchInvert bool + + passEvents []string + discardEvents []string + }{ + { + name: "match_and", + matchMode: pipeline.MatchModeAnd, + matchConditions: pipeline.MatchConditions{ + pipeline.MatchCondition{ + Field: []string{"field1"}, + Values: []string{"value1"}, + }, + pipeline.MatchCondition{ + Field: []string{"field2"}, + Values: []string{"value2"}, + }, + }, + passEvents: []string{ + `{"field1":"not_value1"}`, + `{"field2":"not_value2"}`, + `{"field1":"value1"}`, + `{"field2":"value2"}`, + }, + discardEvents: []string{ + `{"field1":"value1","field2":"value2"}`, + `{"field3":"value3","field1":"value1","field2":"value2"}`, + }, }, - pipeline.MatchCondition{ - Field: []string{"field2"}, - Values: []string{"value2"}, + { + name: "match_or", + matchMode: pipeline.MatchModeOr, + matchConditions: pipeline.MatchConditions{ + pipeline.MatchCondition{ + Field: []string{"field1"}, + Values: []string{"value1"}, + }, + pipeline.MatchCondition{ + Field: []string{"field2"}, + Values: []string{"value2"}, + }, + }, + passEvents: []string{ + `{"field1":"not_value1"}`, + `{"field2":"not_value2"}`, + }, + discardEvents: []string{ + `{"field1":"value1"}`, + `{"field2":"value2"}`, + `{"field1":"value1","field2":"value2"}`, + `{"field3":"value3","field1":"value1","field2":"value2"}`, + }, }, - } - - p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, nil, pipeline.MatchModeAnd, conds, false)) - - wg := &sync.WaitGroup{} - wg.Add(10) - - inEvents := 0 - input.SetInFn(func() { - wg.Done() - inEvents++ - }) - - outEvents := make([]*pipeline.Event, 0) - output.SetOutFn(func(e *pipeline.Event) { - wg.Done() - outEvents = append(outEvents, e) - }) - - input.In(0, "test", 0, []byte(`{"field1":"not_value1"}`)) - input.In(0, "test", 0, []byte(`{"field2":"not_value2"}`)) - input.In(0, "test", 0, []byte(`{"field1":"value1"}`)) - input.In(0, "test", 0, []byte(`{"field2":"value2"}`)) - input.In(0, "test", 0, []byte(`{"field1":"value1","field2":"value2"}`)) - input.In(0, "test", 0, []byte(`{"field3":"value3","field1":"value1","field2":"value2"}`)) - - wg.Wait() - p.Stop() - - assert.Equal(t, 6, inEvents, "wrong in events count") - assert.Equal(t, 4, len(outEvents), "wrong out events count") - assert.Equal(t, `{"field1":"not_value1"}`, outEvents[0].Root.EncodeToString(), "wrong event json") -} - -func TestDiscardOr(t *testing.T) { - conds := pipeline.MatchConditions{ - pipeline.MatchCondition{ - Field: []string{"field1"}, - Values: []string{"value1"}, + { + name: "match_or_regex", + matchMode: pipeline.MatchModeOr, + matchConditions: pipeline.MatchConditions{ + pipeline.MatchCondition{ + Field: []string{"field1"}, + Regexp: regexp.MustCompile("(one|two|three)"), + }, + pipeline.MatchCondition{ + Field: []string{"field2", "field3"}, + Regexp: regexp.MustCompile("four"), + }, + }, + passEvents: []string{ + `{"field2":{"field3":"0000 one 0000"}}`, + `{"field1":"four"}`, + `{"field2":"... four ....","field3":"value2"}`, + `{"field3":"value3","field1":"value1","field2":"value2"}`, + }, + discardEvents: []string{ + `{"field1":"0000 one 0000"}`, + `{"field2":{"field3":"0000 four 0000"}}`, + `{"field1":". two ."}`, + }, }, - pipeline.MatchCondition{ - Field: []string{"field2"}, - Values: []string{"value2"}, + { + name: "match_and_invert", + matchMode: pipeline.MatchModeAnd, + matchConditions: pipeline.MatchConditions{ + pipeline.MatchCondition{ + Field: []string{"field2"}, + Values: []string{"value2"}, + }, + }, + matchInvert: true, + passEvents: []string{ + `{"field2":"value2"}`, + `{"field1":"value1","field2":"value2"}`, + `{"field3":"value3","field1":"value1","field2":"value2"}`, + }, + discardEvents: []string{ + `{"field1":"not_value1"}`, + `{"field2":"not_value2"}`, + `{"field1":"value1"}`, + }, }, } - - p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, nil, pipeline.MatchModeOr, conds, false)) - - wg := &sync.WaitGroup{} - wg.Add(8) - - inEvents := 0 - input.SetInFn(func() { - wg.Done() - inEvents++ - }) - - outEvents := make([]*pipeline.Event, 0) - output.SetOutFn(func(e *pipeline.Event) { - wg.Done() - outEvents = append(outEvents, e) - }) - - input.In(0, "test.log", 0, []byte(`{"field1":"not_value1"}`)) - input.In(0, "test.log", 0, []byte(`{"field2":"not_value2"}`)) - input.In(0, "test.log", 0, []byte(`{"field1":"value1"}`)) - input.In(0, "test.log", 0, []byte(`{"field2":"value2"}`)) - input.In(0, "test.log", 0, []byte(`{"field1":"value1","field2":"value2"}`)) - input.In(0, "test.log", 0, []byte(`{"field3":"value3","field1":"value1","field2":"value2"}`)) - - wg.Wait() - p.Stop() - - assert.Equal(t, 6, inEvents, "wrong in events count") - assert.Equal(t, 2, len(outEvents), "wrong out events count") - assert.Equal(t, `{"field1":"not_value1"}`, outEvents[0].Root.EncodeToString(), "wrong event json") -} - -func TestDiscardRegex(t *testing.T) { - conds := pipeline.MatchConditions{ - pipeline.MatchCondition{ - Field: []string{"field1"}, - Regexp: regexp.MustCompile("(one|two|three)"), - }, - pipeline.MatchCondition{ - Field: []string{"field2", "field3"}, - Regexp: regexp.MustCompile("four"), - }, + for _, tt := range cases { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, nil, tt.matchMode, tt.matchConditions, tt.matchInvert)) + + wg := &sync.WaitGroup{} + wg.Add(len(tt.discardEvents) + 2*len(tt.passEvents)) + + inEvents := 0 + input.SetInFn(func() { + inEvents++ + wg.Done() + }) + + outEvents := make([]string, 0, len(tt.passEvents)) + output.SetOutFn(func(e *pipeline.Event) { + outEvents = append(outEvents, e.Root.EncodeToString()) + wg.Done() + }) + + for _, e := range tt.passEvents { + input.In(0, "test", 0, []byte(e)) + } + for _, e := range tt.discardEvents { + input.In(0, "test", 0, []byte(e)) + } + + wg.Wait() + p.Stop() + + assert.Equal(t, len(tt.passEvents)+len(tt.discardEvents), inEvents, "wrong in events count") + assert.Equal(t, len(tt.passEvents), len(outEvents), "wrong out events count") + + for i := range outEvents { + assert.Equal(t, tt.passEvents[i], outEvents[i], "wrong event json") + } + }) } - - p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, nil, pipeline.MatchModeOr, conds, false)) - - wg := &sync.WaitGroup{} - wg.Add(11) - - inEvents := 0 - input.SetInFn(func() { - wg.Done() - inEvents++ - }) - - cntOutEvents := 0 - output.SetOutFn(func(e *pipeline.Event) { - cntOutEvents++ - wg.Done() - }) - - input.In(0, "test.log", 0, []byte(`{"field1":"0000 one 0000"}`)) - input.In(0, "test.log", 0, []byte(`{"field2":{"field3": "0000 one 0000"}}`)) - input.In(0, "test.log", 0, []byte(`{"field2":{"field3": "0000 four 0000"}}`)) - input.In(0, "test.log", 0, []byte(`{"field1":". two ."}`)) - input.In(0, "test.log", 0, []byte(`{"field1":"four"}`)) - input.In(0, "test.log", 0, []byte(`{"field2":"... four ....","field2":"value2"}`)) - input.In(0, "test.log", 0, []byte(`{"field3":"value3","field1":"value1","field2":"value2"}`)) - - wg.Wait() - p.Stop() - - assert.Equal(t, 7, inEvents, "wrong in events count") - assert.Equal(t, 4, cntOutEvents, "wrong out events count") -} - -func TestDiscardMatchInvert(t *testing.T) { - // only this value should appear - conds := pipeline.MatchConditions{ - pipeline.MatchCondition{ - Field: []string{"field2"}, - Values: []string{"value2"}, - }, - } - - p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, nil, pipeline.MatchModeAnd, conds, true)) - - wg := &sync.WaitGroup{} - wg.Add(9) - - inEvents := 0 - input.SetInFn(func() { - wg.Done() - inEvents++ - }) - - outEvents := make([]*pipeline.Event, 0) - output.SetOutFn(func(e *pipeline.Event) { - outEvents = append(outEvents, e) - wg.Done() - }) - - input.In(0, "test", 0, []byte(`{"field1":"not_value1"}`)) - input.In(0, "test", 0, []byte(`{"field2":"not_value2"}`)) - input.In(0, "test", 0, []byte(`{"field1":"value1"}`)) - input.In(0, "test", 0, []byte(`{"field2":"value2"}`)) - input.In(0, "test", 0, []byte(`{"field1":"value1","field2":"value2"}`)) - input.In(0, "test", 0, []byte(`{"field3":"value3","field1":"value1","field2":"value2"}`)) - - wg.Wait() - p.Stop() - - assert.Equal(t, 6, inEvents, "wrong in events count") - assert.Equal(t, 3, len(outEvents), "wrong out events count") - assert.Equal(t, `{"field2":"value2"}`, outEvents[0].Root.EncodeToString(), "wrong event json") } diff --git a/plugin/action/flatten/flatten_test.go b/plugin/action/flatten/flatten_test.go index a84286e6d..ca5668be5 100644 --- a/plugin/action/flatten/flatten_test.go +++ b/plugin/action/flatten/flatten_test.go @@ -4,44 +4,29 @@ import ( "sync" "testing" - "github.com/ozontech/file.d/cfg" - "github.com/ozontech/file.d/logger" "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/test" "github.com/stretchr/testify/assert" ) func TestFlatten(t *testing.T) { - config := &Config{Field: "complex", Prefix: "flat_"} - err := cfg.Parse(config, nil) - if err != nil { - logger.Panicf("wrong config") - } - + config := test.NewConfig(&Config{Field: "complex", Prefix: "flat_"}, nil) p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false)) wg := &sync.WaitGroup{} - acceptedEvents := 0 + wg.Add(2) + input.SetCommitFn(func(e *pipeline.Event) { - acceptedEvents++ wg.Done() }) - dumpedEvents := 0 - rawEvent := "" output.SetOutFn(func(e *pipeline.Event) { - rawEvent = e.Root.EncodeToString() - dumpedEvents++ + assert.Equal(t, `{"flat_a":"b","flat_c":"d"}`, e.Root.EncodeToString(), "wrong out event") wg.Done() }) input.In(0, "test.log", 0, []byte(`{"complex":{"a":"b","c":"d"}}`)) - wg.Add(2) wg.Wait() p.Stop() - - assert.Equal(t, 1, acceptedEvents) - assert.Equal(t, 1, dumpedEvents) - assert.Equal(t, `{"flat_a":"b","flat_c":"d"}`, rawEvent, "wrong out events count") } diff --git a/plugin/action/join/join_test.go b/plugin/action/join/join_test.go index 9ac3ee8a4..9207dcaaa 100644 --- a/plugin/action/join/join_test.go +++ b/plugin/action/join/join_test.go @@ -11,7 +11,6 @@ import ( "go.uber.org/atomic" "github.com/ozontech/file.d/cfg" - "github.com/ozontech/file.d/logger" "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/test" ) @@ -138,17 +137,12 @@ func TestSimpleJoin(t *testing.T) { lines = append(lines, fmt.Sprintf(format, line)) } - config := &Config{ + config := test.NewConfig(&Config{ Field: "log", Start: cfg.Regexp(tt.startPat), Continue: cfg.Regexp(tt.continuePat), Negate: tt.negate, - } - - err := cfg.Parse(config, nil) - if err != nil { - logger.Panic(err.Error()) - } + }, nil) p, input, output := test.NewPipelineMock( test.NewActionPluginStaticInfo( @@ -243,16 +237,11 @@ func TestJoinAfterNilNode(t *testing.T) { lines = append(lines, fmt.Sprintf(formatNode, line)) } - config := &Config{ + config := test.NewConfig(&Config{ Field: "log", Start: cfg.Regexp(tt.startPat), Continue: cfg.Regexp(tt.continuePat), - } - - err := cfg.Parse(config, nil) - if err != nil { - logger.Panic(err.Error()) - } + }, nil) p, input, output := test.NewPipelineMock( test.NewActionPluginStaticInfo( diff --git a/plugin/action/join_template/join_template_test.go b/plugin/action/join_template/join_template_test.go index ffc6646a4..32d04661c 100644 --- a/plugin/action/join_template/join_template_test.go +++ b/plugin/action/join_template/join_template_test.go @@ -10,7 +10,6 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" - "github.com/ozontech/file.d/cfg" "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/test" ) @@ -417,13 +416,10 @@ func TestSimpleJoin(t *testing.T) { lines = append(lines, fmt.Sprintf(format, line)) } - config := &Config{ + config := test.NewConfig(&Config{ Field: "log", Template: "go_panic", - } - - err := cfg.Parse(config, nil) - require.NoError(t, err) + }, nil) p, input, output := test.NewPipelineMock( test.NewActionPluginStaticInfo( @@ -507,13 +503,10 @@ func TestJoinAfterNilNode(t *testing.T) { lines = append(lines, fmt.Sprintf(formatNode, line)) } - config := &Config{ + config := test.NewConfig(&Config{ Field: "log", Template: "go_panic", - } - - err := cfg.Parse(config, nil) - require.NoError(t, err) + }, nil) p, input, output := test.NewPipelineMock( test.NewActionPluginStaticInfo( diff --git a/plugin/action/json_decode/json_decode_test.go b/plugin/action/json_decode/json_decode_test.go index d80185605..696d7eab8 100644 --- a/plugin/action/json_decode/json_decode_test.go +++ b/plugin/action/json_decode/json_decode_test.go @@ -4,32 +4,19 @@ import ( "sync" "testing" - "github.com/ozontech/file.d/cfg" - "github.com/ozontech/file.d/logger" "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/test" "github.com/stretchr/testify/assert" ) func TestDecode(t *testing.T) { - config := &Config{Field: "log", Prefix: "prefix."} + config := test.NewConfig(&Config{Field: "log", Prefix: "prefix."}, nil) p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false)) wg := &sync.WaitGroup{} wg.Add(1) - err := cfg.Parse(config, nil) - if err != nil { - logger.Panicf("wrong config") - } - - inEvents := 0 - input.SetInFn(func() { - inEvents++ - }) - - outEvents := make([]*pipeline.Event, 0) output.SetOutFn(func(e *pipeline.Event) { - outEvents = append(outEvents, e) + assert.Equal(t, `{"prefix.field2":"value2","prefix.field3":"value3"}`, e.Root.EncodeToString(), "wrong out event") wg.Done() }) @@ -37,8 +24,4 @@ func TestDecode(t *testing.T) { wg.Wait() p.Stop() - - assert.Equal(t, 1, inEvents, "wrong in events count") - assert.Equal(t, 1, len(outEvents), "wrong out events count") - assert.Equal(t, `{"prefix.field2":"value2","prefix.field3":"value3"}`, outEvents[0].Root.EncodeToString(), "wrong out event") } diff --git a/plugin/action/json_encode/json_encode_test.go b/plugin/action/json_encode/json_encode_test.go index 49db9856b..02f1ab6bf 100644 --- a/plugin/action/json_encode/json_encode_test.go +++ b/plugin/action/json_encode/json_encode_test.go @@ -4,32 +4,19 @@ import ( "sync" "testing" - "github.com/ozontech/file.d/cfg" - "github.com/ozontech/file.d/logger" "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/test" "github.com/stretchr/testify/assert" ) func TestEncode(t *testing.T) { - config := &Config{Field: "server"} + config := test.NewConfig(&Config{Field: "server"}, nil) p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false)) wg := &sync.WaitGroup{} wg.Add(1) - err := cfg.Parse(config, nil) - if err != nil { - logger.Panicf("wrong config") - } - - inEvents := 0 - input.SetInFn(func() { - inEvents++ - }) - - outEvents := make([]*pipeline.Event, 0) output.SetOutFn(func(e *pipeline.Event) { - outEvents = append(outEvents, e) + assert.Equal(t, `{"server":"{\"os\":\"linux\",\"arch\":\"amd64\"}"}`, e.Root.EncodeToString(), "wrong out event") wg.Done() }) @@ -37,8 +24,4 @@ func TestEncode(t *testing.T) { wg.Wait() p.Stop() - - assert.Equal(t, 1, inEvents, "wrong in events count") - assert.Equal(t, 1, len(outEvents), "wrong out events count") - assert.Equal(t, `{"server":"{\"os\":\"linux\",\"arch\":\"amd64\"}"}`, outEvents[0].Root.EncodeToString(), "wrong out event") } diff --git a/plugin/action/keep_fields/keep_fields_test.go b/plugin/action/keep_fields/keep_fields_test.go index b352960ed..60699b171 100644 --- a/plugin/action/keep_fields/keep_fields_test.go +++ b/plugin/action/keep_fields/keep_fields_test.go @@ -15,9 +15,9 @@ func TestKeepFields(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(3) - outEvents := make([]*pipeline.Event, 0) + outEvents := make([]string, 0, 3) output.SetOutFn(func(e *pipeline.Event) { - outEvents = append(outEvents, e) + outEvents = append(outEvents, e.Root.EncodeToString()) wg.Done() }) @@ -29,7 +29,7 @@ func TestKeepFields(t *testing.T) { p.Stop() assert.Equal(t, 3, len(outEvents), "wrong out events count") - assert.Equal(t, `{"field_1":"value_1"}`, outEvents[0].Root.EncodeToString(), "wrong event") - assert.Equal(t, `{"field_2":"value_2"}`, outEvents[1].Root.EncodeToString(), "wrong event") - assert.Equal(t, `{}`, outEvents[2].Root.EncodeToString(), "wrong event") + assert.Equal(t, `{"field_1":"value_1"}`, outEvents[0], "wrong event") + assert.Equal(t, `{"field_2":"value_2"}`, outEvents[1], "wrong event") + assert.Equal(t, `{}`, outEvents[2], "wrong event") } diff --git a/plugin/action/mask/mask_test.go b/plugin/action/mask/mask_test.go index 541bbc158..5a411d4d2 100644 --- a/plugin/action/mask/mask_test.go +++ b/plugin/action/mask/mask_test.go @@ -173,14 +173,14 @@ func TestMaskAddExtraField(t *testing.T) { var plugin Plugin - config := Config{ + config := test.NewConfig(&Config{ MaskAppliedField: key, MaskAppliedValue: val, Masks: []Mask{ {Re: kDefaultCardRegExp, Groups: []int{1, 2, 3, 4}}, }, - } - plugin.Start(&config, test.NewEmptyActionPluginParams()) + }, nil) + plugin.Start(config, test.NewEmptyActionPluginParams()) plugin.config.Masks[0].Re_ = regexp.MustCompile(plugin.config.Masks[0].Re) result := plugin.Do(event) @@ -387,7 +387,9 @@ func TestGetValueNodeList(t *testing.T) { for _, s := range suits { t.Run(s.name, func(t *testing.T) { root, err := insaneJSON.DecodeString(s.input) - assert.NoError(t, err, "error on parsing test json") + require.NoError(t, err) + defer insaneJSON.Release(root) + nodes := make([]*insaneJSON.Node, 0) nodes = getValueNodeList(root.Node, nodes) assert.Equal(t, len(nodes), len(s.expected), s.comment) @@ -454,21 +456,36 @@ func TestPlugin(t *testing.T) { }, } - config := createConfig() + config := test.NewConfig(&Config{ + Masks: []Mask{ + { + Re: `a(x*)b`, + Groups: []int{0}, + }, + { + Re: kDefaultCardRegExp, + Groups: []int{1, 2, 3, 4}, + }, + { + Re: kDefaultIDRegExp, + Groups: []int{0}, + }, + }, + }, nil) for _, s := range suits { t.Run(s.name, func(t *testing.T) { sut, input, output := test.NewPipelineMock( - test.NewActionPluginStaticInfo(factory, &config, + test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false)) wg := sync.WaitGroup{} wg.Add(len(s.input)) - outEvents := make([]*pipeline.Event, 0) + outEvents := make([]string, 0, len(s.expected)) output.SetOutFn(func(e *pipeline.Event) { - outEvents = append(outEvents, e) + outEvents = append(outEvents, e.Root.EncodeToString()) wg.Done() }) @@ -480,32 +497,12 @@ func TestPlugin(t *testing.T) { sut.Stop() for i := range s.expected { - assert.Equal(t, s.expected[i], outEvents[i].Root.EncodeToString(), s.comment) + assert.Equal(t, s.expected[i], outEvents[i], s.comment) } }) } } -func createConfig() Config { - config := Config{ - Masks: []Mask{ - { - Re: `a(x*)b`, - Groups: []int{0}, - }, - { - Re: kDefaultCardRegExp, - Groups: []int{1, 2, 3, 4}, - }, - { - Re: kDefaultIDRegExp, - Groups: []int{0}, - }, - }, - } - return config -} - //nolint:funlen func TestPluginWithComplexMasks(t *testing.T) { suits := []struct { @@ -663,22 +660,22 @@ func TestPluginWithComplexMasks(t *testing.T) { for _, s := range suits { t.Run(s.name, func(t *testing.T) { - config := Config{ + config := test.NewConfig(&Config{ Masks: s.masks, AppliedMetricName: s.metricName, AppliedMetricLabels: s.metricLabels, - } + }, nil) sut, input, output := test.NewPipelineMock( - test.NewActionPluginStaticInfo(factory, &config, + test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false)) wg := sync.WaitGroup{} wg.Add(len(s.input)) - outEvents := make([]*pipeline.Event, 0) + outEvents := make([]string, 0, len(s.expected)) output.SetOutFn(func(e *pipeline.Event) { - outEvents = append(outEvents, e) + outEvents = append(outEvents, e.Root.EncodeToString()) wg.Done() }) @@ -690,7 +687,7 @@ func TestPluginWithComplexMasks(t *testing.T) { sut.Stop() for i := range s.expected { - assert.Equal(t, s.expected[i], outEvents[i].Root.EncodeToString(), s.comment) + assert.Equal(t, s.expected[i], outEvents[i], s.comment) } }) } diff --git a/plugin/action/modify/modify_test.go b/plugin/action/modify/modify_test.go index 5f8895d07..32a623d8e 100644 --- a/plugin/action/modify/modify_test.go +++ b/plugin/action/modify/modify_test.go @@ -15,9 +15,9 @@ func TestModify(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) - outEvents := make([]*pipeline.Event, 0) output.SetOutFn(func(e *pipeline.Event) { - outEvents = append(outEvents, e) + assert.Equal(t, "new_value", e.Root.Dig("new_field").AsString(), "wrong event field") + assert.Equal(t, "existing_value", e.Root.Dig("substitution_field").AsString(), "wrong event field") wg.Done() }) @@ -25,8 +25,4 @@ func TestModify(t *testing.T) { wg.Wait() p.Stop() - - assert.Equal(t, 1, len(outEvents), "wrong out events count") - assert.Equal(t, "new_value", outEvents[0].Root.Dig("new_field").AsString(), "wrong field value") - assert.Equal(t, "existing_value", outEvents[0].Root.Dig("substitution_field").AsString(), "wrong field value") } diff --git a/plugin/action/move/README.idoc.md b/plugin/action/move/README.idoc.md new file mode 100644 index 000000000..1a2311ce7 --- /dev/null +++ b/plugin/action/move/README.idoc.md @@ -0,0 +1,5 @@ +# Move plugin +@introduction + +### Config params +@config-params|description \ No newline at end of file diff --git a/plugin/action/move/README.md b/plugin/action/move/README.md new file mode 100755 index 000000000..53f926f3f --- /dev/null +++ b/plugin/action/move/README.md @@ -0,0 +1,116 @@ +# Move plugin +It moves fields to the target field in a certain mode. +> In `allow` mode, the specified `fields` will be moved; +> in `block` mode, the unspecified `fields` will be moved. + +### Examples +```yaml +pipelines: + example_pipeline: + ... + actions: + - type: move + mode: allow + target: other + fields: + - log.stream + - zone + ... +``` +The original event: +```json +{ + "service": "test", + "log": { + "level": "error", + "message": "error occurred", + "ts": "2023-10-30T13:35:33.638720813Z", + "stream": "stderr" + }, + "zone": "z501" +} +``` +The resulting event: +```json +{ + "service": "test", + "log": { + "level": "error", + "message": "error occurred", + "ts": "2023-10-30T13:35:33.638720813Z" + }, + "other": { + "stream": "stderr", + "zone": "z501" + } +} +``` +--- +```yaml +pipelines: + example_pipeline: + ... + actions: + - type: move + mode: block + target: other + fields: + - log + ... +``` +The original event: +```json +{ + "service": "test", + "log": { + "level": "error", + "message": "error occurred", + "ts": "2023-10-30T13:35:33.638720813Z", + "stream": "stderr" + }, + "zone": "z501", + "other": { + "user": "ivanivanov" + } +} +``` +The resulting event: +```json +{ + "log": { + "level": "error", + "message": "error occurred", + "ts": "2023-10-30T13:35:33.638720813Z" + }, + "other": { + "user": "ivanivanov", + "service": "test", + "zone": "z501" + } +} +``` + +### Config params +**`fields`** *`[]cfg.FieldSelector`* *`required`* + +The list of the fields to move. +> In `block` mode, the maximum `fields` depth is 1. + +
+ +**`mode`** *`string`* *`required`* + +The mode of the moving. Available modes are one of: `allow|block`. + +
+ +**`target`** *`cfg.FieldSelector`* *`required`* + +The target field of the moving. +> If the `target` field is existing non-object field, it will be overwritten as object field. + +> In `block` mode, the maximum `target` depth is 1. + +
+ +
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* \ No newline at end of file diff --git a/plugin/action/move/move.go b/plugin/action/move/move.go new file mode 100644 index 000000000..c3e98769f --- /dev/null +++ b/plugin/action/move/move.go @@ -0,0 +1,224 @@ +package move + +import ( + "errors" + "fmt" + + "github.com/ozontech/file.d/cfg" + "github.com/ozontech/file.d/fd" + "github.com/ozontech/file.d/pipeline" + insaneJSON "github.com/vitkovskii/insane-json" +) + +/*{ introduction +It moves fields to the target field in a certain mode. +> In `allow` mode, the specified `fields` will be moved; +> in `block` mode, the unspecified `fields` will be moved. + +### Examples +```yaml +pipelines: + example_pipeline: + ... + actions: + - type: move + mode: allow + target: other + fields: + - log.stream + - zone + ... +``` +The original event: +```json +{ + "service": "test", + "log": { + "level": "error", + "message": "error occurred", + "ts": "2023-10-30T13:35:33.638720813Z", + "stream": "stderr" + }, + "zone": "z501" +} +``` +The resulting event: +```json +{ + "service": "test", + "log": { + "level": "error", + "message": "error occurred", + "ts": "2023-10-30T13:35:33.638720813Z" + }, + "other": { + "stream": "stderr", + "zone": "z501" + } +} +``` +--- +```yaml +pipelines: + example_pipeline: + ... + actions: + - type: move + mode: block + target: other + fields: + - log + ... +``` +The original event: +```json +{ + "service": "test", + "log": { + "level": "error", + "message": "error occurred", + "ts": "2023-10-30T13:35:33.638720813Z", + "stream": "stderr" + }, + "zone": "z501", + "other": { + "user": "ivanivanov" + } +} +``` +The resulting event: +```json +{ + "log": { + "level": "error", + "message": "error occurred", + "ts": "2023-10-30T13:35:33.638720813Z" + }, + "other": { + "user": "ivanivanov", + "service": "test", + "zone": "z501" + } +} +``` +}*/ + +type Plugin struct { + config *Config + fields map[string][]string +} + +const ( + modeAllow string = "allow" + modeBlock string = "block" +) + +// ! config-params +// ^ config-params +type Config struct { + // > @3@4@5@6 + // > + // > The list of the fields to move. + // >> In `block` mode, the maximum `fields` depth is 1. + Fields []cfg.FieldSelector `json:"fields" slice:"true" required:"true"` // * + + // > @3@4@5@6 + // > + // > The mode of the moving. Available modes are one of: `allow|block`. + Mode string `json:"mode" required:"true"` // * + + // > @3@4@5@6 + // > + // > The target field of the moving. + // >> If the `target` field is existing non-object field, it will be overwritten as object field. + // > + // >> In `block` mode, the maximum `target` depth is 1. + Target cfg.FieldSelector `json:"target" parse:"selector" required:"true"` // * + Target_ []string +} + +func (c *Config) validate() error { + if !(c.Mode == modeAllow || c.Mode == modeBlock) { + return fmt.Errorf("invalid mode %q", c.Mode) + } + if c.Mode == modeBlock && len(c.Target_) > 1 { + return errors.New(`in "block" mode, the maximum "target" depth is 1`) + } + return nil +} + +func init() { + fd.DefaultPluginRegistry.RegisterAction(&pipeline.PluginStaticInfo{ + Type: "move", + Factory: factory, + }) +} + +func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) { + return &Plugin{}, &Config{} +} + +func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) { + p.config = config.(*Config) + if err := p.config.validate(); err != nil { + params.Logger.Fatalf("invalid config: %s", err.Error()) + } + + p.fields = make(map[string][]string) + isBlockMode := p.config.Mode == modeBlock + for _, fs := range p.config.Fields { + // in `block` mode, max field depth is 1 + if f := cfg.ParseFieldSelector(string(fs)); len(f) > 0 && (!isBlockMode || len(f) == 1) { + p.fields[f[len(f)-1]] = f + } + } +} + +func (p *Plugin) Stop() {} + +func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { + targetNode := event.Root.Node + continueDig := true + for _, tField := range p.config.Target_ { + if !continueDig { + targetNode = targetNode.AddFieldNoAlloc(event.Root, tField).MutateToObject() + continue + } + + node := targetNode.Dig(tField) + if node == nil { + node = targetNode.AddFieldNoAlloc(event.Root, tField).MutateToObject() + continueDig = false + } else if !node.IsObject() { + node.MutateToObject() + } + targetNode = node + } + + moveNode := func(name string, node *insaneJSON.Node) { + node.Suicide() + targetNode.AddFieldNoAlloc(event.Root, name).MutateToNode(node) + } + + if p.config.Mode == modeAllow { + for name, field := range p.fields { + if node := event.Root.Dig(field...); node != nil && node != targetNode { + moveNode(name, node) + } + } + } else { + for _, node := range event.Root.AsFields() { + value := node.AsFieldValue() + if value == targetNode { + continue + } + + name := node.AsString() + if _, ok := p.fields[name]; !ok { + moveNode(name, value) + } + } + } + + return pipeline.ActionPass +} diff --git a/plugin/action/move/move_test.go b/plugin/action/move/move_test.go new file mode 100644 index 000000000..90dc7182e --- /dev/null +++ b/plugin/action/move/move_test.go @@ -0,0 +1,265 @@ +package move + +import ( + "errors" + "sync" + "testing" + + "github.com/ozontech/file.d/cfg" + "github.com/ozontech/file.d/pipeline" + "github.com/ozontech/file.d/test" + "github.com/stretchr/testify/assert" +) + +func TestConfigValidate(t *testing.T) { + cases := []struct { + name string + config *Config + wantErr error + }{ + { + name: "valid_allow", + config: &Config{ + Mode: modeAllow, + Target: "target1.target2.target3", + }, + }, + { + name: "valid_block", + config: &Config{ + Mode: modeBlock, + Target: "target", + }, + }, + { + name: "invalid_mode", + config: &Config{ + Mode: "unknown", + Target: "target", + }, + wantErr: errors.New(`invalid mode "unknown"`), + }, + { + name: "invalid_block_target", + config: &Config{ + Mode: modeBlock, + Target: "target1.target2.target3", + }, + wantErr: errors.New(`in "block" mode, the maximum "target" depth is 1`), + }, + } + for _, tt := range cases { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + config := test.NewConfig(tt.config, nil).(*Config) + assert.Equal(t, tt.wantErr, config.validate()) + }) + } +} + +func TestMove(t *testing.T) { + cases := []struct { + name string + config *Config + in string + wantTarget map[string]string + }{ + { + name: "allow_simple", + config: &Config{ + Fields: []cfg.FieldSelector{"field1", "field3"}, + Mode: modeAllow, + Target: "target_field", + }, + in: `{"field1":"value1","field2":true,"field3":3}`, + wantTarget: map[string]string{ + "field1": "value1", + "field3": "3", + }, + }, + { + name: "block_simple", + config: &Config{ + Fields: []cfg.FieldSelector{"field1", "field3"}, + Mode: modeBlock, + Target: "target_field", + }, + in: `{"field1":"value1","field2":true,"field3":3}`, + wantTarget: map[string]string{ + "field2": "true", + }, + }, + { + name: "allow_deep_fields", + config: &Config{ + Fields: []cfg.FieldSelector{"field3", "field2.field2_1", "field2.field2_2.field2_2_2"}, + Mode: modeAllow, + Target: "target_field", + }, + in: `{"field1":"value1","field2":{"field2_1":"value2_1","field2_2":{"field2_2_1":100,"field2_2_2":"value2_2_2"}},"field3":3}`, + wantTarget: map[string]string{ + "field3": "3", + "field2_1": "value2_1", + "field2_2_2": "value2_2_2", + }, + }, + { + // in block mode max fields depth is 1, so deep fields will be ignored + name: "block_deep_fields", + config: &Config{ + Fields: []cfg.FieldSelector{"field1", "field2.field2_2"}, + Mode: modeBlock, + Target: "target_field", + }, + in: `{"field1":"value1","field2":{"field2_1":"value2_1","field2_2":{"field2_2_1":100,"field2_2_2":"value2_2_2"}},"field3":3}`, + wantTarget: map[string]string{ + "field2": `{"field2_1":"value2_1","field2_2":{"field2_2_1":100,"field2_2_2":"value2_2_2"}}`, + "field3": "3", + }, + }, + { + name: "allow_unknown_fields", + config: &Config{ + Fields: []cfg.FieldSelector{"unknown1", "unknown2"}, + Mode: modeAllow, + Target: "target_field", + }, + in: `{"field1":"value1","field2":true,"field3":3}`, + wantTarget: map[string]string{}, + }, + { + name: "block_all_fields", + config: &Config{ + Fields: []cfg.FieldSelector{"field1", "field2", "field3"}, + Mode: modeBlock, + Target: "target_field", + }, + in: `{"field1":"value1","field2":true,"field3":3}`, + wantTarget: map[string]string{}, + }, + { + name: "allow_empty_fields", + config: &Config{ + Mode: modeAllow, + Target: "target_field", + }, + in: `{"field1":"value1","field2":true,"field3":3}`, + wantTarget: map[string]string{}, + }, + { + name: "block_empty_fields", + config: &Config{ + Mode: modeBlock, + Target: "target_field", + }, + in: `{"field1":"value1","field2":true,"field3":3}`, + wantTarget: map[string]string{ + "field1": "value1", + "field2": "true", + "field3": "3", + }, + }, + { + name: "allow_deep_target", + config: &Config{ + Fields: []cfg.FieldSelector{"field1", "field3"}, + Mode: modeAllow, + Target: "target1.target2.target3", + }, + in: `{"field1":"value1","field2":true,"field3":3}`, + wantTarget: map[string]string{ + "field1": "value1", + "field3": "3", + }, + }, + { + name: "existing_target", + config: &Config{ + Fields: []cfg.FieldSelector{"field2"}, + Mode: modeAllow, + Target: "field3", + }, + in: `{"field1":"value1","field2":true,"field3":{"field3_1":3}}`, + wantTarget: map[string]string{ + "field3_1": "3", + "field2": "true", + }, + }, + { + name: "existing_target_not_object", + config: &Config{ + Fields: []cfg.FieldSelector{"field2"}, + Mode: modeAllow, + Target: "field3", + }, + in: `{"field1":"value1","field2":true,"field3":3}`, + wantTarget: map[string]string{ + "field2": "true", + }, + }, + { + name: "allow_target_in_fields", + config: &Config{ + Fields: []cfg.FieldSelector{"field2", "field3"}, + Mode: modeAllow, + Target: "field3", + }, + in: `{"field1":"value1","field2":true,"field3":{"field3_1":3}}`, + wantTarget: map[string]string{ + "field3_1": "3", + "field2": "true", + }, + }, + { + name: "block_target_in_fields", + config: &Config{ + Fields: []cfg.FieldSelector{"field1", "field3"}, + Mode: modeBlock, + Target: "field3", + }, + in: `{"field1":"value1","field2":true,"field3":{"field3_1":3}}`, + wantTarget: map[string]string{ + "field3_1": "3", + "field2": "true", + }, + }, + } + for _, tt := range cases { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + config := test.NewConfig(tt.config, nil) + p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false)) + + wg := &sync.WaitGroup{} + wg.Add(1) + + output.SetOutFn(func(e *pipeline.Event) { + target := e.Root.Dig(tt.config.Target_...) + assert.NotNil(t, target, "target is nil") + + assert.Equal(t, len(tt.wantTarget), len(target.AsFields()), "wrong target nodes count") + for name, val := range tt.wantTarget { + node := target.Dig(name) + assert.NotNil(t, node, "node is nil") + + if node.IsObject() { + assert.Equal(t, val, node.EncodeToString(), "wrong node value") + } else { + assert.Equal(t, val, node.AsString(), "wrong node value") + } + } + + wg.Done() + }) + + input.In(0, "test.log", 0, []byte(tt.in)) + + wg.Wait() + p.Stop() + }) + } +} diff --git a/plugin/action/parse_es/pipeline_test.go b/plugin/action/parse_es/pipeline_test.go index cfc11154c..9d4f967b4 100644 --- a/plugin/action/parse_es/pipeline_test.go +++ b/plugin/action/parse_es/pipeline_test.go @@ -6,8 +6,6 @@ import ( "github.com/stretchr/testify/assert" - "github.com/ozontech/file.d/cfg" - "github.com/ozontech/file.d/logger" "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/test" ) @@ -92,28 +90,23 @@ func TestPipeline(t *testing.T) { for _, tCase := range cases { tCase := tCase t.Run(tCase.name, func(t *testing.T) { - config := &Config{} + config := test.NewConfig(&Config{}, nil) p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeOr, nil, false)) - err := cfg.Parse(config, nil) - if err != nil { - logger.Panicf("wrong config") - } - wg := &sync.WaitGroup{} wg.Add(tCase.eventsInCount) wg.Add(tCase.eventsOutCount) eventsIn := 0 input.SetInFn(func() { - wg.Done() eventsIn++ + wg.Done() }) - outEvents := make([]*pipeline.Event, 0) + outEvents := 0 output.SetOutFn(func(e *pipeline.Event) { + outEvents++ wg.Done() - outEvents = append(outEvents, e) }) for _, event := range tCase.eventsIn { @@ -123,8 +116,8 @@ func TestPipeline(t *testing.T) { wg.Wait() p.Stop() - assert.Equal(t, 2, eventsIn, "wrong eventsIn events count") - assert.Equal(t, tCase.eventsOutCount, len(outEvents), "wrong out events count") + assert.Equal(t, tCase.eventsInCount, eventsIn, "wrong eventsIn events count") + assert.Equal(t, tCase.eventsOutCount, outEvents, "wrong out events count") }) } } diff --git a/plugin/action/parse_re2/parse_re2_test.go b/plugin/action/parse_re2/parse_re2_test.go index a6a33efae..54f2d3198 100644 --- a/plugin/action/parse_re2/parse_re2_test.go +++ b/plugin/action/parse_re2/parse_re2_test.go @@ -4,36 +4,24 @@ import ( "sync" "testing" - "github.com/ozontech/file.d/cfg" - "github.com/ozontech/file.d/logger" "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/test" "github.com/stretchr/testify/assert" ) func TestDecode(t *testing.T) { - config := &Config{ + config := test.NewConfig(&Config{ Field: "log", Prefix: "prefix.", Re2: "(?P[\\d]{4}-[\\d]{2}-[\\d]{2} [\\d]{2}:[\\d]{2}:[\\d]{2} GMT) \\[(?P[\\d]+)\\] => \\[(?P[\\d-]+)\\] client=(?P[^,]*),db=(?P[^,]*),user=(?P[^,]*) (LOG|HINT): (?P.+)", - } + }, nil) p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false)) wg := &sync.WaitGroup{} wg.Add(1) - err := cfg.Parse(config, nil) - if err != nil { - logger.Panicf("wrong config") - } - - inEvents := 0 - input.SetInFn(func() { - inEvents++ - }) - - outEvents := make([]*pipeline.Event, 0) output.SetOutFn(func(e *pipeline.Event) { - outEvents = append(outEvents, e) + assert.Equal(t, `{"prefix.date":"2021-06-22 16:24:27 GMT","prefix.pid":"7291","prefix.pid_message_number":"2-1","prefix.client":"test_client","prefix.db":"test_db","prefix.user":"test_user","prefix.message":"listening on IPv4 address \"0.0.0.0\", port 5432"}`, + e.Root.EncodeToString(), "wrong out event") wg.Done() }) @@ -41,34 +29,20 @@ func TestDecode(t *testing.T) { wg.Wait() p.Stop() - - assert.Equal(t, 1, inEvents, "wrong in events count") - assert.Equal(t, 1, len(outEvents), "wrong out events count") - assert.Equal(t, `{"prefix.date":"2021-06-22 16:24:27 GMT","prefix.pid":"7291","prefix.pid_message_number":"2-1","prefix.client":"test_client","prefix.db":"test_db","prefix.user":"test_user","prefix.message":"listening on IPv4 address \"0.0.0.0\", port 5432"}`, outEvents[0].Root.EncodeToString(), "wrong out event") } func TestDecodeAccessLogsJira(t *testing.T) { - config := &Config{ + config := test.NewConfig(&Config{ Field: "message", Re2: "(?P[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}) (?P\\w+) (?P(\\w+|-)) \\[(?P[\\d]{2}/[[:alpha:]]{1,3}/[\\d]{4}:[\\d]{2}:[\\d]{2}:[\\d]{2} [+-][\\d]{4})\\] \"(?P.+)\" (?P([\\d]+|-)) (?P([\\d]+|-)) (?P([\\d]+|-)) \"(?P.+)\" \"(?P.+)\" \"(?P.+)\"", - } + }, nil) p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false)) wg := &sync.WaitGroup{} wg.Add(2) - err := cfg.Parse(config, nil) - if err != nil { - logger.Panicf("wrong config") - } - - inEvents := 0 - input.SetInFn(func() { - inEvents++ - }) - - outEvents := make([]*pipeline.Event, 0) + outEvents := make([]string, 0, 2) output.SetOutFn(func(e *pipeline.Event) { - outEvents = append(outEvents, e) + outEvents = append(outEvents, e.Root.EncodeToString()) wg.Done() }) @@ -78,8 +52,9 @@ func TestDecodeAccessLogsJira(t *testing.T) { wg.Wait() p.Stop() - assert.Equal(t, 2, inEvents, "wrong in events count") assert.Equal(t, 2, len(outEvents), "wrong out events count") - assert.Equal(t, `{"origin_ip":"10.115.195.13","request_id":"0x51320775x2","username":"jira_robot","timestamp":"07/Nov/2022:00:00:00 +0300","method_and_endpoint":"GET /rest/api/2/issue/FRAUD-3847?fields=resolution HTTP/1.1","status_code":"200","bytes_sent":"198","processing_time":"20","accessed_url":"https://jit.o3.ru/secure/RapidBoard.jspa?rapidView=2701&selectedIssue=EXPC-3767&quickFilter=16465&quickFilter=15365","client":"Apache-HttpClient/4.5.13 (Java/11.0.9)","session_id":"nj56zg"}`, outEvents[0].Root.EncodeToString(), "wrong out event") - assert.Equal(t, `{"origin_ip":"10.115.195.12","request_id":"0x51320774x2","username":"ezabelin","timestamp":"07/Nov/2022:00:00:00 +0300","method_and_endpoint":"GET /rest/api/2/issue/RP-4977?fields=resolution HTTP/1.1","status_code":"201","bytes_sent":"158","processing_time":"15","accessed_url":"-","client":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36","session_id":"1tmznt9"}`, outEvents[1].Root.EncodeToString(), "wrong out event") + assert.Equal(t, `{"origin_ip":"10.115.195.13","request_id":"0x51320775x2","username":"jira_robot","timestamp":"07/Nov/2022:00:00:00 +0300","method_and_endpoint":"GET /rest/api/2/issue/FRAUD-3847?fields=resolution HTTP/1.1","status_code":"200","bytes_sent":"198","processing_time":"20","accessed_url":"https://jit.o3.ru/secure/RapidBoard.jspa?rapidView=2701&selectedIssue=EXPC-3767&quickFilter=16465&quickFilter=15365","client":"Apache-HttpClient/4.5.13 (Java/11.0.9)","session_id":"nj56zg"}`, + outEvents[0], "wrong out event") + assert.Equal(t, `{"origin_ip":"10.115.195.12","request_id":"0x51320774x2","username":"ezabelin","timestamp":"07/Nov/2022:00:00:00 +0300","method_and_endpoint":"GET /rest/api/2/issue/RP-4977?fields=resolution HTTP/1.1","status_code":"201","bytes_sent":"158","processing_time":"15","accessed_url":"-","client":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36","session_id":"1tmznt9"}`, + outEvents[1], "wrong out event") } diff --git a/plugin/action/remove_fields/remove_fields_test.go b/plugin/action/remove_fields/remove_fields_test.go index 01c8f4206..9d7c24858 100644 --- a/plugin/action/remove_fields/remove_fields_test.go +++ b/plugin/action/remove_fields/remove_fields_test.go @@ -10,13 +10,14 @@ import ( ) func TestRemoveFields(t *testing.T) { - p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, &Config{Fields: []string{"field_1", "field_2"}}, pipeline.MatchModeAnd, nil, false)) + config := test.NewConfig(&Config{Fields: []string{"field_1", "field_2"}}, nil) + p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false)) wg := &sync.WaitGroup{} wg.Add(3) - outEvents := make([]*pipeline.Event, 0) + outEvents := make([]string, 0, 3) output.SetOutFn(func(e *pipeline.Event) { - outEvents = append(outEvents, e) + outEvents = append(outEvents, e.Root.EncodeToString()) wg.Done() }) @@ -28,7 +29,7 @@ func TestRemoveFields(t *testing.T) { p.Stop() assert.Equal(t, 3, len(outEvents), "wrong out events count") - assert.Equal(t, `{"a":"b"}`, outEvents[0].Root.EncodeToString(), "wrong event") - assert.Equal(t, `{"b":"c"}`, outEvents[1].Root.EncodeToString(), "wrong event") - assert.Equal(t, `{"field_3":"value_3","a":"b"}`, outEvents[2].Root.EncodeToString(), "wrong event") + assert.Equal(t, `{"a":"b"}`, outEvents[0], "wrong event") + assert.Equal(t, `{"b":"c"}`, outEvents[1], "wrong event") + assert.Equal(t, `{"field_3":"value_3","a":"b"}`, outEvents[2], "wrong event") } diff --git a/plugin/action/rename/rename_test.go b/plugin/action/rename/rename_test.go index 34588a9f7..841f6ff4a 100644 --- a/plugin/action/rename/rename_test.go +++ b/plugin/action/rename/rename_test.go @@ -25,9 +25,9 @@ func TestRename(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(5) - outEvents := make([]*pipeline.Event, 0) + outEvents := make([]string, 0, 5) output.SetOutFn(func(e *pipeline.Event) { - outEvents = append(outEvents, e) + outEvents = append(outEvents, e.Root.EncodeToString()) wg.Done() }) @@ -41,16 +41,11 @@ func TestRename(t *testing.T) { p.Stop() assert.Equal(t, 5, len(outEvents), "wrong out events count") - assert.Equal(t, "value_1", outEvents[0].Root.Dig("renamed_field_1").AsString(), "wrong field value") - assert.Equal(t, "value_2", outEvents[1].Root.Dig("renamed_field_2").AsString(), "wrong field value") - assert.Equal(t, "value_3", outEvents[2].Root.Dig("field_3").AsString(), "wrong field value") - assert.Equal(t, "value_5", outEvents[3].Root.Dig("renamed_field_5").AsString(), "wrong field value") - assert.Equal(t, "value_6", outEvents[4].Root.Dig("renamed_field.escaped").AsString(), "wrong field value") - assert.Nil(t, outEvents[0].Root.Dig("field_1"), "field isn't nil") - assert.Nil(t, outEvents[1].Root.Dig("field_2"), "field isn't nil") - assert.Nil(t, outEvents[2].Root.Dig("renamed_field_3"), "field isn't nil") - assert.Nil(t, outEvents[3].Root.Dig("field_4", "field_5"), "field isn't nil") - assert.Nil(t, outEvents[4].Root.Dig("k8s_node_label_topology\\.kubernetes\\.io/zone"), "field isn't nil") + assert.Equal(t, `{"renamed_field_1":"value_1"}`, outEvents[0], "wrong event json") + assert.Equal(t, `{"renamed_field_2":"value_2"}`, outEvents[1], "wrong event json") + assert.Equal(t, `{"field_3":"value_3"}`, outEvents[2], "wrong event json") + assert.Equal(t, `{"field_4":{},"renamed_field_5":"value_5"}`, outEvents[3], "wrong event json") + assert.Equal(t, `{"renamed_field.escaped":"value_6"}`, outEvents[4], "wrong event json") } func TestRenamingSequence(t *testing.T) { @@ -71,9 +66,8 @@ func TestRenamingSequence(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) - outEvents := make([]string, 0) output.SetOutFn(func(e *pipeline.Event) { - outEvents = append(outEvents, e.Root.EncodeToString()) + r.Equal(`{"key8":"value_1"}`, e.Root.EncodeToString()) wg.Done() }) @@ -81,9 +75,6 @@ func TestRenamingSequence(t *testing.T) { wg.Wait() p.Stop() - - r.Equal(1, len(outEvents)) - r.Equal(`{"key8":"value_1"}`, outEvents[0]) } func TestUnescapeMap(t *testing.T) { diff --git a/plugin/action/set_time/set_time_test.go b/plugin/action/set_time/set_time_test.go index 80e384fdb..cb91f099e 100644 --- a/plugin/action/set_time/set_time_test.go +++ b/plugin/action/set_time/set_time_test.go @@ -5,7 +5,6 @@ import ( "testing" "time" - "github.com/ozontech/file.d/cfg" "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/test" "github.com/stretchr/testify/require" @@ -116,17 +115,14 @@ func TestPlugin_Do(t *testing.T) { t.Run(tc.Name, func(t *testing.T) { require.NoError(t, root.DecodeString(tc.Root)) - err := cfg.Parse(tc.Config, nil) - require.NoError(t, err) + config := test.NewConfig(tc.Config, nil) - plugin := &Plugin{ - config: tc.Config, - } + plugin := &Plugin{} event := &pipeline.Event{ Root: root, } - plugin.Start(tc.Config, nil) + plugin.Start(config, nil) result := plugin.do(event, now) require.Equal(t, tc.ExpResult, result) @@ -136,17 +132,12 @@ func TestPlugin_Do(t *testing.T) { } func TestE2E_Plugin(t *testing.T) { - config := &Config{Format: pipeline.UnixTime, Field: "timestamp"} - - err := cfg.Parse(config, nil) - require.NoError(t, err) - + config := test.NewConfig(&Config{Format: pipeline.UnixTime, Field: "timestamp"}, nil) p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false)) - outEvents := make([]*pipeline.Event, 0) counter := atomic.Int32{} output.SetOutFn(func(e *pipeline.Event) { - outEvents = append(outEvents, e) + require.NotEqual(t, "", e.Root.Dig("timestamp").AsString(), "wrong out event") counter.Dec() }) @@ -157,9 +148,4 @@ func TestE2E_Plugin(t *testing.T) { time.Sleep(time.Millisecond * 10) } p.Stop() - - require.Equal(t, 1, len(outEvents), "wrong out events count") - timestamp := outEvents[0].Root.Dig("timestamp") - require.NotNil(t, timestamp) - require.NotEqual(t, "", timestamp.EncodeToString(), "wrong out event") } diff --git a/plugin/action/throttle/throttle_test.go b/plugin/action/throttle/throttle_test.go index b13c857fa..5f9832d48 100644 --- a/plugin/action/throttle/throttle_test.go +++ b/plugin/action/throttle/throttle_test.go @@ -247,7 +247,7 @@ func TestRedisThrottle(t *testing.T) { defaultLimit := 3 eventsTotal := 3 - config := &Config{ + config := test.NewConfig(&Config{ Rules: []RuleConfig{ {Limit: int64(defaultLimit), LimitKind: "count"}, }, @@ -263,16 +263,12 @@ func TestRedisThrottle(t *testing.T) { ThrottleField: "k8s_pod", TimeField: "", DefaultLimit: int64(defaultLimit), - } - err = cfg.Parse(config, nil) - if err != nil { - logger.Panic(err.Error()) - } + }, nil) p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false)) - outEvents := make([]*pipeline.Event, 0) + outEvents := 0 output.SetOutFn(func(e *pipeline.Event) { - outEvents = append(outEvents, e) + outEvents++ }) sourceNames := []string{ @@ -297,7 +293,7 @@ func TestRedisThrottle(t *testing.T) { p.Stop() - assert.Greater(t, eventsTotal, len(outEvents), "wrong in events count") + assert.Greater(t, eventsTotal, outEvents, "wrong in events count") t.Cleanup(func() { throttleMapsCleanup() }) @@ -310,7 +306,7 @@ func TestRedisThrottleMultiPipes(t *testing.T) { defaultLimit := 20 - config := Config{ + config := test.NewConfig(&Config{ Rules: []RuleConfig{ {Limit: int64(defaultLimit), LimitKind: "count"}, }, @@ -326,26 +322,24 @@ func TestRedisThrottleMultiPipes(t *testing.T) { ThrottleField: "k8s_pod", TimeField: "", DefaultLimit: int64(defaultLimit), - } - err = cfg.Parse(&config, nil) - require.NoError(t, err) + }, nil) muFirstPipe := sync.Mutex{} - p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, &config, pipeline.MatchModeAnd, nil, false), "name") - outEvents := make([]*pipeline.Event, 0) + p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false), "name") + outEvents := 0 output.SetOutFn(func(e *pipeline.Event) { muFirstPipe.Lock() defer muFirstPipe.Unlock() - outEvents = append(outEvents, e) + outEvents++ }) muSecPipe := sync.Mutex{} - pSec, inputSec, outputSec := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, &config, pipeline.MatchModeAnd, nil, false), "name") - outEventsSec := make([]*pipeline.Event, 0) + pSec, inputSec, outputSec := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false), "name") + outEventsSec := 0 outputSec.SetOutFn(func(e *pipeline.Event) { muSecPipe.Lock() defer muSecPipe.Unlock() - outEventsSec = append(outEventsSec, e) + outEventsSec++ }) // set distributed redis limit @@ -377,7 +371,7 @@ func TestRedisThrottleMultiPipes(t *testing.T) { time.Sleep(100 * time.Millisecond) } // limit is 1 while events count is 3 - assert.Greater(t, len(firstPipeEvents), len(outEvents), "wrong in events count") + assert.Greater(t, len(firstPipeEvents), outEvents, "wrong in events count") for i := 0; i < len(secondPipeEvents); i++ { json := fmt.Sprintf(secondPipeEvents[i], time.Now().Format(time.RFC3339Nano)) @@ -390,7 +384,7 @@ func TestRedisThrottleMultiPipes(t *testing.T) { defer muSecPipe.Unlock() // limit is 10 while events count 4, all passed - assert.Equal(t, len(secondPipeEvents), len(outEventsSec), "wrong in events count") + assert.Equal(t, len(secondPipeEvents), outEventsSec, "wrong in events count") t.Cleanup(func() { throttleMapsCleanup() }) @@ -406,7 +400,7 @@ func TestRedisThrottleWithCustomLimitData(t *testing.T) { defaultLimit := 3 eventsTotal := 3 - config := &Config{ + config := test.NewConfig(&Config{ Rules: []RuleConfig{ {Limit: int64(defaultLimit), LimitKind: "count"}, }, @@ -425,19 +419,15 @@ func TestRedisThrottleWithCustomLimitData(t *testing.T) { ThrottleField: "k8s_pod", TimeField: "", DefaultLimit: int64(defaultLimit), - } - err = cfg.Parse(config, nil) - if err != nil { - logger.Panic(err.Error()) - } + }, nil) p, input, output := test.NewPipelineMock( test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false), "name", ) - outEvents := make([]*pipeline.Event, 0) + outEvents := 0 output.SetOutFn(func(e *pipeline.Event) { - outEvents = append(outEvents, e) + outEvents++ }) sourceNames := []string{ @@ -463,7 +453,7 @@ func TestRedisThrottleWithCustomLimitData(t *testing.T) { p.Stop() - assert.Greater(t, eventsTotal, len(outEvents), "wrong in events count") + assert.Greater(t, eventsTotal, outEvents, "wrong in events count") t.Cleanup(func() { throttleMapsCleanup() }) @@ -472,7 +462,7 @@ func TestRedisThrottleWithCustomLimitData(t *testing.T) { func TestThrottleLimiterExpiration(t *testing.T) { defaultLimit := 3 eventsTotal := 3 - config := &Config{ + config := test.NewConfig(&Config{ Rules: []RuleConfig{ {Limit: int64(defaultLimit), LimitKind: "count"}, }, @@ -482,20 +472,12 @@ func TestThrottleLimiterExpiration(t *testing.T) { TimeField: "", DefaultLimit: int64(defaultLimit), LimiterExpiration: "300ms", - } - err := cfg.Parse(config, nil) - if err != nil { - logger.Panic(err.Error()) - } + }, nil) - p, input, output := test.NewPipelineMock( + p, input, _ := test.NewPipelineMock( test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false), "name", ) - outEvents := make([]*pipeline.Event, 0) - output.SetOutFn(func(e *pipeline.Event) { - outEvents = append(outEvents, e) - }) sourceNames := []string{ `source_1`, diff --git a/plugin/output/kafka/README.md b/plugin/output/kafka/README.md index f0a68b60c..c538a1ead 100755 --- a/plugin/output/kafka/README.md +++ b/plugin/output/kafka/README.md @@ -57,7 +57,7 @@ If set, the plugin will use SASL authentications mechanism.
-**`sasl_mechanism`** *`string`* *`default=SCRAM-SHA-512`* *`options=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512`* +**`sasl_mechanism`** *`string`* *`default=SCRAM-SHA-512`* *`options=PLAIN|SCRAM-SHA-256|SCRAM-SHA-512`* SASL mechanism to use.