Skip to content

Commit

Permalink
支持RPC异步调用
Browse files Browse the repository at this point in the history
  • Loading branch information
pangdogs committed Jan 20, 2025
1 parent 8340cee commit 26a4ab0
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 89 deletions.
51 changes: 46 additions & 5 deletions addins/rpc/rpcpcsr/callmethod.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package rpcpcsr

import (
"context"
"fmt"
"git.golaxy.org/core"
"git.golaxy.org/core/ec"
Expand All @@ -42,7 +43,7 @@ var (
callChainRT = reflect.TypeFor[rpcstack.CallChain]()
)

func CallService(svcCtx service.Context, cc rpcstack.CallChain, addInName, method string, args variant.Array) (rets variant.Array, err error) {
func CallService(svcCtx service.Context, cc rpcstack.CallChain, addInName, method string, args variant.Array) (_ variant.Array, err error) {
defer func() {
if panicErr := types.Panic2Err(recover()); panicErr != nil {
err = fmt.Errorf("%w: %w", core.ErrPanicked, panicErr)
Expand Down Expand Up @@ -87,7 +88,7 @@ func CallService(svcCtx service.Context, cc rpcstack.CallChain, addInName, metho
return variant.MakeSerializedArray(methodRV.Call(argsRV))
}

func CallRuntime(svcCtx service.Context, cc rpcstack.CallChain, entityId uid.Id, addInName, method string, args variant.Array) (asyncRet async.AsyncRet, err error) {
func CallRuntime(svcCtx service.Context, cc rpcstack.CallChain, entityId uid.Id, addInName, method string, args variant.Array) (_ async.AsyncRet, err error) {
defer func() {
if panicErr := types.Panic2Err(recover()); panicErr != nil {
err = fmt.Errorf("%w: %w", core.ErrPanicked, panicErr)
Expand Down Expand Up @@ -134,11 +135,18 @@ func CallRuntime(svcCtx service.Context, cc rpcstack.CallChain, entityId uid.Id,
rpcstack.UnsafeRPCStack(stack).PushCallChain(cc)
defer rpcstack.UnsafeRPCStack(stack).PopCallChain()

return async.MakeRet(variant.MakeSerializedArray(methodRV.Call(argsRV)))
retsRV := methodRV.Call(argsRV)
if len(retsRV) == 1 {
if asyncRet, ok := retsRV[0].Interface().(async.AsyncRet); ok {
return async.MakeRet(asyncRet, nil)
}
}

return async.MakeRet(variant.MakeSerializedArray(retsRV))
}), nil
}

func CallEntity(svcCtx service.Context, cc rpcstack.CallChain, entityId uid.Id, component, method string, args variant.Array) (asyncRet async.AsyncRet, err error) {
func CallEntity(svcCtx service.Context, cc rpcstack.CallChain, entityId uid.Id, component, method string, args variant.Array) (_ async.AsyncRet, err error) {
defer func() {
if panicErr := types.Panic2Err(recover()); panicErr != nil {
err = fmt.Errorf("%w: %w", core.ErrPanicked, panicErr)
Expand Down Expand Up @@ -180,7 +188,14 @@ func CallEntity(svcCtx service.Context, cc rpcstack.CallChain, entityId uid.Id,
rpcstack.UnsafeRPCStack(stack).PushCallChain(cc)
defer rpcstack.UnsafeRPCStack(stack).PopCallChain()

return async.MakeRet(variant.MakeSerializedArray(methodRV.Call(argsRV)))
retsRV := methodRV.Call(argsRV)
if len(retsRV) == 1 {
if asyncRet, ok := retsRV[0].Interface().(async.AsyncRet); ok {
return async.MakeRet(asyncRet, nil)
}
}

return async.MakeRet(variant.MakeSerializedArray(retsRV))
}), nil
}

Expand Down Expand Up @@ -215,3 +230,29 @@ func parseArgs(methodRV reflect.Value, cc rpcstack.CallChain, args variant.Array

return argsRV, nil
}

func waitAsyncRet(ctx context.Context, asyncRet async.AsyncRet) (variant.Array, error) {
for {
ret := asyncRet.Wait(ctx)
if !ret.OK() {
return nil, ret.Error
}

var ok bool
asyncRet, ok = ret.Value.(async.AsyncRet)
if ok {
continue
}

if rets, ok := ret.Value.(variant.Array); ok {
return rets, nil
}

rets, err := variant.MakeSerializedArray([]any{ret.Value})
if err != nil {
return nil, err
}

return rets, nil
}
}
60 changes: 18 additions & 42 deletions addins/rpc/rpcpcsr/forward_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ func (p *_ForwardProcessor) acceptNotify(svc, src, dst, transit string, req *gap
}

go func() {
ret := asyncRet.Wait(p.svcCtx)
if !ret.OK() {
log.Errorf(p.svcCtx, "rpc notify entity:%q, runtime addIn:%q, method:%q calls failed, src:%q, dst:%q, transit:%q, path:%q, %s", cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path, ret.Error)
rets, err := waitAsyncRet(p.svcCtx, asyncRet)
if err != nil {
log.Errorf(p.svcCtx, "rpc notify entity:%q, runtime addIn:%q, method:%q calls failed, src:%q, dst:%q, transit:%q, path:%q, %s", cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path, err)
} else {
log.Debugf(p.svcCtx, "rpc notify entity:%q, runtime addIn:%q, method:%q calls finished, src:%q, dst:%q, transit:%q, path:%q,", cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path)
ret.Value.(variant.Array).Release()
rets.Release()
}
}()

Expand All @@ -137,12 +137,12 @@ func (p *_ForwardProcessor) acceptNotify(svc, src, dst, transit string, req *gap
}

go func() {
ret := asyncRet.Wait(p.svcCtx)
if !ret.OK() {
log.Errorf(p.svcCtx, "rpc notify entity:%q, component:%q, method:%q calls failed, src:%q, dst:%q, transit:%q, path:%q, %s", cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path, ret.Error)
rets, err := waitAsyncRet(p.svcCtx, asyncRet)
if err != nil {
log.Errorf(p.svcCtx, "rpc notify entity:%q, component:%q, method:%q calls failed, src:%q, dst:%q, transit:%q, path:%q, %s", cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path, err)
} else {
log.Debugf(p.svcCtx, "rpc notify entity:%q, component:%q, method:%q calls finished, src:%q, dst:%q, transit:%q, path:%q", cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path)
ret.Value.(variant.Array).Release()
rets.Release()
}
}()

Expand Down Expand Up @@ -200,25 +200,13 @@ func (p *_ForwardProcessor) acceptRequest(svc, src, dst, transit string, req *ga
}

go func() {
var ret async.Ret

for {
ret = asyncRet.Wait(p.svcCtx)
if !ret.OK() {
break
}
asyncRet, _ = ret.Value.(async.AsyncRet)
if asyncRet == nil {
break
}
}

if !ret.OK() {
log.Errorf(p.svcCtx, "rpc request(%d) entity:%q, runtime addIn:%q, method:%q calls failed, src:%q, dst:%q, transit:%q, path:%q, %s", req.CorrId, cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path, ret.Error)
p.reply(src, transit, req.CorrId, nil, ret.Error)
rets, err := waitAsyncRet(p.svcCtx, asyncRet)
if err != nil {
log.Errorf(p.svcCtx, "rpc request(%d) entity:%q, runtime addIn:%q, method:%q calls failed, src:%q, dst:%q, transit:%q, path:%q, %s", req.CorrId, cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path, err)
p.reply(src, transit, req.CorrId, nil, err)
} else {
log.Debugf(p.svcCtx, "rpc request(%d) entity:%q, runtime addIn:%q, method:%q calls finished, src:%q, dst:%q, transit:%q, path:%q", req.CorrId, cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path)
p.reply(src, transit, req.CorrId, ret.Value.(variant.Array), nil)
p.reply(src, transit, req.CorrId, rets, nil)
}
}()

Expand All @@ -233,25 +221,13 @@ func (p *_ForwardProcessor) acceptRequest(svc, src, dst, transit string, req *ga
}

go func() {
var ret async.Ret

for {
ret = asyncRet.Wait(p.svcCtx)
if !ret.OK() {
break
}
asyncRet, _ = ret.Value.(async.AsyncRet)
if asyncRet == nil {
break
}
}

if !ret.OK() {
log.Errorf(p.svcCtx, "rpc request(%d) entity:%q, component:%q, method:%q calls failed, src:%q, dst:%q, transit:%q, path:%q, %s", req.CorrId, cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path, ret.Error)
p.reply(src, transit, req.CorrId, nil, ret.Error)
rets, err := waitAsyncRet(p.svcCtx, asyncRet)
if err != nil {
log.Errorf(p.svcCtx, "rpc request(%d) entity:%q, component:%q, method:%q calls failed, src:%q, dst:%q, transit:%q, path:%q, %s", req.CorrId, cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path, err)
p.reply(src, transit, req.CorrId, nil, err)
} else {
log.Debugf(p.svcCtx, "rpc request(%d) entity:%q, component:%q, method:%q calls finished, src:%q, dst:%q, transit:%q, path:%q", req.CorrId, cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path)
p.reply(src, transit, req.CorrId, ret.Value.(variant.Array), nil)
p.reply(src, transit, req.CorrId, rets, nil)
}
}()

Expand Down
60 changes: 18 additions & 42 deletions addins/rpc/rpcpcsr/service_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,12 @@ func (p *_ServiceProcessor) acceptNotify(svc, src string, req *gap.MsgOnewayRPC)
}

go func() {
ret := asyncRet.Wait(p.svcCtx)
if !ret.OK() {
log.Errorf(p.svcCtx, "rpc notify entity:%q, runtime addIn:%q, method:%q calls failed, %s", cp.Id, cp.Script, cp.Method, ret.Error)
rets, err := waitAsyncRet(p.svcCtx, asyncRet)
if err != nil {
log.Errorf(p.svcCtx, "rpc notify entity:%q, runtime addIn:%q, method:%q calls failed, %s", cp.Id, cp.Script, cp.Method, err)
} else {
log.Debugf(p.svcCtx, "rpc notify entity:%q, runtime addIn:%q, method:%q calls finished", cp.Id, cp.Script, cp.Method)
ret.Value.(variant.Array).Release()
rets.Release()
}
}()

Expand All @@ -116,12 +116,12 @@ func (p *_ServiceProcessor) acceptNotify(svc, src string, req *gap.MsgOnewayRPC)
}

go func() {
ret := asyncRet.Wait(p.svcCtx)
if !ret.OK() {
log.Errorf(p.svcCtx, "rpc notify entity:%q, component:%q, method:%q calls failed, %s", cp.Id, cp.Script, cp.Method, ret.Error)
rets, err := waitAsyncRet(p.svcCtx, asyncRet)
if err != nil {
log.Errorf(p.svcCtx, "rpc notify entity:%q, component:%q, method:%q calls failed, %s", cp.Id, cp.Script, cp.Method, err)
} else {
log.Debugf(p.svcCtx, "rpc notify entity:%q, component:%q, method:%q calls finished", cp.Id, cp.Script, cp.Method)
ret.Value.(variant.Array).Release()
rets.Release()
}
}()

Expand Down Expand Up @@ -178,25 +178,13 @@ func (p *_ServiceProcessor) acceptRequest(svc, src string, req *gap.MsgRPCReques
}

go func() {
var ret async.Ret

for {
ret = asyncRet.Wait(p.svcCtx)
if !ret.OK() {
break
}
asyncRet, _ = ret.Value.(async.AsyncRet)
if asyncRet == nil {
break
}
}

if !ret.OK() {
log.Errorf(p.svcCtx, "rpc request(%d) entity:%q, runtime addIn:%q, method:%q calls failed, %s", req.CorrId, cp.Id, cp.Script, cp.Method, ret.Error)
p.reply(src, req.CorrId, nil, ret.Error)
rets, err := waitAsyncRet(p.svcCtx, asyncRet)
if err != nil {
log.Errorf(p.svcCtx, "rpc request(%d) entity:%q, runtime addIn:%q, method:%q calls failed, %s", req.CorrId, cp.Id, cp.Script, cp.Method, err)
p.reply(src, req.CorrId, nil, err)
} else {
log.Debugf(p.svcCtx, "rpc request(%d) entity:%q, runtime addIn:%q, method:%q calls finished", req.CorrId, cp.Id, cp.Script, cp.Method)
p.reply(src, req.CorrId, ret.Value.(variant.Array), nil)
p.reply(src, req.CorrId, rets, nil)
}
}()

Expand All @@ -211,25 +199,13 @@ func (p *_ServiceProcessor) acceptRequest(svc, src string, req *gap.MsgRPCReques
}

go func() {
var ret async.Ret

for {
ret = asyncRet.Wait(p.svcCtx)
if !ret.OK() {
break
}
asyncRet, _ = ret.Value.(async.AsyncRet)
if asyncRet == nil {
break
}
}

if !ret.OK() {
log.Errorf(p.svcCtx, "rpc request(%d) entity:%q, component:%q, method:%q calls failed, %s", req.CorrId, cp.Id, cp.Script, cp.Method, ret.Error)
p.reply(src, req.CorrId, nil, ret.Error)
rets, err := waitAsyncRet(p.svcCtx, asyncRet)
if err != nil {
log.Errorf(p.svcCtx, "rpc request(%d) entity:%q, component:%q, method:%q calls failed, %s", req.CorrId, cp.Id, cp.Script, cp.Method, err)
p.reply(src, req.CorrId, nil, err)
} else {
log.Debugf(p.svcCtx, "rpc request(%d) entity:%q, component:%q, method:%q calls finished", req.CorrId, cp.Id, cp.Script, cp.Method)
p.reply(src, req.CorrId, ret.Value.(variant.Array), nil)
p.reply(src, req.CorrId, rets, nil)
}
}()

Expand Down

0 comments on commit 26a4ab0

Please sign in to comment.