From d2fb9ccd2371a85e7d7f84159f89914bf7ddc44f Mon Sep 17 00:00:00 2001 From: Cairry Date: Sat, 22 Feb 2025 21:31:35 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=B8=20Optimized=20user=20alarm=20subsc?= =?UTF-8?q?ribe=20logic=20(#138)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 🎸 Optimized user alarm subscribe logic --- alert/consumer/consumer.go | 9 +-- alert/consumer/subscribe.go | 106 +++++++++++++++++++++++------------- internal/repo/subscribe.go | 3 + pkg/client/email.go | 1 + 4 files changed, 74 insertions(+), 45 deletions(-) diff --git a/alert/consumer/consumer.go b/alert/consumer/consumer.go index b8af6cf..341f04e 100644 --- a/alert/consumer/consumer.go +++ b/alert/consumer/consumer.go @@ -321,13 +321,8 @@ func (c *Consume) handleSubscribe(faultCenter models.FaultCenter, alerts []*mode for _, event := range alerts { event := event g.Go(func() error { - noticeId := process.GetNoticeGroupId(event, faultCenter) - noticeData, err := c.getNoticeData(event.TenantId, noticeId) - if err != nil { - return fmt.Errorf("failed to get notice data: %v", err) - } - - if err := processSubscribe(c.ctx, event, noticeData); err != nil { + event.FaultCenter = faultCenter + if err := processSubscribe(c.ctx, event); err != nil { return fmt.Errorf("failed to process subscribe: %v", err) } diff --git a/alert/consumer/subscribe.go b/alert/consumer/subscribe.go index 704f737..aa4b3c2 100644 --- a/alert/consumer/subscribe.go +++ b/alert/consumer/subscribe.go @@ -2,6 +2,7 @@ package consumer import ( "fmt" + "github.com/zeromicro/go-zero/core/logc" "strings" "watchAlert/internal/models" "watchAlert/pkg/ctx" @@ -17,54 +18,83 @@ type toUser struct { } // 向已订阅的用户中发送告警消息 -func processSubscribe(ctx *ctx.Context, alert *models.AlertCurEvent, notice models.AlertNotice) error { - list, err := ctx.DB.Subscribe().List(models.AlertSubscribeQuery{ - STenantId: alert.TenantId, - Query: alert.RuleId, - }) +func processSubscribe(ctx *ctx.Context, alert *models.AlertCurEvent) error { + var toUsers []toUser + + // 获取所有用户订阅列表 + subscribes, err := getSubscribes(alert) if err != nil { - return fmt.Errorf("获取订阅用户失败, err: %s", err.Error()) + return err } - notice.NoticeType = "Email" - var toUsers []toUser - for _, s := range list { - var foundSeverity, foundFilter bool - for _, severity := range s.SRuleSeverity { - if severity == alert.Severity { - foundSeverity = true - break - } + fmt.Println("subscribes->", subscribes) + + for _, subscribe := range subscribes { + // Severity检查 + severitySet := make(map[string]struct{}) + for _, sev := range subscribe.SRuleSeverity { + severitySet[sev] = struct{}{} + } + if _, exists := severitySet[alert.Severity]; !exists { + continue } - if foundSeverity { - if len(s.SFilter) > 0 { - for _, f := range s.SFilter { - if strings.Contains(tools.JsonMarshal(alert.Metric), f) || strings.Contains(alert.Annotations, f) { - foundFilter = true - break - } + // 过滤器检查 + if len(subscribe.SFilter) > 0 { + allMatched := true + for _, f := range subscribe.SFilter { + if !strings.Contains(tools.JsonMarshal(alert.Metric), f) && !strings.Contains(alert.Annotations, f) { + allMatched = false + break } - } else { - foundFilter = true } - - if foundFilter { - toUsers = append(toUsers, toUser{ - Email: s.SUserEmail, - NoticeSubject: s.SNoticeSubject, - NoticeTemplateId: s.SNoticeTemplateId, - }) + if !allMatched { + continue } } + + toUsers = append(toUsers, toUser{ + Email: subscribe.SUserEmail, + NoticeSubject: subscribe.SNoticeSubject, + NoticeTemplateId: subscribe.SNoticeTemplateId, + }) + fmt.Println("toUsers->", toUsers) } - if len(toUsers) > 0 { - for _, u := range toUsers { - notice.NoticeTmplId = u.NoticeTemplateId - emailTemp := templates.NewTemplate(ctx, *alert, notice) + return sendToSubscribeUser(ctx, *alert, toUsers) +} - err = sender.NewEmailSender().Send(sender.SendParams{ +func getSubscribes(alert *models.AlertCurEvent) ([]models.AlertSubscribe, error) { + // Firing_a-csu3be5vi7ns2tmpdnhg + ruleId := strings.Split(alert.RuleId, "_") + list, err := ctx.DB.Subscribe().List(models.AlertSubscribeQuery{ + STenantId: alert.TenantId, + SRuleId: ruleId[1], + }) + if err != nil { + return nil, fmt.Errorf("获取订阅用户失败, err: %s", err.Error()) + } + + return list, nil +} + +func sendToSubscribeUser(ctx *ctx.Context, alert models.AlertCurEvent, toUsers []toUser) error { + if len(toUsers) <= 0 { + return nil + } + + var sem = make(chan struct{}, 10) + for _, user := range toUsers { + u := user + // 插入信号量,超过 10 则阻塞协程启动 + sem <- struct{}{} + go func(u toUser, sem chan struct{}) { + defer func() { + // 释放信号量 + <-sem + }() + emailTemp := templates.NewTemplate(ctx, alert, models.AlertNotice{NoticeType: "Email", NoticeTmplId: u.NoticeTemplateId}) + err := sender.NewEmailSender().Send(sender.SendParams{ IsRecovered: alert.IsRecovered, Email: models.Email{ Subject: u.NoticeSubject, @@ -74,9 +104,9 @@ func processSubscribe(ctx *ctx.Context, alert *models.AlertCurEvent, notice mode Content: emailTemp.CardContentMsg, }) if err != nil { - return fmt.Errorf("邮件发送失败, err: %s", err.Error()) + logc.Errorf(ctx.Ctx, fmt.Sprintf("Email: %s, 邮件发送失败, err: %s", u.Email, err.Error())) } - } + }(u, sem) } return nil diff --git a/internal/repo/subscribe.go b/internal/repo/subscribe.go index 83481c3..2536462 100644 --- a/internal/repo/subscribe.go +++ b/internal/repo/subscribe.go @@ -34,6 +34,9 @@ func (s subscribeRepo) List(r models.AlertSubscribeQuery) ([]models.AlertSubscri ) db.Where("s_tenant_id = ?", r.STenantId) + if r.SRuleId != "" { + db.Where("s_rule_id = ?", r.SRuleId) + } if r.Query != "" { db.Where("s_rule_id LIKE ? or s_rule_name LIKE ? or s_rule_type LIKE ?", "%"+r.Query+"%", "%"+r.Query+"%", "%"+r.Query+"%") } diff --git a/pkg/client/email.go b/pkg/client/email.go index 58fc162..6505cac 100644 --- a/pkg/client/email.go +++ b/pkg/client/email.go @@ -33,6 +33,7 @@ func (a EmailClient) Send(to, cc []string, subject string, msg []byte) error { a.Email.HTML = msg a.Email.Subject = subject port := strconv.FormatInt(int64(a.Port), 10) + //err := a.Email.SendWithTLS(a.ServerAddr+":"+port, a.Auth, &tls.Config{ServerName: a.ServerAddr}) err := a.Email.Send(a.ServerAddr+":"+port, a.Auth) if err != nil { return err