From 6217f2ba71900468f273d79c0c166885e7a4b403 Mon Sep 17 00:00:00 2001 From: icexin Date: Sat, 7 Oct 2023 15:43:48 +0800 Subject: [PATCH] fix concurrent rpc call --- protocol/brpc-std/codec.go | 16 +++--- protocol/brpc-std/server.go | 1 + protocol/brpc-std/server_test.go | 88 ++++++++++++++++++++++++++++++++ 3 files changed, 97 insertions(+), 8 deletions(-) create mode 100644 protocol/brpc-std/server_test.go diff --git a/protocol/brpc-std/codec.go b/protocol/brpc-std/codec.go index 6dc74c4..fcd18e5 100644 --- a/protocol/brpc-std/codec.go +++ b/protocol/brpc-std/codec.go @@ -133,14 +133,14 @@ func (c *codec) Write(meta *metapb.RpcMeta, x interface{}, cw compressWriter) er // // user must not retain the returned buffer func (c *codec) readBuffer(n int) ([]byte, error) { - if n < c.r.Size() { - buf, err := c.r.Peek(n) - if err != nil { - return nil, err - } - c.r.Discard(n) - return buf, nil - } + // if n < c.r.Size() { + // buf, err := c.r.Peek(n) + // if err != nil { + // return nil, err + // } + // c.r.Discard(n) + // return buf, nil + // } buf := make([]byte, n) _, err := io.ReadFull(c.r, buf) return buf, err diff --git a/protocol/brpc-std/server.go b/protocol/brpc-std/server.go index 2b464cb..14dee4a 100644 --- a/protocol/brpc-std/server.go +++ b/protocol/brpc-std/server.go @@ -64,6 +64,7 @@ func (s *server) ServeConn(conn net.Conn) { defer func() { r := recover() if r != nil { + log.Printf("panic:%v", r) debug.PrintStack() s.sendResponse(writeLock, codec, fmt.Errorf("panic:%v", r), req, nil, nil) } diff --git a/protocol/brpc-std/server_test.go b/protocol/brpc-std/server_test.go new file mode 100644 index 0000000..3d5bafa --- /dev/null +++ b/protocol/brpc-std/server_test.go @@ -0,0 +1,88 @@ +package bstd + +import ( + "context" + "io" + "net" + "testing" + + "github.com/icexin/brpc-go" + "github.com/icexin/brpc-go/examples/echo" +) + +type echoService struct { + echo.UnimplementedEchoServerServer +} + +func (s *echoService) Echo(ctx context.Context, req *echo.EchoRequest) (*echo.EchoResponse, error) { + return &echo.EchoResponse{ + Message: "reply: " + req.Message, + }, nil +} + +func newTestServer() (func(), string) { + l, err := net.Listen("tcp", ":0") + if err != nil { + panic(err) + } + server := brpc.NewServer(ProtocolName) + echo.RegisterEchoServerServer(server, &echoService{}) + go server.Serve(l) + + addr := l.Addr().String() + return func() { + l.Close() + }, addr +} + +func newTestClient(addr string) (func(), echo.EchoServerClient) { + conn, err := brpc.Dial(ProtocolName, addr) + if err != nil { + panic(err) + } + closefunc := func() { + conn.(io.Closer).Close() + } + return closefunc, echo.NewEchoServerClient(conn) +} + +func TestServer(t *testing.T) { + closefunc, addr := newTestServer() + defer closefunc() + + closefunc, client := newTestClient(addr) + defer closefunc() + + resp, err := client.Echo(context.Background(), &echo.EchoRequest{ + Message: "hello", + }) + if err != nil { + t.Fatal(err) + } + if resp.Message != "reply: hello" { + t.Fatalf("unexpected response: %s", resp.Message) + } +} + +func BenchmarkServer(b *testing.B) { + closefunc, addr := newTestServer() + defer closefunc() + + closefunc, client := newTestClient(addr) + defer closefunc() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + resp, err := client.Echo(context.Background(), &echo.EchoRequest{ + Message: "hello", + }) + if err != nil { + b.Fatal(err) + } + if resp.Message != "reply: hello" { + b.Fatalf("unexpected response: %s", resp.Message) + } + } + }) +}