Skip to content

Commit

Permalink
Merge branch 'master' into 410-metricholder
Browse files Browse the repository at this point in the history
# Conflicts:
#	plugin/input/file/file.go
#	plugin/input/file/provider.go
#	plugin/output/elasticsearch/elasticsearch.go
  • Loading branch information
Yaroslav Kirillov committed Dec 13, 2023
2 parents a6262ef + 43aa597 commit ea087c0
Show file tree
Hide file tree
Showing 70 changed files with 4,313 additions and 600 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jobs:
cache: true

- name: Run Kafka
run: docker compose -f ./e2e/kafka_file/docker-compose-kafka.yml up -d
run: docker compose -f ./e2e/kafka_file/docker-compose.yml up -d

- name: Run Clickhouse
run: docker compose -f ./e2e/file_clickhouse/docker-compose.yml up -d
Expand Down
6 changes: 6 additions & 0 deletions Insanedocfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@ extractors:
config-params: '"config-params" /json:\"([a-z_]+)\"/ #2 /default:\"([^"]+)\"/ /(required):\"true\"/ /options:\"([^"]+)\"/'
fn-list: '"fn-list" #4 /Plugin\)\s(.+)\s{/'
match-modes: '"match-modes" /MatchMode(.*),/ /\"(.*)\"/'
do-if-node: '"do-if-node" /DoIfNode(\w+)\s/'
do-if-field-op: '"do-if-field-op" /doIfField(\w+)OpBytes\s/'
do-if-logical-op: '"do-if-logical-op" /doIfLogical(\w+)Bytes\s/'
decorators:
config-params: '_ _ /*`%s`* / /*`default=%s`* / /*`%s`* / /*`options=%s`* /'
fn-list: '_ _ /`%s`/'
match-modes: '_ /%s/ /`match_mode: %s`/'
do-if-node: '_ /%s/'
do-if-field-op: '_ /%s/'
do-if-logical-op: '_ /%s/'
templates:
- template: docs/*.idoc.md
files: ["../pipeline/*.go"]
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ TBD: throughput on production servers.

**Input**: [dmesg](plugin/input/dmesg/README.md), [fake](plugin/input/fake/README.md), [file](plugin/input/file/README.md), [http](plugin/input/http/README.md), [journalctl](plugin/input/journalctl/README.md), [k8s](plugin/input/k8s/README.md), [kafka](plugin/input/kafka/README.md)

**Action**: [add_file_name](plugin/action/add_file_name/README.md), [add_host](plugin/action/add_host/README.md), [convert_date](plugin/action/convert_date/README.md), [convert_log_level](plugin/action/convert_log_level/README.md), [debug](plugin/action/debug/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [join](plugin/action/join/README.md), [join_template](plugin/action/join_template/README.md), [json_decode](plugin/action/json_decode/README.md), [json_encode](plugin/action/json_encode/README.md), [keep_fields](plugin/action/keep_fields/README.md), [mask](plugin/action/mask/README.md), [modify](plugin/action/modify/README.md), [parse_es](plugin/action/parse_es/README.md), [parse_re2](plugin/action/parse_re2/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [set_time](plugin/action/set_time/README.md), [throttle](plugin/action/throttle/README.md)
**Action**: [add_file_name](plugin/action/add_file_name/README.md), [add_host](plugin/action/add_host/README.md), [convert_date](plugin/action/convert_date/README.md), [convert_log_level](plugin/action/convert_log_level/README.md), [debug](plugin/action/debug/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [join](plugin/action/join/README.md), [join_template](plugin/action/join_template/README.md), [json_decode](plugin/action/json_decode/README.md), [json_encode](plugin/action/json_encode/README.md), [keep_fields](plugin/action/keep_fields/README.md), [mask](plugin/action/mask/README.md), [modify](plugin/action/modify/README.md), [move](plugin/action/move/README.md), [parse_es](plugin/action/parse_es/README.md), [parse_re2](plugin/action/parse_re2/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [set_time](plugin/action/set_time/README.md), [split](plugin/action/split/README.md), [throttle](plugin/action/throttle/README.md)

**Output**: [clickhouse](plugin/output/clickhouse/README.md), [devnull](plugin/output/devnull/README.md), [elasticsearch](plugin/output/elasticsearch/README.md), [file](plugin/output/file/README.md), [gelf](plugin/output/gelf/README.md), [kafka](plugin/output/kafka/README.md), [postgres](plugin/output/postgres/README.md), [s3](plugin/output/s3/README.md), [splunk](plugin/output/splunk/README.md), [stdout](plugin/output/stdout/README.md)

Expand Down
4 changes: 4 additions & 0 deletions _sidebar.idoc.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
- Output
@global-contents-table-plugin-output|links-list

- **Pipeline**
- [Match modes](pipeline/README.md#match-modes)
- [Experimental: Do If rules](pipeline/README.md#experimental-do-if-rules)

- **Other**
- [Contributing](/docs/contributing.md)
- [License](/docs/license.md)
6 changes: 6 additions & 0 deletions _sidebar.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@
- [keep_fields](plugin/action/keep_fields/README.md)
- [mask](plugin/action/mask/README.md)
- [modify](plugin/action/modify/README.md)
- [move](plugin/action/move/README.md)
- [parse_es](plugin/action/parse_es/README.md)
- [parse_re2](plugin/action/parse_re2/README.md)
- [remove_fields](plugin/action/remove_fields/README.md)
- [rename](plugin/action/rename/README.md)
- [set_time](plugin/action/set_time/README.md)
- [split](plugin/action/split/README.md)
- [throttle](plugin/action/throttle/README.md)

- Output
Expand All @@ -56,6 +58,10 @@
- [stdout](plugin/output/stdout/README.md)


- **Pipeline**
- [Match modes](pipeline/README.md#match-modes)
- [Experimental: Do If rules](pipeline/README.md#experimental-do-if-rules)

- **Other**
- [Contributing](/docs/contributing.md)
- [License](/docs/license.md)
2 changes: 2 additions & 0 deletions cmd/file.d/file.d.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ import (
_ "github.com/ozontech/file.d/plugin/action/keep_fields"
_ "github.com/ozontech/file.d/plugin/action/mask"
_ "github.com/ozontech/file.d/plugin/action/modify"
_ "github.com/ozontech/file.d/plugin/action/move"
_ "github.com/ozontech/file.d/plugin/action/parse_es"
_ "github.com/ozontech/file.d/plugin/action/parse_re2"
_ "github.com/ozontech/file.d/plugin/action/remove_fields"
_ "github.com/ozontech/file.d/plugin/action/rename"
_ "github.com/ozontech/file.d/plugin/action/set_time"
_ "github.com/ozontech/file.d/plugin/action/split"
_ "github.com/ozontech/file.d/plugin/action/throttle"
_ "github.com/ozontech/file.d/plugin/input/dmesg"
_ "github.com/ozontech/file.d/plugin/input/fake"
Expand Down
File renamed without changes.
22 changes: 22 additions & 0 deletions e2e/split_join/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
pipelines:
split_join:
settings:
event_timeout: 1h
capacity: 128
input:
type: file
offsets_op: reset
maintenance_interval: 1m
actions:
- type: debug
message: input event sample
- type: split
field: data
- type: join
field: message
start: '/^start/'
continue: '/^continue/'
- type: debug
message: output event sample
output:
type: kafka
19 changes: 19 additions & 0 deletions e2e/split_join/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package split_join

import (
"github.com/Shopify/sarama"
)

type handlerFunc func(message *sarama.ConsumerMessage)

func (h handlerFunc) Setup(_ sarama.ConsumerGroupSession) error { return nil }

func (h handlerFunc) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }

func (h handlerFunc) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
h(msg)
session.MarkMessage(msg, "")
}
return nil
}
120 changes: 120 additions & 0 deletions e2e/split_join/split_join.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package split_join

import (
"context"
"fmt"
"os"
"path"
"path/filepath"
"strings"
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/ozontech/file.d/cfg"
"github.com/stretchr/testify/require"
)

const (
brokerHost = "localhost:9092"
group = "file_d_test_split_join_client"

arrayLen = 4
sample = `{ "data": [ { "first": "1" }, { "message": "start " }, { "message": "continue" }, { "second": "2" }, { "third": "3" } ] }`

messages = 10
)

type Config struct {
inputDir string
consumer sarama.ConsumerGroup
topic string
}

func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) {
r := require.New(t)

c.inputDir = t.TempDir()
offsetsDir := t.TempDir()
c.topic = fmt.Sprintf("file_d_test_split_join_%d", time.Now().UnixNano())
t.Logf("generated topic: %s", c.topic)

input := conf.Pipelines[pipelineName].Raw.Get("input")
input.Set("watching_dir", c.inputDir)
input.Set("filename_pattern", "input.log")
input.Set("offsets_file", filepath.Join(offsetsDir, "offsets.yaml"))

output := conf.Pipelines[pipelineName].Raw.Get("output")
output.Set("brokers", []string{brokerHost})
output.Set("default_topic", c.topic)

addrs := []string{brokerHost}
config := sarama.NewConfig()
config.Consumer.Offsets.Initial = sarama.OffsetOldest

admin, err := sarama.NewClusterAdmin(addrs, config)
r.NoError(err)
r.NoError(admin.CreateTopic(c.topic, &sarama.TopicDetail{
NumPartitions: 1,
ReplicationFactor: 1,
}, false))

c.consumer, err = sarama.NewConsumerGroup(addrs, group, config)
r.NoError(err)
}

func (c *Config) Send(t *testing.T) {
file, err := os.Create(path.Join(c.inputDir, "input.log"))
require.NoError(t, err)
defer func(file *os.File) {
_ = file.Close()
}(file)

for i := 0; i < messages; i++ {
_, err = file.WriteString(sample + "\n")
require.NoError(t, err)
}
}

func (c *Config) Validate(t *testing.T) {
r := require.New(t)

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

expectedEventsCount := messages * arrayLen

strBuilder := strings.Builder{}
gotEvents := 0
done := make(chan struct{})

go func() {
r.NoError(c.consumer.Consume(ctx, []string{c.topic}, handlerFunc(func(msg *sarama.ConsumerMessage) {
strBuilder.Write(msg.Value)
strBuilder.WriteString("\n")
gotEvents++
if gotEvents == expectedEventsCount {
close(done)
}
})))
}()

select {
case <-done:
case <-ctx.Done():
r.Failf("test timed out", "got: %v, expected: %v, consumed: %s", gotEvents, expectedEventsCount, strBuilder.String())
}

got := strBuilder.String()

expected := strings.Repeat(`{"first":"1"}
{"message":"start continue"}
{"second":"2"}
{"third":"3"}
`,
messages)

r.Equal(len(expected), len(got))
r.Equal(expected, got)
r.Equal(expectedEventsCount, gotEvents)
}
7 changes: 7 additions & 0 deletions e2e/start_work_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ozontech/file.d/e2e/http_file"
"github.com/ozontech/file.d/e2e/join_throttle"
"github.com/ozontech/file.d/e2e/kafka_file"
"github.com/ozontech/file.d/e2e/split_join"
"github.com/ozontech/file.d/fd"
_ "github.com/ozontech/file.d/plugin/action/add_host"
_ "github.com/ozontech/file.d/plugin/action/convert_date"
Expand All @@ -34,6 +35,7 @@ import (
_ "github.com/ozontech/file.d/plugin/action/remove_fields"
_ "github.com/ozontech/file.d/plugin/action/rename"
_ "github.com/ozontech/file.d/plugin/action/set_time"
_ "github.com/ozontech/file.d/plugin/action/split"
_ "github.com/ozontech/file.d/plugin/action/throttle"
_ "github.com/ozontech/file.d/plugin/input/dmesg"
_ "github.com/ozontech/file.d/plugin/input/fake"
Expand Down Expand Up @@ -109,6 +111,11 @@ func TestE2EStabilityWorkCase(t *testing.T) {
},
cfgPath: "./join_throttle/config.yml",
},
{
name: "split_join",
e2eTest: &split_join.Config{},
cfgPath: "./split_join/config.yml",
},
{
name: "file_clickhouse",
e2eTest: &file_clickhouse.Config{},
Expand Down
6 changes: 6 additions & 0 deletions fd/file.d.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ func (f *FileD) setupAction(p *pipeline.Pipeline, index int, t string, actionJSO
logger.Infof("creating action with type %q for pipeline %q", t, p.Name)
info := f.plugins.GetActionByType(t)

doIfChecker, err := extractDoIfChecker(actionJSON.Get("do_if"))
if err != nil {
logger.Fatalf(`failed to extract "do_if" conditions for action %d/%s in pipeline %q: %s`, index, t, p.Name, err.Error())
}

matchMode := extractMatchMode(actionJSON)
if matchMode == pipeline.MatchModeUnknown {
logger.Fatalf("unknown match_mode value for action %d/%s in pipeline %q", index, t, p.Name)
Expand Down Expand Up @@ -191,6 +196,7 @@ func (f *FileD) setupAction(p *pipeline.Pipeline, index int, t string, actionJSO
MetricLabels: metricLabels,
MetricSkipStatus: skipStatus,
MatchInvert: matchInvert,
DoIfChecker: doIfChecker,
})
}

Expand Down
Loading

0 comments on commit ea087c0

Please sign in to comment.