From 5a0cf94b4f725fd3e3008b16727aa615f913503e Mon Sep 17 00:00:00 2001 From: Alberto Delgado Roda Date: Thu, 22 Feb 2024 14:27:55 +0100 Subject: [PATCH] improve retry mechanism --- .../wrappers/monitorstate/esloader_test.go | 2 +- .../monitors/wrappers/monitorstate/tracker.go | 26 ++++++++++++++----- .../wrappers/monitorstate/tracker_test.go | 16 +++++++++++- 3 files changed, 36 insertions(+), 8 deletions(-) diff --git a/heartbeat/monitors/wrappers/monitorstate/esloader_test.go b/heartbeat/monitors/wrappers/monitorstate/esloader_test.go index 532dbee9bb50..db70c1cc72e7 100644 --- a/heartbeat/monitors/wrappers/monitorstate/esloader_test.go +++ b/heartbeat/monitors/wrappers/monitorstate/esloader_test.go @@ -55,7 +55,7 @@ func TestStatesESLoader(t *testing.T) { monID := etc.createTestMonitorStateInES(t, testStatus) // Since we've continued this state it should register the initial state - ms := etc.tracker.GetCurrentState(monID) + ms := etc.tracker.GetCurrentState(monID, RetryConfig{}) require.True(t, ms.StartedAt.After(testStart.Add(-time.Nanosecond)), "timestamp for new state is off") requireMSStatusCount(t, ms, testStatus, 1) diff --git a/heartbeat/monitors/wrappers/monitorstate/tracker.go b/heartbeat/monitors/wrappers/monitorstate/tracker.go index d82622521c15..505e97c53ecc 100644 --- a/heartbeat/monitors/wrappers/monitorstate/tracker.go +++ b/heartbeat/monitors/wrappers/monitorstate/tracker.go @@ -62,7 +62,7 @@ func (t *Tracker) RecordStatus(sf stdfields.StdMonitorFields, newStatus StateSta t.mtx.Lock() defer t.mtx.Unlock() - state := t.GetCurrentState(sf) + state := t.GetCurrentState(sf, RetryConfig{}) if state == nil { state = newMonitorState(sf, newStatus, 0, t.flappingEnabled) logp.L().Infof("initializing new state for monitor %s: %s", sf.ID, state.String()) @@ -75,22 +75,32 @@ func (t *Tracker) RecordStatus(sf stdfields.StdMonitorFields, newStatus StateSta } func (t *Tracker) GetCurrentStatus(sf stdfields.StdMonitorFields) StateStatus { - s := t.GetCurrentState(sf) + s := t.GetCurrentState(sf, RetryConfig{}) if s == nil { return StatusEmpty } return s.Status } -func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields) (state *State) { +type RetryConfig struct { + attempts int + waitFn func() time.Duration +} + +func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields, rc RetryConfig) (state *State) { if state, ok := t.states[sf.ID]; ok { return state } - tries := 3 + // Default number of attempts + attempts := 3 + if rc.attempts != 0 { + attempts = rc.attempts + } + var loadedState *State var err error - for i := 0; i < tries; i++ { + for i := 0; i < attempts; i++ { loadedState, err = t.stateLoader(sf) if err == nil { if loadedState != nil { @@ -104,12 +114,16 @@ func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields) (state *State) break } + // Default sleep time sleepFor := (time.Duration(i*i) * time.Second) + (time.Duration(rand.Intn(500)) * time.Millisecond) + if rc.waitFn != nil { + sleepFor = rc.waitFn() + } logp.L().Warnf("could not load last externally recorded state, will retry again in %d milliseconds: %w", sleepFor.Milliseconds(), err) time.Sleep(sleepFor) } if err != nil { - logp.L().Warnf("could not load prior state from elasticsearch after %d attempts, will create new state for monitor: %s", tries, sf.ID) + logp.L().Warnf("could not load prior state from elasticsearch after %d attempts, will create new state for monitor: %s", attempts, sf.ID) } if loadedState != nil { diff --git a/heartbeat/monitors/wrappers/monitorstate/tracker_test.go b/heartbeat/monitors/wrappers/monitorstate/tracker_test.go index 44a012f7849e..fd34371ce810 100644 --- a/heartbeat/monitors/wrappers/monitorstate/tracker_test.go +++ b/heartbeat/monitors/wrappers/monitorstate/tracker_test.go @@ -134,21 +134,35 @@ func TestDeferredStateLoader(t *testing.T) { } func TestStateLoaderRetry(t *testing.T) { + // While testing the sleep time between retries should be negligible + waitFn := func() time.Duration { + return time.Microsecond + } + tests := []struct { name string retryable bool + rc RetryConfig expectedCalls int }{ { "should retry 3 times when fails with retryable error", true, + RetryConfig{waitFn: waitFn}, 3, }, { "should not retry when fails with non-retryable error", false, + RetryConfig{waitFn: waitFn}, 1, }, + { + "should honour the configured number of attempts when fails with retryable error", + true, + RetryConfig{attempts: 5, waitFn: waitFn}, + 5, + }, } for _, tt := range tests { @@ -160,7 +174,7 @@ func TestStateLoaderRetry(t *testing.T) { } mst := NewTracker(errorStateLoader, true) - mst.GetCurrentState(stdfields.StdMonitorFields{}) + mst.GetCurrentState(stdfields.StdMonitorFields{}, tt.rc) require.Equal(t, calls, tt.expectedCalls) })