diff --git a/api/prometheus/prometheus.go b/api/prometheus/prometheus.go index 0c436f451..510b45044 100644 --- a/api/prometheus/prometheus.go +++ b/api/prometheus/prometheus.go @@ -18,13 +18,14 @@ package prometheus import ( "context" "fmt" - "github.com/go-openapi/errors" - "github.com/loxilb-io/loxilb/options" "net/http" "strings" "sync" "time" + "github.com/go-openapi/errors" + "github.com/loxilb-io/loxilb/options" + cmn "github.com/loxilb-io/loxilb/common" tk "github.com/loxilb-io/loxilib" "github.com/prometheus/client_golang/prometheus" @@ -106,42 +107,71 @@ var ( Help: "The number of LCUs used by the load balancer.", }, ) - newFlowCount = promauto.NewGauge( - prometheus.GaugeOpts{ + newFlowCount = promauto.NewCounter( + prometheus.CounterOpts{ Name: "new_flow_count", Help: "The number of new TCP connections from clients to targets.", }, ) - processedBytes = promauto.NewGauge( - prometheus.GaugeOpts{ + processedBytes = promauto.NewCounter( + prometheus.CounterOpts{ Name: "processed_bytes", Help: "The total number of bytes processed by the load balancer, including TCP/IP headers.", }, ) - processedTCPBytes = promauto.NewGauge( - prometheus.GaugeOpts{ + processedTCPBytes = promauto.NewCounter( + prometheus.CounterOpts{ Name: "processed_tcp_bytes", Help: "The total number of bytes processed by the load balancer, including TCP/IP headers.", }, ) - processedUDPBytes = promauto.NewGauge( - prometheus.GaugeOpts{ + processedUDPBytes = promauto.NewCounter( + prometheus.CounterOpts{ Name: "processed_udp_bytes", Help: "The total number of bytes processed by the load balancer, including TCP/IP headers.", }, ) - processedSCTPBytes = promauto.NewGauge( - prometheus.GaugeOpts{ + processedSCTPBytes = promauto.NewCounter( + prometheus.CounterOpts{ Name: "processed_sctp_bytes", Help: "The total number of bytes processed by the load balancer, including TCP/IP headers.", }, ) - processedPackets = promauto.NewGauge( - prometheus.GaugeOpts{ + processedPackets = promauto.NewCounter( + prometheus.CounterOpts{ Name: "processed_packets", Help: "The total number of packets processed by the load balancer.", }, ) + lbRuleProcessedBytes = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "lb_rule_processed_bytes", + Help: "The total number of bytes processed by the load balancer for each rule, including TCP/IP headers.", + }, + []string{"rule"}, + ) + lbRuleProcessedPackets = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "lb_rule_processed_packets", + Help: "The total number of packets processed by the load balancer for each rule.", + }, + []string{"rule"}, + ) + hostProcessedBytes = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "host_processed_bytes", + Help: "The total number of bytes processed by the load balancer for each host, including TCP/IP headers.", + }, + []string{"host"}, + ) + hostProcessedPackets = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "host_processed_packets", + Help: "The total number of packets processed by the load balancer for each host.", + }, + []string{"host"}, + ) + prevConntrackStats = make(map[ConntrackKey]Stats) ) func PrometheusRegister(hook cmn.NetHookInterface) { @@ -154,6 +184,7 @@ func Init() { // Make Conntrack Statistic map ConntrackStats = make(map[ConntrackKey]Stats) mutex = &sync.Mutex{} + go RunGetConntrack(prometheusCtx) go RunGetEndpoint(prometheusCtx) go RunActiveConntrackCount(prometheusCtx) @@ -184,7 +215,8 @@ func TurnOn() error { } func MakeConntrackKey(c cmn.CtInfo) (key ConntrackKey) { - return ConntrackKey(fmt.Sprintf("%s|%05d|%s|%05d|%v", c.Sip, c.Sport, c.Dip, c.Dport, c.Proto)) + return ConntrackKey(fmt.Sprintf("%s|%05d|%s|%05d|%v|%s", + c.Sip, c.Sport, c.Dip, c.Dport, c.Proto, c.ServiceName)) } func RunResetCounts(ctx context.Context) { @@ -208,33 +240,26 @@ func RunGetConntrack(ctx context.Context) { case <-ctx.Done(): return default: - mutex.Lock() - ConntrackInfo, err = hooks.NetCtInfoGet() - if err != nil { - tk.LogIt(tk.LogDebug, "[Prometheus] Error occur : %v\n", err) - } - - for _, ct := range ConntrackInfo { - k := MakeConntrackKey(ct) - var tmpStats Stats - _, ok := ConntrackStats[k] - if ok { - tmpStats = Stats{ - Bytes: ConntrackStats[k].Bytes + ct.Bytes, - Packets: ConntrackStats[k].Packets + ct.Pkts, - } - } else { - tmpStats = Stats{ - Bytes: ct.Bytes, - Packets: ct.Pkts, - } - } + } - ConntrackStats[k] = tmpStats + ConntrackInfo, err = hooks.NetCtInfoGet() + if err != nil { + tk.LogIt(tk.LogDebug, "[Prometheus] Error occur : %v\n", err) + continue + } + localStats := make(map[ConntrackKey]Stats, len(ConntrackInfo)) + for _, ct := range ConntrackInfo { + key := MakeConntrackKey(ct) + localStats[key] = Stats{ + Bytes: ct.Bytes, + Packets: ct.Pkts, } - mutex.Unlock() } + mutex.Lock() + ConntrackStats = localStats + mutex.Unlock() + time.Sleep(PromethusDefaultPeriod) } } @@ -245,11 +270,14 @@ func RunGetEndpoint(ctx context.Context) { case <-ctx.Done(): return default: - mutex.Lock() - EndPointInfo, err = hooks.NetEpHostGet() + info, err := hooks.NetEpHostGet() if err != nil { tk.LogIt(tk.LogDebug, "[Prometheus] Error occur : %v\n", err) + continue } + + mutex.Lock() + EndPointInfo = info mutex.Unlock() } @@ -263,14 +291,20 @@ func RunGetLBRule(ctx context.Context) { case <-ctx.Done(): return default: - mutex.Lock() - LBRuleInfo, err = hooks.NetLbRuleGet() - if err != nil { - tk.LogIt(tk.LogDebug, "[Prometheus] Error occur : %v\n", err) - } - ruleCount.Set(float64(len(LBRuleInfo))) - mutex.Unlock() } + + info, err := hooks.NetLbRuleGet() + if err != nil { + tk.LogIt(tk.LogDebug, "[Prometheus] Error occur : %v\n", err) + continue + } + + mutex.Lock() + LBRuleInfo = info + mutex.Unlock() + + ruleCount.Set(float64(len(info))) + time.Sleep(PromethusDefaultPeriod) } } @@ -282,36 +316,30 @@ func RunActiveConntrackCount(ctx context.Context) { return default: mutex.Lock() - // init Counts - activeFlowCountTcp.Set(0) - activeFlowCountUdp.Set(0) - activeFlowCountSctp.Set(0) - inActiveFlowCount.Set(0) - - // Total flow count - activeConntrackCount.Set(float64(len(ConntrackInfo))) - - for _, ct := range ConntrackInfo { - // TCP flow count - if ct.Proto == "tcp" { - activeFlowCountTcp.Inc() - } - // UDP flow count - if ct.Proto == "udp" { - activeFlowCountUdp.Inc() - } - // SCTP flow count - if ct.Proto == "sctp" { - activeFlowCountSctp.Inc() + info := make([]cmn.CtInfo, len(ConntrackInfo)) + copy(info, ConntrackInfo) + mutex.Unlock() + + tcpCount, udpCount, sctpCount, closedCount := 0, 0, 0, 0 + for _, ct := range info { + switch ct.Proto { + case "tcp": + tcpCount++ + case "udp": + udpCount++ + case "sctp": + sctpCount++ } - // Closed flow count if ct.CState == "closed" { - inActiveFlowCount.Inc() + closedCount++ } } - mutex.Unlock() + activeConntrackCount.Set(float64(len(info))) + activeFlowCountTcp.Set(float64(tcpCount)) + activeFlowCountUdp.Set(float64(udpCount)) + activeFlowCountSctp.Set(float64(sctpCount)) + inActiveFlowCount.Set(float64(closedCount)) } - time.Sleep(PromethusDefaultPeriod) } } @@ -322,24 +350,35 @@ func RunHostCount(ctx context.Context) { case <-ctx.Done(): return default: - mutex.Lock() - healthyHostCount.Set(0) - unHealthyHostCount.Set(0) - for _, ep := range EndPointInfo { - if ep.CurrState == "ok" { - healthyHostCount.Inc() - } - if ep.CurrState == "nok" { - unHealthyHostCount.Inc() - } + } + + mutex.Lock() + localEndPointInfo := EndPointInfo + mutex.Unlock() + + healthyHostCount.Set(0) + unHealthyHostCount.Set(0) + + for _, ep := range localEndPointInfo { + if ep.CurrState == "ok" { + healthyHostCount.Inc() + } else if ep.CurrState == "nok" { + unHealthyHostCount.Inc() } - mutex.Unlock() } time.Sleep(PromethusDefaultPeriod) } } +func parseConntrackKey(key ConntrackKey) (sip, sport, dip, dport, proto, serviceName string) { + parts := strings.Split(string(key), "|") + if len(parts) == 6 { + return parts[0], parts[1], parts[2], parts[3], parts[4], parts[5] + } + return "", "", "", "", "", "" +} + func RunProcessedStatistic(ctx context.Context) { for { select { @@ -347,25 +386,51 @@ func RunProcessedStatistic(ctx context.Context) { return default: mutex.Lock() - // Init Stats - processedPackets.Set(0) - processedBytes.Set(0) - processedTCPBytes.Set(0) - processedUDPBytes.Set(0) - processedSCTPBytes.Set(0) + localPrevConntrackStats := make(map[ConntrackKey]Stats, len(ConntrackStats)) for k, ct := range ConntrackStats { - if strings.Contains(string(k), "tcp") { - processedTCPBytes.Add(float64(ct.Bytes)) + localPrevConntrackStats[k] = ct + } + mutex.Unlock() + + for k, ct := range localPrevConntrackStats { + prevStats, exists := prevConntrackStats[k] + if !exists { + prevStats = Stats{Bytes: 0, Packets: 0} } - if strings.Contains(string(k), "udp") { - processedUDPBytes.Add(float64(ct.Bytes)) + diffBytes := ct.Bytes - prevStats.Bytes + diffPackets := ct.Packets - prevStats.Packets + + if diffBytes < 0 { + diffBytes = ct.Bytes } - if strings.Contains(string(k), "sctp") { - processedSCTPBytes.Add(float64(ct.Bytes)) + if diffPackets < 0 { + diffPackets = ct.Packets + } + + if diffBytes > 0 || diffPackets > 0 { + if strings.Contains(string(k), "tcp") { + processedTCPBytes.Add(float64(diffBytes)) + } else if strings.Contains(string(k), "udp") { + processedUDPBytes.Add(float64(diffBytes)) + } else if strings.Contains(string(k), "sctp") { + processedSCTPBytes.Add(float64(diffBytes)) + } + processedPackets.Add(float64(diffPackets)) + processedBytes.Add(float64(diffBytes)) + + // Update per-rule and per-endpoint metrics + _, _, dip, _, _, serviceName := parseConntrackKey(k) + lbRuleProcessedBytes.WithLabelValues(serviceName).Add(float64(diffBytes)) + lbRuleProcessedPackets.WithLabelValues(serviceName).Add(float64(diffPackets)) + + hostProcessedBytes.WithLabelValues(dip).Add(float64(diffBytes)) + hostProcessedPackets.WithLabelValues(dip).Add(float64(diffPackets)) + } - processedPackets.Add(float64(ct.Packets)) - processedBytes.Add(float64(ct.Bytes)) } + + mutex.Lock() + prevConntrackStats = localPrevConntrackStats mutex.Unlock() } @@ -381,16 +446,14 @@ func RunNewFlowCount(ctx context.Context) { return default: mutex.Lock() - // Total new flow count CurrentFlowCounts := len(ConntrackInfo) + mutex.Unlock() + diff := CurrentFlowCounts - PreFlowCounts if diff > 0 { - newFlowCount.Set(float64(diff)) - } else { - newFlowCount.Set(0) + newFlowCount.Add(float64(diff)) } PreFlowCounts = CurrentFlowCounts - mutex.Unlock() } time.Sleep(PromethusDefaultPeriod) @@ -403,11 +466,12 @@ func RunLcusCalculator(ctx context.Context) { case <-ctx.Done(): return default: - mutex.Lock() var LCUNewFlowCount = &dto.Metric{} var LCUActiveFlowCount = &dto.Metric{} var LCURuleCount = &dto.Metric{} var LCUProcessedBytes = &dto.Metric{} + + mutex.Lock() if err := newFlowCount.Write(LCUNewFlowCount); err != nil { tk.LogIt(tk.LogDebug, "[Prometheus] Error occur : %v\n", err) } @@ -420,16 +484,17 @@ func RunLcusCalculator(ctx context.Context) { if err := processedBytes.Write(LCUProcessedBytes); err != nil { tk.LogIt(tk.LogDebug, "[Prometheus] Error occur : %v\n", err) } + localConntrackStatsLen := len(ConntrackStats) + mutex.Unlock() + // LCU of accumulated Flow count = Flowcount / 2160000 // LCU of Rule = ruleCount/1000 // LCU of Byte = processedBytes(Gb)/1h - if LCURuleCount.Gauge.Value != nil && LCUProcessedBytes.Gauge.Value != nil { - consumedLcus.Set(float64(len(ConntrackStats))/2160000 + + if LCURuleCount.Gauge != nil && LCURuleCount.Gauge.Value != nil && LCUProcessedBytes.Gauge != nil && LCUProcessedBytes.Gauge.Value != nil { + consumedLcus.Set(float64(localConntrackStatsLen)/2160000 + *LCURuleCount.Gauge.Value/1000 + (*LCUProcessedBytes.Gauge.Value*8)/360000000000) // (byte * 8)/ (60*60*1G)/10 } - - mutex.Unlock() } time.Sleep(PromethusDefaultPeriod) }