Skip to content

Commit

Permalink
Add desired option
Browse files Browse the repository at this point in the history
  • Loading branch information
zoncoen committed Sep 1, 2017
1 parent a37398b commit 4cab9f6
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 3 deletions.
12 changes: 12 additions & 0 deletions examle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,15 @@ func ExampleConcurrency() {
// Output:
// wait 2s
}

func ExampleDesired() {
g, ctx, _ := racegroup.WithContext(context.Background(), racegroup.Desired(2))
g.Go(wait(ctx, 3*time.Second))
g.Go(wait(ctx, 2*time.Second))
g.Go(wait(ctx, 1*time.Second))
g.Wait()

// Output:
// wait 1s
// wait 2s
}
11 changes: 11 additions & 0 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,14 @@ func Concurrency(i int) Option {
return nil
}
}

// Desired returns an Option that sets number of desired completed tasks.
func Desired(i int) Option {
return func(g *Group) error {
if i < 1 {
return errors.New("desired option must be greater than zero")
}
g.desired = int64(i)
return nil
}
}
12 changes: 9 additions & 3 deletions racegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package racegroup
import (
"context"
"sync"
"sync/atomic"
)

// A Group is a collection of goroutines working on subtasks.
Expand All @@ -14,12 +15,14 @@ type Group struct {

errHandler func(error)
semaphore chan struct{}
desired int64
completed int64
}

// WithContext returns a new Group and an associated Context derived from ctx.
func WithContext(ctx context.Context, opts ...Option) (*Group, context.Context, error) {
ctx, cancel := context.WithCancel(ctx)
g := &Group{cancel: cancel}
g := &Group{cancel: cancel, desired: 1}
for _, opt := range opts {
if err := opt(g); err != nil {
return nil, nil, err
Expand All @@ -38,7 +41,8 @@ func (g *Group) Wait() {

// Go calls the given function in a new goroutine.
//
// The first call to return a nil error cancels the group.
// If more than or equal to desired count subtasks are completed,
// cancels the group.
func (g *Group) Go(f func() error) {
g.wg.Add(1)
if g.semaphore != nil {
Expand All @@ -58,7 +62,9 @@ func (g *Group) Go(f func() error) {
g.errHandler(err)
}
} else {
g.cancel()
if atomic.AddInt64(&g.completed, 1) >= g.desired {
g.cancel()
}
}
}()
}

0 comments on commit 4cab9f6

Please sign in to comment.