Skip to content

Commit

Permalink
Merge branch 'master' into output-retry-exponentially
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryRomanov authored Nov 22, 2023
2 parents 04fc065 + d790b89 commit 16ddc6f
Show file tree
Hide file tree
Showing 37 changed files with 1,198 additions and 538 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ TBD: throughput on production servers.

**Input**: [dmesg](plugin/input/dmesg/README.md), [fake](plugin/input/fake/README.md), [file](plugin/input/file/README.md), [http](plugin/input/http/README.md), [journalctl](plugin/input/journalctl/README.md), [k8s](plugin/input/k8s/README.md), [kafka](plugin/input/kafka/README.md)

**Action**: [add_file_name](plugin/action/add_file_name/README.md), [add_host](plugin/action/add_host/README.md), [convert_date](plugin/action/convert_date/README.md), [convert_log_level](plugin/action/convert_log_level/README.md), [debug](plugin/action/debug/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [join](plugin/action/join/README.md), [join_template](plugin/action/join_template/README.md), [json_decode](plugin/action/json_decode/README.md), [json_encode](plugin/action/json_encode/README.md), [keep_fields](plugin/action/keep_fields/README.md), [mask](plugin/action/mask/README.md), [modify](plugin/action/modify/README.md), [parse_es](plugin/action/parse_es/README.md), [parse_re2](plugin/action/parse_re2/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [set_time](plugin/action/set_time/README.md), [throttle](plugin/action/throttle/README.md)
**Action**: [add_file_name](plugin/action/add_file_name/README.md), [add_host](plugin/action/add_host/README.md), [convert_date](plugin/action/convert_date/README.md), [convert_log_level](plugin/action/convert_log_level/README.md), [debug](plugin/action/debug/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [join](plugin/action/join/README.md), [join_template](plugin/action/join_template/README.md), [json_decode](plugin/action/json_decode/README.md), [json_encode](plugin/action/json_encode/README.md), [keep_fields](plugin/action/keep_fields/README.md), [mask](plugin/action/mask/README.md), [modify](plugin/action/modify/README.md), [move](plugin/action/move/README.md), [parse_es](plugin/action/parse_es/README.md), [parse_re2](plugin/action/parse_re2/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [set_time](plugin/action/set_time/README.md), [throttle](plugin/action/throttle/README.md)

**Output**: [clickhouse](plugin/output/clickhouse/README.md), [devnull](plugin/output/devnull/README.md), [elasticsearch](plugin/output/elasticsearch/README.md), [file](plugin/output/file/README.md), [gelf](plugin/output/gelf/README.md), [kafka](plugin/output/kafka/README.md), [postgres](plugin/output/postgres/README.md), [s3](plugin/output/s3/README.md), [splunk](plugin/output/splunk/README.md), [stdout](plugin/output/stdout/README.md)

Expand Down
1 change: 1 addition & 0 deletions _sidebar.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
- [keep_fields](plugin/action/keep_fields/README.md)
- [mask](plugin/action/mask/README.md)
- [modify](plugin/action/modify/README.md)
- [move](plugin/action/move/README.md)
- [parse_es](plugin/action/parse_es/README.md)
- [parse_re2](plugin/action/parse_re2/README.md)
- [remove_fields](plugin/action/remove_fields/README.md)
Expand Down
3 changes: 2 additions & 1 deletion cmd/file.d/file.d.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
_ "github.com/ozontech/file.d/plugin/action/keep_fields"
_ "github.com/ozontech/file.d/plugin/action/mask"
_ "github.com/ozontech/file.d/plugin/action/modify"
_ "github.com/ozontech/file.d/plugin/action/move"
_ "github.com/ozontech/file.d/plugin/action/parse_es"
_ "github.com/ozontech/file.d/plugin/action/parse_re2"
_ "github.com/ozontech/file.d/plugin/action/remove_fields"
Expand Down Expand Up @@ -72,7 +73,7 @@ func main() {
kingpin.Version(buildinfo.Version)
kingpin.Parse()

logger.Infof("Hi! I'm file.d version=%s %s", buildinfo.Version)
logger.Infof("Hi! I'm file.d version=%s", buildinfo.Version)

setRuntimeSettings()
insaneJSON.DisableBeautifulErrors = true
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
go.uber.org/atomic v1.11.0
go.uber.org/automaxprocs v1.5.3
go.uber.org/zap v1.25.0
golang.org/x/net v0.14.0
golang.org/x/net v0.17.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.27.4
k8s.io/apimachinery v0.27.4
Expand Down Expand Up @@ -133,13 +133,13 @@ require (
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.opentelemetry.io/otel/trace v1.16.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.12.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/mod v0.9.0 // indirect
golang.org/x/oauth2 v0.5.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/term v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
golang.org/x/tools v0.7.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -463,8 +463,8 @@ golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk=
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
Expand Down Expand Up @@ -497,8 +497,8 @@ golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14=
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.5.0 h1:HuArIo48skDwlrvM3sEdHXElYslAMsf3KwRkkW4MC4s=
golang.org/x/oauth2 v0.5.0/go.mod h1:9/XBHVqLaWO3/BRHs5jbpYCnOZVjj5V0ndyaAM7KB4I=
Expand Down Expand Up @@ -541,14 +541,14 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0=
golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU=
golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek=
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand All @@ -557,8 +557,8 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44=
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
50 changes: 29 additions & 21 deletions logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strings"
"time"

"github.com/Shopify/sarama"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand Down Expand Up @@ -33,27 +34,34 @@ func init() {

Level = zap.NewAtomicLevelAt(level)

Instance = zap.New(
zapcore.NewCore(
zapcore.NewJSONEncoder(zapcore.EncoderConfig{
TimeKey: "ts",
LevelKey: "level",
NameKey: "logger",
CallerKey: "caller",
MessageKey: "message",
StacktraceKey: "stacktrace",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: zapcore.LowercaseLevelEncoder,
EncodeTime: func(time time.Time, encoder zapcore.PrimitiveArrayEncoder) {
zapcore.RFC3339NanoTimeEncoder(time.UTC(), encoder)
},
EncodeDuration: zapcore.StringDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
}),
zapcore.AddSync(os.Stderr),
Level,
),
).Sugar().Named("fd")
config := zapcore.EncoderConfig{
TimeKey: "ts",
LevelKey: "level",
NameKey: "logger",
CallerKey: "caller",
MessageKey: "message",
StacktraceKey: "stacktrace",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: zapcore.LowercaseLevelEncoder,
EncodeTime: func(time time.Time, encoder zapcore.PrimitiveArrayEncoder) {
zapcore.RFC3339NanoTimeEncoder(time.UTC(), encoder)
},
EncodeDuration: zapcore.StringDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
}

core := zapcore.NewCore(zapcore.NewJSONEncoder(config), zapcore.AddSync(os.Stderr), Level)
Instance = zap.New(core).
Sugar().
Named("fd")

saramaConfig := config
// omit level key for sarama logger
saramaConfig.LevelKey = zapcore.OmitKey
saramaCore := zapcore.NewCore(zapcore.NewJSONEncoder(saramaConfig), zapcore.AddSync(os.Stderr), Level)
saramaLogger := zap.New(saramaCore).Named("sarama")
sarama.Logger, _ = zap.NewStdLogAt(saramaLogger, zapcore.InfoLevel)
sarama.DebugLogger, _ = zap.NewStdLogAt(saramaLogger, zapcore.DebugLevel)

Instance.Infof("Logger initialized with level: %s", level)
}
Expand Down
13 changes: 8 additions & 5 deletions pipeline/metrics_holder.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import (
"go.uber.org/atomic"
)

const PromNamespace = "file_d"
const (
namespace = "file_d"
subsystemPrefix = "pipeline_"
)

// metricHolder has nextMetricsGen method that creates new prometheus.CounterOpts,
// differs form previous only by prometheus.CounterOpts.ConstLabels: {"gen": incValue}.
Expand Down Expand Up @@ -108,16 +111,16 @@ func (m *metricsHolder) nextMetricsGen() {
cnt.totalCounter[string(st)] = atomic.NewUint64(0)
}
opts := prometheus.CounterOpts{
Namespace: PromNamespace,
Subsystem: "pipeline_" + m.pipelineName,
Namespace: namespace,
Subsystem: subsystemPrefix + m.pipelineName,
Name: metrics.name + "_events_count_total",
Help: fmt.Sprintf("how many events processed by pipeline %q and #%d action", m.pipelineName, index),
ConstLabels: map[string]string{"gen": metricsGen, "version": buildinfo.Version},
}
cnt.count = prometheus.NewCounterVec(opts, labels)
opts = prometheus.CounterOpts{
Namespace: PromNamespace,
Subsystem: "pipeline_" + m.pipelineName,
Namespace: namespace,
Subsystem: subsystemPrefix + m.pipelineName,
Name: metrics.name + "_events_size_total",
Help: fmt.Sprintf("total size of events processed by pipeline %q and #%d action", m.pipelineName, index),
ConstLabels: map[string]string{"gen": metricsGen, "version": buildinfo.Version},
Expand Down
93 changes: 93 additions & 0 deletions plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,99 @@ The resulting event could look like:
```

[More details...](plugin/action/modify/README.md)
## move
It moves fields to the target field in a certain mode.
> In `allow` mode, the specified `fields` will be moved;
> in `block` mode, the unspecified `fields` will be moved.

### Examples
```yaml
pipelines:
example_pipeline:
...
actions:
- type: move
mode: allow
target: other
fields:
- log.stream
- zone
...
```
The original event:
```json
{
"service": "test",
"log": {
"level": "error",
"message": "error occurred",
"ts": "2023-10-30T13:35:33.638720813Z",
"stream": "stderr"
},
"zone": "z501"
}
```
The resulting event:
```json
{
"service": "test",
"log": {
"level": "error",
"message": "error occurred",
"ts": "2023-10-30T13:35:33.638720813Z"
},
"other": {
"stream": "stderr",
"zone": "z501"
}
}
```
---
```yaml
pipelines:
example_pipeline:
...
actions:
- type: move
mode: block
target: other
fields:
- log
...
```
The original event:
```json
{
"service": "test",
"log": {
"level": "error",
"message": "error occurred",
"ts": "2023-10-30T13:35:33.638720813Z",
"stream": "stderr"
},
"zone": "z501",
"other": {
"user": "ivanivanov"
}
}
```
The resulting event:
```json
{
"log": {
"level": "error",
"message": "error occurred",
"ts": "2023-10-30T13:35:33.638720813Z"
},
"other": {
"user": "ivanivanov",
"service": "test",
"zone": "z501"
}
}
```

[More details...](plugin/action/move/README.md)
## parse_es
It parses HTTP input using Elasticsearch `/_bulk` API format. It converts sources defining create/index actions to the events. Update/delete actions are ignored.
> Check out the details in [Elastic Bulk API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html).
Expand Down
Loading

0 comments on commit 16ddc6f

Please sign in to comment.