Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/config #26

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,4 @@ https://github.com/NetEase-Media/easy-ngo-examples

## 微信交流群
欢迎大家扫描二维码加入我们
![微信群](https://netease-media.github.io/easy-ngo-website/assets/images/Wechateasyngo-592e98ca3cabaa61781c9481983fda14.jpeg)
![微信群](https://netease-media.github.io/easy-ngo-website/assets/images/Wechateasyngo-819b541c30f12dd6a5779298eccd7884.jpeg)
35 changes: 35 additions & 0 deletions clients/xgoroutinepool/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# xgoroutinepool

## Introduction

#### warning
we refer these code from [gopool]("https://github.com/bytedance/gopkg/tree/develop/util/gopool")

`gopool` is a high-performance goroutine pool which aims to reuse goroutines and limit the number of goroutines.

It is an alternative to the `go` keyword.

## Features

- High Performance
- Auto-recovering Panics
- Limit Goroutine Numbers
- Reuse Goroutine Stack

## QuickStart

Just replace your `go func(){...}` with `gopool.Go(func(){...})`.

old:
```go
go func() {
// do your job
}()
```

new:
```go
gopool.Go(func(){
/// do your job
})
```
35 changes: 35 additions & 0 deletions clients/xgoroutinepool/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2021 ByteDance Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package xgoroutinepool

const (
defaultScalaThreshold = 1
)

// Config is used to config pool.
type Config struct {
// threshold for scale.
// new goroutine is created if len(task chan) > ScaleThreshold.
// defaults to defaultScalaThreshold.
ScaleThreshold int32
}

// NewConfig creates a default Config.
func NewConfig() *Config {
c := &Config{
ScaleThreshold: defaultScalaThreshold,
}
return c
}
5 changes: 5 additions & 0 deletions clients/xgoroutinepool/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module github.com/NetEase-Media/easy-ngo/clients/xgoroutinepool

go 1.18

require github.com/bytedance/gopkg v0.0.0-20230531144706-a12972768317
15 changes: 15 additions & 0 deletions clients/xgoroutinepool/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
github.com/bytedance/gopkg v0.0.0-20230531144706-a12972768317 h1:SReMVmTCeJ5Nf0hU8nyWu7gAaFVD8mu5yvSH/+uLT1E=
github.com/bytedance/gopkg v0.0.0-20230531144706-a12972768317/go.mod h1:FtQG3YbQG9L/91pbKSw787yBQPutC+457AvDW77fgUQ=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/net v0.0.0-20221014081412-f15817d10f9b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
80 changes: 80 additions & 0 deletions clients/xgoroutinepool/gopool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2021 ByteDance Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package xgoroutinepool

import (
"context"
"fmt"
"math"
"sync"
)

// defaultPool is the global default pool.
var defaultPool Pool

var poolMap sync.Map

func init() {
defaultPool = NewPool("gopool.DefaultPool", math.MaxInt32, NewConfig())
}

// Go is an alternative to the go keyword, which is able to recover panic.
// gopool.Go(func(arg interface{}){
// ...
// }(nil))
func Go(f func()) {
CtxGo(context.Background(), f)
}

// CtxGo is preferred than Go.
func CtxGo(ctx context.Context, f func()) {
defaultPool.CtxGo(ctx, f)
}

// SetCap is not recommended to be called, this func changes the global pool's capacity which will affect other callers.
func SetCap(cap int32) {
defaultPool.SetCap(cap)
}

// SetPanicHandler sets the panic handler for the global pool.
func SetPanicHandler(f func(context.Context, interface{})) {
defaultPool.SetPanicHandler(f)
}

// WorkerCount returns the number of global default pool's running workers
func WorkerCount() int32 {
return defaultPool.WorkerCount()
}

// RegisterPool registers a new pool to the global map.
// GetPool can be used to get the registered pool by name.
// returns error if the same name is registered.
func RegisterPool(p Pool) error {
_, loaded := poolMap.LoadOrStore(p.Name(), p)
if loaded {
return fmt.Errorf("name: %s already registered", p.Name())
}
return nil
}

// GetPool gets the registered pool by name.
// Returns nil if not registered.
func GetPool(name string) Pool {
p, ok := poolMap.Load(name)
if !ok {
return nil
}
return p.(Pool)
}
21 changes: 21 additions & 0 deletions clients/xgoroutinepool/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2023 NetEase Media Technology(Beijing)Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package xgoroutinepool

type Option struct {
Name string // 客户端名称,需要唯一
Capacity int32 // 协程池上限
ScaleThreshold int32 //
}
162 changes: 162 additions & 0 deletions clients/xgoroutinepool/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright 2021 ByteDance Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package xgoroutinepool

import (
"context"
"sync"
"sync/atomic"
)

type Pool interface {
// Name returns the corresponding pool name.
Name() string
// SetCap sets the goroutine capacity of the pool.
SetCap(cap int32)
// Go executes f.
Go(f func())
// CtxGo executes f and accepts the context.
CtxGo(ctx context.Context, f func())
// SetPanicHandler sets the panic handler.
SetPanicHandler(f func(context.Context, interface{}))
// WorkerCount returns the number of running workers
WorkerCount() int32
}

var taskPool sync.Pool

func init() {
taskPool.New = newTask
}

type task struct {
ctx context.Context
f func()

next *task
}

func (t *task) zero() {
t.ctx = nil
t.f = nil
t.next = nil
}

func (t *task) Recycle() {
t.zero()
taskPool.Put(t)
}

func newTask() interface{} {
return &task{}
}

type taskList struct {
sync.Mutex
taskHead *task
taskTail *task
}

type pool struct {
// The name of the pool
name string

// capacity of the pool, the maximum number of goroutines that are actually working
cap int32
// Configuration information
config *Config
// linked list of tasks
taskHead *task
taskTail *task
taskLock sync.Mutex
taskCount int32

// Record the number of running workers
workerCount int32

// This method will be called when the worker panic
panicHandler func(context.Context, interface{})
}

func New(opt *Option) Pool {
return NewPool(opt.Name, opt.Capacity, &Config{
ScaleThreshold: opt.ScaleThreshold,
})
}

// NewPool creates a new pool with the given name, cap and config.
func NewPool(name string, cap int32, config *Config) Pool {
p := &pool{
name: name,
cap: cap,
config: config,
}
return p
}

func (p *pool) Name() string {
return p.name
}

func (p *pool) SetCap(cap int32) {
atomic.StoreInt32(&p.cap, cap)
}

func (p *pool) Go(f func()) {
p.CtxGo(context.Background(), f)
}

func (p *pool) CtxGo(ctx context.Context, f func()) {
t := taskPool.Get().(*task)
t.ctx = ctx
t.f = f
p.taskLock.Lock()
if p.taskHead == nil {
p.taskHead = t
p.taskTail = t
} else {
p.taskTail.next = t
p.taskTail = t
}
p.taskLock.Unlock()
atomic.AddInt32(&p.taskCount, 1)
// The following two conditions are met:
// 1. the number of tasks is greater than the threshold.
// 2. The current number of workers is less than the upper limit p.cap.
// or there are currently no workers.
if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {
p.incWorkerCount()
w := workerPool.Get().(*worker)
w.pool = p
w.run()
}
}

// SetPanicHandler the func here will be called after the panic has been recovered.
func (p *pool) SetPanicHandler(f func(context.Context, interface{})) {
p.panicHandler = f
}

func (p *pool) WorkerCount() int32 {
return atomic.LoadInt32(&p.workerCount)
}

func (p *pool) incWorkerCount() {
atomic.AddInt32(&p.workerCount, 1)
}

func (p *pool) decWorkerCount() {
atomic.AddInt32(&p.workerCount, -1)
}
Loading