Skip to content

Commit

Permalink
Add move plugin (#534)
Browse files Browse the repository at this point in the history
* Add move plugin

* Fix races in tests

---------

Co-authored-by: Yaroslav Kirillov <[email protected]>
  • Loading branch information
kirillov6 and Yaroslav Kirillov authored Nov 13, 2023
1 parent beb2d16 commit 87aa254
Show file tree
Hide file tree
Showing 28 changed files with 1,071 additions and 488 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ TBD: throughput on production servers.

**Input**: [dmesg](plugin/input/dmesg/README.md), [fake](plugin/input/fake/README.md), [file](plugin/input/file/README.md), [http](plugin/input/http/README.md), [journalctl](plugin/input/journalctl/README.md), [k8s](plugin/input/k8s/README.md), [kafka](plugin/input/kafka/README.md)

**Action**: [add_file_name](plugin/action/add_file_name/README.md), [add_host](plugin/action/add_host/README.md), [convert_date](plugin/action/convert_date/README.md), [convert_log_level](plugin/action/convert_log_level/README.md), [debug](plugin/action/debug/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [join](plugin/action/join/README.md), [join_template](plugin/action/join_template/README.md), [json_decode](plugin/action/json_decode/README.md), [json_encode](plugin/action/json_encode/README.md), [keep_fields](plugin/action/keep_fields/README.md), [mask](plugin/action/mask/README.md), [modify](plugin/action/modify/README.md), [parse_es](plugin/action/parse_es/README.md), [parse_re2](plugin/action/parse_re2/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [set_time](plugin/action/set_time/README.md), [throttle](plugin/action/throttle/README.md)
**Action**: [add_file_name](plugin/action/add_file_name/README.md), [add_host](plugin/action/add_host/README.md), [convert_date](plugin/action/convert_date/README.md), [convert_log_level](plugin/action/convert_log_level/README.md), [debug](plugin/action/debug/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [join](plugin/action/join/README.md), [join_template](plugin/action/join_template/README.md), [json_decode](plugin/action/json_decode/README.md), [json_encode](plugin/action/json_encode/README.md), [keep_fields](plugin/action/keep_fields/README.md), [mask](plugin/action/mask/README.md), [modify](plugin/action/modify/README.md), [move](plugin/action/move/README.md), [parse_es](plugin/action/parse_es/README.md), [parse_re2](plugin/action/parse_re2/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [set_time](plugin/action/set_time/README.md), [throttle](plugin/action/throttle/README.md)

**Output**: [clickhouse](plugin/output/clickhouse/README.md), [devnull](plugin/output/devnull/README.md), [elasticsearch](plugin/output/elasticsearch/README.md), [file](plugin/output/file/README.md), [gelf](plugin/output/gelf/README.md), [kafka](plugin/output/kafka/README.md), [postgres](plugin/output/postgres/README.md), [s3](plugin/output/s3/README.md), [splunk](plugin/output/splunk/README.md), [stdout](plugin/output/stdout/README.md)

Expand Down
1 change: 1 addition & 0 deletions _sidebar.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
- [keep_fields](plugin/action/keep_fields/README.md)
- [mask](plugin/action/mask/README.md)
- [modify](plugin/action/modify/README.md)
- [move](plugin/action/move/README.md)
- [parse_es](plugin/action/parse_es/README.md)
- [parse_re2](plugin/action/parse_re2/README.md)
- [remove_fields](plugin/action/remove_fields/README.md)
Expand Down
93 changes: 93 additions & 0 deletions plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,99 @@ The resulting event could look like:
```

[More details...](plugin/action/modify/README.md)
## move
It moves fields to the target field in a certain mode.
> In `allow` mode, the specified `fields` will be moved;
> in `block` mode, the unspecified `fields` will be moved.

### Examples
```yaml
pipelines:
example_pipeline:
...
actions:
- type: move
mode: allow
target: other
fields:
- log.stream
- zone
...
```
The original event:
```json
{
"service": "test",
"log": {
"level": "error",
"message": "error occurred",
"ts": "2023-10-30T13:35:33.638720813Z",
"stream": "stderr"
},
"zone": "z501"
}
```
The resulting event:
```json
{
"service": "test",
"log": {
"level": "error",
"message": "error occurred",
"ts": "2023-10-30T13:35:33.638720813Z"
},
"other": {
"stream": "stderr",
"zone": "z501"
}
}
```
---
```yaml
pipelines:
example_pipeline:
...
actions:
- type: move
mode: block
target: other
fields:
- log
...
```
The original event:
```json
{
"service": "test",
"log": {
"level": "error",
"message": "error occurred",
"ts": "2023-10-30T13:35:33.638720813Z",
"stream": "stderr"
},
"zone": "z501",
"other": {
"user": "ivanivanov"
}
}
```
The resulting event:
```json
{
"log": {
"level": "error",
"message": "error occurred",
"ts": "2023-10-30T13:35:33.638720813Z"
},
"other": {
"user": "ivanivanov",
"service": "test",
"zone": "z501"
}
}
```

[More details...](plugin/action/move/README.md)
## parse_es
It parses HTTP input using Elasticsearch `/_bulk` API format. It converts sources defining create/index actions to the events. Update/delete actions are ignored.
> Check out the details in [Elastic Bulk API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html).
Expand Down
93 changes: 93 additions & 0 deletions plugin/action/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,99 @@ The resulting event could look like:
```

[More details...](plugin/action/modify/README.md)
## move
It moves fields to the target field in a certain mode.
> In `allow` mode, the specified `fields` will be moved;
> in `block` mode, the unspecified `fields` will be moved.

### Examples
```yaml
pipelines:
example_pipeline:
...
actions:
- type: move
mode: allow
target: other
fields:
- log.stream
- zone
...
```
The original event:
```json
{
"service": "test",
"log": {
"level": "error",
"message": "error occurred",
"ts": "2023-10-30T13:35:33.638720813Z",
"stream": "stderr"
},
"zone": "z501"
}
```
The resulting event:
```json
{
"service": "test",
"log": {
"level": "error",
"message": "error occurred",
"ts": "2023-10-30T13:35:33.638720813Z"
},
"other": {
"stream": "stderr",
"zone": "z501"
}
}
```
---
```yaml
pipelines:
example_pipeline:
...
actions:
- type: move
mode: block
target: other
fields:
- log
...
```
The original event:
```json
{
"service": "test",
"log": {
"level": "error",
"message": "error occurred",
"ts": "2023-10-30T13:35:33.638720813Z",
"stream": "stderr"
},
"zone": "z501",
"other": {
"user": "ivanivanov"
}
}
```
The resulting event:
```json
{
"log": {
"level": "error",
"message": "error occurred",
"ts": "2023-10-30T13:35:33.638720813Z"
},
"other": {
"user": "ivanivanov",
"service": "test",
"zone": "z501"
}
}
```

[More details...](plugin/action/move/README.md)
## parse_es
It parses HTTP input using Elasticsearch `/_bulk` API format. It converts sources defining create/index actions to the events. Update/delete actions are ignored.
> Check out the details in [Elastic Bulk API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html).
Expand Down
13 changes: 4 additions & 9 deletions plugin/action/add_file_name/add_file_name_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package add_file_name

import (
"strings"
"sync"
"testing"

Expand All @@ -16,19 +15,15 @@ func TestModify(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(2)

outEvents := make([]string, 0)
sourceName := "my_file"
output.SetOutFn(func(e *pipeline.Event) {
outEvents = append(outEvents, strings.Clone(e.Root.Dig("file").AsString()))
assert.Equal(t, sourceName, e.Root.Dig("file").AsString(), "wrong field value")
wg.Done()
})

input.In(0, "my_file", 0, []byte(`{"error":"info about error"}`))
input.In(0, "my_file", 0, []byte(`{"file":"not_my_file"}`))
input.In(0, sourceName, 0, []byte(`{"error":"info about error"}`))
input.In(0, sourceName, 0, []byte(`{"file":"not_my_file"}`))

wg.Wait()
p.Stop()

assert.Equal(t, 2, len(outEvents), "wrong out events count")
assert.Equal(t, "my_file", outEvents[0], "wrong field value")
assert.Equal(t, "my_file", outEvents[1], "wrong field value")
}
9 changes: 2 additions & 7 deletions plugin/action/add_host/add_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,14 @@ func TestModify(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)

outEvents := make([]*pipeline.Event, 0)
host, _ := os.Hostname()
output.SetOutFn(func(e *pipeline.Event) {
outEvents = append(outEvents, e)
assert.Equal(t, host, e.Root.Dig("hostname").AsString(), "wrong field value")
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{}`))

wg.Wait()
p.Stop()

host, _ := os.Hostname()

assert.Equal(t, 1, len(outEvents), "wrong out events count")
assert.Equal(t, host, outEvents[0].Root.Dig("hostname").AsString(), "wrong field value")
}
47 changes: 8 additions & 39 deletions plugin/action/convert_date/convert_date_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,20 @@ import (
"sync"
"testing"

"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/test"
"github.com/stretchr/testify/assert"
)

func TestConvert(t *testing.T) {
config := &Config{SourceFormats: []string{"rfc3339nano", "rfc3339", "ansic", pipeline.UnixTime, "nginx_errorlog"}}

err := cfg.Parse(config, nil)
if err != nil {
logger.Panicf("wrong config")
}

config := test.NewConfig(&Config{SourceFormats: []string{"rfc3339nano", "rfc3339", "ansic", pipeline.UnixTime, "nginx_errorlog"}}, nil)
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false))
wg := &sync.WaitGroup{}
wg.Add(3)

inEvents := 0
input.SetInFn(func() {
inEvents++
})

outEvents := make([]*pipeline.Event, 0)
outEvents := make([]string, 0, 3)
output.SetOutFn(func(e *pipeline.Event) {
outEvents = append(outEvents, e)
outEvents = append(outEvents, e.Root.EncodeToString())
wg.Done()
})

Expand All @@ -41,43 +28,25 @@ func TestConvert(t *testing.T) {
wg.Wait()
p.Stop()

assert.Equal(t, 3, inEvents, "wrong in events count")
assert.Equal(t, 3, len(outEvents), "wrong out events count")

assert.Equal(t, `{"time":998578502}`, outEvents[0].Root.EncodeToString(), "wrong out event")
assert.Equal(t, `{"time":998578999}`, outEvents[1].Root.EncodeToString(), "wrong out event")
assert.Equal(t, `{"time":1644239174}`, outEvents[2].Root.EncodeToString(), "wrong out event")
assert.Equal(t, `{"time":998578502}`, outEvents[0], "wrong out event")
assert.Equal(t, `{"time":998578999}`, outEvents[1], "wrong out event")
assert.Equal(t, `{"time":1644239174}`, outEvents[2], "wrong out event")
}

func TestConvertFail(t *testing.T) {
config := &Config{SourceFormats: []string{"rfc3339nano", "rfc3339", "ansic"}, RemoveOnFail: true}

err := cfg.Parse(config, nil)
if err != nil {
logger.Panicf("wrong config")
}

config := test.NewConfig(&Config{SourceFormats: []string{"rfc3339nano", "rfc3339", "ansic"}, RemoveOnFail: true}, nil)
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false))
wg := &sync.WaitGroup{}
wg.Add(1)

inEvents := 0
input.SetInFn(func() {
inEvents++
})

outEvents := make([]*pipeline.Event, 0)
output.SetOutFn(func(e *pipeline.Event) {
outEvents = append(outEvents, e)
assert.Equal(t, `{}`, e.Root.EncodeToString(), "wrong out event")
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{"time":"XXX"}`))

wg.Wait()
p.Stop()

assert.Equal(t, 1, inEvents, "wrong in events count")
assert.Equal(t, 1, len(outEvents), "wrong out events count")
assert.Equal(t, `{}`, outEvents[0].Root.EncodeToString(), "wrong out event")
}
6 changes: 2 additions & 4 deletions plugin/action/convert_log_level/convert_log_level_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"testing"
"time"

"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/test"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -224,9 +223,8 @@ func TestDo(t *testing.T) {

for _, tc := range tcs {
t.Run(tc.Name, func(t *testing.T) {
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, &tc.Config, pipeline.MatchModeAnd, nil, false))
err := cfg.Parse(&tc.Config, nil)
require.NoError(t, err)
config := test.NewConfig(&tc.Config, nil)
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false))

outCounter := atomic.NewInt32(int32(len(tc.In)))

Expand Down
Loading

0 comments on commit 87aa254

Please sign in to comment.