From be3b32a6a97f5b642e8e2587496f29124ccd1789 Mon Sep 17 00:00:00 2001 From: Cairry <115769353+Cairry@users.noreply.github.com> Date: Sat, 22 Feb 2025 10:52:27 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=9A=A7=20Fix=20consumer=20recover=20alarm?= =?UTF-8?q?=20logic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- alert/consumer/consumer.go | 70 +++++++++++++++++++++++--------------- alert/eval/eval.go | 15 ++++++-- 2 files changed, 55 insertions(+), 30 deletions(-) diff --git a/alert/consumer/consumer.go b/alert/consumer/consumer.go index 0477f56..15904d4 100644 --- a/alert/consumer/consumer.go +++ b/alert/consumer/consumer.go @@ -54,40 +54,31 @@ func (ag *AlertGroups) AddAlert(alert *models.AlertCurEvent, noticeGroup []map[s ag.lock.Lock() defer ag.lock.Unlock() - // 查找Rule位置 - rulePos := sort.Search(len(ag.Rules), func(i int) bool { - return ag.Rules[i].RuleID >= alert.RuleId - }) + // 查找 Rule 位置 + rulePos := ag.getRuleNodePos(alert.RuleId) - // Rule存在时的处理,找到对应的规则组 + // Rule 存在时的处理,找到对应的规则组 if rulePos < len(ag.Rules) && ag.Rules[rulePos].RuleID == alert.RuleId { - groups := &ag.Rules[rulePos].Groups - // 查找Group位置 - groupPos := sort.Search(len(*groups), func(i int) bool { - return (*groups)[i].ID >= groupID - }) + rule := &ag.Rules[rulePos] - if groupPos < len(*groups) && (*groups)[groupPos].ID == groupID { + // 查找 Group 位置 + groupPos := ag.getGroupNodePos(rule, groupID) + + if groupPos < len(rule.Groups) && (rule.Groups)[groupPos].ID == groupID { // 追加事件 - (*groups)[groupPos].Events = append((*groups)[groupPos].Events, alert) + (rule.Groups)[groupPos].Events = append((rule.Groups)[groupPos].Events, alert) } else { - // 插入新Group,新增空元素扩容切片 - *groups = append(*groups, EventsGroup{}) - // 将插入位置之后的元素后移, - copy((*groups)[groupPos+1:], (*groups)[groupPos:]) - // 将数据赋值到新的空元素上 - (*groups)[groupPos] = EventsGroup{ + // 插入新数据 + rule.Groups = append(rule.Groups, EventsGroup{ ID: groupID, Events: []*models.AlertCurEvent{alert}, - } + }) } return } // 插入新Rule - ag.Rules = append(ag.Rules, RulesGroup{}) - copy(ag.Rules[rulePos+1:], ag.Rules[rulePos:]) - ag.Rules[rulePos] = RulesGroup{ + ag.Rules = append(ag.Rules, RulesGroup{ RuleID: alert.RuleId, Groups: []EventsGroup{ { @@ -95,7 +86,7 @@ func (ag *AlertGroups) AddAlert(alert *models.AlertCurEvent, noticeGroup []map[s Events: []*models.AlertCurEvent{alert}, }, }, - } + }) } // generateGroupID 生成分组ID,每个规则可能会有多个分组(其分组通知),默认为 default,如果有匹配的分组则根据 key/value 计算一个 HASH值作为 ID。 @@ -114,6 +105,31 @@ func (ag *AlertGroups) generateGroupID(alert *models.AlertCurEvent, noticeGroupM return groupId } +// getRuleNodePos 获取 Rule 点位 +func (ag *AlertGroups) getRuleNodePos(ruleId string) int { + // Rules 切片排序 + sort.Slice(ag.Rules, func(i, j int) bool { + return ag.Rules[i].RuleID < ag.Rules[j].RuleID + }) + + // 查找Rule位置 + return sort.Search(len(ag.Rules), func(i int) bool { + return ag.Rules[i].RuleID >= ruleId + }) +} + +func (ag *AlertGroups) getGroupNodePos(rule *RulesGroup, groupId string) int { + // Groups 切片排序 + sort.Slice(rule.Groups, func(i, j int) bool { + return rule.Groups[i].ID < rule.Groups[j].ID + }) + + // 查找Group位置 + return sort.Search(len(rule.Groups), func(i int) bool { + return (rule.Groups)[i].ID >= groupId + }) +} + func NewConsumerWork(ctx *ctx.Context) ConsumeInterface { return &Consume{ ctx: ctx, @@ -145,9 +161,9 @@ func (c *Consume) Watch(ctx context.Context, faultCenter models.FaultCenter) { timer := time.NewTicker(time.Second * time.Duration(1)) defer func() { timer.Stop() - if r := recover(); r != nil { - logc.Error(c.ctx.Ctx, fmt.Sprintf("Recovered from consumer watch goroutine panic: %s, FaultCenterName: %s, Id: %s", r, faultCenter.Name, faultCenter.ID)) - } + //if r := recover(); r != nil { + // logc.Error(c.ctx.Ctx, fmt.Sprintf("Recovered from consumer watch goroutine panic: %s, FaultCenterName: %s, Id: %s", r, faultCenter.Name, faultCenter.ID)) + //} }() for { @@ -182,6 +198,7 @@ func (c *Consume) executeTask(faultCenter models.FaultCenter, taskChan chan stru // 事件分组 var alertGroups AlertGroups c.alarmGrouping(faultCenter, &alertGroups, filterEvents) + fmt.Println("alertGroups->", tools.JsonMarshal(alertGroups.Rules)) // 事件聚合 aggEvents := c.alarmAggregation(faultCenter, &alertGroups) // 发送事件 @@ -250,7 +267,6 @@ func (c *Consume) alarmGrouping(faultCenter models.FaultCenter, alertGroups *Ale } alertGroups.AddAlert(alert, faultCenter.NoticeGroup) - //c.addAlertToGroup(alert, faultCenter.NoticeGroup) if alert.IsRecovered { c.removeAlertFromCache(alert) if err := process.RecordAlertHisEvent(c.ctx, *alert); err != nil { diff --git a/alert/eval/eval.go b/alert/eval/eval.go index b999ab7..5b2161a 100644 --- a/alert/eval/eval.go +++ b/alert/eval/eval.go @@ -21,7 +21,7 @@ type ( Submit(rule models.AlertRule) Stop(ruleId string) Eval(ctx context.Context, rule models.AlertRule) - Recover(faultCenterKey string, faultCenterInfoKey string, curKeys []string) + Recover(ruleId, faultCenterKey string, faultCenterInfoKey string, curKeys []string) GC(rule models.AlertRule, curFiringKeys []string) RestartAllEvals() } @@ -109,7 +109,7 @@ func (t *AlertRule) Eval(ctx context.Context, rule models.AlertRule) { curFingerprints = append(curFingerprints, fingerprints...) } logc.Infof(t.ctx.Ctx, fmt.Sprintf("规则评估 -> %v", tools.JsonMarshal(rule))) - t.Recover(models.BuildCacheEventKey(rule.TenantId, rule.FaultCenterId), models.BuildCacheInfoKey(rule.TenantId, rule.FaultCenterId), curFingerprints) + t.Recover(rule.RuleId, models.BuildCacheEventKey(rule.TenantId, rule.FaultCenterId), models.BuildCacheInfoKey(rule.TenantId, rule.FaultCenterId), curFingerprints) t.GC(rule, curFingerprints) case <-ctx.Done(): @@ -120,13 +120,22 @@ func (t *AlertRule) Eval(ctx context.Context, rule models.AlertRule) { } } -func (t *AlertRule) Recover(faultCenterKey string, faultCenterInfoKey string, curFingerprints []string) { +func (t *AlertRule) Recover(RuleId, faultCenterKey string, faultCenterInfoKey string, curFingerprints []string) { // 获取所有的故障中心的告警事件 events, err := t.ctx.Redis.Event().GetAllEventsForFaultCenter(faultCenterKey) if err != nil { return } + // 只获取当前规则的事件 + var currentRuleEvents = make(map[string]models.AlertCurEvent) + for fingerprint, event := range events { + if event.RuleId == RuleId { + currentRuleEvents[fingerprint] = event + } + } + events = currentRuleEvents + // 提取事件中的告警指纹 fingerprints := make([]string, 0) for fingerprint := range events {