diff --git a/clientconn.go b/clientconn.go index 0721de7..1ef0c9e 100644 --- a/clientconn.go +++ b/clientconn.go @@ -21,35 +21,23 @@ type ClientConn struct { var _ grpc.ClientConnInterface = (*ClientConn)(nil) +// Method placeholder to distinguish forwarded raw RPC messages. +const Forward = "(forward)" + func (cc *ClientConn) Invoke( ctx context.Context, method string, - req interface{}, - reply interface{}, + req any, + reply any, callOpts ...grpc.CallOption, ) error { - reqMsg, err := proto.Marshal(req.(proto.Message)) - if err != nil { - return err - } - - cc.logger.With( - zap.String("method", method), - zap.String("requestType", fmt.Sprintf("%T", req)), - zap.String("replyType", fmt.Sprintf("%T", reply)), - ).Debug("invoking method") - - serviceName, methodName, err := parseQualifiedMethod(method) - if err != nil { - return err - } - - rpc := &RPC{ - ServiceName: serviceName, - MethodName: methodName, - Content: &RPC_Request{ - Request: reqMsg, - }, + var serviceName, methodName string + if method != Forward { + var err error + serviceName, methodName, err = parseQualifiedMethod(method) + if err != nil { + return err + } } md, ok := metadata.FromOutgoingContext(ctx) @@ -57,6 +45,35 @@ func (cc *ClientConn) Invoke( md = metadata.New(nil) } + lg := cc.logger.With( + zap.String("requestType", fmt.Sprintf("%T", req)), + zap.String("replyType", fmt.Sprintf("%T", reply)), + ) + var reqMsg []byte + switch req := req.(type) { + case *RPC: + serviceName = req.ServiceName + methodName = req.MethodName + lg.With( + zap.String("method", method), + ).Debug("forwarding rpc") + reqMsg = req.GetRequest() + if req.Metadata != nil { + md = metadata.Join(md, req.Metadata.ToMD()) + } + case proto.Message: + lg.With( + zap.String("method", method), + ).Debug("invoking method") + var err error + reqMsg, err = proto.Marshal(req) + if err != nil { + return err + } + default: + panic(fmt.Sprintf("[totem] unsupported request type: %T", req)) + } + name, attr := spanInfo(method, peerFromCtx(ctx)) attr = append(attr, attribute.String("func", "clientConn.Invoke")) ctx, span := cc.tracer.Start(ctx, name, @@ -65,7 +82,15 @@ func (cc *ClientConn) Invoke( ) defer span.End() otelgrpc.Inject(ctx, &md) - rpc.Metadata = FromMD(md) + + rpc := &RPC{ + ServiceName: serviceName, + MethodName: methodName, + Content: &RPC_Request{ + Request: reqMsg, + }, + Metadata: FromMD(md), + } future := cc.controller.Request(ctx, rpc) select { @@ -96,14 +121,24 @@ func (cc *ClientConn) Invoke( *opt.TrailerAddr = rpc.Metadata.ToMD() } } - if err := proto.Unmarshal(resp.GetResponse(), reply.(proto.Message)); err != nil { - cc.logger.With( - zap.Uint64("tag", rpc.Tag), - zap.String("method", method), - zap.Error(err), - ).Error("received malformed response message") - return fmt.Errorf("[totem] malformed response: %w", err) + switch reply := reply.(type) { + case *RPC: + reply.Content = &RPC_Response{ + Response: resp, + } + case proto.Message: + if err := proto.Unmarshal(resp.GetResponse(), reply); err != nil { + cc.logger.With( + zap.Uint64("tag", rpc.Tag), + zap.String("method", method), + zap.Error(err), + ).Error("received malformed response message") + + return fmt.Errorf("[totem] malformed response: %w", err) + } + default: + panic(fmt.Sprintf("[totem] unsupported request type: %T", req)) } return nil case <-ctx.Done(): diff --git a/go.mod b/go.mod index 420106c..ab3c5a4 100644 --- a/go.mod +++ b/go.mod @@ -4,35 +4,36 @@ go 1.18 require ( github.com/charmbracelet/lipgloss v0.6.0 - github.com/golang/protobuf v1.5.2 github.com/google/uuid v1.3.0 - github.com/jhump/protoreflect v1.13.0 + github.com/jhump/protoreflect v1.14.0 github.com/kralicky/gpkg v0.0.0-20220311205216-0d8ea9557555 - github.com/kralicky/ragu v1.0.0-rc3 + github.com/kralicky/ragu v1.0.0-rc4 github.com/magefile/mage v1.14.0 - github.com/onsi/ginkgo/v2 v2.2.0 - github.com/onsi/gomega v1.20.2 - go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.36.0 - go.opentelemetry.io/contrib/propagators/autoprop v0.36.0 - go.opentelemetry.io/otel v1.10.0 - go.opentelemetry.io/otel/exporters/jaeger v1.10.0 - go.opentelemetry.io/otel/sdk v1.10.0 - go.opentelemetry.io/otel/trace v1.10.0 + github.com/onsi/ginkgo/v2 v2.6.1 + github.com/onsi/gomega v1.24.2 + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.37.0 + go.opentelemetry.io/contrib/propagators/autoprop v0.37.0 + go.opentelemetry.io/otel v1.11.2 + go.opentelemetry.io/otel/exporters/jaeger v1.11.2 + go.opentelemetry.io/otel/sdk v1.11.2 + go.opentelemetry.io/otel/trace v1.11.2 go.uber.org/atomic v1.10.0 - go.uber.org/zap v1.23.0 - golang.org/x/exp v0.0.0-20220927162542-c76eaa363f9d - golang.org/x/net v0.0.0-20220923203811-8be639271d50 - google.golang.org/genproto v0.0.0-20220923205249-dd2d53f1fffc - google.golang.org/grpc v1.49.0 + go.uber.org/zap v1.24.0 + golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15 + golang.org/x/net v0.4.0 + google.golang.org/genproto v0.0.0-20221207170731-23e4bf6bdc37 + google.golang.org/grpc v1.51.0 google.golang.org/protobuf v1.28.1 ) require ( + cloud.google.com/go/compute/metadata v0.2.3 // indirect github.com/bmatcuk/doublestar v1.3.4 // indirect github.com/flosch/pongo2/v6 v6.0.0 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/glog v1.0.0 // indirect + github.com/golang/protobuf v1.5.2 // indirect github.com/google/go-cmp v0.5.9 // indirect github.com/iancoleman/strcase v0.2.0 // indirect github.com/kralicky/grpc-gateway/v2 v2.11.3 // indirect @@ -43,15 +44,14 @@ require ( github.com/muesli/termenv v0.11.1-0.20220204035834-5ac8409525e0 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/samber/lo v1.28.2 // indirect - go.opentelemetry.io/contrib/propagators/aws v1.10.0 // indirect - go.opentelemetry.io/contrib/propagators/b3 v1.10.0 // indirect - go.opentelemetry.io/contrib/propagators/jaeger v1.10.0 // indirect - go.opentelemetry.io/contrib/propagators/ot v1.10.0 // indirect + go.opentelemetry.io/contrib/propagators/aws v1.12.0 // indirect + go.opentelemetry.io/contrib/propagators/b3 v1.12.0 // indirect + go.opentelemetry.io/contrib/propagators/jaeger v1.12.0 // indirect + go.opentelemetry.io/contrib/propagators/ot v1.12.0 // indirect + go.opentelemetry.io/otel/metric v0.34.0 // indirect go.uber.org/multierr v1.8.0 // indirect - golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect - golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 // indirect - golang.org/x/text v0.3.7 // indirect + golang.org/x/mod v0.7.0 // indirect + golang.org/x/sys v0.3.0 // indirect + golang.org/x/text v0.5.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) - -replace github.com/kralicky/ragu => ../ragu diff --git a/go.sum b/go.sum index 479c391..6b2ac0f 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -cloud.google.com/go v0.34.0 h1:eOI3/cP2VTU6uZLDYAoic+eyzzB9YyGmJ7eIjl8rOPg= +cloud.google.com/go v0.105.0 h1:DNtEKRBAAzeS4KyIory52wWHuClNaXJ5x1F7xa4q+5Y= +cloud.google.com/go/compute v1.14.0 h1:hfm2+FfxVmnRlh6LpB7cg1ZNU+5edAHmW679JePztk0= +cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= +cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/bmatcuk/doublestar v1.3.4 h1:gPypJ5xD31uhX6Tf54sDPUOBXTqKH4c9aPY66CyQrS0= @@ -56,8 +59,8 @@ github.com/jhump/gopoet v0.0.0-20190322174617-17282ff210b3/go.mod h1:me9yfT6IJSl github.com/jhump/gopoet v0.1.0/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI= github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7UvLHRQ= github.com/jhump/protoreflect v1.11.0/go.mod h1:U7aMIjN0NWq9swDP7xDdoMfRHb35uiuTd3Z9nFXJf5E= -github.com/jhump/protoreflect v1.13.0 h1:zrrZqa7JAc2YGgPSzZZkmUXJ5G6NRPdxOg/9t7ISImA= -github.com/jhump/protoreflect v1.13.0/go.mod h1:JytZfP5d0r8pVNLZvai7U/MCuTWITgrI4tTg7puQFKI= +github.com/jhump/protoreflect v1.14.0 h1:MBbQK392K3u8NTLbKOCIi3XdI+y+c6yt5oMq0X3xviw= +github.com/jhump/protoreflect v1.14.0/go.mod h1:JytZfP5d0r8pVNLZvai7U/MCuTWITgrI4tTg7puQFKI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -65,10 +68,10 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kralicky/gpkg v0.0.0-20220311205216-0d8ea9557555 h1:w/v9aYd9gdL0pEofCiOM7MWNroB9+HWxHT29Wj2NMYc= github.com/kralicky/gpkg v0.0.0-20220311205216-0d8ea9557555/go.mod h1:EJrGSfmocDg2CBjHDm3zy9oxNKCSGhf+MNTiN1DRbKA= -github.com/kralicky/grpc-gateway/v2 v2.7.3-0.20221006212615-4590601753ba h1:uZx9D7xh4rjncUZCOMb2+K9NPa0a2+XEG8R1EPLWuNo= -github.com/kralicky/grpc-gateway/v2 v2.7.3-0.20221006212615-4590601753ba/go.mod h1:REy8s1IlkKMQFqp5QJaO0qvcfY5gRjXetqwqfb0iiMY= github.com/kralicky/grpc-gateway/v2 v2.11.3 h1:vLLe/VPWWJmtQwuQ0rlL2cU0L/VpL5UKfk6IkM3Zak0= github.com/kralicky/grpc-gateway/v2 v2.11.3/go.mod h1:REy8s1IlkKMQFqp5QJaO0qvcfY5gRjXetqwqfb0iiMY= +github.com/kralicky/ragu v1.0.0-rc4 h1:SukSVmUDs3h2Axs1TJHsoI5gtVgCG1TgRmdw/9Onscs= +github.com/kralicky/ragu v1.0.0-rc4/go.mod h1:upcZm+aSNq1ngZDshp85T4tDG8rhfO9Ak66sY/4bI2g= github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= github.com/magefile/mage v1.14.0 h1:6QDX3g6z1YvJ4olPhT1wksUcSa/V0a1B+pJb73fBjyo= @@ -82,10 +85,10 @@ github.com/muesli/reflow v0.2.1-0.20210115123740-9e1d0d53df68 h1:y1p/ycavWjGT9Fn github.com/muesli/reflow v0.2.1-0.20210115123740-9e1d0d53df68/go.mod h1:Xk+z4oIWdQqJzsxyjgl3P22oYZnHdZ8FFTHAQQt5BMQ= github.com/muesli/termenv v0.11.1-0.20220204035834-5ac8409525e0 h1:STjmj0uFfRryL9fzRA/OupNppeAID6QJYPMavTL7jtY= github.com/muesli/termenv v0.11.1-0.20220204035834-5ac8409525e0/go.mod h1:Bd5NYQ7pd+SrtBSrSNoBBmXlcY8+Xj4BMJgh8qcZrvs= -github.com/onsi/ginkgo/v2 v2.2.0 h1:3ZNA3L1c5FYDFTTxbFeVGGD8jYvjYauHD30YgLxVsNI= -github.com/onsi/ginkgo/v2 v2.2.0/go.mod h1:MEH45j8TBi6u9BMogfbp0stKC5cdGjumZj5Y7AG4VIk= -github.com/onsi/gomega v1.20.2 h1:8uQq0zMgLEfa0vRrrBgaJF2gyW9Da9BmfGV+OyUzfkY= -github.com/onsi/gomega v1.20.2/go.mod h1:iYAIXgPSaDHak0LCMA+AWBpIKBr8WZicMxnE8luStNc= +github.com/onsi/ginkgo/v2 v2.6.1 h1:1xQPCjcqYw/J5LchOcp4/2q/jzJFjiAOc25chhnDw+Q= +github.com/onsi/ginkgo/v2 v2.6.1/go.mod h1:yjiuMwPokqY1XauOgju45q3sJt6VzQ/Fict1LFVcsAo= +github.com/onsi/gomega v1.24.2 h1:J/tulyYK6JwBldPViHJReihxxZ+22FHs0piGjQAvoUE= +github.com/onsi/gomega v1.24.2/go.mod h1:gs3J10IS7Z7r7eXRoNJIrNqU4ToQukCJhFtKrWgHWnk= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -95,59 +98,61 @@ github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/samber/lo v1.28.2 h1:f1gctelJ5YQk336wCN+Elr90FyhZ6ArhelD5kjhNTz4= github.com/samber/lo v1.28.2/go.mod h1:it33p9UtPMS7z72fP4gw/EIfQB2eI8ke7GR2wc6+Rhg= -github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.36.0 h1:+jrwcA4gF8tIZmdKWgTUysKtYW2VIzywjkfgd/5OPEM= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.36.0/go.mod h1:h8TWwRAhQpOd0aM5nYsRD8+flnkj+526GEIVlarH7eY= -go.opentelemetry.io/contrib/propagators/autoprop v0.36.0 h1:BCr2H/NIhiyrv12cRbsqA5zxErkpWtPi2NNj/Ar5lrI= -go.opentelemetry.io/contrib/propagators/autoprop v0.36.0/go.mod h1:YO90FHfpwzkzUrGZiRI67Mj/1lKwXcQDvHfPzB6Klrc= -go.opentelemetry.io/contrib/propagators/aws v1.10.0 h1:EEQ6YK48gtT2e6DtnFAEEFMiakN7WW0I4KK6Sc1NyEc= -go.opentelemetry.io/contrib/propagators/aws v1.10.0/go.mod h1:YCy6JRD/MdPJzUQJuwQTW+X6F/5C/NsWZnYS91+k7fE= -go.opentelemetry.io/contrib/propagators/b3 v1.10.0 h1:6AD2VV8edRdEYNaD8cNckpzgdMLU2kbV9OYyxt2kvCg= -go.opentelemetry.io/contrib/propagators/b3 v1.10.0/go.mod h1:oxvamQ/mTDFQVugml/uFS59+aEUnFLhmd1wsG+n5MOE= -go.opentelemetry.io/contrib/propagators/jaeger v1.10.0 h1:BemHdERnBHu4VHPgZAMCJmWrtkPHZ63P+eaZLa7Phzc= -go.opentelemetry.io/contrib/propagators/jaeger v1.10.0/go.mod h1:j8BPU1bBdUcOksJylVZ2XG6Qugsc/WF6Gx0ELeMLvL8= -go.opentelemetry.io/contrib/propagators/ot v1.10.0 h1:l2L3A2wj97MnDDTFnA/wZ0767e3lxdUmxaHb3rt8y3I= -go.opentelemetry.io/contrib/propagators/ot v1.10.0/go.mod h1:GzJoe4tBD1K/Aba958n2K0JpTvzKCFziagzHHAEx994= -go.opentelemetry.io/otel v1.10.0 h1:Y7DTJMR6zs1xkS/upamJYk0SxxN4C9AqRd77jmZnyY4= -go.opentelemetry.io/otel v1.10.0/go.mod h1:NbvWjCthWHKBEUMpf0/v8ZRZlni86PpGFEMA9pnQSnQ= -go.opentelemetry.io/otel/exporters/jaeger v1.10.0 h1:7W3aVVjEYayu/GOqOVF4mbTvnCuxF1wWu3eRxFGQXvw= -go.opentelemetry.io/otel/exporters/jaeger v1.10.0/go.mod h1:n9IGyx0fgyXXZ/i0foLHNxtET9CzXHzZeKCucvRBFgA= -go.opentelemetry.io/otel/sdk v1.10.0 h1:jZ6K7sVn04kk/3DNUdJ4mqRlGDiXAVuIG+MMENpTNdY= -go.opentelemetry.io/otel/sdk v1.10.0/go.mod h1:vO06iKzD5baltJz1zarxMCNHFpUlUiOy4s65ECtn6kE= -go.opentelemetry.io/otel/trace v1.10.0 h1:npQMbR8o7mum8uF95yFbOEJffhs1sbCOfDh8zAJiH5E= -go.opentelemetry.io/otel/trace v1.10.0/go.mod h1:Sij3YYczqAdz+EhmGhE6TpTxUO5/F/AzrK+kxfGqySM= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.37.0 h1:+uFejS4DCfNH6d3xODVIGsdhzgzhh45p9gpbHQMbdZI= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.37.0/go.mod h1:HSmzQvagH8pS2/xrK7ScWsk0vAMtRTGbMFgInXCi8Tc= +go.opentelemetry.io/contrib/propagators/autoprop v0.37.0 h1:7BO8yg+5TE8QWzA6apuj8nPjoIdQUw8OhsFSYU6DPeo= +go.opentelemetry.io/contrib/propagators/autoprop v0.37.0/go.mod h1:bV3y2INa2Fe69LVRzszdM/knfKRBtJmI8VbHQi9ySpM= +go.opentelemetry.io/contrib/propagators/aws v1.12.0 h1:n3lNyZs2ytVEfFhcn2QO5Z80lgrMXyP1Ncx0C7rUU8A= +go.opentelemetry.io/contrib/propagators/aws v1.12.0/go.mod h1:xZSOQIixr40Cq3NHn/YsvkDOzpVaR0j19WJRZnOKwbk= +go.opentelemetry.io/contrib/propagators/b3 v1.12.0 h1:OtfTF8bneN8qTeo/j92kcvc0iDDm4bm/c3RzaUJfiu0= +go.opentelemetry.io/contrib/propagators/b3 v1.12.0/go.mod h1:0JDB4elfPUWGsCH/qhaMkDzP1l8nB0ANVx8zXuAYEwg= +go.opentelemetry.io/contrib/propagators/jaeger v1.12.0 h1:fQBEhLiGQihBAAmiozZihHvO0t/+NFZMOLx80bmAi+s= +go.opentelemetry.io/contrib/propagators/jaeger v1.12.0/go.mod h1:hryAK4MKIBKRaUh8n0/vHWuu4fzhR0XB1Q8B4wz3qhw= +go.opentelemetry.io/contrib/propagators/ot v1.12.0 h1:6yNXAAQW08uIwgQKQvGC/0/5KkUCZmOcwSjj8zYg1ZU= +go.opentelemetry.io/contrib/propagators/ot v1.12.0/go.mod h1:t2n1RTxGm14/AEMSELd0jJo3NBjeEHnDtCRYXKTl0Ak= +go.opentelemetry.io/otel v1.11.2 h1:YBZcQlsVekzFsFbjygXMOXSs6pialIZxcjfO/mBDmR0= +go.opentelemetry.io/otel v1.11.2/go.mod h1:7p4EUV+AqgdlNV9gL97IgUZiVR3yrFXYo53f9BM3tRI= +go.opentelemetry.io/otel/exporters/jaeger v1.11.2 h1:ES8/j2+aB+3/BUw51ioxa50V9btN1eew/2J7N7n1tsE= +go.opentelemetry.io/otel/exporters/jaeger v1.11.2/go.mod h1:nwcF/DK4Hk0auZ/a5vw20uMsaJSXbzeeimhN5f9d0Lc= +go.opentelemetry.io/otel/metric v0.34.0 h1:MCPoQxcg/26EuuJwpYN1mZTeCYAUGx8ABxfW07YkjP8= +go.opentelemetry.io/otel/metric v0.34.0/go.mod h1:ZFuI4yQGNCupurTXCwkeD/zHBt+C2bR7bw5JqUm/AP8= +go.opentelemetry.io/otel/sdk v1.11.2 h1:GF4JoaEx7iihdMFu30sOyRx52HDHOkl9xQ8SMqNXUiU= +go.opentelemetry.io/otel/sdk v1.11.2/go.mod h1:wZ1WxImwpq+lVRo4vsmSOxdd+xwoUJ6rqyLc3SyX9aU= +go.opentelemetry.io/otel/trace v1.11.2 h1:Xf7hWSF2Glv0DE3MH7fBHvtpSBsjcBUe5MYAmZM/+y0= +go.opentelemetry.io/otel/trace v1.11.2/go.mod h1:4N+yC7QEz7TTsG9BSRLNAa63eg5E06ObSbKPmxQ/pKA= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= -go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY= -go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= +go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= +go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20220927162542-c76eaa363f9d h1:3wgmvnqHUJ8SxiNWwea5NCzTwAVfhTtuV+0ClVFlClc= -golang.org/x/exp v0.0.0-20220927162542-c76eaa363f9d/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= +golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15 h1:5oN1Pz/eDhCpbMbLstvIPa0b/BEQo6g6nwV3pLjfM6w= +golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.7.0 h1:LapD9S96VoQRhi/GrNTqeBJFrUjs5UHCAtTlgwA5oZA= +golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= -golang.org/x/net v0.0.0-20220923203811-8be639271d50 h1:vKyz8L3zkd+xrMeIaBsQ/MNVPVFSffdaU3ZyYlBGFnI= -golang.org/x/net v0.0.0-20220923203811-8be639271d50/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= +golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU= +golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20220822191816-0ebed06d0094 h1:2o1E+E8TpNLklK9nHiPiK1uzIYrIHt+cQx3ynCwq9V8= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -159,12 +164,12 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 h1:WIoqL4EROvwiPdUtaip4VcDdpZ4kha7wBWZrbVKCIZg= -golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ= +golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= -golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= -golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM= +golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -177,15 +182,15 @@ google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6 google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= -google.golang.org/genproto v0.0.0-20220923205249-dd2d53f1fffc h1:saaNe2+SBQxandnzcD/qB1JEBQ2Pqew+KlFLLdA/XcM= -google.golang.org/genproto v0.0.0-20220923205249-dd2d53f1fffc/go.mod h1:yEEpwVWKMZZzo81NwRgyEJnA2fQvpXAYPVisv8EgDVs= +google.golang.org/genproto v0.0.0-20221207170731-23e4bf6bdc37 h1:jmIfw8+gSvXcZSgaFAGyInDXeWzUhvYH57G/5GKMn70= +google.golang.org/genproto v0.0.0-20221207170731-23e4bf6bdc37/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= -google.golang.org/grpc v1.49.0 h1:WTLtQzmQori5FUH25Pq4WT22oCsv8USpQ+F6rqtsmxw= -google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= +google.golang.org/grpc v1.51.0 h1:E1eGv1FTqoLIdnBCZufiSHgKjlqG6fKFf6pPWtMTh8U= +google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsAIPww= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/totem_suite_test.go b/totem_suite_test.go index fa7d3fb..3c38f9c 100644 --- a/totem_suite_test.go +++ b/totem_suite_test.go @@ -97,7 +97,7 @@ func (tc *testCase) Run(until ...chan struct{}) { testCase: tc, wg: sync.WaitGroup{}, } - srv.wg.Add(2) + srv.wg.Add(1) grpcServer := grpc.NewServer(tc.ServerOptions...) RegisterTestServer(grpcServer, &srv) go grpcServer.Serve(tc.listener) @@ -108,23 +108,24 @@ func (tc *testCase) Run(until ...chan struct{}) { stream, err := client.TestStream(ctx) Expect(err).NotTo(HaveOccurred()) + done := make(chan struct{}) go func() { - defer GinkgoRecover() - defer srv.wg.Done() - err := tc.ClientHandler(stream) - Expect(err).To(Or(BeNil(), WithTransform(status.Code, Equal(codes.Canceled)))) + defer close(done) + if len(until) > 0 { + for _, c := range until { + <-c + } + ca() + srv.wg.Wait() + } else { + srv.wg.Wait() + ca() + } }() - if len(until) > 0 { - for _, c := range until { - <-c - } - ca() - srv.wg.Wait() - } else { - srv.wg.Wait() - ca() - } + err = tc.ClientHandler(stream) + Expect(status.Code(err)).To(Or(Equal(codes.Canceled), Equal(codes.OK)), "err: %v", err) + <-done } func (tc *testCase) RunServerOnly() { diff --git a/totem_test.go b/totem_test.go index 178a8ad..3e95350 100644 --- a/totem_test.go +++ b/totem_test.go @@ -16,6 +16,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/durationpb" ) @@ -173,6 +174,116 @@ var _ = Describe("Test", func() { tc.Run(a, b) }) + It("should forward raw RPC messages", func() { + done := make(chan struct{}) + tc := testCase{ + ServerHandler: func(stream test.Test_TestStreamServer) error { + ts, err := totem.NewServer(stream) + if err != nil { + return err + } + incSrv := incrementServer{} + test.RegisterIncrementServer(ts, &incSrv) + _, errC := ts.Serve() + + return <-errC + }, + ClientHandler: func(stream test.Test_TestStreamClient) error { + ts, err := totem.NewServer(stream) + if err != nil { + return err + } + + cc, errC := ts.Serve() + + reqBytes, _ := proto.Marshal(&test.Number{ + Value: 1234, + }) + req := &totem.RPC{ + ServiceName: "test.Increment", + MethodName: "Inc", + Content: &totem.RPC_Request{ + Request: reqBytes, + }, + } + reply := &totem.RPC{} + + ctx, ca := context.WithTimeout(context.Background(), timeout) + defer ca() + err = cc.Invoke(ctx, totem.Forward, req, reply) + if err != nil { + return err + } + + respValue := &test.Number{} + err = proto.Unmarshal(reply.GetResponse().GetResponse(), respValue) + + if err != nil { + return err + } + if respValue.GetValue() != 1235 { + return fmt.Errorf("expected 1235, got %d", respValue.GetValue()) + } + + close(done) + return <-errC + }, + } + tc.Run(done) + }) + + It("should forward raw RPCs and receive regular proto messages", func() { + done := make(chan struct{}) + tc := testCase{ + ServerHandler: func(stream test.Test_TestStreamServer) error { + ts, err := totem.NewServer(stream) + if err != nil { + return err + } + incSrv := incrementServer{} + test.RegisterIncrementServer(ts, &incSrv) + _, errC := ts.Serve() + + return <-errC + }, + ClientHandler: func(stream test.Test_TestStreamClient) error { + ts, err := totem.NewServer(stream) + if err != nil { + return err + } + + cc, errC := ts.Serve() + + reqBytes, _ := proto.Marshal(&test.Number{ + Value: 1234, + }) + req := &totem.RPC{ + ServiceName: "test.Increment", + MethodName: "Inc", + Content: &totem.RPC_Request{ + Request: reqBytes, + }, + } + reply := &test.Number{} + + ctx, ca := context.WithTimeout(context.Background(), timeout) + defer ca() + err = cc.Invoke(ctx, totem.Forward, req, reply) + if err != nil { + return err + } + + if reply.GetValue() != 1235 { + return fmt.Errorf("expected 1235, got %d", reply.GetValue()) + } + + close(done) + return <-errC + }, + } + tc.Run(done) + }) + It("should correctly splice streams", func() { s1 := testCase{ ServerHandler: func(stream test.Test_TestStreamServer) error {