Skip to content

Commit

Permalink
支持削减callpath大小,减少rpc消耗
Browse files Browse the repository at this point in the history
  • Loading branch information
pangdogs committed Sep 28, 2024
1 parent 57011db commit b9009e8
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 22 deletions.
2 changes: 1 addition & 1 deletion plugins/rpc/rpc_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type _Option struct{}

func (_Option) Default() option.Setting[RPCOptions] {
return func(options *RPCOptions) {
With.Processors(rpcpcsr.NewServiceProcessor(nil))(options)
With.Processors(rpcpcsr.NewServiceProcessor(nil, true))(options)
}
}

Expand Down
31 changes: 18 additions & 13 deletions plugins/rpc/rpcli/creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,19 @@ import (

// CreateRPCli 创建RPC客户端
func CreateRPCli() RPCliCreator {
return RPCliCreator{}
return RPCliCreator{
rttSampling: 3,
msgCreator: gap.DefaultMsgCreator(),
reduceCP: true,
}
}

// RPCliCreator RPC客户端构建器
type RPCliCreator struct {
settings []option.Setting[cli.ClientOptions]
rttSampling int
msgCreator gap.IMsgCreator
reduceCP bool
mainProc IProcedure
}

Expand Down Expand Up @@ -226,8 +231,8 @@ func (ctor RPCliCreator) AuthExtensions(extensions []byte) RPCliCreator {
return ctor
}

func (ctor RPCliCreator) ZapLogger(logger *zap.Logger) RPCliCreator {
ctor.settings = append(ctor.settings, cli.With.ZapLogger(logger))
func (ctor RPCliCreator) ReduceCP(b bool) RPCliCreator {
ctor.reduceCP = b
return ctor
}

Expand All @@ -240,6 +245,11 @@ func (ctor RPCliCreator) MainProcedure(proc any) RPCliCreator {
return ctor
}

func (ctor RPCliCreator) ZapLogger(logger *zap.Logger) RPCliCreator {
ctor.settings = append(ctor.settings, cli.With.ZapLogger(logger))
return ctor
}

func (ctor RPCliCreator) Connect(ctx context.Context, endpoint string) (*RPCli, error) {
client, err := cli.Connect(ctx, endpoint, ctor.settings...)
if err != nil {
Expand All @@ -248,7 +258,7 @@ func (ctor RPCliCreator) Connect(ctx context.Context, endpoint string) (*RPCli,

var remoteTime *cli.ResponseTime

for range max(3, ctor.rttSampling) {
for range ctor.rttSampling {
respTime := <-client.RequestTime(ctx)
if !respTime.OK() {
return nil, respTime.Error
Expand All @@ -265,19 +275,14 @@ func (ctor RPCliCreator) Connect(ctx context.Context, endpoint string) (*RPCli,

rpcli := &RPCli{
Client: client,
remoteTime: *remoteTime,
encoder: codec.MakeEncoder(),
}

if ctor.msgCreator != nil {
rpcli.decoder = codec.MakeDecoder(ctor.msgCreator)
} else {
rpcli.decoder = codec.MakeDecoder(gap.DefaultMsgCreator())
decoder: codec.MakeDecoder(ctor.msgCreator),
remoteTime: *remoteTime,
reduceCP: ctor.reduceCP,
}

if ctor.mainProc != nil {
ctor.mainProc.init(rpcli, Main, ctor.mainProc)
rpcli.procs.Add(Main, ctor.mainProc)
rpcli.AddProcedure(Main, ctor.mainProc)
}

rpcli.WatchData(context.Background(), generic.MakeDelegateFunc1(rpcli.handleRecvData))
Expand Down
5 changes: 3 additions & 2 deletions plugins/rpc/rpcli/rpcli.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type RPCli struct {
encoder codec.Encoder
decoder codec.Decoder
remoteTime cli.ResponseTime
reduceCP bool
procs generic.SliceMap[string, IProcedure]
}

Expand All @@ -70,7 +71,7 @@ func (c *RPCli) RPC(service, comp, method string, args ...any) async.AsyncRet {
Method: method,
}

cpbs, err := cp.Encode(false)
cpbs, err := cp.Encode(c.reduceCP)
if err != nil {
future.Cancel(err)
return ret.ToAsyncRet()
Expand Down Expand Up @@ -124,7 +125,7 @@ func (c *RPCli) OnewayRPC(service, comp, method string, args ...any) error {
Method: method,
}

cpbs, err := cp.Encode(false)
cpbs, err := cp.Encode(c.reduceCP)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion plugins/rpc/rpcpcsr/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ import (
type PermissionValidator = generic.DelegateFunc2[rpcstack.CallChain, callpath.CallPath, bool]

// NewForwardProcessor RPC转发处理器,用于S<->G的通信
func NewForwardProcessor(transitService string, mc gap.IMsgCreator, permValidator PermissionValidator) any {
func NewForwardProcessor(transitService string, mc gap.IMsgCreator, permValidator PermissionValidator, reduceCP bool) any {
return &_ForwardProcessor{
encoder: codec.MakeEncoder(),
decoder: codec.MakeDecoder(mc),
transitService: transitService,
permValidator: permValidator,
reduceCP: reduceCP,
}
}

Expand All @@ -56,6 +57,7 @@ type _ForwardProcessor struct {
transitService string
transitBroadcastAddr string
permValidator PermissionValidator
reduceCP bool
watcher dsvc.IWatcher
}

Expand Down
4 changes: 2 additions & 2 deletions plugins/rpc/rpcpcsr/forward_deliverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (p *_ForwardProcessor) Request(svcCtx service.Context, dst string, cc rpcst
return ret.ToAsyncRet()
}

cpbs, err := cp.Encode(false)
cpbs, err := cp.Encode(p.reduceCP)
if err != nil {
future.Cancel(err)
return ret.ToAsyncRet()
Expand Down Expand Up @@ -116,7 +116,7 @@ func (p *_ForwardProcessor) Notify(svcCtx service.Context, dst string, cc rpcsta
return err
}

cpbs, err := cp.Encode(false)
cpbs, err := cp.Encode(p.reduceCP)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion plugins/rpc/rpcpcsr/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ import (
)

// NewServiceProcessor 创建分布式服务间的RPC处理器
func NewServiceProcessor(permValidator PermissionValidator) any {
func NewServiceProcessor(permValidator PermissionValidator, reduceCP bool) any {
return &_ServiceProcessor{
permValidator: permValidator,
reduceCP: reduceCP,
}
}

Expand All @@ -41,6 +42,7 @@ type _ServiceProcessor struct {
dist dsvc.IDistService
watcher dsvc.IWatcher
permValidator PermissionValidator
reduceCP bool
}

// Init 初始化
Expand Down
4 changes: 2 additions & 2 deletions plugins/rpc/rpcpcsr/service_deliverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (p *_ServiceProcessor) Request(svcCtx service.Context, dst string, cc rpcst
return ret.ToAsyncRet()
}

cpbs, err := cp.Encode(false)
cpbs, err := cp.Encode(p.reduceCP)
if err != nil {
future.Cancel(err)
return ret.ToAsyncRet()
Expand Down Expand Up @@ -88,7 +88,7 @@ func (p *_ServiceProcessor) Notify(svcCtx service.Context, dst string, cc rpcsta
return err
}

cpbs, err := cp.Encode(false)
cpbs, err := cp.Encode(p.reduceCP)
if err != nil {
return err
}
Expand Down

0 comments on commit b9009e8

Please sign in to comment.