From eecd874999f5dd5819be159f5ab3bdcd19377d0f Mon Sep 17 00:00:00 2001 From: kaanyalti Date: Tue, 28 Jan 2025 14:17:58 -0500 Subject: [PATCH] enhancement(5423): added logic to replaces scheduler with long-wait scheduler in case of exceeded unauth response limit --- .../gateway/fleet/fleet_gateway.go | 74 ++++++++++++++----- 1 file changed, 56 insertions(+), 18 deletions(-) diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go index a369282fd68..0232c8ed147 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go @@ -6,6 +6,7 @@ package fleet import ( "context" + "fmt" "time" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" @@ -30,15 +31,18 @@ import ( const maxUnauthCounter int = 6 // Consts for states at fleet checkin -const fleetStateDegraded = "DEGRADED" -const fleetStateOnline = "online" -const fleetStateError = "error" -const fleetStateStarting = "starting" +const ( + fleetStateDegraded = "DEGRADED" + fleetStateOnline = "online" + fleetStateError = "error" + fleetStateStarting = "starting" +) // Default Configuration for the Fleet Gateway. var defaultGatewaySettings = &fleetGatewaySettings{ - Duration: 1 * time.Second, // time between successful calls - Jitter: 500 * time.Millisecond, // used as a jitter for duration + Duration: 1 * time.Second, // time between successful calls + Jitter: 500 * time.Millisecond, // used as a jitter for duration + ErrDuration: 1 * time.Hour, // time between calls when the agent exceeds unauthorized response limit Backoff: backoffSettings{ // time after a failed call Init: 60 * time.Second, Max: 10 * time.Minute, @@ -46,9 +50,10 @@ var defaultGatewaySettings = &fleetGatewaySettings{ } type fleetGatewaySettings struct { - Duration time.Duration `config:"checkin_frequency"` - Jitter time.Duration `config:"jitter"` - Backoff backoffSettings `config:"backoff"` + Duration time.Duration `config:"checkin_frequency"` + Jitter time.Duration `config:"jitter"` + Backoff backoffSettings `config:"backoff"` + ErrDuration time.Duration } type backoffSettings struct { @@ -70,6 +75,7 @@ type FleetGateway struct { log *logger.Logger client client.Sender scheduler scheduler.Scheduler + isLongSched bool settings *fleetGatewaySettings agentInfo agentInfo acker acker.Acker @@ -90,7 +96,6 @@ func New( stateFetcher func() coordinator.State, stateStore stateStore, ) (*FleetGateway, error) { - scheduler := scheduler.NewPeriodicJitter(defaultGatewaySettings.Duration, defaultGatewaySettings.Jitter) return newFleetGatewayWithScheduler( log, @@ -132,6 +137,33 @@ func (f *FleetGateway) Actions() <-chan []fleetapi.Action { return f.actionCh } +// Replaces scheduler depending on whether the fleetgateway needs to wait longer +// or shorter between checkins. Regular Periodic scheduler is used for long wait +// times as the jitter is not significant compared to the wait time, and +// PeriodicJitter is used otherwise +func (f *FleetGateway) tryReplaceScheduler() error { + switch v := f.scheduler.(type) { + case *scheduler.Periodic: + if !f.isLongSched { + // If the current scheduler is Periodic and the wait time needs to be + // short, switch to using PeriodicJitter with short duration + f.scheduler = scheduler.NewPeriodicJitter(defaultGatewaySettings.Duration, defaultGatewaySettings.Jitter) + } + return nil + case *scheduler.PeriodicJitter: + if f.isLongSched { + // If the current scheduler is PeriodicJitter and the wait time needs to + // be long, switch to using Periodic with long duration + f.scheduler = scheduler.NewPeriodic(defaultGatewaySettings.ErrDuration) + } + return nil + default: + // This case should not be executed under normal circumstances, added here + // to be extra cautious + return fmt.Errorf("unexpected scheduler type received: %T\n", v) + } +} + func (f *FleetGateway) Run(ctx context.Context) error { backoff := backoff.NewEqualJitterBackoff( ctx.Done(), @@ -156,6 +188,11 @@ func (f *FleetGateway) Run(ctx context.Context) error { continue } + err = f.tryReplaceScheduler() + if err != nil { + f.errCh <- coordinator.NewWarningError(fmt.Sprintf("error replacing scheduler: %s", err.Error())) + } + actions := make([]fleetapi.Action, len(resp.Actions)) copy(actions, resp.Actions) if len(actions) > 0 { @@ -356,17 +393,18 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, resp, took, err := cmd.Execute(ctx, req) if isUnauth(err) { f.unauthCounter++ - - if f.shouldUnenroll() { - f.log.Warnf("retrieved an invalid api key error '%d' times. Starting to unenroll the elastic agent.", f.unauthCounter) - return &fleetapi.CheckinResponse{ - Actions: []fleetapi.Action{&fleetapi.ActionUnenroll{ActionID: "", ActionType: "UNENROLL", IsDetected: true}}, - }, took, nil + if f.shouldUseLongSched() { + // If we exceed the unauth response + f.isLongSched = true + f.log.Warnf("retrieved an invalid api key error '%d' times. will use long scheduler", f.unauthCounter) + return &fleetapi.CheckinResponse{}, took, nil } return nil, took, err } + f.isLongSched = false + f.unauthCounter = 0 if err != nil { return nil, took, err @@ -384,8 +422,8 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, return resp, took, nil } -// shouldUnenroll checks if the max number of trying an invalid key is reached -func (f *FleetGateway) shouldUnenroll() bool { +// shouldUseLongSched checks if the max number of trying an invalid key is reached +func (f *FleetGateway) shouldUseLongSched() bool { return f.unauthCounter > maxUnauthCounter }