From 27cde877d9cadc92ddf32113a266e4c5bcab62f8 Mon Sep 17 00:00:00 2001 From: Alberto Delgado Roda Date: Mon, 4 Mar 2024 11:09:50 +0100 Subject: [PATCH 1/4] [Heartbeat] Adjust State loader to only retry for failed requests and not for 4xx (#37981) * only retry when the status is 5xx * remove test AAA comments * add changelog * correct changelog modification * fix ES query * change error handling strategy * do not retry when there is malformed data * improve retry mechanism * improve log message * improve changelog * fix log format --- CHANGELOG.next.asciidoc | 3 + .../wrappers/monitorstate/esloader.go | 22 +++++-- .../wrappers/monitorstate/esloader_test.go | 63 ++++++++++++++++++- .../wrappers/monitorstate/testutil.go | 4 +- .../monitors/wrappers/monitorstate/tracker.go | 34 +++++++--- .../wrappers/monitorstate/tracker_test.go | 49 +++++++++++++++ 6 files changed, 160 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 46a86a51ecdc..eef3e2304167 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -100,6 +100,9 @@ fields added to events containing the Beats version. {pull}37553[37553] *Heartbeat* +- Fix panics when parsing dereferencing invalid parsed url. {pull}34702[34702] +- Fix setuid root when running under cgroups v2. {pull}37794[37794] +- Adjust State loader to only retry when response code status is 5xx {pull}37981[37981] *Metricbeat* diff --git a/heartbeat/monitors/wrappers/monitorstate/esloader.go b/heartbeat/monitors/wrappers/monitorstate/esloader.go index d63be5aada5b..5705ec4b1468 100644 --- a/heartbeat/monitors/wrappers/monitorstate/esloader.go +++ b/heartbeat/monitors/wrappers/monitorstate/esloader.go @@ -32,13 +32,21 @@ import ( var DefaultDataStreams = "synthetics-*,heartbeat-*" +type LoaderError struct { + err error + Retry bool +} + +func (e LoaderError) Error() string { + return e.err.Error() +} + func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation *config.LocationWithID) StateLoader { if indexPattern == "" { // Should never happen, but if we ever make a coding error... logp.L().Warn("ES state loader initialized with no index pattern, will not load states from ES") return NilStateLoader } - return func(sf stdfields.StdMonitorFields) (*State, error) { var runFromID string if sf.RunFrom != nil { @@ -74,10 +82,11 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation }, }, } - status, body, err := esc.Request("POST", strings.Join([]string{"/", indexPattern, "/", "_search", "?size=1"}, ""), "", nil, reqBody) if err != nil || status > 299 { - return nil, fmt.Errorf("error executing state search for %s in loc=%s: %w", sf.ID, runFromID, err) + sErr := fmt.Errorf("error executing state search for %s in loc=%s: %w", sf.ID, runFromID, err) + retry := shouldRetry(status) + return nil, LoaderError{err: sErr, Retry: retry} } type stateHits struct { @@ -94,7 +103,8 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation sh := stateHits{} err = json.Unmarshal(body, &sh) if err != nil { - return nil, fmt.Errorf("could not unmarshal state hits for %s: %w", sf.ID, err) + sErr := fmt.Errorf("could not unmarshal state hits for %s: %w", sf.ID, err) + return nil, LoaderError{err: sErr, Retry: false} } if len(sh.Hits.Hits) == 0 { @@ -107,3 +117,7 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation return state, nil } } + +func shouldRetry(status int) bool { + return status >= 500 +} diff --git a/heartbeat/monitors/wrappers/monitorstate/esloader_test.go b/heartbeat/monitors/wrappers/monitorstate/esloader_test.go index 42b1c6c31c31..db70c1cc72e7 100644 --- a/heartbeat/monitors/wrappers/monitorstate/esloader_test.go +++ b/heartbeat/monitors/wrappers/monitorstate/esloader_test.go @@ -21,6 +21,9 @@ package monitorstate import ( "fmt" + "io" + "net/http" + "strings" "testing" "time" @@ -33,6 +36,7 @@ import ( "github.com/elastic/beats/v7/heartbeat/config" "github.com/elastic/beats/v7/heartbeat/esutil" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" + "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/processors/util" ) @@ -51,7 +55,7 @@ func TestStatesESLoader(t *testing.T) { monID := etc.createTestMonitorStateInES(t, testStatus) // Since we've continued this state it should register the initial state - ms := etc.tracker.GetCurrentState(monID) + ms := etc.tracker.GetCurrentState(monID, RetryConfig{}) require.True(t, ms.StartedAt.After(testStart.Add(-time.Nanosecond)), "timestamp for new state is off") requireMSStatusCount(t, ms, testStatus, 1) @@ -89,8 +93,61 @@ func TestStatesESLoader(t *testing.T) { } } +func TestMakeESLoaderError(t *testing.T) { + tests := []struct { + name string + statusCode int + expected bool + }{ + { + name: "should return a retryable error", + statusCode: http.StatusInternalServerError, + expected: true, + }, + { + name: "should not return a retryable error", + statusCode: http.StatusNotFound, + expected: false, + }, + { + name: "should not return a retryable error when handling malformed data", + statusCode: http.StatusOK, + expected: false, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + etc := newESTestContext(t) + etc.ec.HTTP = fakeHTTPClient{respStatus: test.statusCode} + loader := MakeESLoader(etc.ec, "fakeIndexPattern", etc.location) + + _, err := loader(stdfields.StdMonitorFields{}) + + var loaderError LoaderError + require.ErrorAs(t, err, &loaderError) + require.Equal(t, loaderError.Retry, test.expected) + }) + } +} + +type fakeHTTPClient struct { + respStatus int +} + +func (fc fakeHTTPClient) Do(req *http.Request) (resp *http.Response, err error) { + return &http.Response{ + StatusCode: fc.respStatus, + Body: io.NopCloser(strings.NewReader("test response")), + }, nil +} + +func (fc fakeHTTPClient) CloseIdleConnections() { + // noop +} + type esTestContext struct { namespace string + ec *eslegclient.Connection esc *elasticsearch.Client loader StateLoader tracker *Tracker @@ -106,10 +163,12 @@ func newESTestContext(t *testing.T) *esTestContext { } namespace, _ := uuid.NewV4() esc := IntegApiClient(t) + ec := IntegES(t) etc := &esTestContext{ namespace: namespace.String(), esc: esc, - loader: IntegESLoader(t, fmt.Sprintf("synthetics-*-%s", namespace.String()), location), + ec: ec, + loader: IntegESLoader(t, ec, fmt.Sprintf("synthetics-*-%s", namespace.String()), location), location: location, } diff --git a/heartbeat/monitors/wrappers/monitorstate/testutil.go b/heartbeat/monitors/wrappers/monitorstate/testutil.go index 540983097587..28a6c2606557 100644 --- a/heartbeat/monitors/wrappers/monitorstate/testutil.go +++ b/heartbeat/monitors/wrappers/monitorstate/testutil.go @@ -33,8 +33,8 @@ import ( // Helpers for tests here and elsewhere -func IntegESLoader(t *testing.T, indexPattern string, location *config.LocationWithID) StateLoader { - return MakeESLoader(IntegES(t), indexPattern, location) +func IntegESLoader(t *testing.T, esc *eslegclient.Connection, indexPattern string, location *config.LocationWithID) StateLoader { + return MakeESLoader(esc, indexPattern, location) } func IntegES(t *testing.T) (esc *eslegclient.Connection) { diff --git a/heartbeat/monitors/wrappers/monitorstate/tracker.go b/heartbeat/monitors/wrappers/monitorstate/tracker.go index e350294e46e8..40a4e8f2ded1 100644 --- a/heartbeat/monitors/wrappers/monitorstate/tracker.go +++ b/heartbeat/monitors/wrappers/monitorstate/tracker.go @@ -62,7 +62,7 @@ func (t *Tracker) RecordStatus(sf stdfields.StdMonitorFields, newStatus StateSta t.mtx.Lock() defer t.mtx.Unlock() - state := t.GetCurrentState(sf) + state := t.GetCurrentState(sf, RetryConfig{}) if state == nil { state = newMonitorState(sf, newStatus, 0, t.flappingEnabled) logp.L().Infof("initializing new state for monitor %s: %s", sf.ID, state.String()) @@ -75,22 +75,33 @@ func (t *Tracker) RecordStatus(sf stdfields.StdMonitorFields, newStatus StateSta } func (t *Tracker) GetCurrentStatus(sf stdfields.StdMonitorFields) StateStatus { - s := t.GetCurrentState(sf) + s := t.GetCurrentState(sf, RetryConfig{}) if s == nil { return StatusEmpty } return s.Status } -func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields) (state *State) { +type RetryConfig struct { + attempts int + waitFn func() time.Duration +} + +func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields, rc RetryConfig) (state *State) { if state, ok := t.states[sf.ID]; ok { return state } - tries := 3 + // Default number of attempts + attempts := 3 + if rc.attempts != 0 { + attempts = rc.attempts + } + var loadedState *State var err error - for i := 0; i < tries; i++ { + var i int + for i = 0; i < attempts; i++ { loadedState, err = t.stateLoader(sf) if err == nil { if loadedState != nil { @@ -98,13 +109,22 @@ func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields) (state *State) } break } + var loaderError LoaderError + if errors.As(err, &loaderError) && !loaderError.Retry { + logp.L().Warnf("could not load last externally recorded state: %v", loaderError) + break + } + // Default sleep time sleepFor := (time.Duration(i*i) * time.Second) + (time.Duration(rand.Intn(500)) * time.Millisecond) - logp.L().Warnf("could not load last externally recorded state, will retry again in %d milliseconds: %w", sleepFor.Milliseconds(), err) + if rc.waitFn != nil { + sleepFor = rc.waitFn() + } + logp.L().Warnf("could not load last externally recorded state, will retry again in %d milliseconds: %v", sleepFor.Milliseconds(), err) time.Sleep(sleepFor) } if err != nil { - logp.L().Warnf("could not load prior state from elasticsearch after %d attempts, will create new state for monitor: %s", tries, sf.ID) + logp.L().Warnf("could not load prior state from elasticsearch after %d attempts, will create new state for monitor: %s", i+1, sf.ID) } if loadedState != nil { diff --git a/heartbeat/monitors/wrappers/monitorstate/tracker_test.go b/heartbeat/monitors/wrappers/monitorstate/tracker_test.go index ec1217b86150..fd34371ce810 100644 --- a/heartbeat/monitors/wrappers/monitorstate/tracker_test.go +++ b/heartbeat/monitors/wrappers/monitorstate/tracker_test.go @@ -18,6 +18,7 @@ package monitorstate import ( + "errors" "testing" "time" @@ -131,3 +132,51 @@ func TestDeferredStateLoader(t *testing.T) { resState, _ = dsl(stdfields.StdMonitorFields{}) require.Equal(t, stateA, resState) } + +func TestStateLoaderRetry(t *testing.T) { + // While testing the sleep time between retries should be negligible + waitFn := func() time.Duration { + return time.Microsecond + } + + tests := []struct { + name string + retryable bool + rc RetryConfig + expectedCalls int + }{ + { + "should retry 3 times when fails with retryable error", + true, + RetryConfig{waitFn: waitFn}, + 3, + }, + { + "should not retry when fails with non-retryable error", + false, + RetryConfig{waitFn: waitFn}, + 1, + }, + { + "should honour the configured number of attempts when fails with retryable error", + true, + RetryConfig{attempts: 5, waitFn: waitFn}, + 5, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + calls := 0 + errorStateLoader := func(_ stdfields.StdMonitorFields) (*State, error) { + calls += 1 + return nil, LoaderError{err: errors.New("test error"), Retry: tt.retryable} + } + + mst := NewTracker(errorStateLoader, true) + mst.GetCurrentState(stdfields.StdMonitorFields{}, tt.rc) + + require.Equal(t, calls, tt.expectedCalls) + }) + } +} From 5f1e656e06fc07a0dee6a180bcb381853bd741dd Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Mon, 4 Mar 2024 13:45:20 +0100 Subject: [PATCH 2/4] [AWS] [S3] Remove url.QueryUnescape() from aws-s3 input in polling mode (#38125) * Remove url.QueryUnescape() We introduced [^1] the `url.QueryUnescape()` function to unescape object keys from S3 notification in SQS messages. However, the object keys in the S3 list object responses do not require [^2] unescape. We must remove the unescape to avoid unintended changes to the S3 object key. [^1]: https://github.com/elastic/beats/pull/18370 [^2]: https://github.com/elastic/beats/issues/38012#issuecomment-1946440453 --------- Co-authored-by: Andrea Spacca --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/input/awss3/s3.go | 12 ++---------- x-pack/filebeat/input/awss3/s3_test.go | 9 +++++++++ 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index eef3e2304167..7731d291ba49 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -97,6 +97,7 @@ fields added to events containing the Beats version. {pull}37553[37553] - [threatintel] MISP pagination fixes {pull}37898[37898] - Fix file handle leak when handling errors in filestream {pull}37973[37973] - Prevent HTTPJSON holding response bodies between executions. {issue}35219[35219] {pull}38116[38116] +- Fix "failed processing S3 event for object key" error on aws-s3 input when key contains the "+" character {issue}38012[38012] {pull}38125[38125] *Heartbeat* diff --git a/x-pack/filebeat/input/awss3/s3.go b/x-pack/filebeat/input/awss3/s3.go index 96be746c160a..5aa8d31e95de 100644 --- a/x-pack/filebeat/input/awss3/s3.go +++ b/x-pack/filebeat/input/awss3/s3.go @@ -8,7 +8,6 @@ import ( "context" "errors" "fmt" - "net/url" "sync" "time" @@ -208,14 +207,7 @@ func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<- // Metrics p.metrics.s3ObjectsListedTotal.Add(uint64(totListedObjects)) for _, object := range page.Contents { - // Unescape s3 key name. For example, convert "%3D" back to "=". - filename, err := url.QueryUnescape(*object.Key) - if err != nil { - p.log.Errorw("Error when unescaping object key, skipping.", "error", err, "s3_object", *object.Key) - continue - } - - state := newState(bucketName, filename, *object.ETag, p.listPrefix, *object.LastModified) + state := newState(bucketName, *object.Key, *object.ETag, p.listPrefix, *object.LastModified) if p.states.MustSkip(state, p.store) { p.log.Debugw("skipping state.", "state", state) continue @@ -240,7 +232,7 @@ func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<- s3ObjectHandler: s3Processor, s3ObjectInfo: s3ObjectInfo{ name: bucketName, - key: filename, + key: *object.Key, etag: *object.ETag, lastModified: *object.LastModified, listingID: listingID.String(), diff --git a/x-pack/filebeat/input/awss3/s3_test.go b/x-pack/filebeat/input/awss3/s3_test.go index 6f075a2f8541..b94ba7cfb09b 100644 --- a/x-pack/filebeat/input/awss3/s3_test.go +++ b/x-pack/filebeat/input/awss3/s3_test.go @@ -93,6 +93,11 @@ func TestS3Poller(t *testing.T) { Key: aws.String("key5"), LastModified: aws.Time(time.Now()), }, + { + ETag: aws.String("etag6"), + Key: aws.String("2024-02-08T08:35:00+00:02.json.gz"), + LastModified: aws.Time(time.Now()), + }, }, }, nil }) @@ -124,6 +129,10 @@ func TestS3Poller(t *testing.T) { GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key5")). Return(nil, errFakeConnectivityFailure) + mockAPI.EXPECT(). + GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("2024-02-08T08:35:00+00:02.json.gz")). + Return(nil, errFakeConnectivityFailure) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, nil, backupConfig{}, numberOfWorkers) receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, s3ObjProc, newStates(inputCtx), store, bucket, "key", "region", "provider", numberOfWorkers, pollInterval) require.Error(t, context.DeadlineExceeded, receiver.Poll(ctx)) From 4f46fcb888b7f12706d005b7e1d2d1dafcbe6fcd Mon Sep 17 00:00:00 2001 From: sharbuz <87968844+sharbuz@users.noreply.github.com> Date: Mon, 4 Mar 2024 17:05:07 +0200 Subject: [PATCH 3/4] increase the maximum timeout for x-pack-metricbeat (#38152) --- catalog-info.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/catalog-info.yaml b/catalog-info.yaml index 955c316d5aae..5e0f94fd2df0 100644 --- a/catalog-info.yaml +++ b/catalog-info.yaml @@ -637,7 +637,7 @@ spec: spec: # branch_configuration: "7.17" #TODO: uncomment after tests pipeline_file: ".buildkite/x-pack/pipeline.xpack.metricbeat.yml" - maximum_timeout_in_minutes: 240 + maximum_timeout_in_minutes: 480 provider_settings: trigger_mode: none # don't trigger jobs from github activity build_pull_request_forks: false From d23b4d33622137f7245b628d77d15f8ecfef3a0d Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 4 Mar 2024 12:51:37 -0500 Subject: [PATCH 4/4] Memory queue: cancel in-progress writes on queue closed, not producer closed (#38094) Fixes a race condition that could lead to incorrect event totals and occasional panics #37702. Once a producer sends a get request to the memory queue, it must wait on the response unless the queue itself is closed, otherwise it can return a false failure. The previous code mistakenly waited on the done signal for the current producer rather than the queue. This PR adds the queue's done signal to the producer struct, and waits on that once the insert request is sent. --- CHANGELOG.next.asciidoc | 1 + libbeat/publisher/queue/memqueue/produce.go | 39 ++++-- .../publisher/queue/memqueue/queue_test.go | 121 +++++++++++++++--- 3 files changed, 131 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7731d291ba49..77e78868b6a8 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -96,6 +96,7 @@ fields added to events containing the Beats version. {pull}37553[37553] - Update github.com/lestrrat-go/jwx dependency. {pull}37799[37799] - [threatintel] MISP pagination fixes {pull}37898[37898] - Fix file handle leak when handling errors in filestream {pull}37973[37973] +- Fix a race condition that could crash Filebeat with a "negative WaitGroup counter" error {pull}38094[38094] - Prevent HTTPJSON holding response bodies between executions. {issue}35219[35219] {pull}38116[38116] - Fix "failed processing S3 event for object key" error on aws-s3 input when key contains the "+" character {issue}38012[38012] {pull}38125[38125] diff --git a/libbeat/publisher/queue/memqueue/produce.go b/libbeat/publisher/queue/memqueue/produce.go index 954ea055f4a4..5ddea468e4c6 100644 --- a/libbeat/publisher/queue/memqueue/produce.go +++ b/libbeat/publisher/queue/memqueue/produce.go @@ -36,9 +36,10 @@ type ackProducer struct { } type openState struct { - log *logp.Logger - done chan struct{} - events chan pushRequest + log *logp.Logger + done chan struct{} + queueDone <-chan struct{} + events chan pushRequest } // producerID stores the order of events within a single producer, so multiple @@ -58,9 +59,10 @@ type ackHandler func(count int) func newProducer(b *broker, cb ackHandler, dropCB func(interface{}), dropOnCancel bool) queue.Producer { openState := openState{ - log: b.logger, - done: make(chan struct{}), - events: b.pushChan, + log: b.logger, + done: make(chan struct{}), + queueDone: b.ctx.Done(), + events: b.pushChan, } if cb != nil { @@ -143,27 +145,40 @@ func (st *openState) Close() { func (st *openState) publish(req pushRequest) (queue.EntryID, bool) { select { case st.events <- req: - // If the output is blocked and the queue is full, `req` is written - // to `st.events`, however the queue never writes back to `req.resp`, - // which effectively blocks for ever. So we also need to select on the - // done channel to ensure we don't miss the shutdown signal. + // The events channel is buffered, which means we may successfully + // write to it even if the queue is shutting down. To avoid blocking + // forever during shutdown, we also have to wait on the queue's + // shutdown channel. select { case resp := <-req.resp: return resp, true - case <-st.done: + case <-st.queueDone: st.events = nil return 0, false } case <-st.done: st.events = nil return 0, false + case <-st.queueDone: + st.events = nil + return 0, false } } func (st *openState) tryPublish(req pushRequest) (queue.EntryID, bool) { select { case st.events <- req: - return <-req.resp, true + // The events channel is buffered, which means we may successfully + // write to it even if the queue is shutting down. To avoid blocking + // forever during shutdown, we also have to wait on the queue's + // shutdown channel. + select { + case resp := <-req.resp: + return resp, true + case <-st.queueDone: + st.events = nil + return 0, false + } case <-st.done: st.events = nil return 0, false diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 141514483f33..53f8da4b77c6 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -27,8 +27,7 @@ import ( "testing" "time" - "gotest.tools/assert" - + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/publisher/queue" @@ -77,17 +76,17 @@ func TestProduceConsumer(t *testing.T) { t.Run("flush", testWith(makeTestQueue(bufferSize, batchSize/2, 100*time.Millisecond))) } -// TestProducerDoesNotBlockWhenCancelled ensures the producer Publish -// does not block indefinitely. +// TestProducerDoesNotBlockWhenQueueClosed ensures the producer Publish +// does not block indefinitely during queue shutdown. // -// Once we get a producer `p` from the queue we want to ensure +// Once we get a producer `p` from the queue `q` we want to ensure // that if p.Publish is called and blocks it will unblock once -// p.Cancel is called. +// `q.Close` is called. // // For this test we start a queue with size 2 and try to add more -// than 2 events to it, p.Publish will block, once we call p.Cancel, +// than 2 events to it, p.Publish will block, once we call q.Close, // we ensure the 3rd event was not successfully published. -func TestProducerDoesNotBlockWhenCancelled(t *testing.T) { +func TestProducerDoesNotBlockWhenQueueClosed(t *testing.T) { q := NewQueue(nil, nil, Settings{ Events: 2, // Queue size @@ -138,8 +137,12 @@ func TestProducerDoesNotBlockWhenCancelled(t *testing.T) { time.Millisecond, "the first two events were not successfully published") - // Cancel the producer, this should unblock its Publish method - p.Cancel() + // Close the queue, this should unblock the pending Publish call. + // It's not enough to just cancel the producer: once the producer + // has successfully sent a request to the queue, it must wait for + // the response unless the queue shuts down, otherwise the pipeline + // event totals will be wrong. + q.Close() require.Eventually( t, @@ -149,6 +152,88 @@ func TestProducerDoesNotBlockWhenCancelled(t *testing.T) { "test not flagged as successful, p.Publish likely blocked indefinitely") } +func TestProducerClosePreservesEventCount(t *testing.T) { + // Check for https://github.com/elastic/beats/issues/37702, a problem + // where canceling a producer while it was waiting on a response + // to an insert request could lead to inaccurate event totals. + + var activeEvents atomic.Int64 + + q := NewQueue(nil, nil, + Settings{ + Events: 3, // Queue size + MaxGetRequest: 2, + FlushTimeout: 10 * time.Millisecond, + }, 1) + + p := q.Producer(queue.ProducerConfig{ + ACK: func(count int) { + activeEvents.Add(-int64(count)) + }, + OnDrop: func(e interface{}) { + //activeEvents.Add(-1) + }, + DropOnCancel: false, + }) + + // Asynchronously, send 4 events to the queue. + // Three will be enqueued, and one will be buffered, + // until we start reading from the queue. + // This needs to run in a goroutine because the buffered + // event will block until the queue handles it. + var wgProducer sync.WaitGroup + wgProducer.Add(1) + go func() { + for i := 0; i < 4; i++ { + event := i + // For proper navigation of the race conditions inherent to this + // test: increment active events before the publish attempt, then + // decrement afterwards if it failed (otherwise the event count + // could become negative even under correct queue operation). + activeEvents.Add(1) + _, ok := p.Publish(event) + if !ok { + activeEvents.Add(-1) + } + } + wgProducer.Done() + }() + + // This sleep is regrettable, but there's no deterministic way to know when + // the producer code has buffered an event in the queue's channel. + // However, the test is written to produce false negatives only: + // - If this test fails, it _always_ indicates a bug. + // - If there is a bug, this test will _often_ fail. + time.Sleep(20 * time.Millisecond) + + // Cancel the producer, then read and acknowledge two batches. If the + // Publish calls and the queue code are working, activeEvents should + // _usually_ end up as 0, but _always_ end up non-negative. + p.Cancel() + + // The queue reads also need to be done in a goroutine, in case the + // producer cancellation signal went through before the Publish + // requests -- if only 2 events entered the queue, then the second + // Get call will block until the queue itself is cancelled. + go func() { + for i := 0; i < 2; i++ { + batch, err := q.Get(2) + // Only error to worry about is queue closing, which isn't + // a test failure. + if err == nil { + batch.Done() + } + } + }() + + // One last sleep to let things percolate, then we close the queue + // to unblock any helpers and verify that the final active event + // count isn't negative. + time.Sleep(10 * time.Millisecond) + q.Close() + assert.False(t, activeEvents.Load() < 0, "active event count should never be negative") +} + func TestQueueMetricsDirect(t *testing.T) { eventsToTest := 5 maxEvents := 10 @@ -190,7 +275,7 @@ func queueTestWithSettings(t *testing.T, settings Settings, eventsToTest int, te // Read events, don't yet ack them batch, err := testQueue.Get(eventsToTest) - assert.NilError(t, err, "error in Get") + assert.NoError(t, err, "error in Get") t.Logf("Got batch of %d events", batch.Count()) queueMetricsAreValid(t, testQueue, 5, settings.Events, 5, fmt.Sprintf("%s - Producer Getting events, no ACK", testName)) @@ -206,7 +291,7 @@ func queueMetricsAreValid(t *testing.T, q queue.Queue, evtCount, evtLimit, occup // wait briefly to avoid races across all the queue channels time.Sleep(time.Millisecond * 100) testMetrics, err := q.Metrics() - assert.NilError(t, err, "error calling metrics for test %s", test) + assert.NoError(t, err, "error calling metrics for test %s", test) assert.Equal(t, testMetrics.EventCount.ValueOr(0), uint64(evtCount), "incorrect EventCount for %s", test) assert.Equal(t, testMetrics.EventLimit.ValueOr(0), uint64(evtLimit), "incorrect EventLimit for %s", test) assert.Equal(t, testMetrics.UnackedConsumedEvents.ValueOr(0), uint64(occupied), "incorrect OccupiedRead for %s", test) @@ -266,18 +351,18 @@ func TestEntryIDs(t *testing.T) { for i := 0; i < entryCount; i++ { batch, err := q.Get(1) - assert.NilError(t, err, "Queue read should succeed") + assert.NoError(t, err, "Queue read should succeed") assert.Equal(t, batch.Count(), 1, "Returned batch should have 1 entry") metrics, err := q.Metrics() - assert.NilError(t, err, "Queue metrics call should succeed") + assert.NoError(t, err, "Queue metrics call should succeed") assert.Equal(t, metrics.OldestEntryID, queue.EntryID(i), fmt.Sprintf("Oldest entry ID before ACKing event %v should be %v", i, i)) batch.Done() waiter.waitForEvents(1) metrics, err = q.Metrics() - assert.NilError(t, err, "Queue metrics call should succeed") + assert.NoError(t, err, "Queue metrics call should succeed") assert.Equal(t, metrics.OldestEntryID, queue.EntryID(i+1), fmt.Sprintf("Oldest entry ID after ACKing event %v should be %v", i, i+1)) @@ -297,7 +382,7 @@ func TestEntryIDs(t *testing.T) { for i := 0; i < entryCount; i++ { batch, err := q.Get(1) - assert.NilError(t, err, "Queue read should succeed") + assert.NoError(t, err, "Queue read should succeed") assert.Equal(t, batch.Count(), 1, "Returned batch should have 1 entry") batches = append(batches, batch) } @@ -318,7 +403,7 @@ func TestEntryIDs(t *testing.T) { // the slight nondeterminism. time.Sleep(1 * time.Millisecond) metrics, err := q.Metrics() - assert.NilError(t, err, "Queue metrics call should succeed") + assert.NoError(t, err, "Queue metrics call should succeed") assert.Equal(t, metrics.OldestEntryID, queue.EntryID(0), fmt.Sprintf("Oldest entry ID after ACKing event %v should be 0", i)) } @@ -326,7 +411,7 @@ func TestEntryIDs(t *testing.T) { batches[0].Done() waiter.waitForEvents(100) metrics, err := q.Metrics() - assert.NilError(t, err, "Queue metrics call should succeed") + assert.NoError(t, err, "Queue metrics call should succeed") assert.Equal(t, metrics.OldestEntryID, queue.EntryID(100), fmt.Sprintf("Oldest entry ID after ACKing event 0 should be %v", queue.EntryID(entryCount)))