Skip to content
This repository has been archived by the owner on Sep 21, 2023. It is now read-only.

Commit

Permalink
POC for logstash output (#210)
Browse files Browse the repository at this point in the history
* POC for logstash output

still to do:
- support async go-lumber client
- support bulk_max_size
- support windowing
- support load balancing
- support backoff
- determine if codecs are needed
- update default config

Closes #129
  • Loading branch information
leehinman authored Jan 17, 2023
1 parent 422349c commit 43c5f5a
Show file tree
Hide file tree
Showing 19 changed files with 952 additions and 225 deletions.
46 changes: 23 additions & 23 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,29 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/go-elasticsearc
limitations under the License.


--------------------------------------------------------------------------------
Dependency : github.com/elastic/go-lumber
Version: v0.1.1
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/[email protected]/LICENSE:

Copyright (c) 2012–2016 Elasticsearch <http://www.elastic.co>

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.


--------------------------------------------------------------------------------
Dependency : github.com/elastic/go-ucfg
Version: v0.8.6
Expand Down Expand Up @@ -21739,29 +21762,6 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/go-lookslike@v0
limitations under the License.


--------------------------------------------------------------------------------
Dependency : github.com/elastic/go-lumber
Version: v0.1.0
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/[email protected]/LICENSE:

Copyright (c) 2012–2016 Elasticsearch <http://www.elastic.co>

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.


--------------------------------------------------------------------------------
Dependency : github.com/elastic/go-perf
Version: v0.0.0-20191212140718-9c656876f595
Expand Down
4 changes: 4 additions & 0 deletions controller/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/elastic/elastic-agent-shipper/output"
"github.com/elastic/elastic-agent-shipper/output/elasticsearch"
"github.com/elastic/elastic-agent-shipper/output/kafka"
"github.com/elastic/elastic-agent-shipper/output/logstash"
"github.com/elastic/elastic-agent-shipper/queue"
"github.com/elastic/elastic-agent-shipper/server"

Expand Down Expand Up @@ -216,5 +217,8 @@ func outputFromConfig(config output.Config, queue *queue.Queue) (Output, error)
if config.Console != nil && config.Console.Enabled {
return output.NewConsole(queue), nil
}
if config.Logstash != nil && config.Logstash.Enabled {
return logstash.NewLogstash(config.Logstash, queue), nil
}
return nil, errors.New("no active output configuration")
}
4 changes: 4 additions & 0 deletions elastic-agent-shipper.yml
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,7 @@ output:
enabled: false
hosts: ["localhost:9092", "localhost:9093"]
topic: '%{[metricset][name]}'

logstash:
enabled: false
hosts: ["localhost:5044"]
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/elastic/elastic-agent-client/v7 v7.0.1
github.com/elastic/elastic-agent-shipper-client v0.4.0
github.com/elastic/go-elasticsearch/v8 v8.2.0
github.com/elastic/go-lumber v0.1.1
github.com/elastic/go-ucfg v0.8.6
github.com/gofrs/uuid v4.2.0+incompatible
github.com/magefile/mage v1.14.0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,8 @@ github.com/elastic/go-licenser v0.4.1 h1:1xDURsc8pL5zYT9R29425J3vkHdt4RT5TNEMeRN
github.com/elastic/go-licenser v0.4.1/go.mod h1:V56wHMpmdURfibNBggaSBfqgPxyT1Tldns1i87iTEvU=
github.com/elastic/go-lookslike v0.3.0/go.mod h1:AhH+rdJux5RlVjs+6ej4jkvYyoNRkj2crxmqeHlj3hA=
github.com/elastic/go-lumber v0.1.0/go.mod h1:8YvjMIRYypWuPvpxx7WoijBYdbB7XIh/9FqSYQZTtxQ=
github.com/elastic/go-lumber v0.1.1 h1:aae5rSBnwBvdB0aShJ7AbOYPyvP1/wS/JIOC1A4D1DM=
github.com/elastic/go-lumber v0.1.1/go.mod h1:DMVoFv7YM71enE9X5vWJWWv7wvQNtzXh7bPeKukDccY=
github.com/elastic/go-perf v0.0.0-20191212140718-9c656876f595/go.mod h1:s09U1b4P1ZxnKx2OsqY7KlHdCesqZWIhyq0Gs/QC/Us=
github.com/elastic/go-seccomp-bpf v1.2.0/go.mod h1:l+89Vy5BzjVcaX8USZRMOwmwwDScE+vxCFzzvQwN7T8=
github.com/elastic/go-structform v0.0.9/go.mod h1:CZWf9aIRYY5SuKSmOhtXScE5uQiLZNqAFnwKR4OrIM4=
Expand Down Expand Up @@ -1056,6 +1058,7 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.11.2/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
Expand Down
214 changes: 214 additions & 0 deletions integration/console_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
//go:build integration

package integration

import (
"testing"

"github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages"
"github.com/stretchr/testify/require"
)

func TestServerStarts(t *testing.T) {
config := `
server:
strict_mode: false
port: 50052
tls: false
logging:
level: debug
selectors: ["*"]
to_stderr: true
output:
console:
enabled: true
`
env := NewTestingEnvironment(t, config)
t.Cleanup(func() { env.Stop() })

found := env.WaitUntil("stderr", "gRPC server is ready and is listening on")

if !found {
env.Fatalf("Test executable failed to start.")
}
}

func TestServerFailsToStart(t *testing.T) {
config := `
server:
strict_mode: false
port: 50052
tls: not_boolean
logging:
level: debug
selectors: ["*"]
to_stderr: true
output:
console:
enabled: true
`

env := NewTestingEnvironment(t, config)
t.Cleanup(func() { env.Stop() })

found := env.WaitUntil("stderr", "error unpacking shipper config")

if !found {
env.Fatalf("didn't error on bad config")
}
}

func TestPublishMessage(t *testing.T) {
config := `
server:
strict_mode: false
port: 50052
tls: false
logging:
level: debug
selectors: ["*"]
to_stderr: true
output:
console:
enabled: true
`

env := NewTestingEnvironment(t, config)
t.Cleanup(func() { env.Stop() })

found := env.WaitUntil("stderr", "gRPC server is ready and is listening on")

if !found {
env.Fatalf("Test executable failed to start")
}

found = env.Contains("stderr", "Initializing disk queue at path")
if found {
env.Fatalf("Memory queue configured but disk queue started")
}

client := env.NewClient("localhost:50052")
unique := "UniqueStringToLookForInOutput"
events, err := createEvents([]string{unique})
require.NoErrorf(t, err, "error creating events: %s\n", err)

_, err = client.PublishEvents(env.ctx, &messages.PublishRequest{
Events: events,
})
require.NoErrorf(t, err, "Error publishing event: %s", err)

found = env.WaitUntil("stdout", unique)

if !found {
env.Fatalf("Event wasn't published")
}
}

func TestPublishDiskQueue(t *testing.T) {
queue_path := t.TempDir()

config := `
server:
strict_mode: false
port: 50052
tls: false
logging:
level: debug
selectors: ["*"]
to_stderr: true
output:
console:
enabled: true
queue:
disk:
path: ` + queue_path + `
max_size: 10G
`

env := NewTestingEnvironment(t, config)
t.Cleanup(func() { env.Stop() })

found := env.WaitUntil("stderr", "gRPC server is ready and is listening on")

if !found {
env.Fatalf("Test executable failed to start")
}

found = env.Contains("stderr", "Initializing disk queue at path")
if !found {
env.Fatalf("Disk queue configured but not started")
}

client := env.NewClient("localhost:50052")
unique := "UniqueStringToLookForInOutput"
events, err := createEvents([]string{unique})
require.NoErrorf(t, err, "error creating events: %s\n", err)

_, err = client.PublishEvents(env.ctx, &messages.PublishRequest{
Events: events,
})
require.NoErrorf(t, err, "Error publishing event: %s", err)

found = env.WaitUntil("stdout", unique)

if !found {
env.Fatalf("Event wasn't published")
}
}

func TestPublishCompressEncryptedDiskQueue(t *testing.T) {
queue_path := t.TempDir()

config := `
server:
strict_mode: false
port: 50052
tls: false
logging:
level: debug
selectors: ["*"]
to_stderr: true
output:
console:
enabled: true
queue:
disk:
path: ` + queue_path + `
max_size: 10G
use_compression: true
encryption_password: secret
`

env := NewTestingEnvironment(t, config)
t.Cleanup(func() { env.Stop() })

found := env.WaitUntil("stderr", "gRPC server is ready and is listening on")

if !found {
env.Fatalf("Test executable failed to start")
}

found = env.Contains("stderr", "Initializing disk queue at path")
if !found {
env.Fatalf("Disk queue configured but not started")
}

client := env.NewClient("localhost:50052")
unique := "UniqueStringToLookForInOutput"
events, err := createEvents([]string{unique})
require.NoErrorf(t, err, "error creating events: %s\n", err)

_, err = client.PublishEvents(env.ctx, &messages.PublishRequest{
Events: events,
})
require.NoErrorf(t, err, "error publishing event: %s", err)

found = env.WaitUntil("stdout", unique)

if !found {
env.Fatalf("Event wasn't published")
}
}
Loading

0 comments on commit 43c5f5a

Please sign in to comment.