From 2f522e4e829c5dad36b0658ac6279fea8eae6fbc Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 21 Feb 2024 15:59:48 -0500 Subject: [PATCH 1/6] Writes in progress should cancel on queue closed, not producer closed --- libbeat/publisher/queue/memqueue/produce.go | 39 +++++++++++++------ .../publisher/queue/memqueue/queue_test.go | 4 +- 2 files changed, 29 insertions(+), 14 deletions(-) diff --git a/libbeat/publisher/queue/memqueue/produce.go b/libbeat/publisher/queue/memqueue/produce.go index 954ea055f4a4..5ddea468e4c6 100644 --- a/libbeat/publisher/queue/memqueue/produce.go +++ b/libbeat/publisher/queue/memqueue/produce.go @@ -36,9 +36,10 @@ type ackProducer struct { } type openState struct { - log *logp.Logger - done chan struct{} - events chan pushRequest + log *logp.Logger + done chan struct{} + queueDone <-chan struct{} + events chan pushRequest } // producerID stores the order of events within a single producer, so multiple @@ -58,9 +59,10 @@ type ackHandler func(count int) func newProducer(b *broker, cb ackHandler, dropCB func(interface{}), dropOnCancel bool) queue.Producer { openState := openState{ - log: b.logger, - done: make(chan struct{}), - events: b.pushChan, + log: b.logger, + done: make(chan struct{}), + queueDone: b.ctx.Done(), + events: b.pushChan, } if cb != nil { @@ -143,27 +145,40 @@ func (st *openState) Close() { func (st *openState) publish(req pushRequest) (queue.EntryID, bool) { select { case st.events <- req: - // If the output is blocked and the queue is full, `req` is written - // to `st.events`, however the queue never writes back to `req.resp`, - // which effectively blocks for ever. So we also need to select on the - // done channel to ensure we don't miss the shutdown signal. + // The events channel is buffered, which means we may successfully + // write to it even if the queue is shutting down. To avoid blocking + // forever during shutdown, we also have to wait on the queue's + // shutdown channel. select { case resp := <-req.resp: return resp, true - case <-st.done: + case <-st.queueDone: st.events = nil return 0, false } case <-st.done: st.events = nil return 0, false + case <-st.queueDone: + st.events = nil + return 0, false } } func (st *openState) tryPublish(req pushRequest) (queue.EntryID, bool) { select { case st.events <- req: - return <-req.resp, true + // The events channel is buffered, which means we may successfully + // write to it even if the queue is shutting down. To avoid blocking + // forever during shutdown, we also have to wait on the queue's + // shutdown channel. + select { + case resp := <-req.resp: + return resp, true + case <-st.queueDone: + st.events = nil + return 0, false + } case <-st.done: st.events = nil return 0, false diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 141514483f33..8245e1fe1df5 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -138,8 +138,8 @@ func TestProducerDoesNotBlockWhenCancelled(t *testing.T) { time.Millisecond, "the first two events were not successfully published") - // Cancel the producer, this should unblock its Publish method - p.Cancel() + // Close the queue, this should unblock the pending Publish call + q.Close() require.Eventually( t, From 8c1010c482f33a7b9c763f56dd1aec943726d9d1 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 21 Feb 2024 16:24:04 -0500 Subject: [PATCH 2/6] update changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 14a24c00aabd..d7adc969f37f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -95,6 +95,7 @@ fields added to events containing the Beats version. {pull}37553[37553] - Update github.com/lestrrat-go/jwx dependency. {pull}37799[37799] - [threatintel] MISP pagination fixes {pull}37898[37898] - Fix file handle leak when handling errors in filestream {pull}37973[37973] +- Fix a race condition that could crash Filebeat with a "negative WaitGroup counter" error {pull}38094[38094] *Heartbeat* From 61ff43065b3b95b01d6bf2b182cf8fe9045b2b64 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 21 Feb 2024 16:29:26 -0500 Subject: [PATCH 3/6] update test name / comments --- libbeat/publisher/queue/memqueue/queue_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 8245e1fe1df5..7f60d490adc3 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -77,17 +77,17 @@ func TestProduceConsumer(t *testing.T) { t.Run("flush", testWith(makeTestQueue(bufferSize, batchSize/2, 100*time.Millisecond))) } -// TestProducerDoesNotBlockWhenCancelled ensures the producer Publish -// does not block indefinitely. +// TestProducerDoesNotBlockWhenQueueClosed ensures the producer Publish +// does not block indefinitely during queue shutdown. // -// Once we get a producer `p` from the queue we want to ensure +// Once we get a producer `p` from the queue `q` we want to ensure // that if p.Publish is called and blocks it will unblock once -// p.Cancel is called. +// `q.Close` is called. // // For this test we start a queue with size 2 and try to add more -// than 2 events to it, p.Publish will block, once we call p.Cancel, +// than 2 events to it, p.Publish will block, once we call q.Close, // we ensure the 3rd event was not successfully published. -func TestProducerDoesNotBlockWhenCancelled(t *testing.T) { +func TestProducerDoesNotBlockWhenQueueClosed(t *testing.T) { q := NewQueue(nil, nil, Settings{ Events: 2, // Queue size From 461f8e12ea637a609e76143cd9324c026a43027e Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 29 Feb 2024 15:29:30 -0500 Subject: [PATCH 4/6] candidate test for wait group panic --- .../publisher/queue/memqueue/queue_test.go | 101 ++++++++++++++++-- 1 file changed, 91 insertions(+), 10 deletions(-) diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 7f60d490adc3..46260d4f97d4 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -27,8 +27,7 @@ import ( "testing" "time" - "gotest.tools/assert" - + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/publisher/queue" @@ -149,6 +148,88 @@ func TestProducerDoesNotBlockWhenQueueClosed(t *testing.T) { "test not flagged as successful, p.Publish likely blocked indefinitely") } +func TestProducerClosePreservesEventCount(t *testing.T) { + // Check for https://github.com/elastic/beats/issues/37702, a problem + // where canceling a producer while it was waiting on a response + // to an insert request could lead to inaccurate event totals. + + var activeEvents atomic.Int64 + + q := NewQueue(nil, nil, + Settings{ + Events: 4, // Queue size + MaxGetRequest: 2, + FlushTimeout: 10 * time.Millisecond, + }, 1) + + p := q.Producer(queue.ProducerConfig{ + ACK: func(count int) { + activeEvents.Add(-int64(count)) + }, + OnDrop: func(e interface{}) { + //activeEvents.Add(-1) + }, + DropOnCancel: false, + }) + + // Asynchronously, send 4 events to the queue. + // Three will be enqueued, and one will be buffered, + // until we start reading from the queue. + // This needs to run in a goroutine because the buffered + // event will block until the queue handles it. + var wgProducer sync.WaitGroup + wgProducer.Add(1) + go func() { + for i := 0; i < 4; i++ { + event := i + // For proper navigation of the race conditions inherent to this + // test: increment active events before the publish attempt, then + // decrement afterwards if it failed (otherwise the event count + // could become negative even under correct queue operation). + activeEvents.Add(1) + _, ok := p.Publish(event) + if !ok { + activeEvents.Add(-1) + } + } + wgProducer.Done() + }() + + // This sleep is regrettable, but there's no deterministic way to know when + // the producer code has buffered an event in the queue's channel. + // However, the test is written to produce false negatives only: + // - If this test fails, it _always_ indicates a bug. + // - If there is a bug, this test will _often_ fail. + time.Sleep(10 * time.Millisecond) + + // Cancel the producer, then read and acknowledge two batches. If the + // Publish calls and the queue code are working, activeEvents should + // _usually_ end up as 0, but _always_ end up non-negative. + p.Cancel() + + // The queue reads also need to be done in a goroutine, in case the + // producer cancellation signal went through before the Publish + // requests -- if only 2 events entered the queue, then the second + // Get call will block until the queue itself is cancelled. + go func() { + for i := 0; i < 2; i++ { + batch, err := q.Get(2) + // Only error to worry about is queue closing, which isn't + // a test failure. + if err == nil { + batch.Done() + } + } + }() + + // One last sleep to let things percolate, then we close the queue + // to unblock any helpers and verify that the final active event + // count isn't negative. + time.Sleep(10 * time.Millisecond) + q.Close() + assert.False(t, activeEvents.Load() < 0, "active event count should never be negative") +} + func TestQueueMetricsDirect(t *testing.T) { eventsToTest := 5 maxEvents := 10 @@ -190,7 +271,7 @@ func queueTestWithSettings(t *testing.T, settings Settings, eventsToTest int, te // Read events, don't yet ack them batch, err := testQueue.Get(eventsToTest) - assert.NilError(t, err, "error in Get") + assert.NoError(t, err, "error in Get") t.Logf("Got batch of %d events", batch.Count()) queueMetricsAreValid(t, testQueue, 5, settings.Events, 5, fmt.Sprintf("%s - Producer Getting events, no ACK", testName)) @@ -206,7 +287,7 @@ func queueMetricsAreValid(t *testing.T, q queue.Queue, evtCount, evtLimit, occup // wait briefly to avoid races across all the queue channels time.Sleep(time.Millisecond * 100) testMetrics, err := q.Metrics() - assert.NilError(t, err, "error calling metrics for test %s", test) + assert.NoError(t, err, "error calling metrics for test %s", test) assert.Equal(t, testMetrics.EventCount.ValueOr(0), uint64(evtCount), "incorrect EventCount for %s", test) assert.Equal(t, testMetrics.EventLimit.ValueOr(0), uint64(evtLimit), "incorrect EventLimit for %s", test) assert.Equal(t, testMetrics.UnackedConsumedEvents.ValueOr(0), uint64(occupied), "incorrect OccupiedRead for %s", test) @@ -266,18 +347,18 @@ func TestEntryIDs(t *testing.T) { for i := 0; i < entryCount; i++ { batch, err := q.Get(1) - assert.NilError(t, err, "Queue read should succeed") + assert.NoError(t, err, "Queue read should succeed") assert.Equal(t, batch.Count(), 1, "Returned batch should have 1 entry") metrics, err := q.Metrics() - assert.NilError(t, err, "Queue metrics call should succeed") + assert.NoError(t, err, "Queue metrics call should succeed") assert.Equal(t, metrics.OldestEntryID, queue.EntryID(i), fmt.Sprintf("Oldest entry ID before ACKing event %v should be %v", i, i)) batch.Done() waiter.waitForEvents(1) metrics, err = q.Metrics() - assert.NilError(t, err, "Queue metrics call should succeed") + assert.NoError(t, err, "Queue metrics call should succeed") assert.Equal(t, metrics.OldestEntryID, queue.EntryID(i+1), fmt.Sprintf("Oldest entry ID after ACKing event %v should be %v", i, i+1)) @@ -297,7 +378,7 @@ func TestEntryIDs(t *testing.T) { for i := 0; i < entryCount; i++ { batch, err := q.Get(1) - assert.NilError(t, err, "Queue read should succeed") + assert.NoError(t, err, "Queue read should succeed") assert.Equal(t, batch.Count(), 1, "Returned batch should have 1 entry") batches = append(batches, batch) } @@ -318,7 +399,7 @@ func TestEntryIDs(t *testing.T) { // the slight nondeterminism. time.Sleep(1 * time.Millisecond) metrics, err := q.Metrics() - assert.NilError(t, err, "Queue metrics call should succeed") + assert.NoError(t, err, "Queue metrics call should succeed") assert.Equal(t, metrics.OldestEntryID, queue.EntryID(0), fmt.Sprintf("Oldest entry ID after ACKing event %v should be 0", i)) } @@ -326,7 +407,7 @@ func TestEntryIDs(t *testing.T) { batches[0].Done() waiter.waitForEvents(100) metrics, err := q.Metrics() - assert.NilError(t, err, "Queue metrics call should succeed") + assert.NoError(t, err, "Queue metrics call should succeed") assert.Equal(t, metrics.OldestEntryID, queue.EntryID(100), fmt.Sprintf("Oldest entry ID after ACKing event 0 should be %v", queue.EntryID(entryCount))) From 166b6bd5f9d1afa9c97c528fc9b7aff12dca0a46 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 29 Feb 2024 15:38:54 -0500 Subject: [PATCH 5/6] fix wg panic test --- libbeat/publisher/queue/memqueue/queue_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 46260d4f97d4..bbb9e2e4366e 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -157,7 +157,7 @@ func TestProducerClosePreservesEventCount(t *testing.T) { q := NewQueue(nil, nil, Settings{ - Events: 4, // Queue size + Events: 3, // Queue size MaxGetRequest: 2, FlushTimeout: 10 * time.Millisecond, }, 1) @@ -200,7 +200,7 @@ func TestProducerClosePreservesEventCount(t *testing.T) { // However, the test is written to produce false negatives only: // - If this test fails, it _always_ indicates a bug. // - If there is a bug, this test will _often_ fail. - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) // Cancel the producer, then read and acknowledge two batches. If the // Publish calls and the queue code are working, activeEvents should From d6d19ceb6d610e371114d1b4647a646743f309ad Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 29 Feb 2024 16:43:48 -0500 Subject: [PATCH 6/6] add more explanatory comments --- libbeat/publisher/queue/memqueue/queue_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index bbb9e2e4366e..53f8da4b77c6 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -137,7 +137,11 @@ func TestProducerDoesNotBlockWhenQueueClosed(t *testing.T) { time.Millisecond, "the first two events were not successfully published") - // Close the queue, this should unblock the pending Publish call + // Close the queue, this should unblock the pending Publish call. + // It's not enough to just cancel the producer: once the producer + // has successfully sent a request to the queue, it must wait for + // the response unless the queue shuts down, otherwise the pipeline + // event totals will be wrong. q.Close() require.Eventually(