Skip to content

Commit

Permalink
candidate test for wait group panic
Browse files Browse the repository at this point in the history
  • Loading branch information
faec committed Feb 29, 2024
1 parent 61ff430 commit 461f8e1
Showing 1 changed file with 91 additions and 10 deletions.
101 changes: 91 additions & 10 deletions libbeat/publisher/queue/memqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import (
"testing"
"time"

"gotest.tools/assert"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/publisher/queue"
Expand Down Expand Up @@ -149,6 +148,88 @@ func TestProducerDoesNotBlockWhenQueueClosed(t *testing.T) {
"test not flagged as successful, p.Publish likely blocked indefinitely")
}

func TestProducerClosePreservesEventCount(t *testing.T) {
// Check for https://github.com/elastic/beats/issues/37702, a problem
// where canceling a producer while it was waiting on a response
// to an insert request could lead to inaccurate event totals.

var activeEvents atomic.Int64

q := NewQueue(nil, nil,
Settings{
Events: 4, // Queue size
MaxGetRequest: 2,
FlushTimeout: 10 * time.Millisecond,
}, 1)

p := q.Producer(queue.ProducerConfig{
ACK: func(count int) {
activeEvents.Add(-int64(count))
},
OnDrop: func(e interface{}) {
//activeEvents.Add(-1)
},
DropOnCancel: false,
})

// Asynchronously, send 4 events to the queue.
// Three will be enqueued, and one will be buffered,
// until we start reading from the queue.
// This needs to run in a goroutine because the buffered
// event will block until the queue handles it.
var wgProducer sync.WaitGroup
wgProducer.Add(1)
go func() {
for i := 0; i < 4; i++ {
event := i
// For proper navigation of the race conditions inherent to this
// test: increment active events before the publish attempt, then
// decrement afterwards if it failed (otherwise the event count
// could become negative even under correct queue operation).
activeEvents.Add(1)
_, ok := p.Publish(event)
if !ok {
activeEvents.Add(-1)
}
}
wgProducer.Done()
}()

// This sleep is regrettable, but there's no deterministic way to know when
// the producer code has buffered an event in the queue's channel.
// However, the test is written to produce false negatives only:
// - If this test fails, it _always_ indicates a bug.
// - If there is a bug, this test will _often_ fail.
time.Sleep(10 * time.Millisecond)

// Cancel the producer, then read and acknowledge two batches. If the
// Publish calls and the queue code are working, activeEvents should
// _usually_ end up as 0, but _always_ end up non-negative.
p.Cancel()

// The queue reads also need to be done in a goroutine, in case the
// producer cancellation signal went through before the Publish
// requests -- if only 2 events entered the queue, then the second
// Get call will block until the queue itself is cancelled.
go func() {
for i := 0; i < 2; i++ {
batch, err := q.Get(2)
// Only error to worry about is queue closing, which isn't
// a test failure.
if err == nil {
batch.Done()
}
}
}()

// One last sleep to let things percolate, then we close the queue
// to unblock any helpers and verify that the final active event
// count isn't negative.
time.Sleep(10 * time.Millisecond)
q.Close()
assert.False(t, activeEvents.Load() < 0, "active event count should never be negative")
}

func TestQueueMetricsDirect(t *testing.T) {
eventsToTest := 5
maxEvents := 10
Expand Down Expand Up @@ -190,7 +271,7 @@ func queueTestWithSettings(t *testing.T, settings Settings, eventsToTest int, te

// Read events, don't yet ack them
batch, err := testQueue.Get(eventsToTest)
assert.NilError(t, err, "error in Get")
assert.NoError(t, err, "error in Get")
t.Logf("Got batch of %d events", batch.Count())

queueMetricsAreValid(t, testQueue, 5, settings.Events, 5, fmt.Sprintf("%s - Producer Getting events, no ACK", testName))
Expand All @@ -206,7 +287,7 @@ func queueMetricsAreValid(t *testing.T, q queue.Queue, evtCount, evtLimit, occup
// wait briefly to avoid races across all the queue channels
time.Sleep(time.Millisecond * 100)
testMetrics, err := q.Metrics()
assert.NilError(t, err, "error calling metrics for test %s", test)
assert.NoError(t, err, "error calling metrics for test %s", test)
assert.Equal(t, testMetrics.EventCount.ValueOr(0), uint64(evtCount), "incorrect EventCount for %s", test)
assert.Equal(t, testMetrics.EventLimit.ValueOr(0), uint64(evtLimit), "incorrect EventLimit for %s", test)
assert.Equal(t, testMetrics.UnackedConsumedEvents.ValueOr(0), uint64(occupied), "incorrect OccupiedRead for %s", test)
Expand Down Expand Up @@ -266,18 +347,18 @@ func TestEntryIDs(t *testing.T) {

for i := 0; i < entryCount; i++ {
batch, err := q.Get(1)
assert.NilError(t, err, "Queue read should succeed")
assert.NoError(t, err, "Queue read should succeed")
assert.Equal(t, batch.Count(), 1, "Returned batch should have 1 entry")

metrics, err := q.Metrics()
assert.NilError(t, err, "Queue metrics call should succeed")
assert.NoError(t, err, "Queue metrics call should succeed")
assert.Equal(t, metrics.OldestEntryID, queue.EntryID(i),
fmt.Sprintf("Oldest entry ID before ACKing event %v should be %v", i, i))

batch.Done()
waiter.waitForEvents(1)
metrics, err = q.Metrics()
assert.NilError(t, err, "Queue metrics call should succeed")
assert.NoError(t, err, "Queue metrics call should succeed")
assert.Equal(t, metrics.OldestEntryID, queue.EntryID(i+1),
fmt.Sprintf("Oldest entry ID after ACKing event %v should be %v", i, i+1))

Expand All @@ -297,7 +378,7 @@ func TestEntryIDs(t *testing.T) {

for i := 0; i < entryCount; i++ {
batch, err := q.Get(1)
assert.NilError(t, err, "Queue read should succeed")
assert.NoError(t, err, "Queue read should succeed")
assert.Equal(t, batch.Count(), 1, "Returned batch should have 1 entry")
batches = append(batches, batch)
}
Expand All @@ -318,15 +399,15 @@ func TestEntryIDs(t *testing.T) {
// the slight nondeterminism.
time.Sleep(1 * time.Millisecond)
metrics, err := q.Metrics()
assert.NilError(t, err, "Queue metrics call should succeed")
assert.NoError(t, err, "Queue metrics call should succeed")
assert.Equal(t, metrics.OldestEntryID, queue.EntryID(0),
fmt.Sprintf("Oldest entry ID after ACKing event %v should be 0", i))
}
// ACK the first batch, which should unblock all the later ones
batches[0].Done()
waiter.waitForEvents(100)
metrics, err := q.Metrics()
assert.NilError(t, err, "Queue metrics call should succeed")
assert.NoError(t, err, "Queue metrics call should succeed")
assert.Equal(t, metrics.OldestEntryID, queue.EntryID(100),
fmt.Sprintf("Oldest entry ID after ACKing event 0 should be %v", queue.EntryID(entryCount)))

Expand Down

0 comments on commit 461f8e1

Please sign in to comment.