Skip to content

Commit

Permalink
修改代码
Browse files Browse the repository at this point in the history
  • Loading branch information
pangdogs committed Aug 10, 2024
1 parent ddcd3aa commit d163358
Show file tree
Hide file tree
Showing 19 changed files with 897 additions and 869 deletions.
58 changes: 22 additions & 36 deletions async.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,66 +27,52 @@ import (
"git.golaxy.org/core/utils/async"
"git.golaxy.org/core/utils/generic"
"time"
_ "unsafe"
)

//go:linkname getCaller git.golaxy.org/core/runtime.getCaller
func getCaller(provider ictx.ConcurrentContextProvider) async.Caller

// Async 异步执行代码,有返回值
func Async(provider ictx.ConcurrentContextProvider, fun generic.FuncVar1[runtime.Context, any, async.Ret], va ...any) async.AsyncRet {
ctx := getCaller(provider)
return ctx.Call(func(va ...any) async.Ret {
ctx := va[0].(runtime.Context)
fun := va[1].(generic.FuncVar1[runtime.Context, any, async.Ret])
funVa := va[2].([]any)
return fun.Exec(ctx, funVa...)
}, ctx, fun, va)
func Async(provider ictx.ConcurrentContextProvider, fun generic.FuncVar1[runtime.Context, any, async.Ret], args ...any) async.AsyncRet {
ctx := runtime.UnsafeConcurrentContext(runtime.Concurrent(provider)).GetContext()
return ctx.Call(func(...any) async.Ret { return fun.Exec(ctx, args...) })
}

// AsyncVoid 异步执行代码,无返回值
func AsyncVoid(provider ictx.ConcurrentContextProvider, fun generic.ActionVar1[runtime.Context, any], va ...any) async.AsyncRet {
ctx := getCaller(provider)
return ctx.CallVoid(func(va ...any) {
ctx := va[0].(runtime.Context)
fun := va[1].(generic.ActionVar1[runtime.Context, any])
funVa := va[2].([]any)
fun.Exec(ctx, funVa...)
}, ctx, fun, va)
func AsyncVoid(provider ictx.ConcurrentContextProvider, fun generic.ActionVar1[runtime.Context, any], args ...any) async.AsyncRet {
ctx := runtime.UnsafeConcurrentContext(runtime.Concurrent(provider)).GetContext()
return ctx.CallVoid(func(...any) { fun.Exec(ctx, args...) })
}

// Go 使用新线程执行代码,有返回值
func Go(ctx context.Context, fun generic.FuncVar1[context.Context, any, async.Ret], va ...any) async.AsyncRet {
func Go(ctx context.Context, fun generic.FuncVar1[context.Context, any, async.Ret], args ...any) async.AsyncRet {
if ctx == nil {
ctx = context.Background()
}

asyncRet := async.MakeAsyncRet()

go func(fun generic.FuncVar1[context.Context, any, async.Ret], ctx context.Context, va []any, asyncRet chan async.Ret) {
ret, panicErr := fun.Invoke(ctx, va...)
go func() {
ret, panicErr := fun.Invoke(ctx, args...)
if panicErr != nil {
ret.Error = panicErr
}
asyncRet <- ret
close(asyncRet)
}(fun, ctx, va, asyncRet)
}()

return asyncRet
}

// GoVoid 使用新线程执行代码,无返回值
func GoVoid(ctx context.Context, fun generic.ActionVar1[context.Context, any], va ...any) async.AsyncRet {
func GoVoid(ctx context.Context, fun generic.ActionVar1[context.Context, any], args ...any) async.AsyncRet {
if ctx == nil {
ctx = context.Background()
}

asyncRet := async.MakeAsyncRet()

go func(fun generic.ActionVar1[context.Context, any], ctx context.Context, va []any, asyncRet chan async.Ret) {
asyncRet <- async.MakeRet(nil, fun.Invoke(ctx, va...))
go func() {
asyncRet <- async.MakeRet(nil, fun.Invoke(ctx, args...))
close(asyncRet)
}(fun, ctx, va, asyncRet)
}()

return asyncRet
}
Expand All @@ -99,7 +85,7 @@ func TimeAfter(ctx context.Context, dur time.Duration) async.AsyncRet {

asyncRet := async.MakeAsyncRet()

go func(ctx context.Context, dur time.Duration, asyncRet chan async.Ret) {
go func() {
timer := time.NewTimer(dur)
defer timer.Stop()

Expand All @@ -111,7 +97,7 @@ func TimeAfter(ctx context.Context, dur time.Duration) async.AsyncRet {
}

close(asyncRet)
}(ctx, dur, asyncRet)
}()

return asyncRet
}
Expand All @@ -124,7 +110,7 @@ func TimeAt(ctx context.Context, at time.Time) async.AsyncRet {

asyncRet := async.MakeAsyncRet()

go func(ctx context.Context, at time.Time, asyncRet chan async.Ret) {
go func() {
timer := time.NewTimer(time.Until(at))
defer timer.Stop()

Expand All @@ -136,7 +122,7 @@ func TimeAt(ctx context.Context, at time.Time) async.AsyncRet {
}

close(asyncRet)
}(ctx, at, asyncRet)
}()

return asyncRet
}
Expand All @@ -149,7 +135,7 @@ func TimeTick(ctx context.Context, dur time.Duration) async.AsyncRet {

asyncRet := async.MakeAsyncRet()

go func(ctx context.Context, dur time.Duration, asyncRet chan async.Ret) {
go func() {
tick := time.NewTicker(dur)
defer tick.Stop()

Expand All @@ -168,7 +154,7 @@ func TimeTick(ctx context.Context, dur time.Duration) async.AsyncRet {
}

close(asyncRet)
}(ctx, dur, asyncRet)
}()

return asyncRet
}
Expand All @@ -185,7 +171,7 @@ func ReadChan[T any](ctx context.Context, ch <-chan T) async.AsyncRet {

asyncRet := async.MakeAsyncRet()

go func(ctx context.Context, ch <-chan T, asyncRet chan async.Ret) {
go func() {
loop:
for {
select {
Expand All @@ -203,7 +189,7 @@ func ReadChan[T any](ctx context.Context, ch <-chan T) async.AsyncRet {
}
}
close(asyncRet)
}(ctx, ch, asyncRet)
}()

return asyncRet
}
94 changes: 33 additions & 61 deletions await.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,16 @@ import (
"git.golaxy.org/core/utils/generic"
"sync"
"sync/atomic"
_ "unsafe"
)

var (
ErrAllFailures = fmt.Errorf("%w: all of async result failures", ErrCore)
)

//go:linkname getRuntimeContext git.golaxy.org/core/runtime.getRuntimeContext
func getRuntimeContext(provider ictx.CurrentContextProvider) runtime.Context

// Await 异步等待结果返回
func Await(provider ictx.CurrentContextProvider, asyncRet ...async.AsyncRet) AwaitDirector {
return AwaitDirector{
rtCtx: getRuntimeContext(provider),
rtCtx: runtime.Current(provider),
asyncRets: asyncRet,
}
}
Expand All @@ -53,7 +49,7 @@ type AwaitDirector struct {
}

// Any 异步等待任意一个结果返回
func (ad AwaitDirector) Any(fun generic.ActionVar2[runtime.Context, async.Ret, any], va ...any) {
func (ad AwaitDirector) Any(fun generic.ActionVar2[runtime.Context, async.Ret, any], args ...any) {
if ad.rtCtx == nil {
panic(fmt.Errorf("%w: setting rtCtx is nil", ErrCore))
}
Expand All @@ -71,9 +67,7 @@ func (ad AwaitDirector) Any(fun generic.ActionVar2[runtime.Context, async.Ret, a
continue
}

go func(b *atomic.Bool, ctx context.Context, cancel context.CancelFunc,
asyncRet async.AsyncRet, rtCtx runtime.Context, fun generic.ActionVar2[runtime.Context, async.Ret, any], va []any) {

go func() {
ret := asyncRet.Wait(ctx)

if !b.CompareAndSwap(false, true) {
Expand All @@ -82,19 +76,15 @@ func (ad AwaitDirector) Any(fun generic.ActionVar2[runtime.Context, async.Ret, a

cancel()

rtCtx.CallVoid(func(va ...any) {
rtCtx := va[0].(runtime.Context)
fun := va[1].(generic.ActionVar2[runtime.Context, async.Ret, any])
ret := va[2].(async.Ret)
funVa := va[3].([]any)
fun.Exec(rtCtx, ret, funVa...)
}, rtCtx, fun, ret, va)
}(&b, ctx, cancel, asyncRet, ad.rtCtx, fun, va)
ad.rtCtx.CallVoid(func(...any) {
fun.Exec(ad.rtCtx, ret, args...)
})
}()
}
}

// AnyOK 异步等待任意一个结果成功返回
func (ad AwaitDirector) AnyOK(fun generic.ActionVar2[runtime.Context, async.Ret, any], va ...any) {
func (ad AwaitDirector) AnyOK(fun generic.ActionVar2[runtime.Context, async.Ret, any], args ...any) {
if ad.rtCtx == nil {
panic(fmt.Errorf("%w: setting rtCtx is nil", ErrCore))
}
Expand All @@ -114,8 +104,7 @@ func (ad AwaitDirector) AnyOK(fun generic.ActionVar2[runtime.Context, async.Ret,
}

wg.Add(1)
go func(wg *sync.WaitGroup, b *atomic.Bool, ctx context.Context, cancel context.CancelFunc,
asyncRet async.AsyncRet, rtCtx runtime.Context, fun generic.ActionVar2[runtime.Context, async.Ret, any], va []any) {
go func() {
defer wg.Done()

ret := asyncRet.Wait(ctx)
Expand All @@ -129,34 +118,27 @@ func (ad AwaitDirector) AnyOK(fun generic.ActionVar2[runtime.Context, async.Ret,

cancel()

rtCtx.CallVoid(func(va ...any) {
rtCtx := va[0].(runtime.Context)
fun := va[1].(generic.ActionVar2[runtime.Context, async.Ret, any])
ret := va[2].(async.Ret)
funVa := va[3].([]any)
fun.Exec(rtCtx, ret, funVa...)
}, rtCtx, fun, ret, va)
}(&wg, &b, ctx, cancel, asyncRet, ad.rtCtx, fun, va)
ad.rtCtx.CallVoid(func(...any) {
fun.Exec(ad.rtCtx, ret, args...)
})
}()
}

go func(wg *sync.WaitGroup, b *atomic.Bool, rtCtx runtime.Context, fun generic.ActionVar2[runtime.Context, async.Ret, any], va []any) {
go func() {
wg.Wait()

if b.Load() {
return
}

rtCtx.CallVoid(func(va ...any) {
rtCtx := va[0].(runtime.Context)
fun := va[1].(generic.ActionVar2[runtime.Context, async.Ret, any])
funVa := va[2].([]any)
fun.Exec(rtCtx, async.MakeRet(nil, ErrAllFailures), funVa...)
}, rtCtx, fun, va)
}(&wg, &b, ad.rtCtx, fun, va)
ad.rtCtx.CallVoid(func(...any) {
fun.Exec(ad.rtCtx, async.MakeRet(nil, ErrAllFailures), args...)
})
}()
}

// All 异步等待所有结果返回
func (ad AwaitDirector) All(fun generic.ActionVar2[runtime.Context, []async.Ret, any], va ...any) {
func (ad AwaitDirector) All(fun generic.ActionVar2[runtime.Context, []async.Ret, any], args ...any) {
if ad.rtCtx == nil {
panic(fmt.Errorf("%w: setting rtCtx is nil", ErrCore))
}
Expand All @@ -166,7 +148,6 @@ func (ad AwaitDirector) All(fun generic.ActionVar2[runtime.Context, []async.Ret,
}

var wg sync.WaitGroup
ctx, cancel := context.WithCancel(ad.rtCtx)
rets := make([]async.Ret, len(ad.asyncRets))

for i := range ad.asyncRets {
Expand All @@ -176,27 +157,22 @@ func (ad AwaitDirector) All(fun generic.ActionVar2[runtime.Context, []async.Ret,
}

wg.Add(1)
go func(wg *sync.WaitGroup, ctx context.Context, cancel context.CancelFunc, ret *async.Ret, asyncRet async.AsyncRet) {
go func(ret *async.Ret) {
defer wg.Done()
*ret = asyncRet.Wait(ctx)
}(&wg, ctx, cancel, &rets[i], asyncRet)
*ret = asyncRet.Wait(ad.rtCtx)
}(&rets[i])
}

go func(wg *sync.WaitGroup, rtCtx runtime.Context, fun generic.ActionVar2[runtime.Context, []async.Ret, any], rets []async.Ret, va []any) {
go func() {
wg.Wait()

rtCtx.CallVoid(func(va ...any) {
rtCtx := va[0].(runtime.Context)
fun := va[1].(generic.ActionVar2[runtime.Context, []async.Ret, any])
rets := va[2].([]async.Ret)
funVa := va[3].([]any)
fun.Exec(rtCtx, rets, funVa...)
}, rtCtx, fun, rets, va)
}(&wg, ad.rtCtx, fun, rets, va)
ad.rtCtx.CallVoid(func(...any) {
fun.Exec(ad.rtCtx, rets, args...)
})
}()
}

// Pipe 异步等待管道返回
func (ad AwaitDirector) Pipe(ctx context.Context, fun generic.ActionVar2[runtime.Context, async.Ret, any], va ...any) {
func (ad AwaitDirector) Pipe(ctx context.Context, fun generic.ActionVar2[runtime.Context, async.Ret, any], args ...any) {
if ctx == nil {
ctx = context.Background()
}
Expand All @@ -215,24 +191,20 @@ func (ad AwaitDirector) Pipe(ctx context.Context, fun generic.ActionVar2[runtime
continue
}

go func(ctx context.Context, rtCtx runtime.Context, asyncRet async.AsyncRet, fun generic.ActionVar2[runtime.Context, async.Ret, any], va []any) {
go func() {
for {
select {
case ret, ok := <-asyncRet:
if !ok {
return
}
rtCtx.CallVoid(func(va ...any) {
rtCtx := va[0].(runtime.Context)
fun := va[1].(generic.ActionVar2[runtime.Context, async.Ret, any])
ret := va[2].(async.Ret)
funVa := va[3].([]any)
fun.Exec(rtCtx, ret, funVa...)
}, rtCtx, fun, ret, va)
ad.rtCtx.CallVoid(func(...any) {
fun.Exec(ad.rtCtx, ret, args...)
})
case <-ctx.Done():
return
}
}
}(ctx, ad.rtCtx, asyncRet, fun, va)
}()
}
}
1 change: 1 addition & 0 deletions ec/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func UnsafeNewEntity(options EntityOptions) Entity {
// Entity 实体接口
type Entity interface {
iEntity
iConcurrentEntity
iComponentMgr
iTreeNode
ictx.CurrentContextProvider
Expand Down
9 changes: 9 additions & 0 deletions ec/entity_concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

// ConcurrentEntity 多线程安全的实体接口
type ConcurrentEntity interface {
iConcurrentEntity
ictx.ConcurrentContextProvider
context.Context
fmt.Stringer
Expand All @@ -37,3 +38,11 @@ type ConcurrentEntity interface {
// GetPrototype 获取实体原型
GetPrototype() string
}

type iConcurrentEntity interface {
getEntity() Entity
}

func (entity *EntityBehavior) getEntity() Entity {
return entity.opts.CompositeFace.Iface
}
Loading

0 comments on commit d163358

Please sign in to comment.