Skip to content

Commit

Permalink
Allow forwarding totem.RPC messages directly via clientconn.Invoke
Browse files Browse the repository at this point in the history
  • Loading branch information
kralicky committed Dec 20, 2022
1 parent acb5d4c commit f41b945
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 119 deletions.
99 changes: 67 additions & 32 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,42 +21,59 @@ type ClientConn struct {

var _ grpc.ClientConnInterface = (*ClientConn)(nil)

// Method placeholder to distinguish forwarded raw RPC messages.
const Forward = "(forward)"

func (cc *ClientConn) Invoke(
ctx context.Context,
method string,
req interface{},
reply interface{},
req any,
reply any,
callOpts ...grpc.CallOption,
) error {
reqMsg, err := proto.Marshal(req.(proto.Message))
if err != nil {
return err
}

cc.logger.With(
zap.String("method", method),
zap.String("requestType", fmt.Sprintf("%T", req)),
zap.String("replyType", fmt.Sprintf("%T", reply)),
).Debug("invoking method")

serviceName, methodName, err := parseQualifiedMethod(method)
if err != nil {
return err
}

rpc := &RPC{
ServiceName: serviceName,
MethodName: methodName,
Content: &RPC_Request{
Request: reqMsg,
},
var serviceName, methodName string
if method != Forward {
var err error
serviceName, methodName, err = parseQualifiedMethod(method)
if err != nil {
return err
}
}

md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(nil)
}

lg := cc.logger.With(
zap.String("requestType", fmt.Sprintf("%T", req)),
zap.String("replyType", fmt.Sprintf("%T", reply)),
)
var reqMsg []byte
switch req := req.(type) {
case *RPC:
serviceName = req.ServiceName
methodName = req.MethodName
lg.With(
zap.String("method", method),
).Debug("forwarding rpc")
reqMsg = req.GetRequest()
if req.Metadata != nil {
md = metadata.Join(md, req.Metadata.ToMD())
}
case proto.Message:
lg.With(
zap.String("method", method),
).Debug("invoking method")
var err error
reqMsg, err = proto.Marshal(req)
if err != nil {
return err
}
default:
panic(fmt.Sprintf("[totem] unsupported request type: %T", req))
}

name, attr := spanInfo(method, peerFromCtx(ctx))
attr = append(attr, attribute.String("func", "clientConn.Invoke"))
ctx, span := cc.tracer.Start(ctx, name,
Expand All @@ -65,7 +82,15 @@ func (cc *ClientConn) Invoke(
)
defer span.End()
otelgrpc.Inject(ctx, &md)
rpc.Metadata = FromMD(md)

rpc := &RPC{
ServiceName: serviceName,
MethodName: methodName,
Content: &RPC_Request{
Request: reqMsg,
},
Metadata: FromMD(md),
}

future := cc.controller.Request(ctx, rpc)
select {
Expand Down Expand Up @@ -96,14 +121,24 @@ func (cc *ClientConn) Invoke(
*opt.TrailerAddr = rpc.Metadata.ToMD()
}
}
if err := proto.Unmarshal(resp.GetResponse(), reply.(proto.Message)); err != nil {
cc.logger.With(
zap.Uint64("tag", rpc.Tag),
zap.String("method", method),
zap.Error(err),
).Error("received malformed response message")

return fmt.Errorf("[totem] malformed response: %w", err)
switch reply := reply.(type) {
case *RPC:
reply.Content = &RPC_Response{
Response: resp,
}
case proto.Message:
if err := proto.Unmarshal(resp.GetResponse(), reply); err != nil {
cc.logger.With(
zap.Uint64("tag", rpc.Tag),
zap.String("method", method),
zap.Error(err),
).Error("received malformed response message")

return fmt.Errorf("[totem] malformed response: %w", err)
}
default:
panic(fmt.Sprintf("[totem] unsupported request type: %T", req))
}
return nil
case <-ctx.Done():
Expand Down
50 changes: 25 additions & 25 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,36 @@ go 1.18

require (
github.com/charmbracelet/lipgloss v0.6.0
github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.3.0
github.com/jhump/protoreflect v1.13.0
github.com/jhump/protoreflect v1.14.0
github.com/kralicky/gpkg v0.0.0-20220311205216-0d8ea9557555
github.com/kralicky/ragu v1.0.0-rc3
github.com/kralicky/ragu v1.0.0-rc4
github.com/magefile/mage v1.14.0
github.com/onsi/ginkgo/v2 v2.2.0
github.com/onsi/gomega v1.20.2
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.36.0
go.opentelemetry.io/contrib/propagators/autoprop v0.36.0
go.opentelemetry.io/otel v1.10.0
go.opentelemetry.io/otel/exporters/jaeger v1.10.0
go.opentelemetry.io/otel/sdk v1.10.0
go.opentelemetry.io/otel/trace v1.10.0
github.com/onsi/ginkgo/v2 v2.6.1
github.com/onsi/gomega v1.24.2
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.37.0
go.opentelemetry.io/contrib/propagators/autoprop v0.37.0
go.opentelemetry.io/otel v1.11.2
go.opentelemetry.io/otel/exporters/jaeger v1.11.2
go.opentelemetry.io/otel/sdk v1.11.2
go.opentelemetry.io/otel/trace v1.11.2
go.uber.org/atomic v1.10.0
go.uber.org/zap v1.23.0
golang.org/x/exp v0.0.0-20220927162542-c76eaa363f9d
golang.org/x/net v0.0.0-20220923203811-8be639271d50
google.golang.org/genproto v0.0.0-20220923205249-dd2d53f1fffc
google.golang.org/grpc v1.49.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15
golang.org/x/net v0.4.0
google.golang.org/genproto v0.0.0-20221207170731-23e4bf6bdc37
google.golang.org/grpc v1.51.0
google.golang.org/protobuf v1.28.1
)

require (
cloud.google.com/go/compute/metadata v0.2.3 // indirect
github.com/bmatcuk/doublestar v1.3.4 // indirect
github.com/flosch/pongo2/v6 v6.0.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/glog v1.0.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/iancoleman/strcase v0.2.0 // indirect
github.com/kralicky/grpc-gateway/v2 v2.11.3 // indirect
Expand All @@ -43,15 +44,14 @@ require (
github.com/muesli/termenv v0.11.1-0.20220204035834-5ac8409525e0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/samber/lo v1.28.2 // indirect
go.opentelemetry.io/contrib/propagators/aws v1.10.0 // indirect
go.opentelemetry.io/contrib/propagators/b3 v1.10.0 // indirect
go.opentelemetry.io/contrib/propagators/jaeger v1.10.0 // indirect
go.opentelemetry.io/contrib/propagators/ot v1.10.0 // indirect
go.opentelemetry.io/contrib/propagators/aws v1.12.0 // indirect
go.opentelemetry.io/contrib/propagators/b3 v1.12.0 // indirect
go.opentelemetry.io/contrib/propagators/jaeger v1.12.0 // indirect
go.opentelemetry.io/contrib/propagators/ot v1.12.0 // indirect
go.opentelemetry.io/otel/metric v0.34.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/mod v0.7.0 // indirect
golang.org/x/sys v0.3.0 // indirect
golang.org/x/text v0.5.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/kralicky/ragu => ../ragu
Loading

0 comments on commit f41b945

Please sign in to comment.