diff --git a/addins/rpc/rpcpcsr/callmethod.go b/addins/rpc/rpcpcsr/callmethod.go index b610f4e..63d788e 100644 --- a/addins/rpc/rpcpcsr/callmethod.go +++ b/addins/rpc/rpcpcsr/callmethod.go @@ -20,6 +20,7 @@ package rpcpcsr import ( + "context" "fmt" "git.golaxy.org/core" "git.golaxy.org/core/ec" @@ -42,7 +43,7 @@ var ( callChainRT = reflect.TypeFor[rpcstack.CallChain]() ) -func CallService(svcCtx service.Context, cc rpcstack.CallChain, addInName, method string, args variant.Array) (rets variant.Array, err error) { +func CallService(svcCtx service.Context, cc rpcstack.CallChain, addInName, method string, args variant.Array) (_ variant.Array, err error) { defer func() { if panicErr := types.Panic2Err(recover()); panicErr != nil { err = fmt.Errorf("%w: %w", core.ErrPanicked, panicErr) @@ -87,7 +88,7 @@ func CallService(svcCtx service.Context, cc rpcstack.CallChain, addInName, metho return variant.MakeSerializedArray(methodRV.Call(argsRV)) } -func CallRuntime(svcCtx service.Context, cc rpcstack.CallChain, entityId uid.Id, addInName, method string, args variant.Array) (asyncRet async.AsyncRet, err error) { +func CallRuntime(svcCtx service.Context, cc rpcstack.CallChain, entityId uid.Id, addInName, method string, args variant.Array) (_ async.AsyncRet, err error) { defer func() { if panicErr := types.Panic2Err(recover()); panicErr != nil { err = fmt.Errorf("%w: %w", core.ErrPanicked, panicErr) @@ -134,11 +135,18 @@ func CallRuntime(svcCtx service.Context, cc rpcstack.CallChain, entityId uid.Id, rpcstack.UnsafeRPCStack(stack).PushCallChain(cc) defer rpcstack.UnsafeRPCStack(stack).PopCallChain() - return async.MakeRet(variant.MakeSerializedArray(methodRV.Call(argsRV))) + retsRV := methodRV.Call(argsRV) + if len(retsRV) == 1 { + if asyncRet, ok := retsRV[0].Interface().(async.AsyncRet); ok { + return async.MakeRet(asyncRet, nil) + } + } + + return async.MakeRet(variant.MakeSerializedArray(retsRV)) }), nil } -func CallEntity(svcCtx service.Context, cc rpcstack.CallChain, entityId uid.Id, component, method string, args variant.Array) (asyncRet async.AsyncRet, err error) { +func CallEntity(svcCtx service.Context, cc rpcstack.CallChain, entityId uid.Id, component, method string, args variant.Array) (_ async.AsyncRet, err error) { defer func() { if panicErr := types.Panic2Err(recover()); panicErr != nil { err = fmt.Errorf("%w: %w", core.ErrPanicked, panicErr) @@ -180,7 +188,14 @@ func CallEntity(svcCtx service.Context, cc rpcstack.CallChain, entityId uid.Id, rpcstack.UnsafeRPCStack(stack).PushCallChain(cc) defer rpcstack.UnsafeRPCStack(stack).PopCallChain() - return async.MakeRet(variant.MakeSerializedArray(methodRV.Call(argsRV))) + retsRV := methodRV.Call(argsRV) + if len(retsRV) == 1 { + if asyncRet, ok := retsRV[0].Interface().(async.AsyncRet); ok { + return async.MakeRet(asyncRet, nil) + } + } + + return async.MakeRet(variant.MakeSerializedArray(retsRV)) }), nil } @@ -215,3 +230,29 @@ func parseArgs(methodRV reflect.Value, cc rpcstack.CallChain, args variant.Array return argsRV, nil } + +func waitAsyncRet(ctx context.Context, asyncRet async.AsyncRet) (variant.Array, error) { + for { + ret := asyncRet.Wait(ctx) + if !ret.OK() { + return nil, ret.Error + } + + var ok bool + asyncRet, ok = ret.Value.(async.AsyncRet) + if ok { + continue + } + + if rets, ok := ret.Value.(variant.Array); ok { + return rets, nil + } + + rets, err := variant.MakeSerializedArray([]any{ret.Value}) + if err != nil { + return nil, err + } + + return rets, nil + } +} diff --git a/addins/rpc/rpcpcsr/forward_dispatcher.go b/addins/rpc/rpcpcsr/forward_dispatcher.go index 55d99c7..413e2a0 100644 --- a/addins/rpc/rpcpcsr/forward_dispatcher.go +++ b/addins/rpc/rpcpcsr/forward_dispatcher.go @@ -118,12 +118,12 @@ func (p *_ForwardProcessor) acceptNotify(svc, src, dst, transit string, req *gap } go func() { - ret := asyncRet.Wait(p.svcCtx) - if !ret.OK() { - log.Errorf(p.svcCtx, "rpc notify entity:%q, runtime addIn:%q, method:%q calls failed, src:%q, dst:%q, transit:%q, path:%q, %s", cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path, ret.Error) + rets, err := waitAsyncRet(p.svcCtx, asyncRet) + if err != nil { + log.Errorf(p.svcCtx, "rpc notify entity:%q, runtime addIn:%q, method:%q calls failed, src:%q, dst:%q, transit:%q, path:%q, %s", cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path, err) } else { log.Debugf(p.svcCtx, "rpc notify entity:%q, runtime addIn:%q, method:%q calls finished, src:%q, dst:%q, transit:%q, path:%q,", cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path) - ret.Value.(variant.Array).Release() + rets.Release() } }() @@ -137,12 +137,12 @@ func (p *_ForwardProcessor) acceptNotify(svc, src, dst, transit string, req *gap } go func() { - ret := asyncRet.Wait(p.svcCtx) - if !ret.OK() { - log.Errorf(p.svcCtx, "rpc notify entity:%q, component:%q, method:%q calls failed, src:%q, dst:%q, transit:%q, path:%q, %s", cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path, ret.Error) + rets, err := waitAsyncRet(p.svcCtx, asyncRet) + if err != nil { + log.Errorf(p.svcCtx, "rpc notify entity:%q, component:%q, method:%q calls failed, src:%q, dst:%q, transit:%q, path:%q, %s", cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path, err) } else { log.Debugf(p.svcCtx, "rpc notify entity:%q, component:%q, method:%q calls finished, src:%q, dst:%q, transit:%q, path:%q", cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path) - ret.Value.(variant.Array).Release() + rets.Release() } }() @@ -200,25 +200,13 @@ func (p *_ForwardProcessor) acceptRequest(svc, src, dst, transit string, req *ga } go func() { - var ret async.Ret - - for { - ret = asyncRet.Wait(p.svcCtx) - if !ret.OK() { - break - } - asyncRet, _ = ret.Value.(async.AsyncRet) - if asyncRet == nil { - break - } - } - - if !ret.OK() { - log.Errorf(p.svcCtx, "rpc request(%d) entity:%q, runtime addIn:%q, method:%q calls failed, src:%q, dst:%q, transit:%q, path:%q, %s", req.CorrId, cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path, ret.Error) - p.reply(src, transit, req.CorrId, nil, ret.Error) + rets, err := waitAsyncRet(p.svcCtx, asyncRet) + if err != nil { + log.Errorf(p.svcCtx, "rpc request(%d) entity:%q, runtime addIn:%q, method:%q calls failed, src:%q, dst:%q, transit:%q, path:%q, %s", req.CorrId, cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path, err) + p.reply(src, transit, req.CorrId, nil, err) } else { log.Debugf(p.svcCtx, "rpc request(%d) entity:%q, runtime addIn:%q, method:%q calls finished, src:%q, dst:%q, transit:%q, path:%q", req.CorrId, cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path) - p.reply(src, transit, req.CorrId, ret.Value.(variant.Array), nil) + p.reply(src, transit, req.CorrId, rets, nil) } }() @@ -233,25 +221,13 @@ func (p *_ForwardProcessor) acceptRequest(svc, src, dst, transit string, req *ga } go func() { - var ret async.Ret - - for { - ret = asyncRet.Wait(p.svcCtx) - if !ret.OK() { - break - } - asyncRet, _ = ret.Value.(async.AsyncRet) - if asyncRet == nil { - break - } - } - - if !ret.OK() { - log.Errorf(p.svcCtx, "rpc request(%d) entity:%q, component:%q, method:%q calls failed, src:%q, dst:%q, transit:%q, path:%q, %s", req.CorrId, cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path, ret.Error) - p.reply(src, transit, req.CorrId, nil, ret.Error) + rets, err := waitAsyncRet(p.svcCtx, asyncRet) + if err != nil { + log.Errorf(p.svcCtx, "rpc request(%d) entity:%q, component:%q, method:%q calls failed, src:%q, dst:%q, transit:%q, path:%q, %s", req.CorrId, cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path, err) + p.reply(src, transit, req.CorrId, nil, err) } else { log.Debugf(p.svcCtx, "rpc request(%d) entity:%q, component:%q, method:%q calls finished, src:%q, dst:%q, transit:%q, path:%q", req.CorrId, cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path) - p.reply(src, transit, req.CorrId, ret.Value.(variant.Array), nil) + p.reply(src, transit, req.CorrId, rets, nil) } }() diff --git a/addins/rpc/rpcpcsr/service_dispatcher.go b/addins/rpc/rpcpcsr/service_dispatcher.go index 03b2b84..79c82ad 100644 --- a/addins/rpc/rpcpcsr/service_dispatcher.go +++ b/addins/rpc/rpcpcsr/service_dispatcher.go @@ -97,12 +97,12 @@ func (p *_ServiceProcessor) acceptNotify(svc, src string, req *gap.MsgOnewayRPC) } go func() { - ret := asyncRet.Wait(p.svcCtx) - if !ret.OK() { - log.Errorf(p.svcCtx, "rpc notify entity:%q, runtime addIn:%q, method:%q calls failed, %s", cp.Id, cp.Script, cp.Method, ret.Error) + rets, err := waitAsyncRet(p.svcCtx, asyncRet) + if err != nil { + log.Errorf(p.svcCtx, "rpc notify entity:%q, runtime addIn:%q, method:%q calls failed, %s", cp.Id, cp.Script, cp.Method, err) } else { log.Debugf(p.svcCtx, "rpc notify entity:%q, runtime addIn:%q, method:%q calls finished", cp.Id, cp.Script, cp.Method) - ret.Value.(variant.Array).Release() + rets.Release() } }() @@ -116,12 +116,12 @@ func (p *_ServiceProcessor) acceptNotify(svc, src string, req *gap.MsgOnewayRPC) } go func() { - ret := asyncRet.Wait(p.svcCtx) - if !ret.OK() { - log.Errorf(p.svcCtx, "rpc notify entity:%q, component:%q, method:%q calls failed, %s", cp.Id, cp.Script, cp.Method, ret.Error) + rets, err := waitAsyncRet(p.svcCtx, asyncRet) + if err != nil { + log.Errorf(p.svcCtx, "rpc notify entity:%q, component:%q, method:%q calls failed, %s", cp.Id, cp.Script, cp.Method, err) } else { log.Debugf(p.svcCtx, "rpc notify entity:%q, component:%q, method:%q calls finished", cp.Id, cp.Script, cp.Method) - ret.Value.(variant.Array).Release() + rets.Release() } }() @@ -178,25 +178,13 @@ func (p *_ServiceProcessor) acceptRequest(svc, src string, req *gap.MsgRPCReques } go func() { - var ret async.Ret - - for { - ret = asyncRet.Wait(p.svcCtx) - if !ret.OK() { - break - } - asyncRet, _ = ret.Value.(async.AsyncRet) - if asyncRet == nil { - break - } - } - - if !ret.OK() { - log.Errorf(p.svcCtx, "rpc request(%d) entity:%q, runtime addIn:%q, method:%q calls failed, %s", req.CorrId, cp.Id, cp.Script, cp.Method, ret.Error) - p.reply(src, req.CorrId, nil, ret.Error) + rets, err := waitAsyncRet(p.svcCtx, asyncRet) + if err != nil { + log.Errorf(p.svcCtx, "rpc request(%d) entity:%q, runtime addIn:%q, method:%q calls failed, %s", req.CorrId, cp.Id, cp.Script, cp.Method, err) + p.reply(src, req.CorrId, nil, err) } else { log.Debugf(p.svcCtx, "rpc request(%d) entity:%q, runtime addIn:%q, method:%q calls finished", req.CorrId, cp.Id, cp.Script, cp.Method) - p.reply(src, req.CorrId, ret.Value.(variant.Array), nil) + p.reply(src, req.CorrId, rets, nil) } }() @@ -211,25 +199,13 @@ func (p *_ServiceProcessor) acceptRequest(svc, src string, req *gap.MsgRPCReques } go func() { - var ret async.Ret - - for { - ret = asyncRet.Wait(p.svcCtx) - if !ret.OK() { - break - } - asyncRet, _ = ret.Value.(async.AsyncRet) - if asyncRet == nil { - break - } - } - - if !ret.OK() { - log.Errorf(p.svcCtx, "rpc request(%d) entity:%q, component:%q, method:%q calls failed, %s", req.CorrId, cp.Id, cp.Script, cp.Method, ret.Error) - p.reply(src, req.CorrId, nil, ret.Error) + rets, err := waitAsyncRet(p.svcCtx, asyncRet) + if err != nil { + log.Errorf(p.svcCtx, "rpc request(%d) entity:%q, component:%q, method:%q calls failed, %s", req.CorrId, cp.Id, cp.Script, cp.Method, err) + p.reply(src, req.CorrId, nil, err) } else { log.Debugf(p.svcCtx, "rpc request(%d) entity:%q, component:%q, method:%q calls finished", req.CorrId, cp.Id, cp.Script, cp.Method) - p.reply(src, req.CorrId, ret.Value.(variant.Array), nil) + p.reply(src, req.CorrId, rets, nil) } }()