Skip to content

Commit

Permalink
enhancement(5423): added logic to replaces scheduler with long-wait s…
Browse files Browse the repository at this point in the history
…cheduler in case of exceeded unauth response limit (#6619)

* enhancement(5423): added logic to replaces scheduler with long-wait scheduler in case of exceeded unauth response limit

* enhancement(5423): removed default case from type switch, added unit tests

* enhancement(5423): added blackbox functional tests for gateway Run

* enhancement(5423): added changelog

* enhancement(5423): remove tryReplaceScheduler, update tests

* enhancement(5423): added SetDuration function, added mock scheduler to tests, simplified scheduler usage
  • Loading branch information
kaanyalti authored Feb 13, 2025
1 parent 37be67f commit 99696b8
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: enhancement

# Change summary; a 80ish characters long description of the change.
summary: Updated the fleet gateway so that when the number of unauthorized fleet responses exceeds the set limit, instead of unenrolling, the gateway starts checking in less frequently.

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: "elastic-agent"
# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/6619
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/5428
39 changes: 21 additions & 18 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,29 @@ import (
const maxUnauthCounter int = 6

// Consts for states at fleet checkin
const fleetStateDegraded = "DEGRADED"
const fleetStateOnline = "online"
const fleetStateError = "error"
const fleetStateStarting = "starting"
const (
fleetStateDegraded = "DEGRADED"
fleetStateOnline = "online"
fleetStateError = "error"
fleetStateStarting = "starting"
)

// Default Configuration for the Fleet Gateway.
var defaultGatewaySettings = &fleetGatewaySettings{
Duration: 1 * time.Second, // time between successful calls
Jitter: 500 * time.Millisecond, // used as a jitter for duration
Duration: 1 * time.Second, // time between successful calls
Jitter: 500 * time.Millisecond, // used as a jitter for duration
ErrConsecutiveUnauthDuration: 1 * time.Hour, // time between calls when the agent exceeds unauthorized response limit
Backoff: backoffSettings{ // time after a failed call
Init: 60 * time.Second,
Max: 10 * time.Minute,
},
}

type fleetGatewaySettings struct {
Duration time.Duration `config:"checkin_frequency"`
Jitter time.Duration `config:"jitter"`
Backoff backoffSettings `config:"backoff"`
Duration time.Duration `config:"checkin_frequency"`
Jitter time.Duration `config:"jitter"`
Backoff backoffSettings `config:"backoff"`
ErrConsecutiveUnauthDuration time.Duration
}

type backoffSettings struct {
Expand Down Expand Up @@ -90,7 +94,6 @@ func New(
stateFetcher func() coordinator.State,
stateStore stateStore,
) (*FleetGateway, error) {

scheduler := scheduler.NewPeriodicJitter(defaultGatewaySettings.Duration, defaultGatewaySettings.Jitter)
return newFleetGatewayWithScheduler(
log,
Expand Down Expand Up @@ -356,17 +359,17 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
resp, took, err := cmd.Execute(ctx, req)
if isUnauth(err) {
f.unauthCounter++

if f.shouldUnenroll() {
f.log.Warnf("retrieved an invalid api key error '%d' times. Starting to unenroll the elastic agent.", f.unauthCounter)
return &fleetapi.CheckinResponse{
Actions: []fleetapi.Action{&fleetapi.ActionUnenroll{ActionID: "", ActionType: "UNENROLL", IsDetected: true}},
}, took, nil
if f.shouldUseLongSched() {
f.log.Warnf("retrieved an invalid api key error '%d' times. will use long scheduler", f.unauthCounter)
f.scheduler.SetDuration(defaultGatewaySettings.ErrConsecutiveUnauthDuration)
return &fleetapi.CheckinResponse{}, took, nil
}

return nil, took, err
}

f.scheduler.SetDuration(defaultGatewaySettings.Duration)

f.unauthCounter = 0
if err != nil {
return nil, took, err
Expand All @@ -384,8 +387,8 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
return resp, took, nil
}

// shouldUnenroll checks if the max number of trying an invalid key is reached
func (f *FleetGateway) shouldUnenroll() bool {
// shouldUseLongSched checks if the max number of trying an invalid key is reached
func (f *FleetGateway) shouldUseLongSched() bool {
return f.unauthCounter > maxUnauthCounter
}

Expand Down
102 changes: 102 additions & 0 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/storage/store"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/noop"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
"github.com/elastic/elastic-agent/internal/pkg/scheduler"
agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client"
"github.com/elastic/elastic-agent/pkg/core/logger"
Expand Down Expand Up @@ -564,3 +565,104 @@ func TestAgentStateToString(t *testing.T) {
})
}
}

type MockScheduler struct {
Duration time.Duration
Ticker *time.Ticker
}

func (m *MockScheduler) WaitTick() <-chan time.Time {
return m.Ticker.C
}

func (m *MockScheduler) SetDuration(d time.Duration) {
m.Duration = d
}

func (m *MockScheduler) Stop() {
m.Ticker.Stop()
}

func TestFleetGatewaySchedulerSwitch(t *testing.T) {
agentInfo := &testAgentInfo{}
settings := &fleetGatewaySettings{
Duration: 1 * time.Second,
Backoff: backoffSettings{Init: 1 * time.Millisecond, Max: 2 * time.Millisecond},
}

tempSet := *defaultGatewaySettings
defaultGatewaySettings.Duration = 500 * time.Millisecond
defaultGatewaySettings.ErrConsecutiveUnauthDuration = 700 * time.Millisecond
defer func() {
*defaultGatewaySettings = tempSet
}()

t.Run("if unauthorized responses exceed the set limit, the scheduler should be switched to the long-wait scheduler", withGateway(agentInfo, settings, func(
t *testing.T,
gateway coordinator.FleetGateway,
c *testingClient,
sch *scheduler.Stepper,
) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

unauth := func(_ http.Header, _ io.Reader) (*http.Response, error) {
return nil, client.ErrInvalidAPIKey
}

clientWaitFn := c.Answer(unauth)
g, ok := gateway.(*FleetGateway)
require.True(t, ok)

ms := &MockScheduler{
Duration: defaultGatewaySettings.Duration,
Ticker: time.NewTicker(defaultGatewaySettings.Duration),
}
g.scheduler = ms
errCh := runFleetGateway(ctx, gateway)

for i := 0; i <= maxUnauthCounter; i++ {
<-clientWaitFn
}

cancel()
err := <-errCh
require.NoError(t, err)

require.Equal(t, ms.Duration, defaultGatewaySettings.ErrConsecutiveUnauthDuration)
}))

t.Run("should switch back to short-wait scheduler if the a successful response is received", withGateway(agentInfo, settings, func(
t *testing.T,
gateway coordinator.FleetGateway,
c *testingClient,
sch *scheduler.Stepper,
) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

unauth := func(_ http.Header, _ io.Reader) (*http.Response, error) {
resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`)
return resp, nil
}

clientWaitFn := c.Answer(unauth)
g, ok := gateway.(*FleetGateway)
require.True(t, ok)

ms := &MockScheduler{
Duration: defaultGatewaySettings.ErrConsecutiveUnauthDuration,
Ticker: time.NewTicker(defaultGatewaySettings.ErrConsecutiveUnauthDuration),
}
g.scheduler = ms
errCh := runFleetGateway(ctx, gateway)

<-clientWaitFn

cancel()
err := <-errCh
require.NoError(t, err)

require.Equal(t, ms.Duration, defaultGatewaySettings.Duration)
}))
}
12 changes: 12 additions & 0 deletions internal/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
type Scheduler interface {
WaitTick() <-chan time.Time
Stop()
SetDuration(time.Duration)
}

// Stepper is a scheduler where each Tick is manually triggered, this is useful in scenario
Expand All @@ -32,6 +33,9 @@ func (s *Stepper) WaitTick() <-chan time.Time {
return s.C
}

// Sets the wait duration for the scheduler. Noop for stepper scheduler
func (s *Stepper) SetDuration(_ time.Duration) {}

// Stop is stopping the scheduler, in the case of the Stepper scheduler nothing is done.
func (s *Stepper) Stop() {}

Expand Down Expand Up @@ -68,6 +72,10 @@ func (p *Periodic) WaitTick() <-chan time.Time {
return rC
}

func (p *Periodic) SetDuration(d time.Duration) {
p.Ticker = time.NewTicker(d)
}

// Stop stops the internal Ticker.
// Note this will not close the internal channel is up to the developer to unblock the goroutine
// using another mechanism.
Expand Down Expand Up @@ -123,6 +131,10 @@ func (p *PeriodicJitter) WaitTick() <-chan time.Time {
return p.C
}

func (p *PeriodicJitter) SetDuration(d time.Duration) {
p.d = d
}

// Stop stops the PeriodicJitter scheduler.
func (p *PeriodicJitter) Stop() {
close(p.done)
Expand Down

0 comments on commit 99696b8

Please sign in to comment.