Skip to content

Commit

Permalink
Add ssl monitor (#67)
Browse files Browse the repository at this point in the history
* Add ssl monitor
  • Loading branch information
Cairry authored Aug 11, 2024
1 parent efce567 commit 79966dc
Show file tree
Hide file tree
Showing 19 changed files with 693 additions and 9 deletions.
11 changes: 9 additions & 2 deletions alert/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,23 @@ package alert
import (
"watchAlert/alert/consumer"
"watchAlert/alert/eval"
"watchAlert/alert/task"
"watchAlert/internal/global"
"watchAlert/pkg/ctx"
)

func Initialize(ctx *ctx.Context) {
var (
MonEvalTask task.MonitorSSLEval
MonConsumerTask consumer.MonitorSslConsumer
)

func Initialize(ctx *ctx.Context) {
consumer.NewInterEvalConsumeWork(ctx).Run()
eval.NewInterAlertRuleWork(ctx).Run()
initAlarmConfig(ctx)

MonConsumerTask = consumer.NewMonitorSslConsumer(ctx)
MonEvalTask = task.NewMonitorSSLEval()
MonEvalTask.RePushTask(ctx, &MonConsumerTask)
}

func initAlarmConfig(ctx *ctx.Context) {
Expand Down
103 changes: 103 additions & 0 deletions alert/consumer/monitor_ssl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package consumer

import (
"context"
"fmt"
"sync"
"time"
"watchAlert/alert/process"
"watchAlert/alert/sender"
"watchAlert/internal/global"
"watchAlert/internal/models"
"watchAlert/pkg/ctx"
)

type MonitorSslConsumer struct {
l sync.RWMutex
consumerPool map[string]context.CancelFunc
ctx *ctx.Context
}

func NewMonitorSslConsumer(ctx *ctx.Context) MonitorSslConsumer {
return MonitorSslConsumer{
ctx: ctx,
consumerPool: make(map[string]context.CancelFunc),
}
}

func (m *MonitorSslConsumer) Add(r models.MonitorSSLRule) {
m.l.Lock()
m.l.Unlock()

c, cancel := context.WithCancel(context.Background())
m.consumerPool[r.ID] = cancel

ticker := time.Tick(time.Second)
go func(ctx context.Context, r models.MonitorSSLRule) {
for {
select {
case <-ticker:
key := fmt.Sprintf("%s:%s%s--", r.TenantId, models.FiringAlertCachePrefix, r.ID)
result := m.ctx.Redis.Event().GetCache(key)
handleAlert(m.ctx, result)
case <-ctx.Done():
return
}
}
}(c, r)
}

func (m *MonitorSslConsumer) Stop(id string) {
m.l.Lock()
m.l.Unlock()

if cancel, exists := m.consumerPool[id]; exists {
cancel()
}
}

func filterEvent(ctx *ctx.Context, alert models.AlertCurEvent) bool {
var pass bool
if !alert.IsRecovered {
if alert.LastSendTime == 0 || alert.LastEvalTime >= alert.LastSendTime+alert.RepeatNoticeInterval*60 {
alert.LastSendTime = time.Now().Unix()
ctx.Redis.Event().SetCache("Firing", alert, 0)
return true
}
} else {
removeAlertFromCache(ctx, alert)
err := process.RecordAlertHisEvent(ctx, alert)
if err != nil {
global.Logger.Sugar().Error(err.Error())
}
return true
}
return pass
}

// 删除缓存
func removeAlertFromCache(ctx *ctx.Context, alert models.AlertCurEvent) {
key := fmt.Sprintf("%s:%s%s--", alert.TenantId, models.FiringAlertCachePrefix, alert.RuleId)
ctx.Redis.Event().DelCache(key)
}

// 推送告警
func handleAlert(ctx *ctx.Context, alert models.AlertCurEvent) {
if alert.RuleId == "" {
return
}

if filterEvent(ctx, alert) {
r := models.NoticeQuery{
TenantId: alert.TenantId,
Uuid: alert.NoticeId,
}
noticeData, _ := ctx.DB.Notice().Get(r)
alert.DutyUser = process.GetDutyUser(ctx, noticeData)
err := sender.Sender(ctx, alert, noticeData)
if err != nil {
global.Logger.Sugar().Errorf(err.Error())
return
}
}
}
47 changes: 41 additions & 6 deletions alert/eval/alert_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,20 @@ package eval

import (
"context"
"fmt"
"sync"
"time"
"watchAlert/alert/query"
"watchAlert/alert/queue"
"watchAlert/internal/global"
models "watchAlert/internal/models"
"watchAlert/internal/services"
"watchAlert/pkg/ctx"
)

type AlertRuleWork struct {
sync.RWMutex
query.RuleQuery
ctx *ctx.Context
services.InterAlertService
ctx *ctx.Context
rule chan *models.AlertRule
alertEvent models.AlertCurEvent
}
Expand Down Expand Up @@ -49,9 +48,7 @@ func (arw *AlertRuleWork) Run() {
}
}()

// 重启服务后将历史 Rule 重新推到队列中
services.AlertService.RePushRule(arw.ctx, arw.rule)

rePushRule(arw.ctx, arw.rule)
}

func (arw *AlertRuleWork) worker(rule models.AlertRule, ctx context.Context) {
Expand All @@ -75,3 +72,41 @@ func (arw *AlertRuleWork) worker(rule models.AlertRule, ctx context.Context) {
}

}

func rePushRule(ctx *ctx.Context, alertRule chan *models.AlertRule) {

var (
ruleList []models.AlertRule
// 创建一个通道用于接收处理结果
resultCh = make(chan error)
// 使用 WaitGroup 来等待所有规则的处理完成
wg sync.WaitGroup
)
ctx.DB.DB().Where("enabled = ?", "1").Find(&ruleList)

// 并发处理规则
for _, rule := range ruleList {
wg.Add(1)
go func(rule models.AlertRule) {
defer wg.Done()

alertRule <- &rule

resultCh <- nil
}(rule)
}

// 等待所有规则的处理完成
go func() {
wg.Wait()
close(resultCh)
}()

// 处理结果
for result := range resultCh {
if result != nil {
fmt.Println("Error:", result)
}
}

}
15 changes: 15 additions & 0 deletions alert/task/process.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package task

import (
"watchAlert/internal/models"
"watchAlert/pkg/ctx"
)

func SaveMonitorCacheEvent(ctx *ctx.Context, event models.AlertCurEvent) {
firingKey := event.GetFiringAlertCacheKey()
resFiring := ctx.Redis.Event().GetCache(firingKey)
event.FirstTriggerTime = ctx.Redis.Event().GetFirstTime(firingKey)
event.LastEvalTime = ctx.Redis.Event().GetLastEvalTime(firingKey)
event.LastSendTime = resFiring.LastSendTime
ctx.Redis.Event().SetCache("Firing", event, 0)
}
161 changes: 161 additions & 0 deletions alert/task/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package task

import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
"sync"
"time"
"watchAlert/alert/consumer"
"watchAlert/internal/global"
"watchAlert/internal/models"
"watchAlert/pkg/ctx"
"watchAlert/pkg/utils/http"
)

type MonitorSSLEval struct {
l sync.RWMutex
WatchCtxMap map[string]context.CancelFunc
}

func NewMonitorSSLEval() MonitorSSLEval {
return MonitorSSLEval{
WatchCtxMap: make(map[string]context.CancelFunc),
}
}

func (t *MonitorSSLEval) Submit(ctx *ctx.Context, rule models.MonitorSSLRule) {
t.l.Lock()
defer t.l.Unlock()

c, cancel := context.WithCancel(context.Background())
t.WatchCtxMap[rule.ID] = cancel
go t.Eval(c, ctx, rule)
}

func (t *MonitorSSLEval) Stop(id string) {
t.l.Lock()
defer t.l.Unlock()

if cancel, exists := t.WatchCtxMap[id]; exists {
cancel()
delete(t.WatchCtxMap, id)
}
}

func (t *MonitorSSLEval) Eval(ctx context.Context, w8tCtx *ctx.Context, rule models.MonitorSSLRule) {
timer := time.NewTicker(time.Hour * time.Duration(rule.EvalInterval))
defer timer.Stop()
t.worker(w8tCtx, rule)

for {
select {
case <-timer.C:
t.worker(w8tCtx, rule)
case <-ctx.Done():
return
}
}
}

func (t *MonitorSSLEval) worker(w8tCtx *ctx.Context, rule models.MonitorSSLRule) {
// 记录开始时间
startTime := time.Now()

resp, err := http.Get("https://" + rule.Domain)
if err != nil {
return
}
defer resp.Body.Close()

// 证书为空, 跳过检测
if resp.TLS == nil {
t.Stop(rule.ID)
return
}

// 获取证书信息
certs := resp.TLS.PeerCertificates[0]
certTime := certs.NotAfter.Unix()
currentTime := time.Now().Unix()

// 计算剩余有效期时间
TimeRemaining := (certTime - currentTime) / 86400

// 创建事件
event := t.processDefaultEvent(rule)
event.TimeRemaining = TimeRemaining
event.ResponseTime = fmt.Sprintf("%dms", time.Since(startTime).Milliseconds())
event.Metric = rule.GetMetrics()
event.Annotations = fmt.Sprintf("域名: %s, SSL证书即将到期, 剩余: %d天", rule.Domain, TimeRemaining)

// 更新规则信息
rule.TimeRemaining = event.TimeRemaining
rule.ResponseTime = event.ResponseTime
if err := w8tCtx.DB.MonitorSSL().Update(rule); err != nil {
global.Logger.Sugar().Error(err.Error())
return
}

// 根据剩余时间与期望时间判断是否触发告警或恢复
if TimeRemaining <= rule.ExpectTime {
SaveMonitorCacheEvent(w8tCtx, event)
} else {
t.processRecover(w8tCtx, event)
}
}

func (t *MonitorSSLEval) processDefaultEvent(rule models.MonitorSSLRule) models.AlertCurEvent {
return models.AlertCurEvent{
TenantId: rule.TenantId,
RuleId: rule.ID,
RuleName: rule.Name,
EvalInterval: rule.EvalInterval,
NoticeId: rule.NoticeId,
IsRecovered: false,
RepeatNoticeInterval: rule.RepeatNoticeInterval,
DutyUser: "暂无", // 默认暂无值班人员, 渲染模版时会实际判断 Notice 是否存在值班人员
RecoverNotify: rule.RecoverNotify,
}
}

func (t *MonitorSSLEval) processRecover(ctx *ctx.Context, event models.AlertCurEvent) {
key := fmt.Sprintf("%s:%s%s--", event.TenantId, models.FiringAlertCachePrefix, event.RuleId)
cache := ctx.Redis.Event().GetCache(key)

// 提前返回,如果缓存中没有相关数据或已经恢复
if cache.RuleId == "" || cache.IsRecovered {
return
}

// 检查事件的剩余时间是否大于或等于缓存中的剩余时间
if event.TimeRemaining >= cache.TimeRemaining {
event.FirstTriggerTime = ctx.Redis.Event().GetFirstTime(key)
event.IsRecovered = true
event.RecoverTime = time.Now().Unix()
event.LastSendTime = 0
ctx.Redis.Event().SetCache("Firing", event, 0)
}
}

func (t *MonitorSSLEval) RePushTask(ctx *ctx.Context, consumer *consumer.MonitorSslConsumer) {
var ruleList []models.MonitorSSLRule
if err := ctx.DB.DB().Where("enabled = ?", "1").Find(&ruleList).Error; err != nil {
global.Logger.Sugar().Error(err.Error())
return
}

g := new(errgroup.Group)
for _, rule := range ruleList {
rule := rule
g.Go(func() error {
t.Submit(ctx, rule)
consumer.Add(rule)
return nil
})
}

if err := g.Wait(); err != nil {
global.Logger.Sugar().Error(err.Error())
}
}
1 change: 1 addition & 0 deletions api/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type ApiGroup struct {
AWSCloudWatchRDSController
SettingsController
KubernetesTypesController
MonitorSSLController
}

var ApiGroupApp = new(ApiGroup)
Expand Down
Loading

0 comments on commit 79966dc

Please sign in to comment.