Skip to content

Commit

Permalink
prometheus: Add ip labels to server metrics
Browse files Browse the repository at this point in the history
Add two labels grpc_server_ip and grpc_client_ip to the server metrics
when option WithServerIPLabelsEnabled() is set. These labels are not
added by default for compatibility.

Signed-off-by: diabloneo <[email protected]>
  • Loading branch information
diabloneo committed Nov 6, 2024
1 parent ba6f8b9 commit 219f211
Show file tree
Hide file tree
Showing 5 changed files with 374 additions and 44 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@ Additional great feature of interceptors is the fact we can chain those. For exa

```go mdox-exec="sed -n '136,151p' examples/server/main.go"
grpcSrv := grpc.NewServer(
grpc.StatsHandler(otelgrpc.NewServerHandler()),
grpc.ChainUnaryInterceptor(
// Order matters e.g. tracing interceptor have to create span first for the later exemplars to work.
otelgrpc.UnaryServerInterceptor(),
srvMetrics.UnaryServerInterceptor(grpcprom.WithExemplarFromContext(exemplarFromContext)),
logging.UnaryServerInterceptor(interceptorLogger(rpcLogger), logging.WithFieldsFromContext(logTraceID)),
selector.UnaryServerInterceptor(auth.UnaryServerInterceptor(authFn), selector.MatchFunc(allButHealthZ)),
recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
grpc.ChainStreamInterceptor(
otelgrpc.StreamServerInterceptor(),
srvMetrics.StreamServerInterceptor(grpcprom.WithExemplarFromContext(exemplarFromContext)),
logging.StreamServerInterceptor(interceptorLogger(rpcLogger), logging.WithFieldsFromContext(logTraceID)),
selector.StreamServerInterceptor(auth.StreamServerInterceptor(authFn), selector.MatchFunc(allButHealthZ)),
recovery.StreamServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
)
t := &testpb.TestPingService{}
```

This pattern offers clean and explicit shared functionality for all your gRPC methods. Full, buildable examples can be found in [examples](examples) directory.
Expand Down
81 changes: 62 additions & 19 deletions providers/prometheus/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@ package prometheus

import (
"context"
"net/netip"
"time"

"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors"
"github.com/prometheus/client_golang/prometheus"
grpcpeer "google.golang.org/grpc/peer"
)

type reporter struct {
clientMetrics *ClientMetrics
serverMetrics *ServerMetrics
clientMetrics *ClientMetrics
serverMetrics *ServerMetrics

grpcServerIP string
grpcClientIP string
typ interceptors.GRPCType
service, method string
kind Kind
Expand All @@ -28,9 +33,17 @@ func (r *reporter) PostCall(err error, rpcDuration time.Duration) {
// perform handling of metrics from code
switch r.kind {
case KindServer:
r.incrementWithExemplar(r.serverMetrics.serverHandledCounter, string(r.typ), r.service, r.method, code.String())
lvals := []string{string(r.typ), r.service, r.method, code.String()}
if r.serverMetrics.ipLabelsEnabled {
lvals = append(lvals, r.grpcServerIP, r.grpcClientIP)
}
r.incrementWithExemplar(r.serverMetrics.serverHandledCounter, lvals...)
if r.serverMetrics.serverHandledHistogram != nil {
r.observeWithExemplar(r.serverMetrics.serverHandledHistogram, rpcDuration.Seconds(), string(r.typ), r.service, r.method)
lvals = []string{string(r.typ), r.service, r.method}
if r.serverMetrics.ipLabelsEnabled {
lvals = append(lvals, r.grpcServerIP, r.grpcClientIP)
}
r.observeWithExemplar(r.serverMetrics.serverHandledHistogram, rpcDuration.Seconds(), lvals...)
}

case KindClient:
Expand All @@ -44,7 +57,11 @@ func (r *reporter) PostCall(err error, rpcDuration time.Duration) {
func (r *reporter) PostMsgSend(_ any, _ error, sendDuration time.Duration) {
switch r.kind {
case KindServer:
r.incrementWithExemplar(r.serverMetrics.serverStreamMsgSent, string(r.typ), r.service, r.method)
lvals := []string{string(r.typ), r.service, r.method}
if r.serverMetrics.ipLabelsEnabled {
lvals = append(lvals, r.grpcServerIP, r.grpcClientIP)
}
r.incrementWithExemplar(r.serverMetrics.serverStreamMsgSent, lvals...)
case KindClient:
r.incrementWithExemplar(r.clientMetrics.clientStreamMsgSent, string(r.typ), r.service, r.method)
if r.clientMetrics.clientStreamSendHistogram != nil {
Expand All @@ -56,7 +73,11 @@ func (r *reporter) PostMsgSend(_ any, _ error, sendDuration time.Duration) {
func (r *reporter) PostMsgReceive(_ any, _ error, recvDuration time.Duration) {
switch r.kind {
case KindServer:
r.incrementWithExemplar(r.serverMetrics.serverStreamMsgReceived, string(r.typ), r.service, r.method)
lvals := []string{string(r.typ), r.service, r.method}
if r.serverMetrics.ipLabelsEnabled {
lvals = append(lvals, r.grpcServerIP, r.grpcClientIP)
}
r.incrementWithExemplar(r.serverMetrics.serverStreamMsgReceived, lvals...)
case KindClient:
r.incrementWithExemplar(r.clientMetrics.clientStreamMsgReceived, string(r.typ), r.service, r.method)
if r.clientMetrics.clientStreamRecvHistogram != nil {
Expand All @@ -65,6 +86,14 @@ func (r *reporter) PostMsgReceive(_ any, _ error, recvDuration time.Duration) {
}
}

func (r *reporter) incrementWithExemplar(c *prometheus.CounterVec, lvals ...string) {
c.WithLabelValues(lvals...).(prometheus.ExemplarAdder).AddWithExemplar(1, r.exemplar)
}

func (r *reporter) observeWithExemplar(h *prometheus.HistogramVec, value float64, lvals ...string) {
h.WithLabelValues(lvals...).(prometheus.ExemplarObserver).ObserveWithExemplar(value, r.exemplar)
}

type reportable struct {
clientMetrics *ClientMetrics
serverMetrics *ServerMetrics
Expand All @@ -86,10 +115,11 @@ func (rep *reportable) reporter(ctx context.Context, sm *ServerMetrics, cm *Clie
r := &reporter{
clientMetrics: cm,
serverMetrics: sm,
typ: meta.Typ,
service: meta.Service,
method: meta.Method,
kind: kind,

typ: meta.Typ,
service: meta.Service,
method: meta.Method,
kind: kind,
}
if c.exemplarFn != nil {
r.exemplar = c.exemplarFn(ctx)
Expand All @@ -99,15 +129,28 @@ func (rep *reportable) reporter(ctx context.Context, sm *ServerMetrics, cm *Clie
case KindClient:
r.incrementWithExemplar(r.clientMetrics.clientStartedCounter, string(r.typ), r.service, r.method)
case KindServer:
r.incrementWithExemplar(r.serverMetrics.serverStartedCounter, string(r.typ), r.service, r.method)

lvals := []string{string(r.typ), r.service, r.method}
if r.serverMetrics.ipLabelsEnabled {
if peer, ok := grpcpeer.FromContext(ctx); ok {
// Fallback to net.Addr.String() when ParseAddrPort failed, because it already contains
// necessary information to be added to the label and we

// This is server side, so LocalAddr is server's address.
if addrPort, e := netip.ParseAddrPort(peer.LocalAddr.String()); e != nil {
r.grpcServerIP = peer.LocalAddr.String()
} else {
r.grpcServerIP = addrPort.Addr().String()
}
if addrPort, e := netip.ParseAddrPort(peer.Addr.String()); e != nil {
r.grpcClientIP = peer.Addr.String()
} else {
r.grpcClientIP = addrPort.Addr().String()
}
}
lvals = append(lvals, r.grpcServerIP, r.grpcClientIP)
}
r.incrementWithExemplar(r.serverMetrics.serverStartedCounter, lvals...)
}
return r, ctx
}

func (r *reporter) incrementWithExemplar(c *prometheus.CounterVec, lvals ...string) {
c.WithLabelValues(lvals...).(prometheus.ExemplarAdder).AddWithExemplar(1, r.exemplar)
}

func (r *reporter) observeWithExemplar(h *prometheus.HistogramVec, value float64, lvals ...string) {
h.WithLabelValues(lvals...).(prometheus.ExemplarObserver).ObserveWithExemplar(value, r.exemplar)
}
49 changes: 38 additions & 11 deletions providers/prometheus/server_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
// ServerMetrics represents a collection of metrics to be registered on a
// Prometheus metrics registry for a gRPC server.
type ServerMetrics struct {
ipLabelsEnabled bool
serverStartedCounter *prometheus.CounterVec
serverHandledCounter *prometheus.CounterVec
serverStreamMsgReceived *prometheus.CounterVec
Expand All @@ -27,29 +28,43 @@ type ServerMetrics struct {
func NewServerMetrics(opts ...ServerMetricsOption) *ServerMetrics {
var config serverMetricsConfig
config.apply(opts)
return &ServerMetrics{

addIPLables := func(orig []string) []string {
if config.ipLabelsEnabled {
return serverMetricAddIPLabel(orig)
}
return orig
}

sm := &ServerMetrics{
ipLabelsEnabled: config.ipLabelsEnabled,
serverStartedCounter: prometheus.NewCounterVec(
config.counterOpts.apply(prometheus.CounterOpts{
Name: "grpc_server_started_total",
Help: "Total number of RPCs started on the server.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
}), addIPLables([]string{"grpc_type", "grpc_service", "grpc_method"})),
serverHandledCounter: prometheus.NewCounterVec(
config.counterOpts.apply(prometheus.CounterOpts{
Name: "grpc_server_handled_total",
Help: "Total number of RPCs completed on the server, regardless of success or failure.",
}), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
}), addIPLables([]string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"})),
serverStreamMsgReceived: prometheus.NewCounterVec(
config.counterOpts.apply(prometheus.CounterOpts{
Name: "grpc_server_msg_received_total",
Help: "Total number of RPC stream messages received on the server.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
}), addIPLables([]string{"grpc_type", "grpc_service", "grpc_method"})),
serverStreamMsgSent: prometheus.NewCounterVec(
config.counterOpts.apply(prometheus.CounterOpts{
Name: "grpc_server_msg_sent_total",
Help: "Total number of gRPC stream messages sent by the server.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
serverHandledHistogram: config.serverHandledHistogram,
}), addIPLables([]string{"grpc_type", "grpc_service", "grpc_method"})),
}
if config.serverHandledHistogramEnabled {
sm.serverHandledHistogram = newServerHandlingTimeHistogram(
config.ipLabelsEnabled, config.serverHandledHistogramOptions)
}

return sm
}

// Describe sends the super-set of all possible descriptors of metrics
Expand Down Expand Up @@ -95,15 +110,27 @@ func (m *ServerMetrics) InitializeMetrics(server reflection.ServiceInfoProvider)
func (m *ServerMetrics) preRegisterMethod(serviceName string, mInfo *grpc.MethodInfo) {
methodName := mInfo.Name
methodType := string(typeFromMethodInfo(mInfo))

lvals := []string{methodType, serviceName, methodName}
if m.ipLabelsEnabled {
// Because netip.Addr.String() returns "invalid IP" for zero Addr,
// we use this value with grpc_server and grpc_client.
lvals = append(lvals, "invalid IP", "invalid IP")
}
// These are just references (no increments), as just referencing will create the labels but not set values.
_, _ = m.serverStartedCounter.GetMetricWithLabelValues(methodType, serviceName, methodName)
_, _ = m.serverStreamMsgReceived.GetMetricWithLabelValues(methodType, serviceName, methodName)
_, _ = m.serverStreamMsgSent.GetMetricWithLabelValues(methodType, serviceName, methodName)
_, _ = m.serverStartedCounter.GetMetricWithLabelValues(lvals...)
_, _ = m.serverStreamMsgReceived.GetMetricWithLabelValues(lvals...)
_, _ = m.serverStreamMsgSent.GetMetricWithLabelValues(lvals...)
if m.serverHandledHistogram != nil {
_, _ = m.serverHandledHistogram.GetMetricWithLabelValues(methodType, serviceName, methodName)
_, _ = m.serverHandledHistogram.GetMetricWithLabelValues(lvals...)
}

for _, code := range interceptors.AllCodes {
_, _ = m.serverHandledCounter.GetMetricWithLabelValues(methodType, serviceName, methodName, code.String())
lvals = []string{methodType, serviceName, methodName, code.String()}
if m.ipLabelsEnabled {
lvals = append(lvals, "invalid IP", "invalid IP")
}
_, _ = m.serverHandledCounter.GetMetricWithLabelValues(lvals...)
}
}

Expand Down
45 changes: 35 additions & 10 deletions providers/prometheus/server_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,23 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

func serverMetricAddIPLabel(orig []string) []string {
return append(orig, "grpc_server_ip", "grpc_client_ip")
}

type exemplarFromCtxFn func(ctx context.Context) prometheus.Labels

type serverMetricsConfig struct {
// ipLabelsEnabled control whether to add grpc_server and grpc_client labels to metrics.
ipLabelsEnabled bool

counterOpts counterOptions
// serverHandledHistogram can be nil.
serverHandledHistogram *prometheus.HistogramVec

serverHandledHistogramEnabled bool
serverHandledHistogramOptions []HistogramOption
}

// ServerMetricsOption configures how we set up the server metrics.
type ServerMetricsOption func(*serverMetricsConfig)

func (c *serverMetricsConfig) apply(opts []ServerMetricsOption) {
Expand All @@ -32,17 +41,33 @@ func WithServerCounterOptions(opts ...CounterOption) ServerMetricsOption {
}
}

func newServerHandlingTimeHistogram(ipLabelsEnabled bool, opts []HistogramOption) *prometheus.HistogramVec {
labels := []string{"grpc_type", "grpc_service", "grpc_method"}
if ipLabelsEnabled {
labels = serverMetricAddIPLabel(labels)
}
return prometheus.NewHistogramVec(
histogramOptions(opts).apply(prometheus.HistogramOpts{
Name: "grpc_server_handling_seconds",
Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.",
Buckets: prometheus.DefBuckets,
}),
labels,
)
}

// WithServerHandlingTimeHistogram turns on recording of handling time of RPCs.
// Histogram metrics can be very expensive for Prometheus to retain and query.
func WithServerHandlingTimeHistogram(opts ...HistogramOption) ServerMetricsOption {
return func(o *serverMetricsConfig) {
o.serverHandledHistogram = prometheus.NewHistogramVec(
histogramOptions(opts).apply(prometheus.HistogramOpts{
Name: "grpc_server_handling_seconds",
Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.",
Buckets: prometheus.DefBuckets,
}),
[]string{"grpc_type", "grpc_service", "grpc_method"},
)
o.serverHandledHistogramEnabled = true
o.serverHandledHistogramOptions = opts
}
}

// WithServerIPLabelsEnabled enables adding grpc_server and grpc_client labels to metrics.
func WithServerIPLabelsEnabled() ServerMetricsOption {
return func(o *serverMetricsConfig) {
o.ipLabelsEnabled = true
}
}
Loading

0 comments on commit 219f211

Please sign in to comment.