Skip to content

Commit

Permalink
Adding some more meaningful metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
blaubaer committed Jun 29, 2020
1 parent fe115a6 commit 4ea617e
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 17 deletions.
5 changes: 4 additions & 1 deletion context/metrics.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package context

type MetricsCollector interface {
Collect(*Context)
CollectContext(*Context)

CollectClientStarted() func()
CollectUpstreamStarted() func()
}
74 changes: 63 additions & 11 deletions lingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"net"
"net/http"
"strings"
"sync/atomic"
"time"
)

Expand All @@ -39,6 +40,12 @@ type Lingress struct {

type accessLogEntry map[string]interface{}

type ConnectionStates struct {
New uint64
Active uint64
Idle uint64
}

func New(fps support.FileProviders) (*Lingress, error) {
if r, err := rules.NewRepository(); err != nil {
return nil, err
Expand All @@ -49,38 +56,40 @@ func New(fps support.FileProviders) (*Lingress, error) {
} else if m, err := management.New(r); err != nil {
return nil, err
} else {
result := Lingress{
result := &Lingress{
RulesRepository: r,
Proxy: p,
Fallback: f,
Management: m,
http: http.Server{
Handler: p,
ErrorLog: support.StdLog(log.Fields{
"context": "server.http",
}, log.DebugLevel),
},
https: http.Server{
Handler: p,
ErrorLog: support.StdLog(log.Fields{
"context": "server.https",
}, log.DebugLevel),
},
HttpListenAddr: ":8080",
HttpsListenAddr: ":8443",
MaxHeaderBytes: 2 << 20, // 2MB,
ReadHeaderTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 5 * time.Minute,

HttpListenAddr: ":8080",
HttpsListenAddr: ":8443",
MaxHeaderBytes: 2 << 20, // 2MB,
ReadHeaderTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 5 * time.Minute,
AccessLogQueueSize: 5000,
}

result.http.Handler = result
result.https.Handler = result
result.http.ConnState = result.onConnState
result.https.ConnState = result.onConnState

p.ResultHandler = result.onResult
p.AccessLogger = result.onAccessLog
p.MetricsCollector = result.Management

return &result, nil
return result, nil
}
}

Expand Down Expand Up @@ -129,6 +138,48 @@ func (instance *Lingress) RegisterFlag(fe support.FlagEnabled, appPrefix string)
return nil
}

func (instance *Lingress) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
finalize := instance.Management.CollectClientStarted()
defer finalize()

instance.Proxy.ServeHTTP(resp, req)
}

func (instance *Lingress) onConnState(conn net.Conn, state http.ConnState) {
previous := SetConnState(conn, state)
if previous == state {
return
}

source := instance.Management.Metrics.Client.Connections.Source

switch previous {
case http.StateNew:
atomic.AddUint64(&source.New, ^uint64(0))
case http.StateActive:
atomic.AddUint64(&source.Active, ^uint64(0))
case http.StateIdle:
atomic.AddUint64(&source.Idle, ^uint64(0))
case -1:
// Ignore
default:
return
}

switch state {
case http.StateNew:
atomic.AddUint64(&source.New, 1)
atomic.AddUint64(&source.Current, 1)
atomic.AddUint64(&source.Total, 1)
case http.StateActive:
atomic.AddUint64(&source.Active, 1)
case http.StateIdle:
atomic.AddUint64(&source.Idle, 1)
default:
atomic.AddUint64(&source.Current, ^uint64(0))
}
}

func (instance *Lingress) Init(stop support.Channel) error {
if err := instance.RulesRepository.Init(stop); err != nil {
return err
Expand Down Expand Up @@ -178,6 +229,7 @@ func (instance *Lingress) serve(target *http.Server, addr string, tlsConfig *tls
return err
}
ln = tcpKeepAliveListener{ln.(*net.TCPListener)}
ln = stateTrackingListener{ln}

serve := func() error {
return target.Serve(ln)
Expand Down
12 changes: 10 additions & 2 deletions management/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,16 @@ func (instance *Management) RegisterFlag(fe support.FlagEnabled, appPrefix strin
return nil
}

func (instance *Management) Collect(ctx *lctx.Context) {
instance.Metrics.Collect(ctx)
func (instance *Management) CollectContext(ctx *lctx.Context) {
instance.Metrics.CollectContext(ctx)
}

func (instance *Management) CollectClientStarted() func() {
return instance.Metrics.CollectClientStarted()
}

func (instance *Management) CollectUpstreamStarted() func() {
return instance.Metrics.CollectUpstreamStarted()
}

func (instance *Management) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
Expand Down
117 changes: 115 additions & 2 deletions management/metics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"net/http"
"strconv"
"sync/atomic"
)

var (
Expand All @@ -30,7 +31,8 @@ type Metrics struct {
}

type ClientMetrics struct {
Request *RequestMetrics
Request *RequestMetrics
Connections *ConnectionMetrics
}

type UpstreamMetrics struct {
Expand All @@ -47,10 +49,40 @@ type RulesMetrics struct {
type RequestMetrics struct {
DurationSeconds *prometheus.HistogramVec
Total *prometheus.CounterVec

Current prometheus.GaugeFunc

Source *RequestStates
}

type RequestStates struct {
Current uint64
}

type ConnectionMetrics struct {
New prometheus.GaugeFunc
Active prometheus.GaugeFunc
Idle prometheus.GaugeFunc

Current prometheus.GaugeFunc
Total prometheus.GaugeFunc

Source *ConnectionStates
}

type ConnectionStates struct {
New uint64
Active uint64
Idle uint64

Current uint64
Total uint64
}

func NewMetrics(rulesRepository rules.Repository) *Metrics {
registry := prometheus.NewRegistry()
//registry.MustRegister(prometheus.NewGoCollector())
//registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))

return &Metrics{
Client: NewClientMetrics(registry),
Expand All @@ -63,23 +95,87 @@ func NewMetrics(rulesRepository rules.Repository) *Metrics {
}

func NewRequestMetrics(registerer prometheus.Registerer, variant string, buckets []float64) *RequestMetrics {
source := &RequestStates{}

loadValue := func(of *uint64) func() float64 {
return func() float64 {
return float64(atomic.LoadUint64(of))
}
}

return &RequestMetrics{
Source: source,

DurationSeconds: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "lingress",
Subsystem: variant + "_requests",
Name: "duration_seconds",
Help: "Duration in seconds per request of " + variant + "s.",
Buckets: buckets,
}, MetricsLabelNames),

Total: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "lingress",
Subsystem: variant + "_requests",
Name: "total",
Help: "Amount of requests of " + variant + "s.",
}, MetricsLabelNames),

Current: promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "lingress",
Subsystem: variant + "_requests",
Name: "current",
Help: "Amount of current connected requests of " + variant + "s.",
}, loadValue(&source.Current)),
}
}

func NewConnectionMetrics(registerer prometheus.Registerer, variant string) *ConnectionMetrics {
result := &ConnectionMetrics{
Source: &ConnectionStates{},
}

loadValue := func(of *uint64) func() float64 {
return func() float64 {
return float64(atomic.LoadUint64(of))
}
}

result.New = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "lingress",
Subsystem: variant + "_connections",
Name: "new",
Help: "Amount of new connections of " + variant + "s.",
}, loadValue(&result.Source.New))
result.Active = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "lingress",
Subsystem: variant + "_connections",
Name: "active",
Help: "Amount of active connections of " + variant + "s.",
}, loadValue(&result.Source.Active))
result.Idle = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "lingress",
Subsystem: variant + "_connections",
Name: "idle",
Help: "Amount of idle connections of " + variant + "s.",
}, loadValue(&result.Source.Idle))

result.Current = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "lingress",
Subsystem: variant + "_connections",
Name: "current",
Help: "Amount of current connected connections of " + variant + "s.",
}, loadValue(&result.Source.Current))
result.Total = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "lingress",
Subsystem: variant + "_connections",
Name: "total",
Help: "Amount of total ever connections of " + variant + "s.",
}, loadValue(&result.Source.Total))

return result
}

func NewClientMetrics(registerer prometheus.Registerer) *ClientMetrics {
return &ClientMetrics{
Request: NewRequestMetrics(registerer, "client", []float64{
Expand All @@ -89,6 +185,7 @@ func NewClientMetrics(registerer prometheus.Registerer) *ClientMetrics {
1,
10,
}),
Connections: NewConnectionMetrics(registerer, "client"),
}
}

Expand Down Expand Up @@ -130,7 +227,7 @@ func (instance *Metrics) ServeHTTP(resp http.ResponseWriter, req *http.Request)
instance.Handler.ServeHTTP(resp, req)
}

func (instance *Metrics) Collect(ctx *context.Context) {
func (instance *Metrics) CollectContext(ctx *context.Context) {
labels := instance.labelsFor(ctx)
if v := ctx.Client.Duration; v > -1 {
instance.Client.Request.DurationSeconds.With(labels).Observe(v.Seconds())
Expand All @@ -143,6 +240,22 @@ func (instance *Metrics) Collect(ctx *context.Context) {
}
}

func (instance *Metrics) CollectClientStarted() func() {
source := instance.Client.Request.Source
atomic.AddUint64(&source.Current, 1)
return func() {
atomic.AddUint64(&source.Current, ^uint64(0))
}
}

func (instance *Metrics) CollectUpstreamStarted() func() {
source := instance.Upstream.Request.Source
atomic.AddUint64(&source.Current, 1)
return func() {
atomic.AddUint64(&source.Current, ^uint64(0))
}
}

func (instance *Metrics) labelsFor(ctx *context.Context) prometheus.Labels {
result := prometheus.Labels{
"client_status": "none",
Expand Down
7 changes: 6 additions & 1 deletion proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (instance *Proxy) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
r.Statistics().MarkUsed(ctx.Client.Duration)
}
if mc := instance.MetricsCollector; mc != nil {
mc.Collect(ctx)
mc.CollectContext(ctx)
}
_, _ = instance.switchStageAndCallInterceptors(lctx.StageDone, ctx)
if al := instance.AccessLogger; al != nil {
Expand Down Expand Up @@ -296,6 +296,11 @@ func (instance *Proxy) createBackendRequestFor(ctx *lctx.Context, r rules.Rule)
}

func (instance *Proxy) execute(ctx *lctx.Context) error {
if mc := instance.MetricsCollector; mc != nil {
finalize := mc.CollectUpstreamStarted()
defer finalize()
}

ctx.Client.Status = 0
ctx.Upstream.Status = 0
ctx.Upstream.Started = time.Now()
Expand Down
Loading

0 comments on commit 4ea617e

Please sign in to comment.