Skip to content

Commit

Permalink
enhancement(5423): added logic to replaces scheduler with long-wait s…
Browse files Browse the repository at this point in the history
…cheduler in case of exceeded unauth response limit
  • Loading branch information
kaanyalti committed Jan 28, 2025
1 parent ebf49f1 commit eecd874
Showing 1 changed file with 56 additions and 18 deletions.
74 changes: 56 additions & 18 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package fleet

import (
"context"
"fmt"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
Expand All @@ -30,25 +31,29 @@ import (
const maxUnauthCounter int = 6

// Consts for states at fleet checkin
const fleetStateDegraded = "DEGRADED"
const fleetStateOnline = "online"
const fleetStateError = "error"
const fleetStateStarting = "starting"
const (
fleetStateDegraded = "DEGRADED"
fleetStateOnline = "online"
fleetStateError = "error"
fleetStateStarting = "starting"
)

// Default Configuration for the Fleet Gateway.
var defaultGatewaySettings = &fleetGatewaySettings{
Duration: 1 * time.Second, // time between successful calls
Jitter: 500 * time.Millisecond, // used as a jitter for duration
Duration: 1 * time.Second, // time between successful calls
Jitter: 500 * time.Millisecond, // used as a jitter for duration
ErrDuration: 1 * time.Hour, // time between calls when the agent exceeds unauthorized response limit
Backoff: backoffSettings{ // time after a failed call
Init: 60 * time.Second,
Max: 10 * time.Minute,
},
}

type fleetGatewaySettings struct {
Duration time.Duration `config:"checkin_frequency"`
Jitter time.Duration `config:"jitter"`
Backoff backoffSettings `config:"backoff"`
Duration time.Duration `config:"checkin_frequency"`
Jitter time.Duration `config:"jitter"`
Backoff backoffSettings `config:"backoff"`
ErrDuration time.Duration
}

type backoffSettings struct {
Expand All @@ -70,6 +75,7 @@ type FleetGateway struct {
log *logger.Logger
client client.Sender
scheduler scheduler.Scheduler
isLongSched bool
settings *fleetGatewaySettings
agentInfo agentInfo
acker acker.Acker
Expand All @@ -90,7 +96,6 @@ func New(
stateFetcher func() coordinator.State,
stateStore stateStore,
) (*FleetGateway, error) {

scheduler := scheduler.NewPeriodicJitter(defaultGatewaySettings.Duration, defaultGatewaySettings.Jitter)
return newFleetGatewayWithScheduler(
log,
Expand Down Expand Up @@ -132,6 +137,33 @@ func (f *FleetGateway) Actions() <-chan []fleetapi.Action {
return f.actionCh
}

// Replaces scheduler depending on whether the fleetgateway needs to wait longer
// or shorter between checkins. Regular Periodic scheduler is used for long wait
// times as the jitter is not significant compared to the wait time, and
// PeriodicJitter is used otherwise
func (f *FleetGateway) tryReplaceScheduler() error {
switch v := f.scheduler.(type) {
case *scheduler.Periodic:
if !f.isLongSched {
// If the current scheduler is Periodic and the wait time needs to be
// short, switch to using PeriodicJitter with short duration
f.scheduler = scheduler.NewPeriodicJitter(defaultGatewaySettings.Duration, defaultGatewaySettings.Jitter)
}
return nil
case *scheduler.PeriodicJitter:
if f.isLongSched {
// If the current scheduler is PeriodicJitter and the wait time needs to
// be long, switch to using Periodic with long duration
f.scheduler = scheduler.NewPeriodic(defaultGatewaySettings.ErrDuration)
}
return nil
default:
// This case should not be executed under normal circumstances, added here
// to be extra cautious
return fmt.Errorf("unexpected scheduler type received: %T\n", v)

Check failure on line 163 in internal/pkg/agent/application/gateway/fleet/fleet_gateway.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

ST1005: error strings should not end with punctuation or newlines (stylecheck)
}
}

func (f *FleetGateway) Run(ctx context.Context) error {
backoff := backoff.NewEqualJitterBackoff(
ctx.Done(),
Expand All @@ -156,6 +188,11 @@ func (f *FleetGateway) Run(ctx context.Context) error {
continue
}

err = f.tryReplaceScheduler()
if err != nil {
f.errCh <- coordinator.NewWarningError(fmt.Sprintf("error replacing scheduler: %s", err.Error()))
}

actions := make([]fleetapi.Action, len(resp.Actions))
copy(actions, resp.Actions)
if len(actions) > 0 {
Expand Down Expand Up @@ -356,17 +393,18 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
resp, took, err := cmd.Execute(ctx, req)
if isUnauth(err) {
f.unauthCounter++

if f.shouldUnenroll() {
f.log.Warnf("retrieved an invalid api key error '%d' times. Starting to unenroll the elastic agent.", f.unauthCounter)
return &fleetapi.CheckinResponse{
Actions: []fleetapi.Action{&fleetapi.ActionUnenroll{ActionID: "", ActionType: "UNENROLL", IsDetected: true}},
}, took, nil
if f.shouldUseLongSched() {
// If we exceed the unauth response
f.isLongSched = true
f.log.Warnf("retrieved an invalid api key error '%d' times. will use long scheduler", f.unauthCounter)
return &fleetapi.CheckinResponse{}, took, nil
}

return nil, took, err
}

f.isLongSched = false

f.unauthCounter = 0
if err != nil {
return nil, took, err
Expand All @@ -384,8 +422,8 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
return resp, took, nil
}

// shouldUnenroll checks if the max number of trying an invalid key is reached
func (f *FleetGateway) shouldUnenroll() bool {
// shouldUseLongSched checks if the max number of trying an invalid key is reached
func (f *FleetGateway) shouldUseLongSched() bool {
return f.unauthCounter > maxUnauthCounter
}

Expand Down

0 comments on commit eecd874

Please sign in to comment.