From bf4ee7294643558a64194ee00322a601210877c0 Mon Sep 17 00:00:00 2001 From: Alberto Delgado Roda Date: Mon, 5 Feb 2024 14:22:23 +0100 Subject: [PATCH 01/11] only retry when the status is 5xx --- .../wrappers/monitorstate/esloader.go | 28 +++++++-- .../wrappers/monitorstate/esloader_test.go | 59 ++++++++++++++++++- .../wrappers/monitorstate/testutil.go | 4 +- .../monitors/wrappers/monitorstate/tracker.go | 4 ++ .../wrappers/monitorstate/tracker_test.go | 34 +++++++++++ 5 files changed, 121 insertions(+), 8 deletions(-) diff --git a/heartbeat/monitors/wrappers/monitorstate/esloader.go b/heartbeat/monitors/wrappers/monitorstate/esloader.go index d63be5aada5b..4f60a59206a1 100644 --- a/heartbeat/monitors/wrappers/monitorstate/esloader.go +++ b/heartbeat/monitors/wrappers/monitorstate/esloader.go @@ -32,13 +32,21 @@ import ( var DefaultDataStreams = "synthetics-*,heartbeat-*" +type LoaderError struct { + Message string + Retry bool +} + +func (e LoaderError) Error() string { + return e.Message +} + func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation *config.LocationWithID) StateLoader { if indexPattern == "" { // Should never happen, but if we ever make a coding error... logp.L().Warn("ES state loader initialized with no index pattern, will not load states from ES") return NilStateLoader } - return func(sf stdfields.StdMonitorFields) (*State, error) { var runFromID string if sf.RunFrom != nil { @@ -74,10 +82,11 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation }, }, } - - status, body, err := esc.Request("POST", strings.Join([]string{"/", indexPattern, "/", "_search", "?size=1"}, ""), "", nil, reqBody) + status, body, err := esc.Request("POST", strings.Join([]string{"/", indexPattern, "/", "search", "?size=1"}, ""), "", nil, reqBody) if err != nil || status > 299 { - return nil, fmt.Errorf("error executing state search for %s in loc=%s: %w", sf.ID, runFromID, err) + errMsg := fmt.Errorf("error executing state search for %s in loc=%s: %w", sf.ID, runFromID, err).Error() + retry := shouldRetry(status) + return nil, LoaderError{Message: errMsg, Retry: retry} } type stateHits struct { @@ -94,7 +103,8 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation sh := stateHits{} err = json.Unmarshal(body, &sh) if err != nil { - return nil, fmt.Errorf("could not unmarshal state hits for %s: %w", sf.ID, err) + errMsg := fmt.Errorf("could not unmarshal state hits for %s: %w", sf.ID, err).Error() + return nil, LoaderError{Message: errMsg, Retry: true} } if len(sh.Hits.Hits) == 0 { @@ -107,3 +117,11 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation return state, nil } } + +func shouldRetry(status int) bool { + if status > 200 && status <= 499 { + return false + } + + return true +} diff --git a/heartbeat/monitors/wrappers/monitorstate/esloader_test.go b/heartbeat/monitors/wrappers/monitorstate/esloader_test.go index 42b1c6c31c31..de31fdf346fc 100644 --- a/heartbeat/monitors/wrappers/monitorstate/esloader_test.go +++ b/heartbeat/monitors/wrappers/monitorstate/esloader_test.go @@ -21,6 +21,9 @@ package monitorstate import ( "fmt" + "io" + "net/http" + "strings" "testing" "time" @@ -33,6 +36,7 @@ import ( "github.com/elastic/beats/v7/heartbeat/config" "github.com/elastic/beats/v7/heartbeat/esutil" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" + "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/processors/util" ) @@ -89,8 +93,59 @@ func TestStatesESLoader(t *testing.T) { } } +func TestMakeESLoaderError(t *testing.T) { + tests := []struct { + name string + statusCode int + expected bool + }{ + { + name: "should return a retryable error", + statusCode: http.StatusInternalServerError, + expected: true, + }, + { + name: "should not return a retryable error", + statusCode: http.StatusNotFound, + expected: false, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // Arrange + etc := newESTestContext(t) + etc.ec.HTTP = fakeHTTPClient{respStatus: test.statusCode} + loader := MakeESLoader(etc.ec, "fakeIndexPattern", etc.location) + + // Act + _, err := loader(stdfields.StdMonitorFields{}) + + // Assert + var loaderError LoaderError + require.ErrorAs(t, err, &loaderError) + require.Equal(t, loaderError.Retry, test.expected) + }) + } +} + +type fakeHTTPClient struct { + respStatus int +} + +func (fc fakeHTTPClient) Do(req *http.Request) (resp *http.Response, err error) { + return &http.Response{ + StatusCode: fc.respStatus, + Body: io.NopCloser(strings.NewReader("test response")), + }, nil +} + +func (fc fakeHTTPClient) CloseIdleConnections() { + // noop +} + type esTestContext struct { namespace string + ec *eslegclient.Connection esc *elasticsearch.Client loader StateLoader tracker *Tracker @@ -106,10 +161,12 @@ func newESTestContext(t *testing.T) *esTestContext { } namespace, _ := uuid.NewV4() esc := IntegApiClient(t) + ec := IntegES(t) etc := &esTestContext{ namespace: namespace.String(), esc: esc, - loader: IntegESLoader(t, fmt.Sprintf("synthetics-*-%s", namespace.String()), location), + ec: ec, + loader: IntegESLoader(t, ec, fmt.Sprintf("synthetics-*-%s", namespace.String()), location), location: location, } diff --git a/heartbeat/monitors/wrappers/monitorstate/testutil.go b/heartbeat/monitors/wrappers/monitorstate/testutil.go index 540983097587..28a6c2606557 100644 --- a/heartbeat/monitors/wrappers/monitorstate/testutil.go +++ b/heartbeat/monitors/wrappers/monitorstate/testutil.go @@ -33,8 +33,8 @@ import ( // Helpers for tests here and elsewhere -func IntegESLoader(t *testing.T, indexPattern string, location *config.LocationWithID) StateLoader { - return MakeESLoader(IntegES(t), indexPattern, location) +func IntegESLoader(t *testing.T, esc *eslegclient.Connection, indexPattern string, location *config.LocationWithID) StateLoader { + return MakeESLoader(esc, indexPattern, location) } func IntegES(t *testing.T) (esc *eslegclient.Connection) { diff --git a/heartbeat/monitors/wrappers/monitorstate/tracker.go b/heartbeat/monitors/wrappers/monitorstate/tracker.go index e350294e46e8..af6c7f2893c0 100644 --- a/heartbeat/monitors/wrappers/monitorstate/tracker.go +++ b/heartbeat/monitors/wrappers/monitorstate/tracker.go @@ -98,6 +98,10 @@ func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields) (state *State) } break } + var loaderError LoaderError + if errors.As(err, &loaderError) && !loaderError.Retry { + break + } sleepFor := (time.Duration(i*i) * time.Second) + (time.Duration(rand.Intn(500)) * time.Millisecond) logp.L().Warnf("could not load last externally recorded state, will retry again in %d milliseconds: %w", sleepFor.Milliseconds(), err) diff --git a/heartbeat/monitors/wrappers/monitorstate/tracker_test.go b/heartbeat/monitors/wrappers/monitorstate/tracker_test.go index ec1217b86150..0b836e5d9700 100644 --- a/heartbeat/monitors/wrappers/monitorstate/tracker_test.go +++ b/heartbeat/monitors/wrappers/monitorstate/tracker_test.go @@ -131,3 +131,37 @@ func TestDeferredStateLoader(t *testing.T) { resState, _ = dsl(stdfields.StdMonitorFields{}) require.Equal(t, stateA, resState) } + +func TestStateLoaderRetry(t *testing.T) { + tests := []struct { + name string + retryable bool + expectedCalls int + }{ + { + "should retry 3 times when fails with retryable error", + true, + 3, + }, + { + "should not retry when fails with non-retryable error", + false, + 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + calls := 0 + errorStateLoader := func(_ stdfields.StdMonitorFields) (*State, error) { + calls += 1 + return nil, LoaderError{Message: "test error", Retry: tt.retryable} + } + + mst := NewTracker(errorStateLoader, true) + mst.GetCurrentState(stdfields.StdMonitorFields{}) + + require.Equal(t, calls, tt.expectedCalls) + }) + } +} From 568be03777ad21c537b675e6576b7e384c6e385a Mon Sep 17 00:00:00 2001 From: Alberto Delgado Roda Date: Mon, 12 Feb 2024 16:34:23 +0100 Subject: [PATCH 02/11] remove test AAA comments --- heartbeat/monitors/wrappers/monitorstate/esloader_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/heartbeat/monitors/wrappers/monitorstate/esloader_test.go b/heartbeat/monitors/wrappers/monitorstate/esloader_test.go index de31fdf346fc..7e1b6e14120f 100644 --- a/heartbeat/monitors/wrappers/monitorstate/esloader_test.go +++ b/heartbeat/monitors/wrappers/monitorstate/esloader_test.go @@ -112,15 +112,12 @@ func TestMakeESLoaderError(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - // Arrange etc := newESTestContext(t) etc.ec.HTTP = fakeHTTPClient{respStatus: test.statusCode} loader := MakeESLoader(etc.ec, "fakeIndexPattern", etc.location) - // Act _, err := loader(stdfields.StdMonitorFields{}) - // Assert var loaderError LoaderError require.ErrorAs(t, err, &loaderError) require.Equal(t, loaderError.Retry, test.expected) From aa2c02b65bf06a6ca926a67a944ebad26ce00ea7 Mon Sep 17 00:00:00 2001 From: Alberto Delgado Roda Date: Mon, 12 Feb 2024 16:38:52 +0100 Subject: [PATCH 03/11] add changelog --- CHANGELOG.next.asciidoc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 46a86a51ecdc..172d185cc5e6 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -32,6 +32,8 @@ fields added to events containing the Beats version. {pull}37553[37553] *Heartbeat* +- Adjust State loader to only retry for failed requests and not for 4xx. {pull}37981[37981] + *Metricbeat* From 7c074d271b2e309cfd55beea6cded26f411d46ff Mon Sep 17 00:00:00 2001 From: Alberto Delgado Roda Date: Mon, 12 Feb 2024 18:19:39 +0100 Subject: [PATCH 04/11] correct changelog modification --- CHANGELOG.next.asciidoc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 172d185cc5e6..8c1c92bd6f79 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -32,8 +32,6 @@ fields added to events containing the Beats version. {pull}37553[37553] *Heartbeat* -- Adjust State loader to only retry for failed requests and not for 4xx. {pull}37981[37981] - *Metricbeat* @@ -102,6 +100,9 @@ fields added to events containing the Beats version. {pull}37553[37553] *Heartbeat* +- Fix panics when parsing dereferencing invalid parsed url. {pull}34702[34702] +- Fix setuid root when running under cgroups v2. {pull}37794[37794] +- Adjust State loader to only retry for failed requests and not for 4xx. {pull}37981[37981] *Metricbeat* From dd757c035ca77be4ea5d62d661e229f647c3bceb Mon Sep 17 00:00:00 2001 From: Alberto Delgado Roda Date: Wed, 21 Feb 2024 16:10:15 +0100 Subject: [PATCH 05/11] fix ES query --- heartbeat/monitors/wrappers/monitorstate/esloader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/heartbeat/monitors/wrappers/monitorstate/esloader.go b/heartbeat/monitors/wrappers/monitorstate/esloader.go index 4f60a59206a1..871e0d089803 100644 --- a/heartbeat/monitors/wrappers/monitorstate/esloader.go +++ b/heartbeat/monitors/wrappers/monitorstate/esloader.go @@ -82,7 +82,7 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation }, }, } - status, body, err := esc.Request("POST", strings.Join([]string{"/", indexPattern, "/", "search", "?size=1"}, ""), "", nil, reqBody) + status, body, err := esc.Request("POST", strings.Join([]string{"/", indexPattern, "/", "_search", "?size=1"}, ""), "", nil, reqBody) if err != nil || status > 299 { errMsg := fmt.Errorf("error executing state search for %s in loc=%s: %w", sf.ID, runFromID, err).Error() retry := shouldRetry(status) From b3e419df03ad7ea19a833d2b8d6d5f9325e8719a Mon Sep 17 00:00:00 2001 From: Alberto Delgado Roda Date: Wed, 21 Feb 2024 17:56:34 +0100 Subject: [PATCH 06/11] change error handling strategy --- .../wrappers/monitorstate/esloader.go | 20 ++++++++----------- .../monitors/wrappers/monitorstate/tracker.go | 1 + .../wrappers/monitorstate/tracker_test.go | 3 ++- 3 files changed, 11 insertions(+), 13 deletions(-) diff --git a/heartbeat/monitors/wrappers/monitorstate/esloader.go b/heartbeat/monitors/wrappers/monitorstate/esloader.go index 871e0d089803..8867c9710238 100644 --- a/heartbeat/monitors/wrappers/monitorstate/esloader.go +++ b/heartbeat/monitors/wrappers/monitorstate/esloader.go @@ -33,12 +33,12 @@ import ( var DefaultDataStreams = "synthetics-*,heartbeat-*" type LoaderError struct { - Message string - Retry bool + err error + Retry bool } func (e LoaderError) Error() string { - return e.Message + return e.err.Error() } func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation *config.LocationWithID) StateLoader { @@ -84,9 +84,9 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation } status, body, err := esc.Request("POST", strings.Join([]string{"/", indexPattern, "/", "_search", "?size=1"}, ""), "", nil, reqBody) if err != nil || status > 299 { - errMsg := fmt.Errorf("error executing state search for %s in loc=%s: %w", sf.ID, runFromID, err).Error() + sErr := fmt.Errorf("error executing state search for %s in loc=%s: %w", sf.ID, runFromID, err) retry := shouldRetry(status) - return nil, LoaderError{Message: errMsg, Retry: retry} + return nil, LoaderError{err: sErr, Retry: retry} } type stateHits struct { @@ -103,8 +103,8 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation sh := stateHits{} err = json.Unmarshal(body, &sh) if err != nil { - errMsg := fmt.Errorf("could not unmarshal state hits for %s: %w", sf.ID, err).Error() - return nil, LoaderError{Message: errMsg, Retry: true} + sErr := fmt.Errorf("could not unmarshal state hits for %s: %w", sf.ID, err) + return nil, LoaderError{err: sErr, Retry: true} } if len(sh.Hits.Hits) == 0 { @@ -119,9 +119,5 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation } func shouldRetry(status int) bool { - if status > 200 && status <= 499 { - return false - } - - return true + return status >= 500 } diff --git a/heartbeat/monitors/wrappers/monitorstate/tracker.go b/heartbeat/monitors/wrappers/monitorstate/tracker.go index af6c7f2893c0..d82622521c15 100644 --- a/heartbeat/monitors/wrappers/monitorstate/tracker.go +++ b/heartbeat/monitors/wrappers/monitorstate/tracker.go @@ -100,6 +100,7 @@ func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields) (state *State) } var loaderError LoaderError if errors.As(err, &loaderError) && !loaderError.Retry { + logp.L().Warnf("could not load last externally recorded state: %w", err) break } diff --git a/heartbeat/monitors/wrappers/monitorstate/tracker_test.go b/heartbeat/monitors/wrappers/monitorstate/tracker_test.go index 0b836e5d9700..44a012f7849e 100644 --- a/heartbeat/monitors/wrappers/monitorstate/tracker_test.go +++ b/heartbeat/monitors/wrappers/monitorstate/tracker_test.go @@ -18,6 +18,7 @@ package monitorstate import ( + "errors" "testing" "time" @@ -155,7 +156,7 @@ func TestStateLoaderRetry(t *testing.T) { calls := 0 errorStateLoader := func(_ stdfields.StdMonitorFields) (*State, error) { calls += 1 - return nil, LoaderError{Message: "test error", Retry: tt.retryable} + return nil, LoaderError{err: errors.New("test error"), Retry: tt.retryable} } mst := NewTracker(errorStateLoader, true) From ff6f4cd82924d0d66b754c896c2b3e4dda3238ee Mon Sep 17 00:00:00 2001 From: Alberto Delgado Roda Date: Thu, 22 Feb 2024 13:33:46 +0100 Subject: [PATCH 07/11] do not retry when there is malformed data --- heartbeat/monitors/wrappers/monitorstate/esloader.go | 2 +- heartbeat/monitors/wrappers/monitorstate/esloader_test.go | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/heartbeat/monitors/wrappers/monitorstate/esloader.go b/heartbeat/monitors/wrappers/monitorstate/esloader.go index 8867c9710238..5705ec4b1468 100644 --- a/heartbeat/monitors/wrappers/monitorstate/esloader.go +++ b/heartbeat/monitors/wrappers/monitorstate/esloader.go @@ -104,7 +104,7 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation err = json.Unmarshal(body, &sh) if err != nil { sErr := fmt.Errorf("could not unmarshal state hits for %s: %w", sf.ID, err) - return nil, LoaderError{err: sErr, Retry: true} + return nil, LoaderError{err: sErr, Retry: false} } if len(sh.Hits.Hits) == 0 { diff --git a/heartbeat/monitors/wrappers/monitorstate/esloader_test.go b/heartbeat/monitors/wrappers/monitorstate/esloader_test.go index 7e1b6e14120f..532dbee9bb50 100644 --- a/heartbeat/monitors/wrappers/monitorstate/esloader_test.go +++ b/heartbeat/monitors/wrappers/monitorstate/esloader_test.go @@ -109,6 +109,11 @@ func TestMakeESLoaderError(t *testing.T) { statusCode: http.StatusNotFound, expected: false, }, + { + name: "should not return a retryable error when handling malformed data", + statusCode: http.StatusOK, + expected: false, + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { From c40730504f4d2182c178f95efad745a19fe78016 Mon Sep 17 00:00:00 2001 From: Alberto Delgado Roda Date: Thu, 22 Feb 2024 14:27:55 +0100 Subject: [PATCH 08/11] improve retry mechanism --- .../wrappers/monitorstate/esloader_test.go | 2 +- .../monitors/wrappers/monitorstate/tracker.go | 26 ++++++++++++++----- .../wrappers/monitorstate/tracker_test.go | 16 +++++++++++- 3 files changed, 36 insertions(+), 8 deletions(-) diff --git a/heartbeat/monitors/wrappers/monitorstate/esloader_test.go b/heartbeat/monitors/wrappers/monitorstate/esloader_test.go index 532dbee9bb50..db70c1cc72e7 100644 --- a/heartbeat/monitors/wrappers/monitorstate/esloader_test.go +++ b/heartbeat/monitors/wrappers/monitorstate/esloader_test.go @@ -55,7 +55,7 @@ func TestStatesESLoader(t *testing.T) { monID := etc.createTestMonitorStateInES(t, testStatus) // Since we've continued this state it should register the initial state - ms := etc.tracker.GetCurrentState(monID) + ms := etc.tracker.GetCurrentState(monID, RetryConfig{}) require.True(t, ms.StartedAt.After(testStart.Add(-time.Nanosecond)), "timestamp for new state is off") requireMSStatusCount(t, ms, testStatus, 1) diff --git a/heartbeat/monitors/wrappers/monitorstate/tracker.go b/heartbeat/monitors/wrappers/monitorstate/tracker.go index d82622521c15..505e97c53ecc 100644 --- a/heartbeat/monitors/wrappers/monitorstate/tracker.go +++ b/heartbeat/monitors/wrappers/monitorstate/tracker.go @@ -62,7 +62,7 @@ func (t *Tracker) RecordStatus(sf stdfields.StdMonitorFields, newStatus StateSta t.mtx.Lock() defer t.mtx.Unlock() - state := t.GetCurrentState(sf) + state := t.GetCurrentState(sf, RetryConfig{}) if state == nil { state = newMonitorState(sf, newStatus, 0, t.flappingEnabled) logp.L().Infof("initializing new state for monitor %s: %s", sf.ID, state.String()) @@ -75,22 +75,32 @@ func (t *Tracker) RecordStatus(sf stdfields.StdMonitorFields, newStatus StateSta } func (t *Tracker) GetCurrentStatus(sf stdfields.StdMonitorFields) StateStatus { - s := t.GetCurrentState(sf) + s := t.GetCurrentState(sf, RetryConfig{}) if s == nil { return StatusEmpty } return s.Status } -func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields) (state *State) { +type RetryConfig struct { + attempts int + waitFn func() time.Duration +} + +func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields, rc RetryConfig) (state *State) { if state, ok := t.states[sf.ID]; ok { return state } - tries := 3 + // Default number of attempts + attempts := 3 + if rc.attempts != 0 { + attempts = rc.attempts + } + var loadedState *State var err error - for i := 0; i < tries; i++ { + for i := 0; i < attempts; i++ { loadedState, err = t.stateLoader(sf) if err == nil { if loadedState != nil { @@ -104,12 +114,16 @@ func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields) (state *State) break } + // Default sleep time sleepFor := (time.Duration(i*i) * time.Second) + (time.Duration(rand.Intn(500)) * time.Millisecond) + if rc.waitFn != nil { + sleepFor = rc.waitFn() + } logp.L().Warnf("could not load last externally recorded state, will retry again in %d milliseconds: %w", sleepFor.Milliseconds(), err) time.Sleep(sleepFor) } if err != nil { - logp.L().Warnf("could not load prior state from elasticsearch after %d attempts, will create new state for monitor: %s", tries, sf.ID) + logp.L().Warnf("could not load prior state from elasticsearch after %d attempts, will create new state for monitor: %s", attempts, sf.ID) } if loadedState != nil { diff --git a/heartbeat/monitors/wrappers/monitorstate/tracker_test.go b/heartbeat/monitors/wrappers/monitorstate/tracker_test.go index 44a012f7849e..fd34371ce810 100644 --- a/heartbeat/monitors/wrappers/monitorstate/tracker_test.go +++ b/heartbeat/monitors/wrappers/monitorstate/tracker_test.go @@ -134,21 +134,35 @@ func TestDeferredStateLoader(t *testing.T) { } func TestStateLoaderRetry(t *testing.T) { + // While testing the sleep time between retries should be negligible + waitFn := func() time.Duration { + return time.Microsecond + } + tests := []struct { name string retryable bool + rc RetryConfig expectedCalls int }{ { "should retry 3 times when fails with retryable error", true, + RetryConfig{waitFn: waitFn}, 3, }, { "should not retry when fails with non-retryable error", false, + RetryConfig{waitFn: waitFn}, 1, }, + { + "should honour the configured number of attempts when fails with retryable error", + true, + RetryConfig{attempts: 5, waitFn: waitFn}, + 5, + }, } for _, tt := range tests { @@ -160,7 +174,7 @@ func TestStateLoaderRetry(t *testing.T) { } mst := NewTracker(errorStateLoader, true) - mst.GetCurrentState(stdfields.StdMonitorFields{}) + mst.GetCurrentState(stdfields.StdMonitorFields{}, tt.rc) require.Equal(t, calls, tt.expectedCalls) }) From 639cb207a2c0046da05502ef5306551bf9417392 Mon Sep 17 00:00:00 2001 From: Alberto Delgado Roda Date: Thu, 22 Feb 2024 18:23:35 +0100 Subject: [PATCH 09/11] improve log message --- heartbeat/monitors/wrappers/monitorstate/tracker.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/heartbeat/monitors/wrappers/monitorstate/tracker.go b/heartbeat/monitors/wrappers/monitorstate/tracker.go index 505e97c53ecc..7bc8e98afa3d 100644 --- a/heartbeat/monitors/wrappers/monitorstate/tracker.go +++ b/heartbeat/monitors/wrappers/monitorstate/tracker.go @@ -100,7 +100,8 @@ func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields, rc RetryConfig) var loadedState *State var err error - for i := 0; i < attempts; i++ { + var i int + for i = 0; i < attempts; i++ { loadedState, err = t.stateLoader(sf) if err == nil { if loadedState != nil { @@ -123,7 +124,7 @@ func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields, rc RetryConfig) time.Sleep(sleepFor) } if err != nil { - logp.L().Warnf("could not load prior state from elasticsearch after %d attempts, will create new state for monitor: %s", attempts, sf.ID) + logp.L().Warnf("could not load prior state from elasticsearch after %d attempts, will create new state for monitor: %s", i+1, sf.ID) } if loadedState != nil { From d305b8672ebd659da1342459024e28bb06a5ee0a Mon Sep 17 00:00:00 2001 From: Alberto Delgado Roda Date: Thu, 22 Feb 2024 19:04:04 +0100 Subject: [PATCH 10/11] improve changelog --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8c1c92bd6f79..eef3e2304167 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -102,7 +102,7 @@ fields added to events containing the Beats version. {pull}37553[37553] - Fix panics when parsing dereferencing invalid parsed url. {pull}34702[34702] - Fix setuid root when running under cgroups v2. {pull}37794[37794] -- Adjust State loader to only retry for failed requests and not for 4xx. {pull}37981[37981] +- Adjust State loader to only retry when response code status is 5xx {pull}37981[37981] *Metricbeat* From 0b8a9023e75e97e13db66f64fffee478851cb9de Mon Sep 17 00:00:00 2001 From: Alberto Delgado Roda Date: Fri, 23 Feb 2024 11:49:26 +0100 Subject: [PATCH 11/11] fix log format --- heartbeat/monitors/wrappers/monitorstate/tracker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/heartbeat/monitors/wrappers/monitorstate/tracker.go b/heartbeat/monitors/wrappers/monitorstate/tracker.go index 7bc8e98afa3d..40a4e8f2ded1 100644 --- a/heartbeat/monitors/wrappers/monitorstate/tracker.go +++ b/heartbeat/monitors/wrappers/monitorstate/tracker.go @@ -111,7 +111,7 @@ func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields, rc RetryConfig) } var loaderError LoaderError if errors.As(err, &loaderError) && !loaderError.Retry { - logp.L().Warnf("could not load last externally recorded state: %w", err) + logp.L().Warnf("could not load last externally recorded state: %v", loaderError) break } @@ -120,7 +120,7 @@ func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields, rc RetryConfig) if rc.waitFn != nil { sleepFor = rc.waitFn() } - logp.L().Warnf("could not load last externally recorded state, will retry again in %d milliseconds: %w", sleepFor.Milliseconds(), err) + logp.L().Warnf("could not load last externally recorded state, will retry again in %d milliseconds: %v", sleepFor.Milliseconds(), err) time.Sleep(sleepFor) } if err != nil {