diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 46a86a51ecdc..77e78868b6a8 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -96,10 +96,15 @@ fields added to events containing the Beats version. {pull}37553[37553] - Update github.com/lestrrat-go/jwx dependency. {pull}37799[37799] - [threatintel] MISP pagination fixes {pull}37898[37898] - Fix file handle leak when handling errors in filestream {pull}37973[37973] +- Fix a race condition that could crash Filebeat with a "negative WaitGroup counter" error {pull}38094[38094] - Prevent HTTPJSON holding response bodies between executions. {issue}35219[35219] {pull}38116[38116] +- Fix "failed processing S3 event for object key" error on aws-s3 input when key contains the "+" character {issue}38012[38012] {pull}38125[38125] *Heartbeat* +- Fix panics when parsing dereferencing invalid parsed url. {pull}34702[34702] +- Fix setuid root when running under cgroups v2. {pull}37794[37794] +- Adjust State loader to only retry when response code status is 5xx {pull}37981[37981] *Metricbeat* diff --git a/catalog-info.yaml b/catalog-info.yaml index 955c316d5aae..5e0f94fd2df0 100644 --- a/catalog-info.yaml +++ b/catalog-info.yaml @@ -637,7 +637,7 @@ spec: spec: # branch_configuration: "7.17" #TODO: uncomment after tests pipeline_file: ".buildkite/x-pack/pipeline.xpack.metricbeat.yml" - maximum_timeout_in_minutes: 240 + maximum_timeout_in_minutes: 480 provider_settings: trigger_mode: none # don't trigger jobs from github activity build_pull_request_forks: false diff --git a/heartbeat/monitors/wrappers/monitorstate/esloader.go b/heartbeat/monitors/wrappers/monitorstate/esloader.go index d63be5aada5b..5705ec4b1468 100644 --- a/heartbeat/monitors/wrappers/monitorstate/esloader.go +++ b/heartbeat/monitors/wrappers/monitorstate/esloader.go @@ -32,13 +32,21 @@ import ( var DefaultDataStreams = "synthetics-*,heartbeat-*" +type LoaderError struct { + err error + Retry bool +} + +func (e LoaderError) Error() string { + return e.err.Error() +} + func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation *config.LocationWithID) StateLoader { if indexPattern == "" { // Should never happen, but if we ever make a coding error... logp.L().Warn("ES state loader initialized with no index pattern, will not load states from ES") return NilStateLoader } - return func(sf stdfields.StdMonitorFields) (*State, error) { var runFromID string if sf.RunFrom != nil { @@ -74,10 +82,11 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation }, }, } - status, body, err := esc.Request("POST", strings.Join([]string{"/", indexPattern, "/", "_search", "?size=1"}, ""), "", nil, reqBody) if err != nil || status > 299 { - return nil, fmt.Errorf("error executing state search for %s in loc=%s: %w", sf.ID, runFromID, err) + sErr := fmt.Errorf("error executing state search for %s in loc=%s: %w", sf.ID, runFromID, err) + retry := shouldRetry(status) + return nil, LoaderError{err: sErr, Retry: retry} } type stateHits struct { @@ -94,7 +103,8 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation sh := stateHits{} err = json.Unmarshal(body, &sh) if err != nil { - return nil, fmt.Errorf("could not unmarshal state hits for %s: %w", sf.ID, err) + sErr := fmt.Errorf("could not unmarshal state hits for %s: %w", sf.ID, err) + return nil, LoaderError{err: sErr, Retry: false} } if len(sh.Hits.Hits) == 0 { @@ -107,3 +117,7 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation return state, nil } } + +func shouldRetry(status int) bool { + return status >= 500 +} diff --git a/heartbeat/monitors/wrappers/monitorstate/esloader_test.go b/heartbeat/monitors/wrappers/monitorstate/esloader_test.go index 42b1c6c31c31..db70c1cc72e7 100644 --- a/heartbeat/monitors/wrappers/monitorstate/esloader_test.go +++ b/heartbeat/monitors/wrappers/monitorstate/esloader_test.go @@ -21,6 +21,9 @@ package monitorstate import ( "fmt" + "io" + "net/http" + "strings" "testing" "time" @@ -33,6 +36,7 @@ import ( "github.com/elastic/beats/v7/heartbeat/config" "github.com/elastic/beats/v7/heartbeat/esutil" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" + "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/processors/util" ) @@ -51,7 +55,7 @@ func TestStatesESLoader(t *testing.T) { monID := etc.createTestMonitorStateInES(t, testStatus) // Since we've continued this state it should register the initial state - ms := etc.tracker.GetCurrentState(monID) + ms := etc.tracker.GetCurrentState(monID, RetryConfig{}) require.True(t, ms.StartedAt.After(testStart.Add(-time.Nanosecond)), "timestamp for new state is off") requireMSStatusCount(t, ms, testStatus, 1) @@ -89,8 +93,61 @@ func TestStatesESLoader(t *testing.T) { } } +func TestMakeESLoaderError(t *testing.T) { + tests := []struct { + name string + statusCode int + expected bool + }{ + { + name: "should return a retryable error", + statusCode: http.StatusInternalServerError, + expected: true, + }, + { + name: "should not return a retryable error", + statusCode: http.StatusNotFound, + expected: false, + }, + { + name: "should not return a retryable error when handling malformed data", + statusCode: http.StatusOK, + expected: false, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + etc := newESTestContext(t) + etc.ec.HTTP = fakeHTTPClient{respStatus: test.statusCode} + loader := MakeESLoader(etc.ec, "fakeIndexPattern", etc.location) + + _, err := loader(stdfields.StdMonitorFields{}) + + var loaderError LoaderError + require.ErrorAs(t, err, &loaderError) + require.Equal(t, loaderError.Retry, test.expected) + }) + } +} + +type fakeHTTPClient struct { + respStatus int +} + +func (fc fakeHTTPClient) Do(req *http.Request) (resp *http.Response, err error) { + return &http.Response{ + StatusCode: fc.respStatus, + Body: io.NopCloser(strings.NewReader("test response")), + }, nil +} + +func (fc fakeHTTPClient) CloseIdleConnections() { + // noop +} + type esTestContext struct { namespace string + ec *eslegclient.Connection esc *elasticsearch.Client loader StateLoader tracker *Tracker @@ -106,10 +163,12 @@ func newESTestContext(t *testing.T) *esTestContext { } namespace, _ := uuid.NewV4() esc := IntegApiClient(t) + ec := IntegES(t) etc := &esTestContext{ namespace: namespace.String(), esc: esc, - loader: IntegESLoader(t, fmt.Sprintf("synthetics-*-%s", namespace.String()), location), + ec: ec, + loader: IntegESLoader(t, ec, fmt.Sprintf("synthetics-*-%s", namespace.String()), location), location: location, } diff --git a/heartbeat/monitors/wrappers/monitorstate/testutil.go b/heartbeat/monitors/wrappers/monitorstate/testutil.go index 540983097587..28a6c2606557 100644 --- a/heartbeat/monitors/wrappers/monitorstate/testutil.go +++ b/heartbeat/monitors/wrappers/monitorstate/testutil.go @@ -33,8 +33,8 @@ import ( // Helpers for tests here and elsewhere -func IntegESLoader(t *testing.T, indexPattern string, location *config.LocationWithID) StateLoader { - return MakeESLoader(IntegES(t), indexPattern, location) +func IntegESLoader(t *testing.T, esc *eslegclient.Connection, indexPattern string, location *config.LocationWithID) StateLoader { + return MakeESLoader(esc, indexPattern, location) } func IntegES(t *testing.T) (esc *eslegclient.Connection) { diff --git a/heartbeat/monitors/wrappers/monitorstate/tracker.go b/heartbeat/monitors/wrappers/monitorstate/tracker.go index e350294e46e8..40a4e8f2ded1 100644 --- a/heartbeat/monitors/wrappers/monitorstate/tracker.go +++ b/heartbeat/monitors/wrappers/monitorstate/tracker.go @@ -62,7 +62,7 @@ func (t *Tracker) RecordStatus(sf stdfields.StdMonitorFields, newStatus StateSta t.mtx.Lock() defer t.mtx.Unlock() - state := t.GetCurrentState(sf) + state := t.GetCurrentState(sf, RetryConfig{}) if state == nil { state = newMonitorState(sf, newStatus, 0, t.flappingEnabled) logp.L().Infof("initializing new state for monitor %s: %s", sf.ID, state.String()) @@ -75,22 +75,33 @@ func (t *Tracker) RecordStatus(sf stdfields.StdMonitorFields, newStatus StateSta } func (t *Tracker) GetCurrentStatus(sf stdfields.StdMonitorFields) StateStatus { - s := t.GetCurrentState(sf) + s := t.GetCurrentState(sf, RetryConfig{}) if s == nil { return StatusEmpty } return s.Status } -func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields) (state *State) { +type RetryConfig struct { + attempts int + waitFn func() time.Duration +} + +func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields, rc RetryConfig) (state *State) { if state, ok := t.states[sf.ID]; ok { return state } - tries := 3 + // Default number of attempts + attempts := 3 + if rc.attempts != 0 { + attempts = rc.attempts + } + var loadedState *State var err error - for i := 0; i < tries; i++ { + var i int + for i = 0; i < attempts; i++ { loadedState, err = t.stateLoader(sf) if err == nil { if loadedState != nil { @@ -98,13 +109,22 @@ func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields) (state *State) } break } + var loaderError LoaderError + if errors.As(err, &loaderError) && !loaderError.Retry { + logp.L().Warnf("could not load last externally recorded state: %v", loaderError) + break + } + // Default sleep time sleepFor := (time.Duration(i*i) * time.Second) + (time.Duration(rand.Intn(500)) * time.Millisecond) - logp.L().Warnf("could not load last externally recorded state, will retry again in %d milliseconds: %w", sleepFor.Milliseconds(), err) + if rc.waitFn != nil { + sleepFor = rc.waitFn() + } + logp.L().Warnf("could not load last externally recorded state, will retry again in %d milliseconds: %v", sleepFor.Milliseconds(), err) time.Sleep(sleepFor) } if err != nil { - logp.L().Warnf("could not load prior state from elasticsearch after %d attempts, will create new state for monitor: %s", tries, sf.ID) + logp.L().Warnf("could not load prior state from elasticsearch after %d attempts, will create new state for monitor: %s", i+1, sf.ID) } if loadedState != nil { diff --git a/heartbeat/monitors/wrappers/monitorstate/tracker_test.go b/heartbeat/monitors/wrappers/monitorstate/tracker_test.go index ec1217b86150..fd34371ce810 100644 --- a/heartbeat/monitors/wrappers/monitorstate/tracker_test.go +++ b/heartbeat/monitors/wrappers/monitorstate/tracker_test.go @@ -18,6 +18,7 @@ package monitorstate import ( + "errors" "testing" "time" @@ -131,3 +132,51 @@ func TestDeferredStateLoader(t *testing.T) { resState, _ = dsl(stdfields.StdMonitorFields{}) require.Equal(t, stateA, resState) } + +func TestStateLoaderRetry(t *testing.T) { + // While testing the sleep time between retries should be negligible + waitFn := func() time.Duration { + return time.Microsecond + } + + tests := []struct { + name string + retryable bool + rc RetryConfig + expectedCalls int + }{ + { + "should retry 3 times when fails with retryable error", + true, + RetryConfig{waitFn: waitFn}, + 3, + }, + { + "should not retry when fails with non-retryable error", + false, + RetryConfig{waitFn: waitFn}, + 1, + }, + { + "should honour the configured number of attempts when fails with retryable error", + true, + RetryConfig{attempts: 5, waitFn: waitFn}, + 5, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + calls := 0 + errorStateLoader := func(_ stdfields.StdMonitorFields) (*State, error) { + calls += 1 + return nil, LoaderError{err: errors.New("test error"), Retry: tt.retryable} + } + + mst := NewTracker(errorStateLoader, true) + mst.GetCurrentState(stdfields.StdMonitorFields{}, tt.rc) + + require.Equal(t, calls, tt.expectedCalls) + }) + } +} diff --git a/libbeat/publisher/queue/memqueue/produce.go b/libbeat/publisher/queue/memqueue/produce.go index 954ea055f4a4..5ddea468e4c6 100644 --- a/libbeat/publisher/queue/memqueue/produce.go +++ b/libbeat/publisher/queue/memqueue/produce.go @@ -36,9 +36,10 @@ type ackProducer struct { } type openState struct { - log *logp.Logger - done chan struct{} - events chan pushRequest + log *logp.Logger + done chan struct{} + queueDone <-chan struct{} + events chan pushRequest } // producerID stores the order of events within a single producer, so multiple @@ -58,9 +59,10 @@ type ackHandler func(count int) func newProducer(b *broker, cb ackHandler, dropCB func(interface{}), dropOnCancel bool) queue.Producer { openState := openState{ - log: b.logger, - done: make(chan struct{}), - events: b.pushChan, + log: b.logger, + done: make(chan struct{}), + queueDone: b.ctx.Done(), + events: b.pushChan, } if cb != nil { @@ -143,27 +145,40 @@ func (st *openState) Close() { func (st *openState) publish(req pushRequest) (queue.EntryID, bool) { select { case st.events <- req: - // If the output is blocked and the queue is full, `req` is written - // to `st.events`, however the queue never writes back to `req.resp`, - // which effectively blocks for ever. So we also need to select on the - // done channel to ensure we don't miss the shutdown signal. + // The events channel is buffered, which means we may successfully + // write to it even if the queue is shutting down. To avoid blocking + // forever during shutdown, we also have to wait on the queue's + // shutdown channel. select { case resp := <-req.resp: return resp, true - case <-st.done: + case <-st.queueDone: st.events = nil return 0, false } case <-st.done: st.events = nil return 0, false + case <-st.queueDone: + st.events = nil + return 0, false } } func (st *openState) tryPublish(req pushRequest) (queue.EntryID, bool) { select { case st.events <- req: - return <-req.resp, true + // The events channel is buffered, which means we may successfully + // write to it even if the queue is shutting down. To avoid blocking + // forever during shutdown, we also have to wait on the queue's + // shutdown channel. + select { + case resp := <-req.resp: + return resp, true + case <-st.queueDone: + st.events = nil + return 0, false + } case <-st.done: st.events = nil return 0, false diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 141514483f33..53f8da4b77c6 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -27,8 +27,7 @@ import ( "testing" "time" - "gotest.tools/assert" - + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/publisher/queue" @@ -77,17 +76,17 @@ func TestProduceConsumer(t *testing.T) { t.Run("flush", testWith(makeTestQueue(bufferSize, batchSize/2, 100*time.Millisecond))) } -// TestProducerDoesNotBlockWhenCancelled ensures the producer Publish -// does not block indefinitely. +// TestProducerDoesNotBlockWhenQueueClosed ensures the producer Publish +// does not block indefinitely during queue shutdown. // -// Once we get a producer `p` from the queue we want to ensure +// Once we get a producer `p` from the queue `q` we want to ensure // that if p.Publish is called and blocks it will unblock once -// p.Cancel is called. +// `q.Close` is called. // // For this test we start a queue with size 2 and try to add more -// than 2 events to it, p.Publish will block, once we call p.Cancel, +// than 2 events to it, p.Publish will block, once we call q.Close, // we ensure the 3rd event was not successfully published. -func TestProducerDoesNotBlockWhenCancelled(t *testing.T) { +func TestProducerDoesNotBlockWhenQueueClosed(t *testing.T) { q := NewQueue(nil, nil, Settings{ Events: 2, // Queue size @@ -138,8 +137,12 @@ func TestProducerDoesNotBlockWhenCancelled(t *testing.T) { time.Millisecond, "the first two events were not successfully published") - // Cancel the producer, this should unblock its Publish method - p.Cancel() + // Close the queue, this should unblock the pending Publish call. + // It's not enough to just cancel the producer: once the producer + // has successfully sent a request to the queue, it must wait for + // the response unless the queue shuts down, otherwise the pipeline + // event totals will be wrong. + q.Close() require.Eventually( t, @@ -149,6 +152,88 @@ func TestProducerDoesNotBlockWhenCancelled(t *testing.T) { "test not flagged as successful, p.Publish likely blocked indefinitely") } +func TestProducerClosePreservesEventCount(t *testing.T) { + // Check for https://github.com/elastic/beats/issues/37702, a problem + // where canceling a producer while it was waiting on a response + // to an insert request could lead to inaccurate event totals. + + var activeEvents atomic.Int64 + + q := NewQueue(nil, nil, + Settings{ + Events: 3, // Queue size + MaxGetRequest: 2, + FlushTimeout: 10 * time.Millisecond, + }, 1) + + p := q.Producer(queue.ProducerConfig{ + ACK: func(count int) { + activeEvents.Add(-int64(count)) + }, + OnDrop: func(e interface{}) { + //activeEvents.Add(-1) + }, + DropOnCancel: false, + }) + + // Asynchronously, send 4 events to the queue. + // Three will be enqueued, and one will be buffered, + // until we start reading from the queue. + // This needs to run in a goroutine because the buffered + // event will block until the queue handles it. + var wgProducer sync.WaitGroup + wgProducer.Add(1) + go func() { + for i := 0; i < 4; i++ { + event := i + // For proper navigation of the race conditions inherent to this + // test: increment active events before the publish attempt, then + // decrement afterwards if it failed (otherwise the event count + // could become negative even under correct queue operation). + activeEvents.Add(1) + _, ok := p.Publish(event) + if !ok { + activeEvents.Add(-1) + } + } + wgProducer.Done() + }() + + // This sleep is regrettable, but there's no deterministic way to know when + // the producer code has buffered an event in the queue's channel. + // However, the test is written to produce false negatives only: + // - If this test fails, it _always_ indicates a bug. + // - If there is a bug, this test will _often_ fail. + time.Sleep(20 * time.Millisecond) + + // Cancel the producer, then read and acknowledge two batches. If the + // Publish calls and the queue code are working, activeEvents should + // _usually_ end up as 0, but _always_ end up non-negative. + p.Cancel() + + // The queue reads also need to be done in a goroutine, in case the + // producer cancellation signal went through before the Publish + // requests -- if only 2 events entered the queue, then the second + // Get call will block until the queue itself is cancelled. + go func() { + for i := 0; i < 2; i++ { + batch, err := q.Get(2) + // Only error to worry about is queue closing, which isn't + // a test failure. + if err == nil { + batch.Done() + } + } + }() + + // One last sleep to let things percolate, then we close the queue + // to unblock any helpers and verify that the final active event + // count isn't negative. + time.Sleep(10 * time.Millisecond) + q.Close() + assert.False(t, activeEvents.Load() < 0, "active event count should never be negative") +} + func TestQueueMetricsDirect(t *testing.T) { eventsToTest := 5 maxEvents := 10 @@ -190,7 +275,7 @@ func queueTestWithSettings(t *testing.T, settings Settings, eventsToTest int, te // Read events, don't yet ack them batch, err := testQueue.Get(eventsToTest) - assert.NilError(t, err, "error in Get") + assert.NoError(t, err, "error in Get") t.Logf("Got batch of %d events", batch.Count()) queueMetricsAreValid(t, testQueue, 5, settings.Events, 5, fmt.Sprintf("%s - Producer Getting events, no ACK", testName)) @@ -206,7 +291,7 @@ func queueMetricsAreValid(t *testing.T, q queue.Queue, evtCount, evtLimit, occup // wait briefly to avoid races across all the queue channels time.Sleep(time.Millisecond * 100) testMetrics, err := q.Metrics() - assert.NilError(t, err, "error calling metrics for test %s", test) + assert.NoError(t, err, "error calling metrics for test %s", test) assert.Equal(t, testMetrics.EventCount.ValueOr(0), uint64(evtCount), "incorrect EventCount for %s", test) assert.Equal(t, testMetrics.EventLimit.ValueOr(0), uint64(evtLimit), "incorrect EventLimit for %s", test) assert.Equal(t, testMetrics.UnackedConsumedEvents.ValueOr(0), uint64(occupied), "incorrect OccupiedRead for %s", test) @@ -266,18 +351,18 @@ func TestEntryIDs(t *testing.T) { for i := 0; i < entryCount; i++ { batch, err := q.Get(1) - assert.NilError(t, err, "Queue read should succeed") + assert.NoError(t, err, "Queue read should succeed") assert.Equal(t, batch.Count(), 1, "Returned batch should have 1 entry") metrics, err := q.Metrics() - assert.NilError(t, err, "Queue metrics call should succeed") + assert.NoError(t, err, "Queue metrics call should succeed") assert.Equal(t, metrics.OldestEntryID, queue.EntryID(i), fmt.Sprintf("Oldest entry ID before ACKing event %v should be %v", i, i)) batch.Done() waiter.waitForEvents(1) metrics, err = q.Metrics() - assert.NilError(t, err, "Queue metrics call should succeed") + assert.NoError(t, err, "Queue metrics call should succeed") assert.Equal(t, metrics.OldestEntryID, queue.EntryID(i+1), fmt.Sprintf("Oldest entry ID after ACKing event %v should be %v", i, i+1)) @@ -297,7 +382,7 @@ func TestEntryIDs(t *testing.T) { for i := 0; i < entryCount; i++ { batch, err := q.Get(1) - assert.NilError(t, err, "Queue read should succeed") + assert.NoError(t, err, "Queue read should succeed") assert.Equal(t, batch.Count(), 1, "Returned batch should have 1 entry") batches = append(batches, batch) } @@ -318,7 +403,7 @@ func TestEntryIDs(t *testing.T) { // the slight nondeterminism. time.Sleep(1 * time.Millisecond) metrics, err := q.Metrics() - assert.NilError(t, err, "Queue metrics call should succeed") + assert.NoError(t, err, "Queue metrics call should succeed") assert.Equal(t, metrics.OldestEntryID, queue.EntryID(0), fmt.Sprintf("Oldest entry ID after ACKing event %v should be 0", i)) } @@ -326,7 +411,7 @@ func TestEntryIDs(t *testing.T) { batches[0].Done() waiter.waitForEvents(100) metrics, err := q.Metrics() - assert.NilError(t, err, "Queue metrics call should succeed") + assert.NoError(t, err, "Queue metrics call should succeed") assert.Equal(t, metrics.OldestEntryID, queue.EntryID(100), fmt.Sprintf("Oldest entry ID after ACKing event 0 should be %v", queue.EntryID(entryCount))) diff --git a/x-pack/filebeat/input/awss3/s3.go b/x-pack/filebeat/input/awss3/s3.go index 96be746c160a..5aa8d31e95de 100644 --- a/x-pack/filebeat/input/awss3/s3.go +++ b/x-pack/filebeat/input/awss3/s3.go @@ -8,7 +8,6 @@ import ( "context" "errors" "fmt" - "net/url" "sync" "time" @@ -208,14 +207,7 @@ func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<- // Metrics p.metrics.s3ObjectsListedTotal.Add(uint64(totListedObjects)) for _, object := range page.Contents { - // Unescape s3 key name. For example, convert "%3D" back to "=". - filename, err := url.QueryUnescape(*object.Key) - if err != nil { - p.log.Errorw("Error when unescaping object key, skipping.", "error", err, "s3_object", *object.Key) - continue - } - - state := newState(bucketName, filename, *object.ETag, p.listPrefix, *object.LastModified) + state := newState(bucketName, *object.Key, *object.ETag, p.listPrefix, *object.LastModified) if p.states.MustSkip(state, p.store) { p.log.Debugw("skipping state.", "state", state) continue @@ -240,7 +232,7 @@ func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<- s3ObjectHandler: s3Processor, s3ObjectInfo: s3ObjectInfo{ name: bucketName, - key: filename, + key: *object.Key, etag: *object.ETag, lastModified: *object.LastModified, listingID: listingID.String(), diff --git a/x-pack/filebeat/input/awss3/s3_test.go b/x-pack/filebeat/input/awss3/s3_test.go index 6f075a2f8541..b94ba7cfb09b 100644 --- a/x-pack/filebeat/input/awss3/s3_test.go +++ b/x-pack/filebeat/input/awss3/s3_test.go @@ -93,6 +93,11 @@ func TestS3Poller(t *testing.T) { Key: aws.String("key5"), LastModified: aws.Time(time.Now()), }, + { + ETag: aws.String("etag6"), + Key: aws.String("2024-02-08T08:35:00+00:02.json.gz"), + LastModified: aws.Time(time.Now()), + }, }, }, nil }) @@ -124,6 +129,10 @@ func TestS3Poller(t *testing.T) { GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key5")). Return(nil, errFakeConnectivityFailure) + mockAPI.EXPECT(). + GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("2024-02-08T08:35:00+00:02.json.gz")). + Return(nil, errFakeConnectivityFailure) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, nil, backupConfig{}, numberOfWorkers) receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, s3ObjProc, newStates(inputCtx), store, bucket, "key", "region", "provider", numberOfWorkers, pollInterval) require.Error(t, context.DeadlineExceeded, receiver.Poll(ctx))