Skip to content

Commit

Permalink
Stream metrics instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
kralicky committed Mar 21, 2023
1 parent 35be186 commit 4d92b2c
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 17 deletions.
7 changes: 7 additions & 0 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package totem
import (
"context"
"fmt"
"time"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/attribute"
Expand All @@ -19,6 +20,7 @@ type ClientConn struct {
interceptor grpc.UnaryClientInterceptor
tracer trace.Tracer
logger *zap.Logger
metrics *MetricsExporter
}

var _ grpc.ClientConnInterface = (*ClientConn)(nil)
Expand Down Expand Up @@ -110,6 +112,8 @@ func (cc *ClientConn) invoke(
defer span.End()
otelgrpc.Inject(ctx, &md)

cc.metrics.TrackTxBytes(serviceName, methodName, int64(len(reqMsg)))

rpc := &RPC{
ServiceName: serviceName,
MethodName: methodName,
Expand All @@ -119,6 +123,7 @@ func (cc *ClientConn) invoke(
Metadata: FromMD(md),
}

startTime := time.Now()
future := cc.controller.Request(ctx, rpc)
select {
case rpc := <-future:
Expand All @@ -139,6 +144,8 @@ func (cc *ClientConn) invoke(
zap.String("method", method),
).Debug("received reply")
recordSuccess(span)
cc.metrics.TrackSvcTxLatency(serviceName, methodName, time.Since(startTime))
cc.metrics.TrackRxBytes(serviceName, methodName, int64(len(resp.Response)))

for _, callOpt := range callOpts {
switch opt := callOpt.(type) {
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ require (
go.opentelemetry.io/contrib/propagators/autoprop v0.40.0
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/exporters/jaeger v1.14.0
go.opentelemetry.io/otel/metric v0.37.0
go.opentelemetry.io/otel/sdk v1.14.0
go.opentelemetry.io/otel/sdk/metric v0.37.0
go.opentelemetry.io/otel/trace v1.14.0
go.uber.org/atomic v1.10.0
go.uber.org/zap v1.24.0
Expand Down Expand Up @@ -45,13 +47,13 @@ require (
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/muesli/reflow v0.3.0 // indirect
github.com/muesli/termenv v0.15.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/samber/lo v1.37.0 // indirect
go.opentelemetry.io/contrib/propagators/aws v1.15.0 // indirect
go.opentelemetry.io/contrib/propagators/b3 v1.15.0 // indirect
go.opentelemetry.io/contrib/propagators/jaeger v1.15.0 // indirect
go.opentelemetry.io/contrib/propagators/ot v1.15.0 // indirect
go.opentelemetry.io/otel/metric v0.37.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/mod v0.9.0 // indirect
golang.org/x/net v0.8.0 // indirect
Expand Down
5 changes: 4 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ github.com/onsi/ginkgo/v2 v2.9.1/go.mod h1:FEcmzVcCHl+4o9bQZVab+4dC9+j+91t2FHSzm
github.com/onsi/gomega v1.27.4 h1:Z2AnStgsdSayCMDiCU42qIz+HLqEPcgiOCXjAU/w+8E=
github.com/onsi/gomega v1.27.4/go.mod h1:riYq/GJKh8hhoM01HN6Vmuy93AarCXCBGpvFDK3q3fQ=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
Expand Down Expand Up @@ -138,6 +139,8 @@ go.opentelemetry.io/otel/metric v0.37.0 h1:pHDQuLQOZwYD+Km0eb657A25NaRzy0a+eLyKf
go.opentelemetry.io/otel/metric v0.37.0/go.mod h1:DmdaHfGt54iV6UKxsV9slj2bBRJcKC1B1uvDLIioc1s=
go.opentelemetry.io/otel/sdk v1.14.0 h1:PDCppFRDq8A1jL9v6KMI6dYesaq+DFcDZvjsoGvxGzY=
go.opentelemetry.io/otel/sdk v1.14.0/go.mod h1:bwIC5TjrNG6QDCHNWvW4HLHtUQ4I+VQDsnjhvyZCALM=
go.opentelemetry.io/otel/sdk/metric v0.37.0 h1:haYBBtZZxiI3ROwSmkZnI+d0+AVzBWeviuYQDeBWosU=
go.opentelemetry.io/otel/sdk/metric v0.37.0/go.mod h1:mO2WV1AZKKwhwHTV3AKOoIEb9LbUaENZDuGUQd+j4A0=
go.opentelemetry.io/otel/trace v1.14.0 h1:wp2Mmvj41tDsyAJXiWDWpfNsOiIyd38fy85pyKcFq/M=
go.opentelemetry.io/otel/trace v1.14.0/go.mod h1:8avnQLK+CG77yNLUae4ea2JDQ6iT+gozhnZjy/rw9G8=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
Expand Down
34 changes: 27 additions & 7 deletions invokers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package totem

import (
"context"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand All @@ -25,13 +26,15 @@ type localServiceInvoker struct {
methods map[string]grpc.MethodDesc
logger *zap.Logger
interceptor grpc.UnaryServerInterceptor
metrics *MetricsExporter
}

func newLocalServiceInvoker(
serviceImpl interface{},
service *grpc.ServiceDesc,
logger *zap.Logger,
interceptor grpc.UnaryServerInterceptor,
metrics *MetricsExporter,
) *localServiceInvoker {
handlers := make(map[string]grpc.MethodDesc)
for _, method := range service.Methods {
Expand All @@ -43,13 +46,17 @@ func newLocalServiceInvoker(
methods: handlers,
logger: logger,
interceptor: interceptor,
metrics: metrics,
}
}

func (l *localServiceInvoker) Invoke(ctx context.Context, req *RPC) ([]byte, error) {
serviceName := req.GetServiceName()
methodName := req.GetMethodName()

l.logger.With(
zap.String("service", req.GetServiceName()),
zap.String("method", req.GetMethodName()),
zap.String("service", serviceName),
zap.String("method", methodName),
zap.Uint64("tag", req.GetTag()),
zap.Strings("md", req.GetMetadata().Keys()),
).Debug("invoking method using local service")
Expand All @@ -64,15 +71,25 @@ func (l *localServiceInvoker) Invoke(ctx context.Context, req *RPC) ([]byte, err
defer span.End()

if m, ok := l.methods[req.MethodName]; ok {
startTime := time.Now()
resp, err := m.Handler(l.serviceImpl, addTotemToContext(ctx), func(v any) error {
return proto.Unmarshal(req.GetRequest(), protoimpl.X.ProtoMessageV2Of(v))
reqBytes := req.GetRequest()
l.metrics.TrackRxBytes(serviceName, methodName, int64(len(reqBytes)))
return proto.Unmarshal(reqBytes, protoimpl.X.ProtoMessageV2Of(v))
}, l.interceptor)
if err != nil {
recordError(span, err)
return nil, err
}
respBytes, err := proto.Marshal(protoimpl.X.ProtoMessageV2Of(resp))
if err != nil {
recordError(span, err)
return nil, err
}
recordSuccess(span)
return proto.Marshal(protoimpl.X.ProtoMessageV2Of(resp))
l.metrics.TrackSvcRxLatency(serviceName, methodName, time.Since(startTime))
l.metrics.TrackTxBytes(serviceName, methodName, int64(len(respBytes)))
return respBytes, nil
} else {
err := status.Errorf(codes.Unimplemented, "unknown method %s", req.MethodName)
recordError(span, err)
Expand All @@ -97,9 +114,11 @@ func newStreamControllerInvoker(ctrl *StreamController, logger *zap.Logger) *str
}

func (r *streamControllerInvoker) Invoke(ctx context.Context, req *RPC) ([]byte, error) {
serviceName := req.GetServiceName()
methodName := req.GetMethodName()
r.logger.With(
zap.String("service", req.GetServiceName()),
zap.String("method", req.GetMethodName()),
zap.String("service", serviceName),
zap.String("method", methodName),
zap.Uint64("tag", req.GetTag()),
zap.Strings("md", req.GetMetadata().Keys()),
).Debug("invoking method using stream controller")
Expand All @@ -119,6 +138,7 @@ func (r *streamControllerInvoker) Invoke(ctx context.Context, req *RPC) ([]byte,
trace.WithAttributes(attrs...))
defer span.End()

r.controller.metrics.TrackTxBytes(serviceName, methodName, int64(len(req.GetRequest())))
rc := r.controller.Request(ctx, req)
select {
case rpc := <-rc:
Expand All @@ -129,7 +149,7 @@ func (r *streamControllerInvoker) Invoke(ctx context.Context, req *RPC) ([]byte,
return nil, err
}
recordSuccess(span)

r.controller.metrics.TrackRxBytes(serviceName, methodName, int64(len(resp.Response)))
return resp.GetResponse(), nil
case <-ctx.Done():
return nil, ctx.Err()
Expand Down
142 changes: 142 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package totem

import (
"context"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/sdk/metric"
)

type MetricsExporter struct {
rxBytesCollector instrument.Int64Counter
txBytesCollector instrument.Int64Counter

rxRpcCollector instrument.Int64Counter
txRpcCollector instrument.Int64Counter

svcRxLatencyCollector instrument.Int64Histogram
svcTxLatencyCollector instrument.Int64Histogram

staticAttrs []attribute.KeyValue
}

// func (m *MetricsExporter) Inverted() *MetricsExporter {
// if m == nil {
// return nil
// }
// return &MetricsExporter{
// rxBytesCollector: m.txBytesCollector,
// txBytesCollector: m.rxBytesCollector,

// rxRpcCollector: m.txRpcCollector,
// txRpcCollector: m.rxRpcCollector,

// svcRxLatencyCollector: m.svcRxLatencyCollector,
// svcTxLatencyCollector: m.svcTxLatencyCollector,

// staticAttrs: m.staticAttrs,
// }
// }

func NewMetricsExporter(reader metric.Reader, staticAttrs ...attribute.KeyValue) *MetricsExporter {
provider := metric.NewMeterProvider(metric.WithReader(reader))
meter := provider.Meter("github.com/kralicky/totem/metrics")

rxBytes, err := meter.Int64Counter("stream_receive_bytes",
instrument.WithDescription("Total number of bytes received on a stream"))
if err != nil {
panic(err)
}

txBytes, err := meter.Int64Counter("stream_transmit_bytes",
instrument.WithDescription("Total number of bytes transmitted on a stream"))
if err != nil {
panic(err)
}

rxRpc, err := meter.Int64Counter("stream_receive_rpcs",
instrument.WithDescription("Total number of requests and replies received on a stream"))
if err != nil {
panic(err)
}

txRpc, err := meter.Int64Counter("stream_transmit_rpcs",
instrument.WithDescription("Total number of requests and replies transmitted on a stream"))
if err != nil {
panic(err)
}

svcRxLatency, err := meter.Int64Histogram("stream_local_service_latency",
instrument.WithDescription("Incoming RPC request-response latency for services handled locally on a stream"),
instrument.WithUnit("μs"),
)
if err != nil {
panic(err)
}

svcTxLatency, err := meter.Int64Histogram("stream_remote_service_latency",
instrument.WithDescription("Outgoing RPC request-response latency for services handled remotely on a stream"),
instrument.WithUnit("μs"),
)
if err != nil {
panic(err)
}

return &MetricsExporter{
rxBytesCollector: rxBytes,
txBytesCollector: txBytes,
rxRpcCollector: rxRpc,
txRpcCollector: txRpc,
svcRxLatencyCollector: svcRxLatency,
svcTxLatencyCollector: svcTxLatency,
staticAttrs: staticAttrs,
}
}

func (m *MetricsExporter) TrackRxBytes(service, method string, count int64) {
if m == nil {
return
}
attrs := append(m.staticAttrs,
attribute.Key("service").String(service),
attribute.Key("method").String(method),
)
m.rxRpcCollector.Add(context.Background(), 1, attrs...)
m.rxBytesCollector.Add(context.Background(), count, attrs...)
}

func (m *MetricsExporter) TrackTxBytes(service, method string, count int64) {
if m == nil {
return
}
attrs := append(m.staticAttrs,
attribute.Key("service").String(service),
attribute.Key("method").String(method),
)
m.txRpcCollector.Add(context.Background(), 1, attrs...)
m.txBytesCollector.Add(context.Background(), count, attrs...)
}

func (m *MetricsExporter) TrackSvcRxLatency(service, method string, latency time.Duration) {
if m == nil {
return
}
attrs := append(m.staticAttrs,
attribute.Key("service").String(service),
attribute.Key("method").String(method),
)
m.svcRxLatencyCollector.Record(context.Background(), latency.Microseconds(), attrs...)
}

func (m *MetricsExporter) TrackSvcTxLatency(service, method string, latency time.Duration) {
if m == nil {
return
}
attrs := append(m.staticAttrs,
attribute.Key("service").String(service),
attribute.Key("method").String(method),
)
m.svcTxLatencyCollector.Record(context.Background(), latency.Microseconds(), attrs...)
}
13 changes: 11 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"go.opentelemetry.io/otel/attribute"
otelcodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand All @@ -29,6 +30,7 @@ type Server struct {
type ServerOptions struct {
name string
interceptors InterceptorConfig
metrics *MetricsExporter
}

type InterceptorConfig struct {
Expand All @@ -53,6 +55,12 @@ func WithInterceptors(config InterceptorConfig) ServerOption {
}
}

func WithMetrics(reader metric.Reader, staticAttrs ...attribute.KeyValue) ServerOption {
return func(o *ServerOptions) {
o.metrics = NewMetricsExporter(reader, staticAttrs...)
}
}

type ServerOption func(*ServerOptions)

func (o *ServerOptions) apply(opts ...ServerOption) {
Expand All @@ -73,7 +81,7 @@ func NewServer(stream Stream, opts ...ServerOption) (*Server, error) {

lg := Log.Named(options.name)

ctrl := NewStreamController(stream, WithStreamName(options.name))
ctrl := NewStreamController(stream, WithStreamName(options.name), withMetricsExporter(options.metrics))

srv := &Server{
ServerOptions: options,
Expand Down Expand Up @@ -191,7 +199,7 @@ func (r *Server) register(serviceDesc *grpc.ServiceDesc, impl interface{}) {
}

r.controller.RegisterServiceHandler(NewDefaultServiceHandler(r.Context(), reflectionDesc,
newLocalServiceInvoker(impl, serviceDesc, r.logger, r.interceptors.Incoming)))
newLocalServiceInvoker(impl, serviceDesc, r.logger, r.interceptors.Incoming, r.metrics)))
}

// Serve starts the totem server, which takes control of the stream and begins
Expand Down Expand Up @@ -266,6 +274,7 @@ func (r *Server) Serve() (grpc.ClientConnInterface, <-chan error) {
interceptor: r.interceptors.Outgoing,
tracer: Tracer(),
logger: r.logger.Named("cc"),
metrics: r.metrics,
}, ch
}

Expand Down
Loading

0 comments on commit 4d92b2c

Please sign in to comment.