Skip to content

Commit

Permalink
introduce rpcinfo.StreamingMode
Browse files Browse the repository at this point in the history
  • Loading branch information
DMwangnima committed Sep 4, 2024
1 parent 461aecc commit cbdff65
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 27 deletions.
1 change: 1 addition & 0 deletions client/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func (kc *kClient) Stream(ctx context.Context, method string, request, response
ctx, ri, _ = kc.initRPCInfo(ctx, method, 0, nil)

rpcinfo.AsMutableRPCConfig(ri.Config()).SetInteractionMode(rpcinfo.Streaming)
rpcinfo.AsMutableRPCConfig(ri.Config()).SetStreamingMode(rpcinfo.StreamingMode(kc.getStreamingMode(ri)))
ctx = rpcinfo.NewCtxWithRPCInfo(ctx, ri)

ctx = kc.opt.TracerCtl.DoStart(ctx, ri)
Expand Down
30 changes: 8 additions & 22 deletions pkg/remote/codec/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ type protobufV2MsgCodec interface {

type grpcCodec struct {
ThriftCodec remote.PayloadCodec
ServiceInfo *serviceinfo.ServiceInfo
}

type CodecOption func(c *grpcCodec)
Expand All @@ -63,12 +62,6 @@ func WithThriftCodec(t remote.PayloadCodec) CodecOption {
}
}

func WithServiceInfo(info *serviceinfo.ServiceInfo) CodecOption {
return func(c *grpcCodec) {
c.ServiceInfo = info
}
}

// NewGRPCCodec create grpc and protobuf codec
func NewGRPCCodec(opts ...CodecOption) remote.Codec {
codec := &grpcCodec{}
Expand Down Expand Up @@ -183,9 +176,15 @@ func (c *grpcCodec) Encode(ctx context.Context, message remote.Message, out remo
}

func (c *grpcCodec) Decode(ctx context.Context, message remote.Message, in remote.ByteBuffer) (err error) {
ri := rpcinfo.GetRPCInfo(ctx)
d, err := decodeGRPCFrame(ctx, in)
if needCheckTrailer(ri, c.ServiceInfo) && err == nil {
// For ClientStreaming, server may return an err(e.g. status) as trailer frame after calling SendAndClose.
// We need to receive this trailer frame.
if message.RPCInfo().Config().StreamingMode() == rpcinfo.ClientStreaming && message.RPCRole() == remote.Client && err == nil {
// Receive trailer frame
// If err == nil, wrong gRPC protocol implementation.
// If err == io.EOF, it means server returns nil, just ignore io.EOF.
// If err != io.EOF, it means server returns status err or BizStatusErr, or other gRPC transport error came out,
// we need to throw it to users.
_, err = decodeGRPCFrame(ctx, in)
if err == nil {
return errors.New("KITEX: grpc client streaming protocol violation: get <nil>, want <EOF>")
Expand Down Expand Up @@ -242,16 +241,3 @@ func (c *grpcCodec) Decode(ctx context.Context, message remote.Message, in remot
func (c *grpcCodec) Name() string {
return "grpc"
}

func needCheckTrailer(ri rpcinfo.RPCInfo, svcInfo *serviceinfo.ServiceInfo) bool {
// server-side
if svcInfo == nil {
return false
}
methodInfo := svcInfo.MethodInfo(ri.Invocation().MethodName())
// is there possibility that methodInfo is nil?
if methodInfo == nil {
return false
}
return methodInfo.StreamingMode() == serviceinfo.StreamingClient
}
7 changes: 2 additions & 5 deletions pkg/remote/trans/nphttp2/client_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,8 @@ func (f *cliTransHandlerFactory) NewTransHandler(opt *remote.ClientOption) (remo

func newCliTransHandler(opt *remote.ClientOption) (*cliTransHandler, error) {
return &cliTransHandler{
opt: opt,
codec: grpc.NewGRPCCodec(
grpc.WithThriftCodec(opt.PayloadCodec),
grpc.WithServiceInfo(opt.SvcInfo),
),
opt: opt,
codec: grpc.NewGRPCCodec(grpc.WithThriftCodec(opt.PayloadCodec)),
}, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/rpcinfo/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type RPCConfig interface {
TransportProtocol() transport.Protocol
InteractionMode() InteractionMode
PayloadCodec() serviceinfo.PayloadCodec
StreamingMode() StreamingMode
}

// Invocation contains specific information about the call.
Expand Down
8 changes: 8 additions & 0 deletions pkg/rpcinfo/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type MockRPCConfig struct {
IOBufferSizeFunc func() (r int)
TransportProtocolFunc func() transport.Protocol
InteractionModeFunc func() (r rpcinfo.InteractionMode)
StreamingModeFunc func() (r rpcinfo.StreamingMode)
}

func (m *MockRPCConfig) PayloadCodec() serviceinfo.PayloadCodec {
Expand Down Expand Up @@ -90,6 +91,13 @@ func (m *MockRPCConfig) TransportProtocol() (r transport.Protocol) {
return
}

func (m *MockRPCConfig) StreamingMode() (r rpcinfo.StreamingMode) {
if m.StreamingModeFunc != nil {
return m.StreamingModeFunc()
}
return
}

type MockRPCStats struct{}

func (m *MockRPCStats) Record(context.Context, stats.Event, stats.Status, string) {}
Expand Down
1 change: 1 addition & 0 deletions pkg/rpcinfo/mutable.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type MutableRPCConfig interface {
CopyFrom(from RPCConfig)
ImmutableView() RPCConfig
SetPayloadCodec(codec serviceinfo.PayloadCodec)
SetStreamingMode(mode StreamingMode)
}

// MutableRPCStats is used to change the information in the RPCStats.
Expand Down
19 changes: 19 additions & 0 deletions pkg/rpcinfo/rpcconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ const (
Streaming InteractionMode = 2
)

type StreamingMode int32

const (
None StreamingMode = 0b0000
Unary StreamingMode = 0b0001
ClientStreaming StreamingMode = 0b0010
ServerStreaming StreamingMode = 0b0100
BidirectionalStreaming StreamingMode = 0b0110
)

// rpcConfig is a set of configurations used during RPC calls.
type rpcConfig struct {
readOnlyMask int
Expand All @@ -66,6 +76,7 @@ type rpcConfig struct {
transportProtocol transport.Protocol
interactionMode InteractionMode
payloadCodec serviceinfo.PayloadCodec
streamingMode StreamingMode
}

func init() {
Expand Down Expand Up @@ -193,6 +204,14 @@ func (r *rpcConfig) PayloadCodec() serviceinfo.PayloadCodec {
return r.payloadCodec
}

func (r *rpcConfig) SetStreamingMode(mode StreamingMode) {
r.streamingMode = mode
}

func (r *rpcConfig) StreamingMode() StreamingMode {
return r.streamingMode
}

// Clone returns a copy of the current rpcConfig.
func (r *rpcConfig) Clone() MutableRPCConfig {
r2 := rpcConfigPool.Get().(*rpcConfig)
Expand Down

0 comments on commit cbdff65

Please sign in to comment.