diff --git a/ants/official_exampal_1.go b/ants/official_exampal_1.go new file mode 100644 index 0000000..7333eb4 --- /dev/null +++ b/ants/official_exampal_1.go @@ -0,0 +1,93 @@ +package main + +import ( + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/panjf2000/ants/v2" +) + +var sum int32 + +func myFunc(i interface{}) { + n := i.(int32) + atomic.AddInt32(&sum, n) + fmt.Printf("run with %d\n", n) +} + +func demoFunc() { + time.Sleep(10 * time.Millisecond) + fmt.Println("Hello World!") +} + +func main() { + defer ants.Release() + + runTimes := 1000 + + // Use the common pool. + var wg sync.WaitGroup + syncCalculateSum := func() { + demoFunc() + wg.Done() + } + for i := 0; i < runTimes; i++ { + wg.Add(1) + _ = ants.Submit(syncCalculateSum) + } + wg.Wait() + fmt.Printf("running goroutines: %d\n", ants.Running()) + fmt.Printf("finish all tasks.\n") + + // Use the pool with a function, + // set 10 to the capacity of goroutine pool and 1 second for expired duration. + p, _ := ants.NewPoolWithFunc(10, func(i interface{}) { + myFunc(i) + wg.Done() + }) + defer p.Release() + // Submit tasks one by one. + for i := 0; i < runTimes; i++ { + wg.Add(1) + _ = p.Invoke(int32(i)) + } + + wg.Wait() + fmt.Printf("running goroutines: %d\n", p.Running()) + fmt.Printf("finish all tasks, result is %d\n", sum) + if sum != 499500 { + panic("the final result is wrong!!!") + } + + // Use the MultiPool and set the capacity of the 10 goroutine pools to unlimited. + // If you use -1 as the pool size parameter, the size will be unlimited. + // There are two load-balancing algorithms for pools: ants.RoundRobin and ants.LeastTasks. + mp, _ := ants.NewMultiPool(10, -1, ants.RoundRobin) + defer mp.ReleaseTimeout(5 * time.Second) + for i := 0; i < runTimes; i++ { + wg.Add(1) + _ = mp.Submit(syncCalculateSum) + } + wg.Wait() + fmt.Printf("running goroutines: %d\n", mp.Running()) + fmt.Printf("finish all tasks.\n") + + // Use the MultiPoolFunc and set the capacity of 10 goroutine pools to (runTimes/10). + mpf, _ := ants.NewMultiPoolWithFunc(10, runTimes/10, func(i interface{}) { + myFunc(i) + wg.Done() + }, ants.LeastTasks) + defer mpf.ReleaseTimeout(5 * time.Second) + for i := 0; i < runTimes; i++ { + wg.Add(1) + _ = mpf.Invoke(int32(i)) + } + wg.Wait() + fmt.Printf("running goroutines: %d\n", mpf.Running()) + fmt.Printf("finish all tasks, result is %d\n", sum) + if sum != 499500*2 { + panic("the final result is wrong!!!") + } +} diff --git a/ants/panic_example.go b/ants/panic_example.go new file mode 100644 index 0000000..1d3c11d --- /dev/null +++ b/ants/panic_example.go @@ -0,0 +1,42 @@ +package main + +import ( + "fmt" + "github.com/panjf2000/ants/v2" + "log" + "sync" + "time" +) + +func wrapper(i int, wg *sync.WaitGroup) func() { + return func() { + fmt.Printf("hello from task:%d\n", i) + if i%2 == 0 { + panic(fmt.Sprintf("panic from task:%d", i)) + } + wg.Done() + } +} + +func ServicePanicHandler(info interface{}) { + c := fmt.Sprintf("%#v", info) + log.Println("ServicePanicHandler handler: ", c) + +} + +func main() { + p, _ := ants.NewPool(2, ants.WithPanicHandler(ServicePanicHandler)) + defer p.Release() + + var wg sync.WaitGroup + wg.Add(3) + for i := 1; i <= 2; i++ { + p.Submit(wrapper(i, &wg)) + } + + time.Sleep(1 * time.Second) + p.Submit(wrapper(3, &wg)) + p.Submit(wrapper(5, &wg)) + p.Submit(wrapper(6, &wg)) + wg.Wait() +} diff --git a/ants/reademe.md b/ants/reademe.md index 4945250..0cb7200 100644 --- a/ants/reademe.md +++ b/ants/reademe.md @@ -8,4 +8,11 @@ Golang 提供的 go 关键字能很方便地将多个协程塞在一个进程中 +# 一些参数 +1. ExpiryDuration:过期时间。表示 goroutine 空闲多长时间之后会被ants池回收 +2. PreAlloc:预分配。调用NewPool()/NewPoolWithFunc()之后预分配worker(管理一个工作 goroutine 的结构体)切片。而且使用预分配与否会直接影响池中管理worker的结构。 +3. MaxBlockingTasks:最大阻塞任务数量。即池中 goroutine 数量已到池容量,且所有 goroutine 都处理繁忙状态,这时到来的任务会在阻塞列表等待。这个选项设置的是列表的最大长度。阻塞的任务数量达到这个值后,后续任务提交直接返回失败 +4. Nonblocking:池是否阻塞,默认阻塞。提交任务时,如果ants池中 goroutine 已到上限且全部繁忙,阻塞的池会将任务添加的阻塞列表等待(当然受限于阻塞列表长度,见上一个选项)。非阻塞的池直接返回失败 +5. PanicHandler:panic 处理。遇到 **panic 会调用这里设置的处理函数** +6. Logger:指定日志记录器 diff --git a/cc/a.go b/cc/a.go index 31563e0..ef56652 100644 --- a/cc/a.go +++ b/cc/a.go @@ -71,48 +71,56 @@ func pack2uhex(size int, data interface{}) []byte { } } -func main() { - //data1 := uint64(42) - //data2 := uint64(1024) - //data3 := []uint64{255, 42} - //data4 := uint64(123456789) - //data5 := []uint64{123456789, 255} - //data6 := []uint64{65536, 42} - //data7 := []uint64{65536, 42, 255} - //data8 := uint64(12345678901234567890) - //data9 := []uint64{12345678901234567890, 255} - //data10 := []uint64{12345678901234567890, 65535} - //data11 := []uint64{12345678901234567890, 65535, 255} - - data1 := uint64(99) - data2 := uint64(2048) - data3 := []uint64{128, 99} - data4 := uint64(987654321) - data5 := []uint64{987654321, 128} - data6 := []uint64{8192, 99} - data7 := []uint64{8192, 99, 128} - data8 := uint64(9876543210123456789) - data9 := []uint64{9876543210123456789, 128} - data10 := []uint64{9876543210123456789, 32767} - data11 := []uint64{9876543210123456789, 32767, 128} - - testData := map[int]interface{}{ - 1: data1, - 2: data2, - 3: data3, - 4: data4, - 5: data5, - 6: data6, - 7: data7, - 8: data8, - 9: data9, - 10: data10, - 11: data11, - } +// +//func main() { +// //data1 := uint64(42) +// //data2 := uint64(1024) +// //data3 := []uint64{255, 42} +// //data4 := uint64(123456789) +// //data5 := []uint64{123456789, 255} +// //data6 := []uint64{65536, 42} +// //data7 := []uint64{65536, 42, 255} +// //data8 := uint64(12345678901234567890) +// //data9 := []uint64{12345678901234567890, 255} +// //data10 := []uint64{12345678901234567890, 65535} +// //data11 := []uint64{12345678901234567890, 65535, 255} +// +// data1 := uint64(99) +// data2 := uint64(2048) +// data3 := []uint64{128, 99} +// data4 := uint64(987654321) +// data5 := []uint64{987654321, 128} +// data6 := []uint64{8192, 99} +// data7 := []uint64{8192, 99, 128} +// data8 := uint64(9876543210123456789) +// data9 := []uint64{9876543210123456789, 128} +// data10 := []uint64{9876543210123456789, 32767} +// data11 := []uint64{9876543210123456789, 32767, 128} +// +// testData := map[int]interface{}{ +// 1: data1, +// 2: data2, +// 3: data3, +// 4: data4, +// 5: data5, +// 6: data6, +// 7: data7, +// 8: data8, +// 9: data9, +// 10: data10, +// 11: data11, +// } +// +// for size := 1; size <= 11; size++ { +// data := testData[size] +// result := pack2uhex(size, data) +// fmt.Printf("size=%d, data=%v, result=%v\n", size, data, result) +// } +//} - for size := 1; size <= 11; size++ { - data := testData[size] - result := pack2uhex(size, data) - fmt.Printf("size=%d, data=%v, result=%v\n", size, data, result) - } +func main() { + var a any = nil + var c *int = nil + fmt.Println(a == c) + fmt.Printf("%#v", a) } diff --git a/sync_/main.go b/sync_/main.go new file mode 100644 index 0000000..7eb56da --- /dev/null +++ b/sync_/main.go @@ -0,0 +1 @@ +package sync_ diff --git a/sync_/semaphore_test.go b/sync_/semaphore_test.go new file mode 100644 index 0000000..6dbfd23 --- /dev/null +++ b/sync_/semaphore_test.go @@ -0,0 +1,22 @@ +package sync_ + +import ( + "fmt" + "testing" + "unsafe" +) + +type Aa struct { + a int + b float64 + c []int +} + +func TestSemaphore(t *testing.T) { + c := Aa{} + fmt.Println(unsafe.Sizeof(c)) + c.a = 55 + c.b = 99 + fmt.Println(unsafe.Sizeof(c)) + +}