From 219f21157e85267383292f698567e72839e19efb Mon Sep 17 00:00:00 2001 From: diabloneo Date: Wed, 6 Nov 2024 14:55:06 +0800 Subject: [PATCH] prometheus: Add ip labels to server metrics Add two labels grpc_server_ip and grpc_client_ip to the server metrics when option WithServerIPLabelsEnabled() is set. These labels are not added by default for compatibility. Signed-off-by: diabloneo --- README.md | 6 +- providers/prometheus/reporter.go | 81 +++++++-- providers/prometheus/server_metrics.go | 49 +++-- providers/prometheus/server_options.go | 45 +++-- providers/prometheus/server_test.go | 237 ++++++++++++++++++++++++- 5 files changed, 374 insertions(+), 44 deletions(-) diff --git a/README.md b/README.md index 8c4ac16bc..dcc84281e 100644 --- a/README.md +++ b/README.md @@ -18,21 +18,21 @@ Additional great feature of interceptors is the fact we can chain those. For exa ```go mdox-exec="sed -n '136,151p' examples/server/main.go" grpcSrv := grpc.NewServer( + grpc.StatsHandler(otelgrpc.NewServerHandler()), grpc.ChainUnaryInterceptor( - // Order matters e.g. tracing interceptor have to create span first for the later exemplars to work. - otelgrpc.UnaryServerInterceptor(), srvMetrics.UnaryServerInterceptor(grpcprom.WithExemplarFromContext(exemplarFromContext)), logging.UnaryServerInterceptor(interceptorLogger(rpcLogger), logging.WithFieldsFromContext(logTraceID)), selector.UnaryServerInterceptor(auth.UnaryServerInterceptor(authFn), selector.MatchFunc(allButHealthZ)), recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)), ), grpc.ChainStreamInterceptor( - otelgrpc.StreamServerInterceptor(), srvMetrics.StreamServerInterceptor(grpcprom.WithExemplarFromContext(exemplarFromContext)), logging.StreamServerInterceptor(interceptorLogger(rpcLogger), logging.WithFieldsFromContext(logTraceID)), selector.StreamServerInterceptor(auth.StreamServerInterceptor(authFn), selector.MatchFunc(allButHealthZ)), recovery.StreamServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)), ), + ) + t := &testpb.TestPingService{} ``` This pattern offers clean and explicit shared functionality for all your gRPC methods. Full, buildable examples can be found in [examples](examples) directory. diff --git a/providers/prometheus/reporter.go b/providers/prometheus/reporter.go index 96c49ad93..74a8e1580 100644 --- a/providers/prometheus/reporter.go +++ b/providers/prometheus/reporter.go @@ -5,15 +5,20 @@ package prometheus import ( "context" + "net/netip" "time" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors" "github.com/prometheus/client_golang/prometheus" + grpcpeer "google.golang.org/grpc/peer" ) type reporter struct { - clientMetrics *ClientMetrics - serverMetrics *ServerMetrics + clientMetrics *ClientMetrics + serverMetrics *ServerMetrics + + grpcServerIP string + grpcClientIP string typ interceptors.GRPCType service, method string kind Kind @@ -28,9 +33,17 @@ func (r *reporter) PostCall(err error, rpcDuration time.Duration) { // perform handling of metrics from code switch r.kind { case KindServer: - r.incrementWithExemplar(r.serverMetrics.serverHandledCounter, string(r.typ), r.service, r.method, code.String()) + lvals := []string{string(r.typ), r.service, r.method, code.String()} + if r.serverMetrics.ipLabelsEnabled { + lvals = append(lvals, r.grpcServerIP, r.grpcClientIP) + } + r.incrementWithExemplar(r.serverMetrics.serverHandledCounter, lvals...) if r.serverMetrics.serverHandledHistogram != nil { - r.observeWithExemplar(r.serverMetrics.serverHandledHistogram, rpcDuration.Seconds(), string(r.typ), r.service, r.method) + lvals = []string{string(r.typ), r.service, r.method} + if r.serverMetrics.ipLabelsEnabled { + lvals = append(lvals, r.grpcServerIP, r.grpcClientIP) + } + r.observeWithExemplar(r.serverMetrics.serverHandledHistogram, rpcDuration.Seconds(), lvals...) } case KindClient: @@ -44,7 +57,11 @@ func (r *reporter) PostCall(err error, rpcDuration time.Duration) { func (r *reporter) PostMsgSend(_ any, _ error, sendDuration time.Duration) { switch r.kind { case KindServer: - r.incrementWithExemplar(r.serverMetrics.serverStreamMsgSent, string(r.typ), r.service, r.method) + lvals := []string{string(r.typ), r.service, r.method} + if r.serverMetrics.ipLabelsEnabled { + lvals = append(lvals, r.grpcServerIP, r.grpcClientIP) + } + r.incrementWithExemplar(r.serverMetrics.serverStreamMsgSent, lvals...) case KindClient: r.incrementWithExemplar(r.clientMetrics.clientStreamMsgSent, string(r.typ), r.service, r.method) if r.clientMetrics.clientStreamSendHistogram != nil { @@ -56,7 +73,11 @@ func (r *reporter) PostMsgSend(_ any, _ error, sendDuration time.Duration) { func (r *reporter) PostMsgReceive(_ any, _ error, recvDuration time.Duration) { switch r.kind { case KindServer: - r.incrementWithExemplar(r.serverMetrics.serverStreamMsgReceived, string(r.typ), r.service, r.method) + lvals := []string{string(r.typ), r.service, r.method} + if r.serverMetrics.ipLabelsEnabled { + lvals = append(lvals, r.grpcServerIP, r.grpcClientIP) + } + r.incrementWithExemplar(r.serverMetrics.serverStreamMsgReceived, lvals...) case KindClient: r.incrementWithExemplar(r.clientMetrics.clientStreamMsgReceived, string(r.typ), r.service, r.method) if r.clientMetrics.clientStreamRecvHistogram != nil { @@ -65,6 +86,14 @@ func (r *reporter) PostMsgReceive(_ any, _ error, recvDuration time.Duration) { } } +func (r *reporter) incrementWithExemplar(c *prometheus.CounterVec, lvals ...string) { + c.WithLabelValues(lvals...).(prometheus.ExemplarAdder).AddWithExemplar(1, r.exemplar) +} + +func (r *reporter) observeWithExemplar(h *prometheus.HistogramVec, value float64, lvals ...string) { + h.WithLabelValues(lvals...).(prometheus.ExemplarObserver).ObserveWithExemplar(value, r.exemplar) +} + type reportable struct { clientMetrics *ClientMetrics serverMetrics *ServerMetrics @@ -86,10 +115,11 @@ func (rep *reportable) reporter(ctx context.Context, sm *ServerMetrics, cm *Clie r := &reporter{ clientMetrics: cm, serverMetrics: sm, - typ: meta.Typ, - service: meta.Service, - method: meta.Method, - kind: kind, + + typ: meta.Typ, + service: meta.Service, + method: meta.Method, + kind: kind, } if c.exemplarFn != nil { r.exemplar = c.exemplarFn(ctx) @@ -99,15 +129,28 @@ func (rep *reportable) reporter(ctx context.Context, sm *ServerMetrics, cm *Clie case KindClient: r.incrementWithExemplar(r.clientMetrics.clientStartedCounter, string(r.typ), r.service, r.method) case KindServer: - r.incrementWithExemplar(r.serverMetrics.serverStartedCounter, string(r.typ), r.service, r.method) + + lvals := []string{string(r.typ), r.service, r.method} + if r.serverMetrics.ipLabelsEnabled { + if peer, ok := grpcpeer.FromContext(ctx); ok { + // Fallback to net.Addr.String() when ParseAddrPort failed, because it already contains + // necessary information to be added to the label and we + + // This is server side, so LocalAddr is server's address. + if addrPort, e := netip.ParseAddrPort(peer.LocalAddr.String()); e != nil { + r.grpcServerIP = peer.LocalAddr.String() + } else { + r.grpcServerIP = addrPort.Addr().String() + } + if addrPort, e := netip.ParseAddrPort(peer.Addr.String()); e != nil { + r.grpcClientIP = peer.Addr.String() + } else { + r.grpcClientIP = addrPort.Addr().String() + } + } + lvals = append(lvals, r.grpcServerIP, r.grpcClientIP) + } + r.incrementWithExemplar(r.serverMetrics.serverStartedCounter, lvals...) } return r, ctx } - -func (r *reporter) incrementWithExemplar(c *prometheus.CounterVec, lvals ...string) { - c.WithLabelValues(lvals...).(prometheus.ExemplarAdder).AddWithExemplar(1, r.exemplar) -} - -func (r *reporter) observeWithExemplar(h *prometheus.HistogramVec, value float64, lvals ...string) { - h.WithLabelValues(lvals...).(prometheus.ExemplarObserver).ObserveWithExemplar(value, r.exemplar) -} diff --git a/providers/prometheus/server_metrics.go b/providers/prometheus/server_metrics.go index def21e5bf..65ac83482 100644 --- a/providers/prometheus/server_metrics.go +++ b/providers/prometheus/server_metrics.go @@ -13,6 +13,7 @@ import ( // ServerMetrics represents a collection of metrics to be registered on a // Prometheus metrics registry for a gRPC server. type ServerMetrics struct { + ipLabelsEnabled bool serverStartedCounter *prometheus.CounterVec serverHandledCounter *prometheus.CounterVec serverStreamMsgReceived *prometheus.CounterVec @@ -27,29 +28,43 @@ type ServerMetrics struct { func NewServerMetrics(opts ...ServerMetricsOption) *ServerMetrics { var config serverMetricsConfig config.apply(opts) - return &ServerMetrics{ + + addIPLables := func(orig []string) []string { + if config.ipLabelsEnabled { + return serverMetricAddIPLabel(orig) + } + return orig + } + + sm := &ServerMetrics{ + ipLabelsEnabled: config.ipLabelsEnabled, serverStartedCounter: prometheus.NewCounterVec( config.counterOpts.apply(prometheus.CounterOpts{ Name: "grpc_server_started_total", Help: "Total number of RPCs started on the server.", - }), []string{"grpc_type", "grpc_service", "grpc_method"}), + }), addIPLables([]string{"grpc_type", "grpc_service", "grpc_method"})), serverHandledCounter: prometheus.NewCounterVec( config.counterOpts.apply(prometheus.CounterOpts{ Name: "grpc_server_handled_total", Help: "Total number of RPCs completed on the server, regardless of success or failure.", - }), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}), + }), addIPLables([]string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"})), serverStreamMsgReceived: prometheus.NewCounterVec( config.counterOpts.apply(prometheus.CounterOpts{ Name: "grpc_server_msg_received_total", Help: "Total number of RPC stream messages received on the server.", - }), []string{"grpc_type", "grpc_service", "grpc_method"}), + }), addIPLables([]string{"grpc_type", "grpc_service", "grpc_method"})), serverStreamMsgSent: prometheus.NewCounterVec( config.counterOpts.apply(prometheus.CounterOpts{ Name: "grpc_server_msg_sent_total", Help: "Total number of gRPC stream messages sent by the server.", - }), []string{"grpc_type", "grpc_service", "grpc_method"}), - serverHandledHistogram: config.serverHandledHistogram, + }), addIPLables([]string{"grpc_type", "grpc_service", "grpc_method"})), + } + if config.serverHandledHistogramEnabled { + sm.serverHandledHistogram = newServerHandlingTimeHistogram( + config.ipLabelsEnabled, config.serverHandledHistogramOptions) } + + return sm } // Describe sends the super-set of all possible descriptors of metrics @@ -95,15 +110,27 @@ func (m *ServerMetrics) InitializeMetrics(server reflection.ServiceInfoProvider) func (m *ServerMetrics) preRegisterMethod(serviceName string, mInfo *grpc.MethodInfo) { methodName := mInfo.Name methodType := string(typeFromMethodInfo(mInfo)) + + lvals := []string{methodType, serviceName, methodName} + if m.ipLabelsEnabled { + // Because netip.Addr.String() returns "invalid IP" for zero Addr, + // we use this value with grpc_server and grpc_client. + lvals = append(lvals, "invalid IP", "invalid IP") + } // These are just references (no increments), as just referencing will create the labels but not set values. - _, _ = m.serverStartedCounter.GetMetricWithLabelValues(methodType, serviceName, methodName) - _, _ = m.serverStreamMsgReceived.GetMetricWithLabelValues(methodType, serviceName, methodName) - _, _ = m.serverStreamMsgSent.GetMetricWithLabelValues(methodType, serviceName, methodName) + _, _ = m.serverStartedCounter.GetMetricWithLabelValues(lvals...) + _, _ = m.serverStreamMsgReceived.GetMetricWithLabelValues(lvals...) + _, _ = m.serverStreamMsgSent.GetMetricWithLabelValues(lvals...) if m.serverHandledHistogram != nil { - _, _ = m.serverHandledHistogram.GetMetricWithLabelValues(methodType, serviceName, methodName) + _, _ = m.serverHandledHistogram.GetMetricWithLabelValues(lvals...) } + for _, code := range interceptors.AllCodes { - _, _ = m.serverHandledCounter.GetMetricWithLabelValues(methodType, serviceName, methodName, code.String()) + lvals = []string{methodType, serviceName, methodName, code.String()} + if m.ipLabelsEnabled { + lvals = append(lvals, "invalid IP", "invalid IP") + } + _, _ = m.serverHandledCounter.GetMetricWithLabelValues(lvals...) } } diff --git a/providers/prometheus/server_options.go b/providers/prometheus/server_options.go index 39d422042..ed0e584a5 100644 --- a/providers/prometheus/server_options.go +++ b/providers/prometheus/server_options.go @@ -9,14 +9,23 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +func serverMetricAddIPLabel(orig []string) []string { + return append(orig, "grpc_server_ip", "grpc_client_ip") +} + type exemplarFromCtxFn func(ctx context.Context) prometheus.Labels type serverMetricsConfig struct { + // ipLabelsEnabled control whether to add grpc_server and grpc_client labels to metrics. + ipLabelsEnabled bool + counterOpts counterOptions - // serverHandledHistogram can be nil. - serverHandledHistogram *prometheus.HistogramVec + + serverHandledHistogramEnabled bool + serverHandledHistogramOptions []HistogramOption } +// ServerMetricsOption configures how we set up the server metrics. type ServerMetricsOption func(*serverMetricsConfig) func (c *serverMetricsConfig) apply(opts []ServerMetricsOption) { @@ -32,17 +41,33 @@ func WithServerCounterOptions(opts ...CounterOption) ServerMetricsOption { } } +func newServerHandlingTimeHistogram(ipLabelsEnabled bool, opts []HistogramOption) *prometheus.HistogramVec { + labels := []string{"grpc_type", "grpc_service", "grpc_method"} + if ipLabelsEnabled { + labels = serverMetricAddIPLabel(labels) + } + return prometheus.NewHistogramVec( + histogramOptions(opts).apply(prometheus.HistogramOpts{ + Name: "grpc_server_handling_seconds", + Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.", + Buckets: prometheus.DefBuckets, + }), + labels, + ) +} + // WithServerHandlingTimeHistogram turns on recording of handling time of RPCs. // Histogram metrics can be very expensive for Prometheus to retain and query. func WithServerHandlingTimeHistogram(opts ...HistogramOption) ServerMetricsOption { return func(o *serverMetricsConfig) { - o.serverHandledHistogram = prometheus.NewHistogramVec( - histogramOptions(opts).apply(prometheus.HistogramOpts{ - Name: "grpc_server_handling_seconds", - Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.", - Buckets: prometheus.DefBuckets, - }), - []string{"grpc_type", "grpc_service", "grpc_method"}, - ) + o.serverHandledHistogramEnabled = true + o.serverHandledHistogramOptions = opts + } +} + +// WithServerIPLabelsEnabled enables adding grpc_server and grpc_client labels to metrics. +func WithServerIPLabelsEnabled() ServerMetricsOption { + return func(o *serverMetricsConfig) { + o.ipLabelsEnabled = true } } diff --git a/providers/prometheus/server_test.go b/providers/prometheus/server_test.go index 038d390fb..7156dc17f 100644 --- a/providers/prometheus/server_test.go +++ b/providers/prometheus/server_test.go @@ -170,6 +170,236 @@ func (s *ServerInterceptorTestSuite) TestContextCancelledTreatedAsStatus() { s.serverMetrics.serverHandledCounter.WithLabelValues("bidi_stream", testpb.TestServiceFullName, "PingStream", "Canceled")) } +func TestServerInterceptorWithIPLabelsSuite(t *testing.T) { + s := NewServerMetrics(WithServerHandlingTimeHistogram(), WithServerIPLabelsEnabled()) + suite.Run(t, &serverInterceptorWithIPLabelsSuite{ + InterceptorTestSuite: &testpb.InterceptorTestSuite{ + TestService: &testpb.TestPingService{}, + ServerOpts: []grpc.ServerOption{ + grpc.StreamInterceptor(s.StreamServerInterceptor()), + grpc.UnaryInterceptor(s.UnaryServerInterceptor()), + }, + }, + serverMetrics: s, + serviceName: testpb.TestServiceFullName, + }) +} + +type serverInterceptorWithIPLabelsSuite struct { + *testpb.InterceptorTestSuite + r *require.Assertions + + serverMetrics *ServerMetrics + serviceName string +} + +func (s *serverInterceptorWithIPLabelsSuite) SetupTest() { + s.r = s.Require() + + s.serverMetrics.serverStartedCounter.Reset() + s.serverMetrics.serverHandledCounter.Reset() + s.serverMetrics.serverHandledHistogram.Reset() + s.serverMetrics.serverStreamMsgReceived.Reset() + s.serverMetrics.serverStreamMsgSent.Reset() + s.serverMetrics.InitializeMetrics(s.Server) +} + +func (s *serverInterceptorWithIPLabelsSuite) grpcType(value string) string { + return fmt.Sprintf(`grpc_type="%s"`, value) +} + +func (s *serverInterceptorWithIPLabelsSuite) grpcServiceName() string { + return fmt.Sprintf(`grpc_service="%s"`, s.serviceName) +} + +func (s *serverInterceptorWithIPLabelsSuite) grpcMethod(value string) string { + return fmt.Sprintf(`grpc_method="%s"`, value) +} + +func (s *serverInterceptorWithIPLabelsSuite) grpcCode(value string) string { + return fmt.Sprintf(`grpc_code="%s"`, value) +} + +func (s *serverInterceptorWithIPLabelsSuite) grpcServerIP(value string) string { + return fmt.Sprintf(`grpc_server_ip="%s"`, value) +} + +func (s *serverInterceptorWithIPLabelsSuite) grpcClientIP(value string) string { + return fmt.Sprintf(`grpc_client_ip="%s"`, value) +} + +func (s *serverInterceptorWithIPLabelsSuite) TestRegisterPresetsStuff() { + registry := prometheus.NewPedanticRegistry() + s.r.NoError(registry.Register(s.serverMetrics)) + + allLabels := func(extras ...string) []string { + return append( + []string{s.grpcServiceName(), s.grpcServerIP("invalid IP"), s.grpcClientIP("invalid IP")}, + extras..., + ) + } + + for testID, testCase := range []struct { + metricName string + existingLabels []string + }{ + // Order of label is irrelevant. + { + "grpc_server_started_total", + allLabels(s.grpcMethod("PingEmpty"), s.grpcType("unary")), + }, + { + "grpc_server_started_total", + allLabels(s.grpcMethod("PingList"), s.grpcType("server_stream")), + }, + { + "grpc_server_msg_received_total", + allLabels(s.grpcMethod("PingList"), s.grpcType("server_stream")), + }, + { + "grpc_server_msg_sent_total", + allLabels(s.grpcMethod("PingEmpty"), s.grpcType("unary")), + }, + { + "grpc_server_handling_seconds_sum", + allLabels(s.grpcMethod("PingEmpty"), s.grpcType("unary")), + }, + { + "grpc_server_handling_seconds_count", + allLabels(s.grpcMethod("PingList"), s.grpcType("server_stream")), + }, + { + "grpc_server_handled_total", + allLabels(s.grpcMethod("PingList"), s.grpcType("server_stream"), s.grpcCode("OutOfRange")), + }, + { + "grpc_server_handled_total", + allLabels(s.grpcMethod("PingList"), s.grpcType("server_stream"), s.grpcCode("Aborted")), + }, + { + "grpc_server_handled_total", + allLabels(s.grpcMethod("PingEmpty"), s.grpcType("unary"), s.grpcCode("FailedPrecondition")), + }, + { + "grpc_server_handled_total", + allLabels(s.grpcMethod("PingEmpty"), s.grpcType("unary"), s.grpcCode("ResourceExhausted")), + }, + } { + lineCount := len(fetchPrometheusLines(s.T(), registry, testCase.metricName, testCase.existingLabels...)) + s.NotZero(lineCount, "metrics must exist for test case %d", testID) + } +} + +func (s *serverInterceptorWithIPLabelsSuite) TestUnaryIncrementsMetrics() { + t := s.T() + + _, err := s.Client.PingEmpty(s.SimpleCtx(), &testpb.PingEmptyRequest{}) + s.r.NoError(err) + + serverIP := "127.0.0.1" + clientIP := "127.0.0.1" + requireValue(t, 1, s.serverMetrics.serverStartedCounter.WithLabelValues( + "unary", s.serviceName, "PingEmpty", serverIP, clientIP)) + requireValue(t, 1, s.serverMetrics.serverHandledCounter.WithLabelValues( + "unary", s.serviceName, "PingEmpty", "OK", serverIP, clientIP)) + requireValueHistCount(t, 1, s.serverMetrics.serverHandledHistogram.WithLabelValues( + "unary", s.serviceName, "PingEmpty", serverIP, clientIP)) + + _, err = s.Client.PingError(s.SimpleCtx(), + &testpb.PingErrorRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) + s.r.Error(err) + requireValue(t, 1, s.serverMetrics.serverStartedCounter.WithLabelValues( + "unary", s.serviceName, "PingError", serverIP, clientIP)) + requireValue(t, 1, s.serverMetrics.serverHandledCounter.WithLabelValues( + "unary", s.serviceName, "PingError", "FailedPrecondition", serverIP, clientIP)) + requireValueHistCount(t, 1, s.serverMetrics.serverHandledHistogram.WithLabelValues( + "unary", s.serviceName, "PingError", serverIP, clientIP)) +} + +func (s *serverInterceptorWithIPLabelsSuite) TestStartedStreamingIncrementsStarted() { + t := s.T() + + _, err := s.Client.PingList(s.SimpleCtx(), &testpb.PingListRequest{}) + s.r.NoError(err) + + serverIP := "127.0.0.1" + clientIP := "127.0.0.1" + requireValueWithRetry(s.SimpleCtx(), t, 1, + s.serverMetrics.serverStartedCounter.WithLabelValues( + "server_stream", testpb.TestServiceFullName, "PingList", serverIP, clientIP)) + + _, err = s.Client.PingList(s.SimpleCtx(), &testpb.PingListRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) + s.r.NoError(err, "PingList must not fail immediately") + requireValueWithRetry(s.SimpleCtx(), t, 2, + s.serverMetrics.serverStartedCounter.WithLabelValues( + "server_stream", testpb.TestServiceFullName, "PingList", serverIP, clientIP)) +} + +func (s *serverInterceptorWithIPLabelsSuite) TestStreamingIncrementsMetrics() { + t := s.T() + ss, _ := s.Client.PingList(s.SimpleCtx(), &testpb.PingListRequest{}) + // Do a read, just for kicks. + count := 0 + for { + _, err := ss.Recv() + if err == io.EOF { + break + } + s.NoError(err, "reading pingList shouldn't fail") + count++ + } + s.r.EqualValues(testpb.ListResponseCount, count, "Number of received msg on the wire must match") + + serverIP := "127.0.0.1" + clientIP := "127.0.0.1" + requireValueWithRetry(s.SimpleCtx(), t, 1, + s.serverMetrics.serverStartedCounter.WithLabelValues( + "server_stream", testpb.TestServiceFullName, "PingList", serverIP, clientIP)) + requireValueWithRetry(s.SimpleCtx(), t, 1, + s.serverMetrics.serverHandledCounter.WithLabelValues( + "server_stream", testpb.TestServiceFullName, "PingList", "OK", serverIP, clientIP)) + requireValueWithRetry(s.SimpleCtx(), t, testpb.ListResponseCount, + s.serverMetrics.serverStreamMsgSent.WithLabelValues( + "server_stream", testpb.TestServiceFullName, "PingList", serverIP, clientIP)) + requireValueWithRetry(s.SimpleCtx(), t, 1, + s.serverMetrics.serverStreamMsgReceived.WithLabelValues( + "server_stream", testpb.TestServiceFullName, "PingList", serverIP, clientIP)) + requireValueWithRetryHistCount(s.SimpleCtx(), t, 1, + s.serverMetrics.serverHandledHistogram.WithLabelValues( + "server_stream", testpb.TestServiceFullName, "PingList", serverIP, clientIP)) + + // should return with code=FailedPrecondition + _, err := s.Client.PingList(s.SimpleCtx(), &testpb.PingListRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) + s.r.NoError(err, "PingList must not fail immediately") + + requireValueWithRetry(s.SimpleCtx(), t, 2, + s.serverMetrics.serverStartedCounter.WithLabelValues( + "server_stream", testpb.TestServiceFullName, "PingList", serverIP, clientIP)) + requireValueWithRetry(s.SimpleCtx(), t, 1, + s.serverMetrics.serverHandledCounter.WithLabelValues( + "server_stream", testpb.TestServiceFullName, "PingList", "FailedPrecondition", serverIP, clientIP)) + requireValueWithRetryHistCount(s.SimpleCtx(), t, 2, + s.serverMetrics.serverHandledHistogram.WithLabelValues( + "server_stream", testpb.TestServiceFullName, "PingList", serverIP, clientIP)) +} + +func (s *serverInterceptorWithIPLabelsSuite) TestContextCancelledTreatedAsStatus() { + t := s.T() + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + stream, _ := s.Client.PingStream(ctx) + err := stream.Send(&testpb.PingStreamRequest{}) + s.r.NoError(err) + cancel() + + serverIP := "127.0.0.1" + clientIP := "127.0.0.1" + requireValueWithRetry(s.SimpleCtx(), t, 1, + s.serverMetrics.serverHandledCounter.WithLabelValues( + "bidi_stream", testpb.TestServiceFullName, "PingStream", "Canceled", serverIP, clientIP)) +} + // fetchPrometheusLines does mocked HTTP GET request against real prometheus handler to get the same view that Prometheus // would have while scraping this endpoint. // Order of matching label vales does not matter. @@ -194,7 +424,12 @@ func fetchPrometheusLines(t *testing.T, reg prometheus.Gatherer, metricName stri } matches := true for _, labelValue := range matchingLabelValues { - if !strings.Contains(line, `"`+labelValue+`"`) { + // TODO: Force to use label="labelValue" format. + expected := labelValue + if !strings.Contains(labelValue, "=") { + expected = `"` + labelValue + `"` + } + if !strings.Contains(line, expected) { matches = false } }