Skip to content

Commit

Permalink
fix concurrent rpc call
Browse files Browse the repository at this point in the history
  • Loading branch information
icexin committed Oct 7, 2023
1 parent f6953ce commit 6217f2b
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 8 deletions.
16 changes: 8 additions & 8 deletions protocol/brpc-std/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,14 @@ func (c *codec) Write(meta *metapb.RpcMeta, x interface{}, cw compressWriter) er
//
// user must not retain the returned buffer
func (c *codec) readBuffer(n int) ([]byte, error) {
if n < c.r.Size() {
buf, err := c.r.Peek(n)
if err != nil {
return nil, err
}
c.r.Discard(n)
return buf, nil
}
// if n < c.r.Size() {
// buf, err := c.r.Peek(n)
// if err != nil {
// return nil, err
// }
// c.r.Discard(n)
// return buf, nil
// }
buf := make([]byte, n)
_, err := io.ReadFull(c.r, buf)
return buf, err
Expand Down
1 change: 1 addition & 0 deletions protocol/brpc-std/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (s *server) ServeConn(conn net.Conn) {
defer func() {
r := recover()
if r != nil {
log.Printf("panic:%v", r)
debug.PrintStack()
s.sendResponse(writeLock, codec, fmt.Errorf("panic:%v", r), req, nil, nil)
}
Expand Down
88 changes: 88 additions & 0 deletions protocol/brpc-std/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package bstd

import (
"context"
"io"
"net"
"testing"

"github.com/icexin/brpc-go"
"github.com/icexin/brpc-go/examples/echo"
)

type echoService struct {
echo.UnimplementedEchoServerServer
}

func (s *echoService) Echo(ctx context.Context, req *echo.EchoRequest) (*echo.EchoResponse, error) {
return &echo.EchoResponse{
Message: "reply: " + req.Message,
}, nil
}

func newTestServer() (func(), string) {
l, err := net.Listen("tcp", ":0")
if err != nil {
panic(err)
}
server := brpc.NewServer(ProtocolName)
echo.RegisterEchoServerServer(server, &echoService{})
go server.Serve(l)

addr := l.Addr().String()
return func() {
l.Close()
}, addr
}

func newTestClient(addr string) (func(), echo.EchoServerClient) {
conn, err := brpc.Dial(ProtocolName, addr)
if err != nil {
panic(err)
}
closefunc := func() {
conn.(io.Closer).Close()
}
return closefunc, echo.NewEchoServerClient(conn)
}

func TestServer(t *testing.T) {
closefunc, addr := newTestServer()
defer closefunc()

closefunc, client := newTestClient(addr)
defer closefunc()

resp, err := client.Echo(context.Background(), &echo.EchoRequest{
Message: "hello",
})
if err != nil {
t.Fatal(err)
}
if resp.Message != "reply: hello" {
t.Fatalf("unexpected response: %s", resp.Message)
}
}

func BenchmarkServer(b *testing.B) {
closefunc, addr := newTestServer()
defer closefunc()

closefunc, client := newTestClient(addr)
defer closefunc()

b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
resp, err := client.Echo(context.Background(), &echo.EchoRequest{
Message: "hello",
})
if err != nil {
b.Fatal(err)
}
if resp.Message != "reply: hello" {
b.Fatalf("unexpected response: %s", resp.Message)
}
}
})
}

0 comments on commit 6217f2b

Please sign in to comment.