Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test for handling processing errors while publishing events #37491

Merged
merged 1 commit into from
Jan 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 153 additions & 16 deletions libbeat/publisher/pipeline/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package pipeline

import (
"context"
"errors"
"io"
"sync"
"testing"
"time"
Expand All @@ -28,33 +30,33 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/processing"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"
"github.com/elastic/beats/v7/libbeat/tests/resources"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
)

func TestClient(t *testing.T) {
makePipeline := func(settings Settings, qu queue.Queue) *Pipeline {
p, err := New(beat.Info{},
Monitors{},
conf.Namespace{},
outputs.Group{},
settings,
)
if err != nil {
panic(err)
}
// Inject a test queue so the outputController doesn't create one
p.outputController.queue = qu
func makePipeline(t *testing.T, settings Settings, qu queue.Queue) *Pipeline {
p, err := New(beat.Info{},
Monitors{},
conf.Namespace{},
outputs.Group{},
settings,
)
require.NoError(t, err)
// Inject a test queue so the outputController doesn't create one
p.outputController.queue = qu

return p
}
return p
}

func TestClient(t *testing.T) {
t.Run("client close", func(t *testing.T) {
// Note: no asserts. If closing fails we have a deadlock, because Publish
// would block forever
Expand Down Expand Up @@ -90,7 +92,7 @@ func TestClient(t *testing.T) {
routinesChecker := resources.NewGoroutinesChecker()
defer routinesChecker.Check(t)

pipeline := makePipeline(Settings{}, makeTestQueue())
pipeline := makePipeline(t, Settings{}, makeTestQueue())
defer pipeline.Close()

var ctx context.Context
Expand Down Expand Up @@ -119,6 +121,105 @@ func TestClient(t *testing.T) {
})
}
})

t.Run("no infinite loop when processing fails", func(t *testing.T) {
logp.TestingSetup()
l := logp.L()

// a small in-memory queue with a very short flush interval
q := memqueue.NewQueue(l, nil, memqueue.Settings{
Events: 5,
FlushMinEvents: 1,
FlushTimeout: time.Millisecond,
}, 5)

// model a processor that we're going to make produce errors after
p := &testProcessor{}
ps := testProcessorSupporter{Processor: p}

// now we create a pipeline that makes sure that all
// events are acked while shutting down
pipeline := makePipeline(t, Settings{
WaitClose: 100 * time.Millisecond,
WaitCloseMode: WaitOnPipelineClose,
Processors: ps,
}, q)
client, err := pipeline.Connect()
require.NoError(t, err)
defer client.Close()

// consuming all the published events
var received []beat.Event
done := make(chan struct{})
go func() {
for {
batch, err := q.Get(2)
if errors.Is(err, io.EOF) {
break
}
assert.NoError(t, err)
if batch == nil {
continue
}
for i := 0; i < batch.Count(); i++ {
e := batch.Entry(i).(publisher.Event)
received = append(received, e.Content)
}
batch.Done()
}
close(done)
}()

sent := []beat.Event{
{
Fields: mapstr.M{"number": 1},
},
{
Fields: mapstr.M{"number": 2},
},
{
Fields: mapstr.M{"number": 3},
},
{
Fields: mapstr.M{"number": 4},
},
}

expected := []beat.Event{
{
Fields: mapstr.M{"number": 1, "test": "value"},
},
{
Fields: mapstr.M{"number": 2, "test": "value"},
},
// {
// // this event must be excluded due to the processor error
// Fields: mapstr.M{"number": 3},
// },
{
Fields: mapstr.M{"number": 4, "test": "value"},
},
}

client.PublishAll(sent[:2]) // first 2

// this causes our processor to malfunction and produce errors for all events
p.ErrorSwitch()

client.PublishAll(sent[2:3]) // number 3

// back to normal
p.ErrorSwitch()

client.PublishAll(sent[3:]) // number 4

client.Close()
pipeline.Close()

// waiting for all events to be consumed from the queue
<-done
require.Equal(t, expected, received)
})
}

func TestClientWaitClose(t *testing.T) {
Expand Down Expand Up @@ -258,3 +359,39 @@ func TestMonitoring(t *testing.T) {
assert.Equal(t, int64(batchSize), telemetrySnapshot.Ints["output.batch_size"])
assert.Equal(t, int64(numClients), telemetrySnapshot.Ints["output.clients"])
}

type testProcessor struct{ error bool }

func (p *testProcessor) String() string {
return "testProcessor"
}
func (p *testProcessor) Run(in *beat.Event) (event *beat.Event, err error) {
if p.error {
return nil, errors.New("test error")
}
_, err = in.Fields.Put("test", "value")
return in, err
}

func (p *testProcessor) ErrorSwitch() {
p.error = !p.error
}

type testProcessorSupporter struct {
beat.Processor
}

// Create a running processor interface based on the given config
func (p testProcessorSupporter) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, error) {
return p.Processor, nil
}

// Processors returns a list of config strings for the given processor, for debug purposes
func (p testProcessorSupporter) Processors() []string {
return []string{p.Processor.String()}
}

// Close the processor supporter
func (p testProcessorSupporter) Close() error {
return processors.Close(p.Processor)
}
Loading