Skip to content

Commit

Permalink
🎸 Optimized user alarm subscribe logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Cairry committed Feb 22, 2025
1 parent d6f602a commit 9a48d22
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 45 deletions.
9 changes: 2 additions & 7 deletions alert/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,13 +321,8 @@ func (c *Consume) handleSubscribe(faultCenter models.FaultCenter, alerts []*mode
for _, event := range alerts {
event := event
g.Go(func() error {
noticeId := process.GetNoticeGroupId(event, faultCenter)
noticeData, err := c.getNoticeData(event.TenantId, noticeId)
if err != nil {
return fmt.Errorf("failed to get notice data: %v", err)
}

if err := processSubscribe(c.ctx, event, noticeData); err != nil {
event.FaultCenter = faultCenter
if err := processSubscribe(c.ctx, event); err != nil {
return fmt.Errorf("failed to process subscribe: %v", err)
}

Expand Down
106 changes: 68 additions & 38 deletions alert/consumer/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package consumer

import (
"fmt"
"github.com/zeromicro/go-zero/core/logc"
"strings"
"watchAlert/internal/models"
"watchAlert/pkg/ctx"
Expand All @@ -17,54 +18,83 @@ type toUser struct {
}

// 向已订阅的用户中发送告警消息
func processSubscribe(ctx *ctx.Context, alert *models.AlertCurEvent, notice models.AlertNotice) error {
list, err := ctx.DB.Subscribe().List(models.AlertSubscribeQuery{
STenantId: alert.TenantId,
Query: alert.RuleId,
})
func processSubscribe(ctx *ctx.Context, alert *models.AlertCurEvent) error {
var toUsers []toUser

// 获取所有用户订阅列表
subscribes, err := getSubscribes(alert)
if err != nil {
return fmt.Errorf("获取订阅用户失败, err: %s", err.Error())
return err
}

notice.NoticeType = "Email"
var toUsers []toUser
for _, s := range list {
var foundSeverity, foundFilter bool
for _, severity := range s.SRuleSeverity {
if severity == alert.Severity {
foundSeverity = true
break
}
fmt.Println("subscribes->", subscribes)

for _, subscribe := range subscribes {
// Severity检查
severitySet := make(map[string]struct{})
for _, sev := range subscribe.SRuleSeverity {
severitySet[sev] = struct{}{}
}
if _, exists := severitySet[alert.Severity]; !exists {
continue
}

if foundSeverity {
if len(s.SFilter) > 0 {
for _, f := range s.SFilter {
if strings.Contains(tools.JsonMarshal(alert.Metric), f) || strings.Contains(alert.Annotations, f) {
foundFilter = true
break
}
// 过滤器检查
if len(subscribe.SFilter) > 0 {
allMatched := true
for _, f := range subscribe.SFilter {
if !strings.Contains(tools.JsonMarshal(alert.Metric), f) && !strings.Contains(alert.Annotations, f) {
allMatched = false
break
}
} else {
foundFilter = true
}

if foundFilter {
toUsers = append(toUsers, toUser{
Email: s.SUserEmail,
NoticeSubject: s.SNoticeSubject,
NoticeTemplateId: s.SNoticeTemplateId,
})
if !allMatched {
continue
}
}

toUsers = append(toUsers, toUser{
Email: subscribe.SUserEmail,
NoticeSubject: subscribe.SNoticeSubject,
NoticeTemplateId: subscribe.SNoticeTemplateId,
})
fmt.Println("toUsers->", toUsers)
}

if len(toUsers) > 0 {
for _, u := range toUsers {
notice.NoticeTmplId = u.NoticeTemplateId
emailTemp := templates.NewTemplate(ctx, *alert, notice)
return sendToSubscribeUser(ctx, *alert, toUsers)
}

err = sender.NewEmailSender().Send(sender.SendParams{
func getSubscribes(alert *models.AlertCurEvent) ([]models.AlertSubscribe, error) {
// Firing_a-csu3be5vi7ns2tmpdnhg
ruleId := strings.Split(alert.RuleId, "_")
list, err := ctx.DB.Subscribe().List(models.AlertSubscribeQuery{
STenantId: alert.TenantId,
SRuleId: ruleId[1],
})
if err != nil {
return nil, fmt.Errorf("获取订阅用户失败, err: %s", err.Error())
}

return list, nil
}

func sendToSubscribeUser(ctx *ctx.Context, alert models.AlertCurEvent, toUsers []toUser) error {
if len(toUsers) <= 0 {
return nil
}

var sem = make(chan struct{}, 10)
for _, user := range toUsers {
u := user
// 插入信号量,超过 10 则阻塞协程启动
sem <- struct{}{}
go func(u toUser, sem chan struct{}) {
defer func() {
// 释放信号量
<-sem
}()
emailTemp := templates.NewTemplate(ctx, alert, models.AlertNotice{NoticeType: "Email", NoticeTmplId: u.NoticeTemplateId})
err := sender.NewEmailSender().Send(sender.SendParams{
IsRecovered: alert.IsRecovered,
Email: models.Email{
Subject: u.NoticeSubject,
Expand All @@ -74,9 +104,9 @@ func processSubscribe(ctx *ctx.Context, alert *models.AlertCurEvent, notice mode
Content: emailTemp.CardContentMsg,
})
if err != nil {
return fmt.Errorf("邮件发送失败, err: %s", err.Error())
logc.Errorf(ctx.Ctx, fmt.Sprintf("Email: %s, 邮件发送失败, err: %s", u.Email, err.Error()))
}
}
}(u, sem)
}

return nil
Expand Down
3 changes: 3 additions & 0 deletions internal/repo/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ func (s subscribeRepo) List(r models.AlertSubscribeQuery) ([]models.AlertSubscri
)

db.Where("s_tenant_id = ?", r.STenantId)
if r.SRuleId != "" {
db.Where("s_rule_id = ?", r.SRuleId)
}
if r.Query != "" {
db.Where("s_rule_id LIKE ? or s_rule_name LIKE ? or s_rule_type LIKE ?", "%"+r.Query+"%", "%"+r.Query+"%", "%"+r.Query+"%")
}
Expand Down
1 change: 1 addition & 0 deletions pkg/client/email.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (a EmailClient) Send(to, cc []string, subject string, msg []byte) error {
a.Email.HTML = msg
a.Email.Subject = subject
port := strconv.FormatInt(int64(a.Port), 10)
//err := a.Email.SendWithTLS(a.ServerAddr+":"+port, a.Auth, &tls.Config{ServerName: a.ServerAddr})
err := a.Email.Send(a.ServerAddr+":"+port, a.Auth)
if err != nil {
return err
Expand Down

0 comments on commit 9a48d22

Please sign in to comment.