diff --git a/changelog/fragments/1738199968-update-scheduler-when-received-too-many-unathorized-responses.yaml b/changelog/fragments/1738199968-update-scheduler-when-received-too-many-unathorized-responses.yaml new file mode 100644 index 00000000000..182e4fa62da --- /dev/null +++ b/changelog/fragments/1738199968-update-scheduler-when-received-too-many-unathorized-responses.yaml @@ -0,0 +1,30 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: enhancement + +# Change summary; a 80ish characters long description of the change. +summary: Updated the fleet gateway so that when the number of unauthorized fleet responses exceeds the set limit, instead of unenrolling, the gateway starts checking in less frequently. + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: "elastic-agent" +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/6619 +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/5428 diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go index a369282fd68..a8980a3e08e 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go @@ -30,15 +30,18 @@ import ( const maxUnauthCounter int = 6 // Consts for states at fleet checkin -const fleetStateDegraded = "DEGRADED" -const fleetStateOnline = "online" -const fleetStateError = "error" -const fleetStateStarting = "starting" +const ( + fleetStateDegraded = "DEGRADED" + fleetStateOnline = "online" + fleetStateError = "error" + fleetStateStarting = "starting" +) // Default Configuration for the Fleet Gateway. var defaultGatewaySettings = &fleetGatewaySettings{ - Duration: 1 * time.Second, // time between successful calls - Jitter: 500 * time.Millisecond, // used as a jitter for duration + Duration: 1 * time.Second, // time between successful calls + Jitter: 500 * time.Millisecond, // used as a jitter for duration + ErrConsecutiveUnauthDuration: 1 * time.Hour, // time between calls when the agent exceeds unauthorized response limit Backoff: backoffSettings{ // time after a failed call Init: 60 * time.Second, Max: 10 * time.Minute, @@ -46,9 +49,10 @@ var defaultGatewaySettings = &fleetGatewaySettings{ } type fleetGatewaySettings struct { - Duration time.Duration `config:"checkin_frequency"` - Jitter time.Duration `config:"jitter"` - Backoff backoffSettings `config:"backoff"` + Duration time.Duration `config:"checkin_frequency"` + Jitter time.Duration `config:"jitter"` + Backoff backoffSettings `config:"backoff"` + ErrConsecutiveUnauthDuration time.Duration } type backoffSettings struct { @@ -90,7 +94,6 @@ func New( stateFetcher func() coordinator.State, stateStore stateStore, ) (*FleetGateway, error) { - scheduler := scheduler.NewPeriodicJitter(defaultGatewaySettings.Duration, defaultGatewaySettings.Jitter) return newFleetGatewayWithScheduler( log, @@ -356,17 +359,17 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, resp, took, err := cmd.Execute(ctx, req) if isUnauth(err) { f.unauthCounter++ - - if f.shouldUnenroll() { - f.log.Warnf("retrieved an invalid api key error '%d' times. Starting to unenroll the elastic agent.", f.unauthCounter) - return &fleetapi.CheckinResponse{ - Actions: []fleetapi.Action{&fleetapi.ActionUnenroll{ActionID: "", ActionType: "UNENROLL", IsDetected: true}}, - }, took, nil + if f.shouldUseLongSched() { + f.log.Warnf("retrieved an invalid api key error '%d' times. will use long scheduler", f.unauthCounter) + f.scheduler.SetDuration(defaultGatewaySettings.ErrConsecutiveUnauthDuration) + return &fleetapi.CheckinResponse{}, took, nil } return nil, took, err } + f.scheduler.SetDuration(defaultGatewaySettings.Duration) + f.unauthCounter = 0 if err != nil { return nil, took, err @@ -384,8 +387,8 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, return resp, took, nil } -// shouldUnenroll checks if the max number of trying an invalid key is reached -func (f *FleetGateway) shouldUnenroll() bool { +// shouldUseLongSched checks if the max number of trying an invalid key is reached +func (f *FleetGateway) shouldUseLongSched() bool { return f.unauthCounter > maxUnauthCounter } diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go index 4fcbc51a231..c5221134007 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/storage/store" "github.com/elastic/elastic-agent/internal/pkg/fleetapi" "github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/noop" + "github.com/elastic/elastic-agent/internal/pkg/fleetapi/client" "github.com/elastic/elastic-agent/internal/pkg/scheduler" agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client" "github.com/elastic/elastic-agent/pkg/core/logger" @@ -564,3 +565,104 @@ func TestAgentStateToString(t *testing.T) { }) } } + +type MockScheduler struct { + Duration time.Duration + Ticker *time.Ticker +} + +func (m *MockScheduler) WaitTick() <-chan time.Time { + return m.Ticker.C +} + +func (m *MockScheduler) SetDuration(d time.Duration) { + m.Duration = d +} + +func (m *MockScheduler) Stop() { + m.Ticker.Stop() +} + +func TestFleetGatewaySchedulerSwitch(t *testing.T) { + agentInfo := &testAgentInfo{} + settings := &fleetGatewaySettings{ + Duration: 1 * time.Second, + Backoff: backoffSettings{Init: 1 * time.Millisecond, Max: 2 * time.Millisecond}, + } + + tempSet := *defaultGatewaySettings + defaultGatewaySettings.Duration = 500 * time.Millisecond + defaultGatewaySettings.ErrConsecutiveUnauthDuration = 700 * time.Millisecond + defer func() { + *defaultGatewaySettings = tempSet + }() + + t.Run("if unauthorized responses exceed the set limit, the scheduler should be switched to the long-wait scheduler", withGateway(agentInfo, settings, func( + t *testing.T, + gateway coordinator.FleetGateway, + c *testingClient, + sch *scheduler.Stepper, + ) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + unauth := func(_ http.Header, _ io.Reader) (*http.Response, error) { + return nil, client.ErrInvalidAPIKey + } + + clientWaitFn := c.Answer(unauth) + g, ok := gateway.(*FleetGateway) + require.True(t, ok) + + ms := &MockScheduler{ + Duration: defaultGatewaySettings.Duration, + Ticker: time.NewTicker(defaultGatewaySettings.Duration), + } + g.scheduler = ms + errCh := runFleetGateway(ctx, gateway) + + for i := 0; i <= maxUnauthCounter; i++ { + <-clientWaitFn + } + + cancel() + err := <-errCh + require.NoError(t, err) + + require.Equal(t, ms.Duration, defaultGatewaySettings.ErrConsecutiveUnauthDuration) + })) + + t.Run("should switch back to short-wait scheduler if the a successful response is received", withGateway(agentInfo, settings, func( + t *testing.T, + gateway coordinator.FleetGateway, + c *testingClient, + sch *scheduler.Stepper, + ) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + unauth := func(_ http.Header, _ io.Reader) (*http.Response, error) { + resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`) + return resp, nil + } + + clientWaitFn := c.Answer(unauth) + g, ok := gateway.(*FleetGateway) + require.True(t, ok) + + ms := &MockScheduler{ + Duration: defaultGatewaySettings.ErrConsecutiveUnauthDuration, + Ticker: time.NewTicker(defaultGatewaySettings.ErrConsecutiveUnauthDuration), + } + g.scheduler = ms + errCh := runFleetGateway(ctx, gateway) + + <-clientWaitFn + + cancel() + err := <-errCh + require.NoError(t, err) + + require.Equal(t, ms.Duration, defaultGatewaySettings.Duration) + })) +} diff --git a/internal/pkg/scheduler/scheduler.go b/internal/pkg/scheduler/scheduler.go index f2b05476c02..65be4b1244d 100644 --- a/internal/pkg/scheduler/scheduler.go +++ b/internal/pkg/scheduler/scheduler.go @@ -14,6 +14,7 @@ import ( type Scheduler interface { WaitTick() <-chan time.Time Stop() + SetDuration(time.Duration) } // Stepper is a scheduler where each Tick is manually triggered, this is useful in scenario @@ -32,6 +33,9 @@ func (s *Stepper) WaitTick() <-chan time.Time { return s.C } +// Sets the wait duration for the scheduler. Noop for stepper scheduler +func (s *Stepper) SetDuration(_ time.Duration) {} + // Stop is stopping the scheduler, in the case of the Stepper scheduler nothing is done. func (s *Stepper) Stop() {} @@ -68,6 +72,10 @@ func (p *Periodic) WaitTick() <-chan time.Time { return rC } +func (p *Periodic) SetDuration(d time.Duration) { + p.Ticker = time.NewTicker(d) +} + // Stop stops the internal Ticker. // Note this will not close the internal channel is up to the developer to unblock the goroutine // using another mechanism. @@ -123,6 +131,10 @@ func (p *PeriodicJitter) WaitTick() <-chan time.Time { return p.C } +func (p *PeriodicJitter) SetDuration(d time.Duration) { + p.d = d +} + // Stop stops the PeriodicJitter scheduler. func (p *PeriodicJitter) Stop() { close(p.done)