Skip to content

Commit

Permalink
Merge pull request #134 from opsre/fix/consumer
Browse files Browse the repository at this point in the history
🚧 Fix alarm aggregation and optimized alarm grouping
  • Loading branch information
Cairry authored Feb 19, 2025
2 parents c92b0b1 + 8ec5907 commit 96a5133
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 141 deletions.
62 changes: 22 additions & 40 deletions .github/release-drafter.yml
Original file line number Diff line number Diff line change
@@ -1,41 +1,23 @@
# Configuration for Release Drafter: https://github.com/toolmantim/release-drafter
name-template: 'v$NEXT_PATCH_VERSION 🌈'
tag-template: 'v$NEXT_PATCH_VERSION'
version-template: $MAJOR.$MINOR.$PATCH
# Emoji reference: https://gitmoji.carloscuesta.me/
categories:
- title: '🚀 Features'
changelog:
exclude:
labels:
- 'feature'
- 'enhancement'
- 'kind/feature'
- title: '🐛 Bug Fixes'
labels:
- 'fix'
- 'bugfix'
- 'bug'
- 'regression'
- 'kind/bug'
- title: 📝 Documentation updates
labels:
- documentation
- 'kind/doc'
- title: 👻 Maintenance
labels:
- chore
- dependencies
- 'kind/chore'
- 'kind/dep'
- title: 🚦 Tests
labels:
- test
- tests
exclude-labels:
- reverted
- no-changelog
- skip-changelog
- invalid
change-template: '* $TITLE (#$NUMBER) @$AUTHOR'
template: |
## What’s Changed
$CHANGES
- ignore-for-release
categories:
- title: '🚀 Features'
labels:
- 'feature'
- title: '🐛 Bug Fixes'
labels:
- 'bug'
- title: 📝 Documentation updates
labels:
- documentation
- title: 👻 Maintenance
labels:
- dependencies
- title: 🚦 Tests
labels:
- tests
- title: Other Changes
labels:
- "*"
190 changes: 134 additions & 56 deletions alert/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"github.com/zeromicro/go-zero/core/logc"
"golang.org/x/sync/errgroup"
"sort"
"strings"
"sync"
"time"
Expand All @@ -29,14 +30,93 @@ type (
Consume struct {
ctx *ctx.Context
sync.RWMutex
preStoreAlertEvents map[string][]*models.AlertCurEvent
}

EventsGroup struct {
ID string // 事件组 ID
Events []*models.AlertCurEvent
}

RulesGroup struct {
RuleID string // 规则组 ID
Groups []EventsGroup
}

AlertGroups struct {
Rules []RulesGroup // 告警事件列表, 根据规则划分组
lock sync.Mutex
}
)

func (ag *AlertGroups) AddAlert(alert *models.AlertCurEvent, noticeGroup []map[string]string) {
groupID := ag.generateGroupID(alert, noticeGroup)

ag.lock.Lock()
defer ag.lock.Unlock()

// 查找Rule位置
rulePos := sort.Search(len(ag.Rules), func(i int) bool {
return ag.Rules[i].RuleID >= alert.RuleId
})

// Rule存在时的处理,找到对应的规则组
if rulePos < len(ag.Rules) && ag.Rules[rulePos].RuleID == alert.RuleId {
groups := &ag.Rules[rulePos].Groups
// 查找Group位置
groupPos := sort.Search(len(*groups), func(i int) bool {
return (*groups)[i].ID >= groupID
})

if groupPos < len(*groups) && (*groups)[groupPos].ID == groupID {
// 追加事件
(*groups)[groupPos].Events = append((*groups)[groupPos].Events, alert)
} else {
// 插入新Group,新增空元素扩容切片
*groups = append(*groups, EventsGroup{})
// 将插入位置之后的元素后移,
copy((*groups)[groupPos+1:], (*groups)[groupPos:])
// 将数据赋值到新的空元素上
(*groups)[groupPos] = EventsGroup{
ID: groupID,
Events: []*models.AlertCurEvent{alert},
}
}
return
}

// 插入新Rule
ag.Rules = append(ag.Rules, RulesGroup{})
copy(ag.Rules[rulePos+1:], ag.Rules[rulePos:])
ag.Rules[rulePos] = RulesGroup{
RuleID: alert.RuleId,
Groups: []EventsGroup{
{
ID: groupID,
Events: []*models.AlertCurEvent{alert},
},
},
}
}

// generateGroupID 生成分组ID,每个规则可能会有多个分组(其分组通知),默认为 default,如果有匹配的分组则根据 key/value 计算一个 HASH值作为 ID。
func (ag *AlertGroups) generateGroupID(alert *models.AlertCurEvent, noticeGroupMap []map[string]string) string {
groupId := "default"
if len(noticeGroupMap) > 0 {
for key, value := range alert.Metric {
for _, noticeGroup := range noticeGroupMap {
if noticeGroup["key"] == key && noticeGroup["value"] == value.(string) {
groupId = tools.WithKVCalculateHash(key, value.(string))
break
}
}
}
}
return groupId
}

func NewConsumerWork(ctx *ctx.Context) ConsumeInterface {
return &Consume{
ctx: ctx,
preStoreAlertEvents: make(map[string][]*models.AlertCurEvent),
ctx: ctx,
}
}

Expand Down Expand Up @@ -79,14 +159,36 @@ func (c *Consume) Watch(ctx context.Context, faultCenter models.FaultCenter) {
logc.Error(c.ctx.Ctx, fmt.Sprintf("从 Redis 中获取事件信息错误, faultCenterKey: %s, err: %s", faultCenter.GetFaultCenterKey(), err.Error()))
return
}
c.fireAlertEvent(faultCenter, c.filterAlertEvents(faultCenter, data))
c.clear()
// 事件过滤
filterEvents := c.filterAlertEvents(faultCenter, data)
// 事件分组
var alertGroups AlertGroups
c.alarmGrouping(faultCenter, &alertGroups, filterEvents)
// 事件聚合
aggEvents := c.alarmAggregation(faultCenter, &alertGroups)
// 发送事件
c.sendAlerts(faultCenter, aggEvents)
case <-ctx.Done():
return
}
}
}

func (c *Consume) wait(startAt, endAt int64, sem chan struct{}) {
timer := time.NewTicker(time.Second * time.Duration(1))
for {
select {
case <-timer.C:
if (endAt - startAt) > 60 {
time.Sleep(time.Millisecond * 200)
sem <- struct{}{}
} else {
return
}
}
}
}

// filterAlertEvents 过滤告警事件
func (c *Consume) filterAlertEvents(faultCenter models.FaultCenter, alerts map[string]string) []*models.AlertCurEvent {
var newEvents []*models.AlertCurEvent
Expand Down Expand Up @@ -132,62 +234,50 @@ func (c *Consume) validateEvent(event *models.AlertCurEvent, faultCenter models.
event.LastEvalTime >= event.LastSendTime+faultCenter.RepeatNoticeInterval*60
}

// addAlertToGroup 告警分组
func (c *Consume) addAlertToGroup(alert *models.AlertCurEvent, noticeGroupMap []map[string]string) {
c.Lock()
defer c.Unlock()

groupId := alert.RuleId
if len(noticeGroupMap) > 0 {
for key, value := range alert.Metric {
for _, noticeGroup := range noticeGroupMap {
if noticeGroup["key"] == key && noticeGroup["value"] == value.(string) {
groupId = tools.WithKVCalculateHash(key, value.(string)) + "_" + alert.RuleId
break
}
}
}
}

c.preStoreAlertEvents[groupId] = append(c.preStoreAlertEvents[groupId], alert)
}

// fireAlertEvent 触发告警事件
func (c *Consume) fireAlertEvent(faultCenter models.FaultCenter, alerts []*models.AlertCurEvent) {
// alarmGrouping 告警分组
func (c *Consume) alarmGrouping(faultCenter models.FaultCenter, alertGroups *AlertGroups, alerts []*models.AlertCurEvent) {
if len(alerts) == 0 {
return
}

for _, alert := range alerts {
c.addAlertToGroup(alert, faultCenter.NoticeGroup)
alertGroups.AddAlert(alert, faultCenter.NoticeGroup)
//c.addAlertToGroup(alert, faultCenter.NoticeGroup)
if alert.IsRecovered {
c.removeAlertFromCache(alert)
if err := process.RecordAlertHisEvent(c.ctx, *alert); err != nil {
logc.Error(c.ctx.Ctx, fmt.Sprintf("Failed to record alert history: %v", err))
}
}
}
}

// alarmAggregation 告警聚合
func (c *Consume) alarmAggregation(faultCenter models.FaultCenter, alertGroups *AlertGroups) *AlertGroups {
curTime := time.Now().Unix()
newAlertMapping := alertGroups
switch faultCenter.GetAlarmAggregationType() {
case "Rule":
for ri, rule := range newAlertMapping.Rules {
for ei, events := range rule.Groups {
newAlertMapping.Rules[ri].Groups[ei].Events = c.withRuleGroupByAlerts(curTime, events.Events)
}
}
default:
}

c.sendAlerts(faultCenter, c.preStoreAlertEvents)
return newAlertMapping
}

// sendAlerts 发送告警
func (c *Consume) sendAlerts(faultCenter models.FaultCenter, alertMapping map[string][]*models.AlertCurEvent) {
func (c *Consume) sendAlerts(faultCenter models.FaultCenter, aggEvents *AlertGroups) {
c.RLock()
defer c.RUnlock()

for key, alerts := range alertMapping {
ruleId := key
if strings.Contains(key, "_") {
ruleId = strings.Split(key, "_")[1]
for _, rule := range aggEvents.Rules {
for _, groups := range rule.Groups {
c.processAlertGroup(faultCenter, groups.Events)
}

rule := c.ctx.DB.Rule().GetRuleObject(ruleId)
if rule.RuleId == "" || len(alerts) == 0 {
continue
}

c.processAlertGroup(faultCenter, alerts)
}
}

Expand Down Expand Up @@ -228,12 +318,6 @@ func (c *Consume) handleSubscribe(faultCenter models.FaultCenter, alerts []*mode
// handleAlert 处理告警逻辑
func (c *Consume) handleAlert(faultCenter models.FaultCenter, alerts []*models.AlertCurEvent) error {
curTime := time.Now().Unix()
switch faultCenter.GetAlarmAggregationType() {
case "Rule":
alerts = c.withRuleGroupByAlerts(curTime, alerts)
default:
}

g := new(errgroup.Group)
for _, alert := range alerts {
g.Go(func() error {
Expand Down Expand Up @@ -304,7 +388,9 @@ func (c *Consume) withRuleGroupByAlerts(timeInt int64, alerts []*models.AlertCur
var aggregatedAlert *models.AlertCurEvent
for i := range alerts {
alert := alerts[i]
alert.Annotations += fmt.Sprintf("\n聚合 %d 条告警\n", len(alerts))
if !strings.Contains(alert.Annotations, "聚合") {
alert.Annotations += fmt.Sprintf("\n聚合 %d 条告警\n", len(alerts))
}
aggregatedAlert = alert

if !alert.IsRecovered {
Expand All @@ -321,14 +407,6 @@ func (c *Consume) removeAlertFromCache(alert *models.AlertCurEvent) {
c.ctx.Redis.Event().RemoveEventFromFaultCenter(alert.TenantId, alert.FaultCenterId, alert.Fingerprint)
}

// clear 清楚本地缓存
func (c *Consume) clear() {
c.Lock()
defer c.Unlock()

c.preStoreAlertEvents = make(map[string][]*models.AlertCurEvent)
}

// getNoticeData 获取 Notice 数据
func (c *Consume) getNoticeData(tenantId, noticeId string) (models.AlertNotice, error) {
return c.ctx.DB.Notice().Get(models.NoticeQuery{
Expand Down
Loading

0 comments on commit 96a5133

Please sign in to comment.