Skip to content

Commit

Permalink
Merge pull request #55 from Cairry/master
Browse files Browse the repository at this point in the history
Add VictoriaMetrics datasource and prometheus query api
  • Loading branch information
Cairry authored Jul 22, 2024
2 parents e054224 + 137ea17 commit 671a9cd
Show file tree
Hide file tree
Showing 11 changed files with 371 additions and 139 deletions.
53 changes: 53 additions & 0 deletions alert/process/eval.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package process

import (
"watchAlert/internal/global"
"watchAlert/internal/models"
"watchAlert/pkg/ctx"
)

// EvalCondition 评估告警条件
func EvalCondition(ctx *ctx.Context, f func() models.AlertCurEvent, value float64, ec models.EvalCondition) {

switch ec.Type {
case "count", "metric":
switch ec.Operator {
case ">":
if value > ec.Value {
processEvent(ctx, f())
}
case ">=":
if value >= ec.Value {
processEvent(ctx, f())
}
case "<":
if value < ec.Value {
processEvent(ctx, f())
}
case "<=":
if value <= ec.Value {
processEvent(ctx, f())
}
case "==":
if value == ec.Value {
processEvent(ctx, f())
}
case "!=":
if value != ec.Value {
processEvent(ctx, f())
}
default:
global.Logger.Sugar().Error("无效的评估条件", ec.Type, ec.Operator, ec.Value)
}
default:
global.Logger.Sugar().Error("无效的评估类型", ec.Type)
}

}

func processEvent(ctx *ctx.Context, event models.AlertCurEvent) {
ok := ctx.DB.Rule().GetRuleIsExist(event.RuleId)
if ok {
SaveEventCache(ctx, event)
}
}
47 changes: 4 additions & 43 deletions alert/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,50 +116,11 @@ func ParserDuration(curTime time.Time, logScope int, timeType string) time.Time

}

// EvalCondition 评估告警条件
func EvalCondition(f func(), value int, ec models.EvalCondition) {

switch ec.Type {
case "count", "value":
switch ec.Operator {
case ">":
if value > ec.Value {
f()
}
case ">=":
if value >= ec.Value {
f()
}
case "<":
if value < ec.Value {
f()
}
case "<=":
if value <= ec.Value {
f()
}
case "==":
if value == ec.Value {
f()
}
case "!=":
if value != ec.Value {
f()
}
default:
global.Logger.Sugar().Error("无效的评估条件", ec.Type, ec.Operator, ec.Value)
}
default:
global.Logger.Sugar().Error("无效的评估类型", ec.Type)
}

}

/*
GcPendingCache
清理 Pending 数据的缓存.
场景: 第一次查询到有异常的指标会写入 Pending 缓存, 当该指标持续 Pending 到达持续时间后才会写入 Firing 缓存,
那么未到达持续时间并且该指标恢复正常, 那么就需要清理该指标的 Pending 数据.
GcPendingCache
清理 Pending 数据的缓存.
场景: 第一次查询到有异常的指标会写入 Pending 缓存, 当该指标持续 Pending 到达持续时间后才会写入 Firing 缓存,
那么未到达持续时间并且该指标恢复正常, 那么就需要清理该指标的 Pending 数据.
*/
func GcPendingCache(ctx *ctx.Context, rule models.AlertRule, curKeys []string) {
pendingKeys, err := ctx.Redis.Rule().GetAlertPendingCacheKeys(models.AlertRuleQuery{
Expand Down
58 changes: 0 additions & 58 deletions alert/process/prom.go

This file was deleted.

137 changes: 114 additions & 23 deletions alert/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"watchAlert/pkg/community/aws/cloudwatch"
"watchAlert/pkg/community/aws/cloudwatch/types"
"watchAlert/pkg/ctx"
"watchAlert/pkg/utils/cmd"
)

type RuleQuery struct {
Expand All @@ -27,6 +28,8 @@ func (rq *RuleQuery) Query(ctx *ctx.Context, rule models.AlertRule) {
switch rule.DatasourceType {
case "Prometheus":
rq.prometheus(dsId, rule)
case "VictoriaMetrics":
rq.victoriametrics(dsId, rule)
case "AliCloudSLS":
rq.aliCloudSLS(dsId, rule)
case "Loki":
Expand Down Expand Up @@ -88,14 +91,14 @@ func (rq *RuleQuery) alertRecover(rule models.AlertRule, curKeys []string) {
// Prometheus 数据源
func (rq *RuleQuery) prometheus(datasourceId string, rule models.AlertRule) {
var (
curFiringKeys = &[]string{}
curPendingKeys = &[]string{}
curFiringKeys []string
curPendingKeys []string
)

defer func() {
go process.GcPendingCache(rq.ctx, rule, *curPendingKeys)
rq.alertRecover(rule, *curFiringKeys)
go process.GcRecoverWaitCache(rule, *curFiringKeys)
go process.GcPendingCache(rq.ctx, rule, curPendingKeys)
rq.alertRecover(rule, curFiringKeys)
go process.GcRecoverWaitCache(rule, curFiringKeys)
}()

r := models.DatasourceQuery{
Expand Down Expand Up @@ -123,7 +126,101 @@ func (rq *RuleQuery) prometheus(datasourceId string, rule models.AlertRule) {
re := regexp.MustCompile(`([^\d]+)(\d+)`)
matches := re.FindStringSubmatch(ruleExpr.Expr)
t, _ := strconv.ParseFloat(matches[2], 64)
process.CalIndicatorValue(rq.ctx, matches[1], t, rule, v, datasourceId, curFiringKeys, curPendingKeys, ruleExpr.Severity)

f := func() models.AlertCurEvent {
event := process.ParserDefaultEvent(rule)
event.DatasourceId = datasourceId
event.Fingerprint = v.GetFingerprint()
event.Metric = v.GetMetric()
event.Metric["severity"] = ruleExpr.Severity
event.Severity = ruleExpr.Severity
event.Annotations = cmd.ParserVariables(rule.PrometheusConfig.Annotations, event.Metric)

firingKey := event.GetFiringAlertCacheKey()
pendingKey := event.GetPendingAlertCacheKey()

curFiringKeys = append(curFiringKeys, firingKey)
curPendingKeys = append(curPendingKeys, pendingKey)

return event
}

option := models.EvalCondition{
Type: "metric",
Operator: matches[1],
Value: t,
}

process.EvalCondition(rq.ctx, f, v.Value, option)
}
}

}

// VictorMetrics 数据源
func (rq *RuleQuery) victoriametrics(datasourceId string, rule models.AlertRule) {
var (
curFiringKeys []string
curPendingKeys []string
)

defer func() {
go process.GcPendingCache(rq.ctx, rule, curPendingKeys)
rq.alertRecover(rule, curFiringKeys)
go process.GcRecoverWaitCache(rule, curFiringKeys)
}()

r := models.DatasourceQuery{
TenantId: rule.TenantId,
Id: datasourceId,
Type: "VictoriaMetrics",
}
datasourceInfo, err := rq.ctx.DB.Datasource().Get(r)
if err != nil {
return
}

cmCli := client.NewVictoriaMetricsClient(datasourceInfo)
resQuery, err := cmCli.Query(rule.PrometheusConfig.PromQL)
if err != nil {
return
}

if resQuery == nil {
return
}

for _, v := range resQuery {
for _, ruleExpr := range rule.PrometheusConfig.Rules {
re := regexp.MustCompile(`([^\d]+)(\d+)`)
matches := re.FindStringSubmatch(ruleExpr.Expr)
t, _ := strconv.ParseFloat(matches[2], 64)

f := func() models.AlertCurEvent {
event := process.ParserDefaultEvent(rule)
event.DatasourceId = datasourceId
event.Fingerprint = v.GetFingerprint()
event.Metric = v.GetMetric()
event.Metric["severity"] = ruleExpr.Severity
event.Severity = ruleExpr.Severity
event.Annotations = cmd.ParserVariables(rule.PrometheusConfig.Annotations, event.Metric)

firingKey := event.GetFiringAlertCacheKey()
pendingKey := event.GetPendingAlertCacheKey()

curFiringKeys = append(curFiringKeys, firingKey)
curPendingKeys = append(curPendingKeys, pendingKey)

return event
}

option := models.EvalCondition{
Type: "metric",
Operator: matches[1],
Value: t,
}

process.EvalCondition(rq.ctx, f, v.Value, option)
}
}

Expand Down Expand Up @@ -170,7 +267,7 @@ func (rq *RuleQuery) aliCloudSLS(datasourceId string, rule models.AlertRule) {

for _, body := range bodyList.MetricList {

event := func() {
event := func() models.AlertCurEvent {
event := process.ParserDefaultEvent(rule)
event.DatasourceId = datasourceId
event.Fingerprint = body.GetFingerprint()
Expand All @@ -180,10 +277,7 @@ func (rq *RuleQuery) aliCloudSLS(datasourceId string, rule models.AlertRule) {
key := event.GetFiringAlertCacheKey()
curKeys = append(curKeys, key)

ok := rq.ctx.DB.Rule().GetRuleIsExist(event.RuleId)
if ok {
process.SaveEventCache(rq.ctx, event)
}
return event
}

options := models.EvalCondition{
Expand All @@ -197,7 +291,7 @@ func (rq *RuleQuery) aliCloudSLS(datasourceId string, rule models.AlertRule) {
}

// 评估告警条件
process.EvalCondition(event, count, options)
process.EvalCondition(rq.ctx, event, float64(count), options)
}

}
Expand Down Expand Up @@ -239,7 +333,7 @@ func (rq *RuleQuery) loki(datasourceId string, rule models.AlertRule) {
continue
}

event := func() {
event := func() models.AlertCurEvent {
event := process.ParserDefaultEvent(rule)
event.DatasourceId = datasourceId
event.Fingerprint = v.GetFingerprint()
Expand All @@ -249,10 +343,7 @@ func (rq *RuleQuery) loki(datasourceId string, rule models.AlertRule) {
key := event.GetPendingAlertCacheKey()
curKeys = append(curKeys, key)

ok := rq.ctx.DB.Rule().GetRuleIsExist(event.RuleId)
if ok {
process.SaveEventCache(rq.ctx, event)
}
return event
}

options := models.EvalCondition{
Expand All @@ -262,7 +353,7 @@ func (rq *RuleQuery) loki(datasourceId string, rule models.AlertRule) {
}

// 评估告警条件
process.EvalCondition(event, count, options)
process.EvalCondition(rq.ctx, event, float64(count), options)

}

Expand Down Expand Up @@ -358,22 +449,22 @@ func (rq *RuleQuery) cloudWatch(datasourceId string, rule models.AlertRule) {
return
}

event := func() {
event := func() models.AlertCurEvent {
event := process.ParserDefaultEvent(rule)
event.DatasourceId = datasourceId
event.Fingerprint = query.GetFingerprint()
event.Metric = query.GetMetrics()
event.Annotations = fmt.Sprintf("%s %s %s %s %d", query.Namespace, query.MetricName, query.Statistic, rule.CloudWatchConfig.Expr, rule.CloudWatchConfig.Threshold)

process.SaveEventCache(rq.ctx, event)
return event
}

options := models.EvalCondition{
Type: "value",
Type: "metric",
Operator: rule.CloudWatchConfig.Expr,
Value: rule.CloudWatchConfig.Threshold,
Value: float64(rule.CloudWatchConfig.Threshold),
}

process.EvalCondition(event, int(values[0]), options)
process.EvalCondition(rq.ctx, event, values[0], options)
}
}
Loading

0 comments on commit 671a9cd

Please sign in to comment.