Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve json extract #718

Merged
merged 9 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ It transforms `{"server":{"os":"linux","arch":"amd64"}}` into `{"server":"{\"os\

[More details...](plugin/action/json_encode/README.md)
## json_extract
It extracts a field from JSON-encoded event field and adds extracted field to the event root.
It extracts fields from JSON-encoded event field and adds extracted fields to the event root.
> If extracted field already exists in the event root, it will be overridden.

[More details...](plugin/action/json_extract/README.md)
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ It transforms `{"server":{"os":"linux","arch":"amd64"}}` into `{"server":"{\"os\

[More details...](plugin/action/json_encode/README.md)
## json_extract
It extracts a field from JSON-encoded event field and adds extracted field to the event root.
It extracts fields from JSON-encoded event field and adds extracted fields to the event root.
> If extracted field already exists in the event root, it will be overridden.

[More details...](plugin/action/json_extract/README.md)
Expand Down
6 changes: 3 additions & 3 deletions plugin/action/json_extract/README.idoc.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# JSON extract plugin
@introduction

### Examples
## Examples
@examples

### Benchmarks
## Benchmarks
@benchmarks

### Config params
## Config params
@config-params|description
42 changes: 28 additions & 14 deletions plugin/action/json_extract/README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
# JSON extract plugin
It extracts a field from JSON-encoded event field and adds extracted field to the event root.
It extracts fields from JSON-encoded event field and adds extracted fields to the event root.
> If extracted field already exists in the event root, it will be overridden.

### Examples
## Examples
```yaml
pipelines:
example_pipeline:
...
actions:
- type: json_extract
field: log
extract_field: error.code
extract_fields:
- error.code
- level
...
```
The original event:
Expand All @@ -25,32 +27,44 @@ The resulting event:
{
"log": "{\"level\":\"error\",\"message\":\"error occurred\",\"service\":\"my-service\",\"error\":{\"code\":2,\"args\":[]}}",
"time": "2024-03-01T10:49:28.263317941Z",
"code": 2
"code": 2,
"level": "error"
}
```

### Benchmarks
## Benchmarks
Performance comparison of `json_extract` and `json_decode` plugins.
`json_extract` on average 3 times faster than `json_decode`.
`json_extract` on average 2.5 times faster than `json_decode` and
doesn't allocate memory during the extract process.

### Extract 1 field
| json (length) | json_extract (time ns) | json_decode (time ns) |
|---------------|------------------------|-----------------------|
| 129 | 33 | 176 |
| 309 | 264 | 520 |
| 2109 | 2263 | 6778 |
| 10909 | 11289 | 32205 |
| 21909 | 23277 | 62819 |
| 309 | 300 | 560 |
| 2109 | 2570 | 7250 |
| 10909 | 13550 | 34250 |
| 21909 | 26000 | 67940 |
| 237909 | 262500 | 741530 |

### Config params
### Extract 5 fields
| json (length) | json_extract (time ns) | json_decode (time ns) |
|---------------|------------------------|-----------------------|
| 309 | 450 | 685 |
| 2109 | 2990 | 7410 |
| 10909 | 14540 | 35000 |
| 21909 | 28340 | 69950 |
| 237909 | 286600 | 741600 |

## Config params
**`field`** *`cfg.FieldSelector`* *`required`*

The event field from which to extract. Must be a string.

<br>

**`extract_field`** *`cfg.FieldSelector`* *`required`*
**`extract_fields`** *`[]cfg.FieldSelector`* *`required`*

Field to extract.
Fields to extract.

<br>

Expand Down
103 changes: 67 additions & 36 deletions plugin/action/json_extract/json_extract.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package json_extract

import (
"bytes"

"github.com/go-faster/jx"
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
Expand All @@ -11,7 +9,7 @@ import (
)

/*{ introduction
It extracts a field from JSON-encoded event field and adds extracted field to the event root.
It extracts fields from JSON-encoded event field and adds extracted fields to the event root.
> If extracted field already exists in the event root, it will be overridden.
}*/

Expand All @@ -23,7 +21,9 @@ pipelines:
actions:
- type: json_extract
field: log
extract_field: error.code
extract_fields:
- error.code
- level
...
```
The original event:
Expand All @@ -38,27 +38,41 @@ The resulting event:
{
"log": "{\"level\":\"error\",\"message\":\"error occurred\",\"service\":\"my-service\",\"error\":{\"code\":2,\"args\":[]}}",
"time": "2024-03-01T10:49:28.263317941Z",
"code": 2
"code": 2,
"level": "error"
}
```
}*/

/*{ benchmarks
Performance comparison of `json_extract` and `json_decode` plugins.
`json_extract` on average 3 times faster than `json_decode`.
`json_extract` on average 2.5 times faster than `json_decode` and
doesn't allocate memory during the extract process.

### Extract 1 field
| json (length) | json_extract (time ns) | json_decode (time ns) |
|---------------|------------------------|-----------------------|
| 309 | 300 | 560 |
| 2109 | 2570 | 7250 |
| 10909 | 13550 | 34250 |
| 21909 | 26000 | 67940 |
| 237909 | 262500 | 741530 |

### Extract 5 fields
| json (length) | json_extract (time ns) | json_decode (time ns) |
|---------------|------------------------|-----------------------|
| 129 | 33 | 176 |
| 309 | 264 | 520 |
| 2109 | 2263 | 6778 |
| 10909 | 11289 | 32205 |
| 21909 | 23277 | 62819 |
| 309 | 450 | 685 |
| 2109 | 2990 | 7410 |
| 10909 | 14540 | 35000 |
| 21909 | 28340 | 69950 |
| 237909 | 286600 | 741600 |
}*/

type Plugin struct {
config *Config
decoder *jx.Decoder
config *Config

extractFields *pathTree
decoder *jx.Decoder
}

// ! config-params
Expand All @@ -72,9 +86,8 @@ type Config struct {

// > @3@4@5@6
// >
// > Field to extract.
ExtractField cfg.FieldSelector `json:"extract_field" parse:"selector" required:"true"` // *
ExtractField_ []string
// > Fields to extract.
ExtractFields []cfg.FieldSelector `json:"extract_fields" slice:"true" required:"true"` // *
}

func init() {
Expand All @@ -91,6 +104,11 @@ func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
func (p *Plugin) Start(config pipeline.AnyConfig, _ *pipeline.ActionPluginParams) {
p.config = config.(*Config)
p.decoder = &jx.Decoder{}

p.extractFields = newPathTree()
for _, f := range p.config.ExtractFields {
p.extractFields.add(cfg.ParseFieldSelector(string(f)))
}
}

func (p *Plugin) Stop() {}
Expand All @@ -102,36 +120,48 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
}

p.decoder.ResetBytes(jsonNode.AsBytes())
extract(event.Root, p.decoder, p.config.ExtractField_, 0, false)
extract(event.Root, p.decoder, p.extractFields.root.children, false)
return pipeline.ActionPass
}

// extract extracts field from decoder and adds it to the root.
// `skipAddField` flag is required for proper benchmarking.
func extract(root *insaneJSON.Root, d *jx.Decoder, field []string, depth int, skipAddField bool) {
// extract extracts fields from decoder and adds it to the root.
//
// [skipAddField] flag is required for proper benchmarking.
func extract(root *insaneJSON.Root, d *jx.Decoder, fields pathNodes, skipAddField bool) {
objIter, err := d.ObjIter()
if err != nil {
return
}

processed := len(fields)
for objIter.Next() {
if bytes.Equal(objIter.Key(), pipeline.StringToByteUnsafe(field[depth])) {
if depth == len(field)-1 { // add field
if skipAddField {
_ = d.Skip()
} else {
addField(root, field[depth], d)
}
} else { // go deep
raw, err := d.Raw()
if err != nil {
break
}
d.ResetBytes(raw)
extract(root, d, field, depth+1, skipAddField)
n := fields.find(string(objIter.Key()))
if n == nil {
if err = d.Skip(); err != nil {
break
}
break
} else if err = d.Skip(); err != nil {
continue
}

// last field in path, add to root
if len(n.children) == 0 {
if skipAddField {
_ = d.Skip()
} else {
addField(root, n.data, d)
}
} else { // go deep
_ = d.Capture(func(d *jx.Decoder) error {
extract(root, d, n.children, skipAddField)
return nil
})
if err = d.Skip(); err != nil {
break
}
}

processed--
if processed == 0 {
break
}
}
Expand All @@ -154,6 +184,7 @@ func addField(root *insaneJSON.Root, field string, d *jx.Decoder) {
s, _ := d.StrBytes()
root.AddFieldNoAlloc(root, field).MutateToBytesCopy(root, s)
case jx.Null:
_ = d.Null()
root.AddFieldNoAlloc(root, field).MutateToNull()
case jx.Bool:
b, _ := d.Bool()
Expand Down
Loading
Loading