Skip to content

Commit

Permalink
🚧 Fix consumer recover alarm logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Cairry committed Feb 22, 2025
1 parent cf89a65 commit be3b32a
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 30 deletions.
70 changes: 43 additions & 27 deletions alert/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,48 +54,39 @@ func (ag *AlertGroups) AddAlert(alert *models.AlertCurEvent, noticeGroup []map[s
ag.lock.Lock()
defer ag.lock.Unlock()

// 查找Rule位置
rulePos := sort.Search(len(ag.Rules), func(i int) bool {
return ag.Rules[i].RuleID >= alert.RuleId
})
// 查找 Rule 位置
rulePos := ag.getRuleNodePos(alert.RuleId)

// Rule存在时的处理,找到对应的规则组
// Rule 存在时的处理,找到对应的规则组
if rulePos < len(ag.Rules) && ag.Rules[rulePos].RuleID == alert.RuleId {
groups := &ag.Rules[rulePos].Groups
// 查找Group位置
groupPos := sort.Search(len(*groups), func(i int) bool {
return (*groups)[i].ID >= groupID
})
rule := &ag.Rules[rulePos]

if groupPos < len(*groups) && (*groups)[groupPos].ID == groupID {
// 查找 Group 位置
groupPos := ag.getGroupNodePos(rule, groupID)

if groupPos < len(rule.Groups) && (rule.Groups)[groupPos].ID == groupID {
// 追加事件
(*groups)[groupPos].Events = append((*groups)[groupPos].Events, alert)
(rule.Groups)[groupPos].Events = append((rule.Groups)[groupPos].Events, alert)
} else {
// 插入新Group,新增空元素扩容切片
*groups = append(*groups, EventsGroup{})
// 将插入位置之后的元素后移,
copy((*groups)[groupPos+1:], (*groups)[groupPos:])
// 将数据赋值到新的空元素上
(*groups)[groupPos] = EventsGroup{
// 插入新数据
rule.Groups = append(rule.Groups, EventsGroup{
ID: groupID,
Events: []*models.AlertCurEvent{alert},
}
})
}
return
}

// 插入新Rule
ag.Rules = append(ag.Rules, RulesGroup{})
copy(ag.Rules[rulePos+1:], ag.Rules[rulePos:])
ag.Rules[rulePos] = RulesGroup{
ag.Rules = append(ag.Rules, RulesGroup{
RuleID: alert.RuleId,
Groups: []EventsGroup{
{
ID: groupID,
Events: []*models.AlertCurEvent{alert},
},
},
}
})
}

// generateGroupID 生成分组ID,每个规则可能会有多个分组(其分组通知),默认为 default,如果有匹配的分组则根据 key/value 计算一个 HASH值作为 ID。
Expand All @@ -114,6 +105,31 @@ func (ag *AlertGroups) generateGroupID(alert *models.AlertCurEvent, noticeGroupM
return groupId
}

// getRuleNodePos 获取 Rule 点位
func (ag *AlertGroups) getRuleNodePos(ruleId string) int {
// Rules 切片排序
sort.Slice(ag.Rules, func(i, j int) bool {
return ag.Rules[i].RuleID < ag.Rules[j].RuleID
})

// 查找Rule位置
return sort.Search(len(ag.Rules), func(i int) bool {
return ag.Rules[i].RuleID >= ruleId
})
}

func (ag *AlertGroups) getGroupNodePos(rule *RulesGroup, groupId string) int {
// Groups 切片排序
sort.Slice(rule.Groups, func(i, j int) bool {
return rule.Groups[i].ID < rule.Groups[j].ID
})

// 查找Group位置
return sort.Search(len(rule.Groups), func(i int) bool {
return (rule.Groups)[i].ID >= groupId
})
}

func NewConsumerWork(ctx *ctx.Context) ConsumeInterface {
return &Consume{
ctx: ctx,
Expand Down Expand Up @@ -145,9 +161,9 @@ func (c *Consume) Watch(ctx context.Context, faultCenter models.FaultCenter) {
timer := time.NewTicker(time.Second * time.Duration(1))
defer func() {
timer.Stop()
if r := recover(); r != nil {
logc.Error(c.ctx.Ctx, fmt.Sprintf("Recovered from consumer watch goroutine panic: %s, FaultCenterName: %s, Id: %s", r, faultCenter.Name, faultCenter.ID))
}
//if r := recover(); r != nil {
// logc.Error(c.ctx.Ctx, fmt.Sprintf("Recovered from consumer watch goroutine panic: %s, FaultCenterName: %s, Id: %s", r, faultCenter.Name, faultCenter.ID))
//}
}()

for {
Expand Down Expand Up @@ -182,6 +198,7 @@ func (c *Consume) executeTask(faultCenter models.FaultCenter, taskChan chan stru
// 事件分组
var alertGroups AlertGroups
c.alarmGrouping(faultCenter, &alertGroups, filterEvents)
fmt.Println("alertGroups->", tools.JsonMarshal(alertGroups.Rules))
// 事件聚合
aggEvents := c.alarmAggregation(faultCenter, &alertGroups)
// 发送事件
Expand Down Expand Up @@ -250,7 +267,6 @@ func (c *Consume) alarmGrouping(faultCenter models.FaultCenter, alertGroups *Ale
}

alertGroups.AddAlert(alert, faultCenter.NoticeGroup)
//c.addAlertToGroup(alert, faultCenter.NoticeGroup)
if alert.IsRecovered {
c.removeAlertFromCache(alert)
if err := process.RecordAlertHisEvent(c.ctx, *alert); err != nil {
Expand Down
15 changes: 12 additions & 3 deletions alert/eval/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type (
Submit(rule models.AlertRule)
Stop(ruleId string)
Eval(ctx context.Context, rule models.AlertRule)
Recover(faultCenterKey string, faultCenterInfoKey string, curKeys []string)
Recover(ruleId, faultCenterKey string, faultCenterInfoKey string, curKeys []string)
GC(rule models.AlertRule, curFiringKeys []string)
RestartAllEvals()
}
Expand Down Expand Up @@ -109,7 +109,7 @@ func (t *AlertRule) Eval(ctx context.Context, rule models.AlertRule) {
curFingerprints = append(curFingerprints, fingerprints...)
}
logc.Infof(t.ctx.Ctx, fmt.Sprintf("规则评估 -> %v", tools.JsonMarshal(rule)))
t.Recover(models.BuildCacheEventKey(rule.TenantId, rule.FaultCenterId), models.BuildCacheInfoKey(rule.TenantId, rule.FaultCenterId), curFingerprints)
t.Recover(rule.RuleId, models.BuildCacheEventKey(rule.TenantId, rule.FaultCenterId), models.BuildCacheInfoKey(rule.TenantId, rule.FaultCenterId), curFingerprints)
t.GC(rule, curFingerprints)

case <-ctx.Done():
Expand All @@ -120,13 +120,22 @@ func (t *AlertRule) Eval(ctx context.Context, rule models.AlertRule) {
}
}

func (t *AlertRule) Recover(faultCenterKey string, faultCenterInfoKey string, curFingerprints []string) {
func (t *AlertRule) Recover(RuleId, faultCenterKey string, faultCenterInfoKey string, curFingerprints []string) {
// 获取所有的故障中心的告警事件
events, err := t.ctx.Redis.Event().GetAllEventsForFaultCenter(faultCenterKey)
if err != nil {
return
}

// 只获取当前规则的事件
var currentRuleEvents = make(map[string]models.AlertCurEvent)
for fingerprint, event := range events {
if event.RuleId == RuleId {
currentRuleEvents[fingerprint] = event
}
}
events = currentRuleEvents

// 提取事件中的告警指纹
fingerprints := make([]string, 0)
for fingerprint := range events {
Expand Down

0 comments on commit be3b32a

Please sign in to comment.