Skip to content

Commit

Permalink
Add kubernetes event monitor (#64)
Browse files Browse the repository at this point in the history
* Add kubernetes event monitor
  • Loading branch information
Cairry authored Aug 3, 2024
1 parent 35e4e5b commit e5c86b8
Show file tree
Hide file tree
Showing 17 changed files with 696 additions and 16 deletions.
14 changes: 7 additions & 7 deletions alert/process/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,27 @@ func EvalCondition(ctx *ctx.Context, f func() models.AlertCurEvent, value float6
switch ec.Operator {
case ">":
if value > ec.Value {
processEvent(ctx, f())
SaveAlertEvent(ctx, f())
}
case ">=":
if value >= ec.Value {
processEvent(ctx, f())
SaveAlertEvent(ctx, f())
}
case "<":
if value < ec.Value {
processEvent(ctx, f())
SaveAlertEvent(ctx, f())
}
case "<=":
if value <= ec.Value {
processEvent(ctx, f())
SaveAlertEvent(ctx, f())
}
case "==":
if value == ec.Value {
processEvent(ctx, f())
SaveAlertEvent(ctx, f())
}
case "!=":
if value != ec.Value {
processEvent(ctx, f())
SaveAlertEvent(ctx, f())
}
default:
global.Logger.Sugar().Error("无效的评估条件", ec.Type, ec.Operator, ec.Value)
Expand All @@ -45,7 +45,7 @@ func EvalCondition(ctx *ctx.Context, f func() models.AlertCurEvent, value float6

}

func processEvent(ctx *ctx.Context, event models.AlertCurEvent) {
func SaveAlertEvent(ctx *ctx.Context, event models.AlertCurEvent) {
ok := ctx.DB.Rule().GetRuleIsExist(event.RuleId)
if ok {
SaveEventCache(ctx, event)
Expand Down
64 changes: 64 additions & 0 deletions alert/process/kubeevent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package process

import (
"crypto/md5"
"encoding/hex"
v1 "k8s.io/api/core/v1"
"strings"
"watchAlert/pkg/ctx"
)

type KubernetesEvent struct {
ctx *ctx.Context
event v1.Event
}

func KubernetesAlertEvent(ctx *ctx.Context, event v1.Event) KubernetesEvent {
return KubernetesEvent{ctx: ctx, event: event}
}

func (a KubernetesEvent) GetFingerprint() string {
h := md5.New()
streamString := a.event.Namespace + "-" + a.event.Reason
h.Write([]byte(streamString))
fingerprint := hex.EncodeToString(h.Sum(nil))
return fingerprint
}

func (a KubernetesEvent) GetMetrics() map[string]interface{} {
return map[string]interface{}{
"namespace": a.event.Namespace,
"resource": a.event.Reason,
"podName": a.event.InvolvedObject.Name,
}
}

// EvalKubeEvent 评估 Kubernetes 事件
type EvalKubeEvent struct {
Reason string
Filter []string
}

// FilterKubeEvent 过滤资源
func FilterKubeEvent(event *v1.EventList, filter []string) *v1.EventList {
if filter == nil {
return event
}

warningEvents := &v1.EventList{}
for _, event := range event.Items {
var found bool
for _, f := range filter {
if strings.Contains(event.InvolvedObject.Name, f) {
found = true
break
}
}

if !found {
warningEvents.Items = append(warningEvents.Items, event)
}
}

return warningEvents
}
50 changes: 46 additions & 4 deletions alert/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func (rq *RuleQuery) Query(ctx *ctx.Context, rule models.AlertRule) {
rq.jaeger(dsId, rule)
case "CloudWatch":
rq.cloudWatch(dsId, rule)
case "KubernetesEvent":
rq.kubernetesEvent(dsId, rule)
}
}

Expand Down Expand Up @@ -402,10 +404,7 @@ func (rq *RuleQuery) jaeger(datasourceId string, rule models.AlertRule) {
key := rq.alertEvent.GetFiringAlertCacheKey()
curKeys = append(curKeys, key)

ok := rq.ctx.DB.Rule().GetRuleIsExist(event.RuleId)
if ok {
process.SaveEventCache(rq.ctx, event)
}
process.SaveAlertEvent(rq.ctx, event)
}

}
Expand Down Expand Up @@ -468,3 +467,46 @@ func (rq *RuleQuery) cloudWatch(datasourceId string, rule models.AlertRule) {
process.EvalCondition(rq.ctx, event, values[0], options)
}
}

func (rq *RuleQuery) kubernetesEvent(datasourceId string, rule models.AlertRule) {
var curKeys []string
defer func() {
rq.alertRecover(rule, curKeys)
go process.GcRecoverWaitCache(rule, curKeys)
}()

datasourceObj, err := rq.ctx.DB.Datasource().GetInstance(datasourceId)
if err != nil {
return
}

cli, err := client.NewKubernetesClient(rq.ctx.Ctx, datasourceObj.KubeConfig)
if err != nil {
return
}
event, err := cli.GetWarningEvent(rule.KubernetesConfig.Reason, rule.KubernetesConfig.Scope)
if err != nil {
return
}

if len(event.Items) < rule.KubernetesConfig.Value {
return
}

var eventMapping = make(map[string][]string)
for _, item := range process.FilterKubeEvent(event, rule.KubernetesConfig.Filter).Items {
// 同一个资源可能有多条不同的事件信息
eventMapping[item.InvolvedObject.Name] = append(eventMapping[item.InvolvedObject.Name], "\n"+item.Message)
k8sItem := process.KubernetesAlertEvent(rq.ctx, item)
alertEvent := process.ParserDefaultEvent(rule)
alertEvent.DatasourceId = datasourceId
alertEvent.Fingerprint = k8sItem.GetFingerprint()
alertEvent.Metric = k8sItem.GetMetrics()
alertEvent.Annotations = fmt.Sprintf("\n- 环境: %s\n- 命名空间: %s\n- 资源类型: %s\n- 资源名称: %s\n- 事件类型: %s\n- 事件详情: %s\n",
datasourceObj.Name, item.Namespace, item.InvolvedObject.Kind,
item.InvolvedObject.Name, item.Reason, eventMapping[item.InvolvedObject.Name],
)

process.SaveAlertEvent(rq.ctx, alertEvent)
}
}
1 change: 1 addition & 0 deletions api/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type ApiGroup struct {
AWSCloudWatchController
AWSCloudWatchRDSController
SettingsController
KubernetesTypesController
}

var ApiGroupApp = new(ApiGroup)
Expand Down
38 changes: 38 additions & 0 deletions api/kubernetes_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package api

import (
"github.com/gin-gonic/gin"
"watchAlert/internal/middleware"
"watchAlert/internal/models"
"watchAlert/internal/types"
)

type KubernetesTypesController struct{}

func (ktc KubernetesTypesController) API(gin *gin.RouterGroup) {
k8s := gin.Group("kubernetes")
k8s.Use(
middleware.Auth(),
middleware.Permission(),
middleware.ParseTenant(),
)
{
k8s.GET("getResourceList", ktc.getResourceList)
k8s.GET("getReasonList", ktc.getReasonList)
}
}

func (ktc KubernetesTypesController) getResourceList(ctx *gin.Context) {
Service(ctx, func() (interface{}, interface{}) {
return types.EventResourceTypeList, nil
})
}

func (ktc KubernetesTypesController) getReasonList(ctx *gin.Context) {
r := new(models.RequestEventTypes)
BindQuery(ctx, r)

Service(ctx, func() (interface{}, interface{}) {
return types.EventReasonLMapping[r.Resource], nil
})
}
8 changes: 4 additions & 4 deletions api/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
type RuleController struct{}

/*
告警规则 API
/api/w8t/rule
告警规则 API
/api/w8t/rule
*/
func (rc RuleController) API(gin *gin.RouterGroup) {
ruleA := gin.Group("rule")
Expand Down Expand Up @@ -87,8 +87,8 @@ func (rc RuleController) Delete(ctx *gin.Context) {
}

func (rc RuleController) Search(ctx *gin.Context) {
r := new(models.AlertRule)
BindJson(ctx, r)
r := new(models.AlertRuleQuery)
BindQuery(ctx, r)

tid, _ := ctx.Get("TenantID")
r.TenantId = tid.(string)
Expand Down
21 changes: 20 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/gin-gonic/gin v1.9.1
github.com/go-redis/redis v6.15.9+incompatible
github.com/google/uuid v1.6.0
github.com/jordan-wright/email v4.0.1-0.20210109023952-943e75fe5223+incompatible
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/common v0.40.0
github.com/rs/xid v1.5.0
Expand All @@ -23,6 +24,8 @@ require (
go.uber.org/zap v1.24.0
gorm.io/driver/mysql v1.5.4
gorm.io/gorm v1.25.7
k8s.io/apimachinery v0.20.4
k8s.io/client-go v0.20.0
)

require (
Expand Down Expand Up @@ -54,20 +57,26 @@ require (
github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect
github.com/chenzhuoyu/iasm v0.9.1 // indirect
github.com/clbanning/mxj/v2 v2.5.5 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.18.0 // indirect
github.com/go-sql-driver/mysql v1.7.1 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/googleapis/gnostic v0.4.1 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/imdario/mergo v0.3.5 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/jordan-wright/email v4.0.1-0.20210109023952-943e75fe5223+incompatible // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
Expand Down Expand Up @@ -96,8 +105,18 @@ require (
golang.org/x/net v0.24.0 // indirect
golang.org/x/oauth2 v0.16.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/term v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.20.4 // indirect
k8s.io/klog/v2 v2.80.0 // indirect
k8s.io/utils v0.0.0-20201110183641-67b214c5f920 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.0.2 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
)
Loading

0 comments on commit e5c86b8

Please sign in to comment.