-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgrouped.go
88 lines (74 loc) · 1.68 KB
/
grouped.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// Copyright 2019 John Papandriopoulos. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package concurrent
import (
"runtime"
"sync"
"sync/atomic"
)
// RunGroupedErr the given func f concurrently using up to the specified number of
// maxThreads, or equal to the number of CPUs on the system if passed zero.
// The number of jobs is given by count, and the range of jobs [m, n) are
// passed to the callback f.
func RunGroupedErr(count, maxThreads int, f func(m, n int) error) error {
if count == 0 {
return nil
}
if count == 1 {
// Run it on the current goroutine.
return f(0, 1)
}
if maxThreads == 0 {
maxThreads = runtime.NumCPU()
}
if maxThreads > count {
maxThreads = count
}
if maxThreads == 1 {
return f(0, count)
}
countPerThread := count / maxThreads
m := 0
n := countPerThread
q := make(chan struct{}, maxThreads)
nerr := uint32(0)
var firstErr error
var wg sync.WaitGroup
for i := 0; i < count; i += countPerThread {
if atomic.LoadUint32(&nerr) > 0 {
break
}
// Block if we exceed maxThreads
q <- struct{}{}
wg.Add(1)
// Run job
go func(a, b int) {
err := f(a, b)
if err != nil {
if atomic.AddUint32(&nerr, 1) == 1 {
firstErr = err
}
}
// We're done
wg.Done()
<-q
}(m, n)
m += countPerThread
n += countPerThread
if n > count {
// Truncate the last job, as required
n = count
}
}
wg.Wait()
close(q)
return firstErr
}
// RunGrouped is like RunGroupedErr but without errors.
func RunGrouped(count, maxThreads int, f func(m, n int)) {
RunGroupedErr(count, maxThreads, func(m, n int) error {
f(m, n)
return nil
})
}