From b54f5c67a4f451e3384d1c108cfaeceaeebb18a7 Mon Sep 17 00:00:00 2001 From: Alexander Rickardsson Date: Thu, 8 Dec 2022 11:54:02 -0500 Subject: [PATCH] dispatch: Fix initial alerts not honoring group_wait At initial startup of Alertmanager, old alerts will be sent to the receivers immediately as the start time for those alerts could be several days old in some cases (and in either way much older than the group_wait time) This is problematic for alerts that are supposed to be inhibited. If the old inhibited alert gets processed before the alert that is supposed to inhibit it, it will get sent to the receiver and cause unwanted noise. One approach to combat this is to always wait at least the group_wait duration for a new alert group, even if the alert is very old. This should make things a bit more stable as it gives all alerts a fighting chance to come in before we send out notifications Signed-off-by: Alexander Rickardsson --- config/config.go | 1 + config/config_test.go | 13 +++++- config/testdata/conf.wait-on-startup.yml | 8 ++++ dispatch/dispatch.go | 55 ++++++++++++++++-------- dispatch/dispatch_test.go | 28 +++++++++++- dispatch/route.go | 7 +++ 6 files changed, 91 insertions(+), 21 deletions(-) create mode 100644 config/testdata/conf.wait-on-startup.yml diff --git a/config/config.go b/config/config.go index 7f3602e066..c178bef500 100644 --- a/config/config.go +++ b/config/config.go @@ -794,6 +794,7 @@ type Route struct { GroupWait *model.Duration `yaml:"group_wait,omitempty" json:"group_wait,omitempty"` GroupInterval *model.Duration `yaml:"group_interval,omitempty" json:"group_interval,omitempty"` RepeatInterval *model.Duration `yaml:"repeat_interval,omitempty" json:"repeat_interval,omitempty"` + WaitOnStartup bool `yaml:"wait_on_startup" json:"wait_on_startup,omitempty"` } // UnmarshalYAML implements the yaml.Unmarshaler interface for Route. diff --git a/config/config_test.go b/config/config_test.go index 7631d37cb7..f558054f62 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -200,7 +200,7 @@ receivers: func TestTimeIntervalHasName(t *testing.T) { in := ` time_intervals: -- name: +- name: time_intervals: - times: - start_time: '09:00' @@ -1010,6 +1010,17 @@ func TestGroupByAll(t *testing.T) { } } +func TestWaitOnStartup(t *testing.T) { + c, err := LoadFile("testdata/conf.wait-on-startup.yml") + if err != nil { + t.Fatalf("Error parsing %s: %s", "testdata/conf.wait-on-startup.yml", err) + } + + if !c.Route.WaitOnStartup { + t.Errorf("Invalid wait on startup param: expected to be true") + } +} + func TestVictorOpsDefaultAPIKey(t *testing.T) { conf, err := LoadFile("testdata/conf.victorops-default-apikey.yml") if err != nil { diff --git a/config/testdata/conf.wait-on-startup.yml b/config/testdata/conf.wait-on-startup.yml new file mode 100644 index 0000000000..7a4f3b520d --- /dev/null +++ b/config/testdata/conf.wait-on-startup.yml @@ -0,0 +1,8 @@ +route: + group_wait: 30s + group_interval: 5m + repeat_interval: 3h + receiver: team-X + wait_on_startup: True +receivers: + - name: 'team-X' diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 640b22abe2..527e9eed0e 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -91,7 +91,8 @@ type Dispatcher struct { ctx context.Context cancel func() - logger log.Logger + logger log.Logger + startTime time.Time } // Limits describes limits used by Dispatcher. @@ -118,13 +119,14 @@ func NewDispatcher( } disp := &Dispatcher{ - alerts: ap, - stage: s, - route: r, - timeout: to, - logger: log.With(l, "component", "dispatcher"), - metrics: m, - limits: lim, + alerts: ap, + stage: s, + route: r, + timeout: to, + logger: log.With(l, "component", "dispatcher"), + metrics: m, + limits: lim, + startTime: time.Now(), } return disp } @@ -330,7 +332,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { return } - ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger) + ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger, d.startTime) routeGroups[fp] = ag d.aggrGroupsNum++ d.metrics.aggrGroups.Inc() @@ -385,20 +387,22 @@ type aggrGroup struct { mtx sync.RWMutex hasFlushed bool + startTime time.Time } // newAggrGroup returns a new aggregation group. -func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, logger log.Logger) *aggrGroup { +func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, logger log.Logger, startTime time.Time) *aggrGroup { if to == nil { to = func(d time.Duration) time.Duration { return d } } ag := &aggrGroup{ - labels: labels, - routeKey: r.Key(), - opts: &r.RouteOpts, - timeout: to, - alerts: store.NewAlerts(), - done: make(chan struct{}), + labels: labels, + routeKey: r.Key(), + opts: &r.RouteOpts, + timeout: to, + alerts: store.NewAlerts(), + done: make(chan struct{}), + startTime: startTime, } ag.ctx, ag.cancel = context.WithCancel(ctx) @@ -473,17 +477,32 @@ func (ag *aggrGroup) stop() { <-ag.done } +// check if we want to wait on initial startup before sending notification +func (ag *aggrGroup) shouldWaitOnStartup() bool { + now := time.Now() + return !ag.opts.WaitOnStartup || ag.startTime.Add(ag.opts.GroupWait).Before(now) +} + +func (ag *aggrGroup) shouldWaitForGroup(alert *types.Alert) bool { + now := time.Now() + return alert.StartsAt.Add(ag.opts.GroupWait).Before(now) +} + +// check if we want alertgroup timer to reset +func (ag *aggrGroup) shouldReset(alert *types.Alert) bool { + return !ag.hasFlushed && ag.shouldWaitForGroup(alert) && ag.shouldWaitOnStartup() +} + // insert inserts the alert into the aggregation group. func (ag *aggrGroup) insert(alert *types.Alert) { if err := ag.alerts.Set(alert); err != nil { level.Error(ag.logger).Log("msg", "error on set alert", "err", err) } - // Immediately trigger a flush if the wait duration for this // alert is already over. ag.mtx.Lock() defer ag.mtx.Unlock() - if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) { + if ag.shouldReset(alert) { ag.next.Reset(0) } } diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index 85bd62dc4d..5ed10fb955 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -138,7 +138,7 @@ func TestAggrGroup(t *testing.T) { } // Test regular situation where we wait for group_wait to send out alerts. - ag := newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger()) + ag := newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger(), time.Now()) go ag.run(ntfy) ag.insert(a1) @@ -192,7 +192,7 @@ func TestAggrGroup(t *testing.T) { // immediate flushing. // Finally, set all alerts to be resolved. After successful notify the aggregation group // should empty itself. - ag = newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger()) + ag = newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger(), time.Now()) go ag.run(ntfy) ag.insert(a1) @@ -267,6 +267,30 @@ func TestAggrGroup(t *testing.T) { } ag.stop() + + // Ensure WaitOnStartup is being honored + opts.WaitOnStartup = true + route = &Route{ + RouteOpts: *opts, + } + ag = newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger(), time.Now()) + go ag.run(ntfy) + + ag.insert(a1) + + select { + case <-time.After(opts.GroupWait * 2): + t.Fatalf("Expected alert to be dealt with after group_wait but it has not been handled yet") + + case batch := <-alertsCh: + exp := removeEndsAt(types.AlertSlice{a1}) + sort.Sort(batch) + if !reflect.DeepEqual(batch, exp) { + t.Fatalf("expected alert %v but got %v", exp, batch) + } + } + + ag.stop() } func TestGroupLabels(t *testing.T) { diff --git a/dispatch/route.go b/dispatch/route.go index 5ada178dab..3e18d2dd35 100644 --- a/dispatch/route.go +++ b/dispatch/route.go @@ -35,6 +35,7 @@ var DefaultRouteOpts = RouteOpts{ GroupBy: map[model.LabelName]struct{}{}, GroupByAll: false, MuteTimeIntervals: []string{}, + WaitOnStartup: false, } // A Route is a node that contains definitions of how to handle alerts. @@ -88,6 +89,7 @@ func NewRoute(cr *config.Route, parent *Route) *Route { if cr.RepeatInterval != nil { opts.RepeatInterval = time.Duration(*cr.RepeatInterval) } + opts.WaitOnStartup = cr.WaitOnStartup // Build matchers. var matchers labels.Matchers @@ -234,6 +236,9 @@ type RouteOpts struct { // A list of time intervals for which the route is active. ActiveTimeIntervals []string + + // Honor the group_wait on initial startup even if incoming alerts are old + WaitOnStartup bool } func (ro *RouteOpts) String() string { @@ -254,12 +259,14 @@ func (ro *RouteOpts) MarshalJSON() ([]byte, error) { GroupWait time.Duration `json:"groupWait"` GroupInterval time.Duration `json:"groupInterval"` RepeatInterval time.Duration `json:"repeatInterval"` + WaitOnStartup bool `json:"waitOnStartup"` }{ Receiver: ro.Receiver, GroupByAll: ro.GroupByAll, GroupWait: ro.GroupWait, GroupInterval: ro.GroupInterval, RepeatInterval: ro.RepeatInterval, + WaitOnStartup: ro.WaitOnStartup, } for ln := range ro.GroupBy { v.GroupBy = append(v.GroupBy, ln)