Skip to content

Commit

Permalink
实体与运行时RPC支持异步返回值
Browse files Browse the repository at this point in the history
  • Loading branch information
pangdogs committed Jan 17, 2025
1 parent 3d11384 commit 713d10c
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 4 deletions.
28 changes: 26 additions & 2 deletions addins/rpc/rpcpcsr/forward_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,19 @@ func (p *_ForwardProcessor) acceptRequest(svc, src, dst, transit string, req *ga
}

go func() {
ret := asyncRet.Wait(p.svcCtx)
var ret async.Ret

for {
ret = asyncRet.Wait(p.svcCtx)
if !ret.OK() {
break
}
asyncRet, _ = ret.Value.(async.AsyncRet)
if asyncRet == nil {
break
}
}

if !ret.OK() {
log.Errorf(p.svcCtx, "rpc request(%d) entity:%q, runtime addIn:%q, method:%q calls failed, src:%q, dst:%q, transit:%q, path:%q, %s", req.CorrId, cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path, ret.Error)
p.reply(src, transit, req.CorrId, nil, ret.Error)
Expand All @@ -221,7 +233,19 @@ func (p *_ForwardProcessor) acceptRequest(svc, src, dst, transit string, req *ga
}

go func() {
ret := asyncRet.Wait(p.svcCtx)
var ret async.Ret

for {
ret = asyncRet.Wait(p.svcCtx)
if !ret.OK() {
break
}
asyncRet, _ = ret.Value.(async.AsyncRet)
if asyncRet == nil {
break
}
}

if !ret.OK() {
log.Errorf(p.svcCtx, "rpc request(%d) entity:%q, component:%q, method:%q calls failed, src:%q, dst:%q, transit:%q, path:%q, %s", req.CorrId, cp.Id, cp.Script, cp.Method, src, dst, transit, req.Path, ret.Error)
p.reply(src, transit, req.CorrId, nil, ret.Error)
Expand Down
28 changes: 26 additions & 2 deletions addins/rpc/rpcpcsr/service_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,19 @@ func (p *_ServiceProcessor) acceptRequest(svc, src string, req *gap.MsgRPCReques
}

go func() {
ret := asyncRet.Wait(p.svcCtx)
var ret async.Ret

for {
ret = asyncRet.Wait(p.svcCtx)
if !ret.OK() {
break
}
asyncRet, _ = ret.Value.(async.AsyncRet)
if asyncRet == nil {
break
}
}

if !ret.OK() {
log.Errorf(p.svcCtx, "rpc request(%d) entity:%q, runtime addIn:%q, method:%q calls failed, %s", req.CorrId, cp.Id, cp.Script, cp.Method, ret.Error)
p.reply(src, req.CorrId, nil, ret.Error)
Expand All @@ -199,7 +211,19 @@ func (p *_ServiceProcessor) acceptRequest(svc, src string, req *gap.MsgRPCReques
}

go func() {
ret := asyncRet.Wait(p.svcCtx)
var ret async.Ret

for {
ret = asyncRet.Wait(p.svcCtx)
if !ret.OK() {
break
}
asyncRet, _ = ret.Value.(async.AsyncRet)
if asyncRet == nil {
break
}
}

if !ret.OK() {
log.Errorf(p.svcCtx, "rpc request(%d) entity:%q, component:%q, method:%q calls failed, %s", req.CorrId, cp.Id, cp.Script, cp.Method, ret.Error)
p.reply(src, req.CorrId, nil, ret.Error)
Expand Down

0 comments on commit 713d10c

Please sign in to comment.