-
Notifications
You must be signed in to change notification settings - Fork 58
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add internal/errgroup package to support cancellable error groups. Fixes #147. Authored-by: Sergey Egorov <[email protected]>
- Loading branch information
Showing
11 changed files
with
306 additions
and
16 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
// Copyright 2023 The go-zeromq Authors. All rights reserved. | ||
// Use of this source code is governed by a BSD-style | ||
// license that can be found in the LICENSE file. | ||
|
||
// Package errgroup is bit more advanced than golang.org/x/sync/errgroup. | ||
// Major difference is that when error group is created with WithContext | ||
// the parent context would implicitly cancel all functions called by Go method. | ||
package errgroup | ||
|
||
import ( | ||
"context" | ||
|
||
"golang.org/x/sync/errgroup" | ||
) | ||
|
||
// The Group is superior errgroup.Group which aborts whole group | ||
// execution when parent context is cancelled | ||
type Group struct { | ||
grp *errgroup.Group | ||
ctx context.Context | ||
} | ||
|
||
// WithContext creates Group and store inside parent context | ||
// so the Go method would respect parent context cancellation | ||
func WithContext(ctx context.Context) (*Group, context.Context) { | ||
grp, child_ctx := errgroup.WithContext(ctx) | ||
return &Group{grp: grp, ctx: ctx}, child_ctx | ||
} | ||
|
||
// Go runs the provided f function in a dedicated goroutine and waits for its | ||
// completion or for the parent context cancellation. | ||
func (g *Group) Go(f func() error) { | ||
g.getErrGroup().Go(g.wrap(f)) | ||
} | ||
|
||
// Wait blocks until all function calls from the Go method have returned, then | ||
// returns the first non-nil error (if any) from them. | ||
// If the error group was created via WithContext then the Wait returns error | ||
// of cancelled parent context prior any functions calls complete. | ||
func (g *Group) Wait() error { | ||
return g.getErrGroup().Wait() | ||
} | ||
|
||
// SetLimit limits the number of active goroutines in this group to at most n. | ||
// A negative value indicates no limit. | ||
// | ||
// Any subsequent call to the Go method will block until it can add an active | ||
// goroutine without exceeding the configured limit. | ||
// | ||
// The limit must not be modified while any goroutines in the group are active. | ||
func (g *Group) SetLimit(n int) { | ||
g.getErrGroup().SetLimit(n) | ||
} | ||
|
||
// TryGo calls the given function in a new goroutine only if the number of | ||
// active goroutines in the group is currently below the configured limit. | ||
// | ||
// The return value reports whether the goroutine was started. | ||
func (g *Group) TryGo(f func() error) bool { | ||
return g.getErrGroup().TryGo(g.wrap(f)) | ||
} | ||
|
||
func (g *Group) wrap(f func() error) func() error { | ||
if g.ctx == nil { | ||
return f | ||
} | ||
|
||
return func() error { | ||
// If parent context is canceled, | ||
// just return its error and do not call func f | ||
select { | ||
case <-g.ctx.Done(): | ||
return g.ctx.Err() | ||
default: | ||
} | ||
|
||
// Create return channel and call func f | ||
// Buffered channel is used as the following select | ||
// may be exiting by context cancellation | ||
// and in such case the write to channel can be block | ||
// and cause the go routine leak | ||
ch := make(chan error, 1) | ||
go func() { | ||
ch <- f() | ||
}() | ||
|
||
// Wait func f complete or | ||
// parent context to be cancelled, | ||
select { | ||
case err := <-ch: | ||
return err | ||
case <-g.ctx.Done(): | ||
return g.ctx.Err() | ||
} | ||
} | ||
} | ||
|
||
// The getErrGroup returns actual x/sync/errgroup.Group. | ||
// If the group is not allocated it would implicitly allocate it. | ||
// Thats allows the internal/errgroup.Group be fully | ||
// compatible to x/sync/errgroup.Group | ||
func (g *Group) getErrGroup() *errgroup.Group { | ||
if g.grp == nil { | ||
g.grp = &errgroup.Group{} | ||
} | ||
return g.grp | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
// Copyright 2023 The go-zeromq Authors. All rights reserved. | ||
// Use of this source code is governed by a BSD-style | ||
// license that can be found in the LICENSE file. | ||
|
||
package errgroup | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"testing" | ||
|
||
"golang.org/x/sync/errgroup" | ||
) | ||
|
||
// TestRegularErrGroupDoesNotRespectParentContext checks regular errgroup behavior | ||
// where errgroup.WithContext does not respect the parent context | ||
func TestRegularErrGroupDoesNotRespectParentContext(t *testing.T) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
eg, _ := errgroup.WithContext(ctx) | ||
|
||
what := fmt.Errorf("func generated error") | ||
ch := make(chan error) | ||
eg.Go(func() error { return <-ch }) | ||
|
||
cancel() // abort parent context | ||
ch <- what // signal the func in regular errgroup to fail | ||
err := eg.Wait() | ||
|
||
// The error shall be one returned by the function | ||
// as regular errgroup.WithContext does not respect parent context | ||
if err != what { | ||
t.Errorf("invalid error. got=%+v, want=%+v", err, what) | ||
} | ||
} | ||
|
||
// TestErrGroupWithContextCanCallFunctions checks the errgroup operations | ||
// are fine working and errgroup called function can return error | ||
func TestErrGroupWithContextCanCallFunctions(t *testing.T) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
eg, _ := WithContext(ctx) | ||
|
||
what := fmt.Errorf("func generated error") | ||
ch := make(chan error) | ||
eg.Go(func() error { return <-ch }) | ||
|
||
ch <- what // signal the func in errgroup to fail | ||
err := eg.Wait() // wait errgroup complete and read error | ||
|
||
// The error shall be one returned by the function | ||
if err != what { | ||
t.Errorf("invalid error. got=%+v, want=%+v", err, what) | ||
} | ||
} | ||
|
||
// TestErrGroupWithContextDoesRespectParentContext checks the errgroup operations | ||
// are cancellable by parent context | ||
func TestErrGroupWithContextDoesRespectParentContext(t *testing.T) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
eg, _ := WithContext(ctx) | ||
|
||
s1 := make(chan struct{}) | ||
s2 := make(chan struct{}) | ||
eg.Go(func() error { | ||
s1 <- struct{}{} | ||
<-s2 | ||
return fmt.Errorf("func generated error") | ||
}) | ||
|
||
// We have no set limit to errgroup so | ||
// shall be able to start function via TryGo | ||
if ok := eg.TryGo(func() error { return nil }); !ok { | ||
t.Errorf("Expected TryGo to be able start function!!!") | ||
} | ||
|
||
<-s1 // wait for function to start | ||
cancel() // abort parent context | ||
|
||
eg.Go(func() error { | ||
t.Errorf("The parent context was already cancelled and this function shall not be called!!!") | ||
return nil | ||
}) | ||
|
||
s2 <- struct{}{} // signal the func in regular errgroup to fail | ||
err := eg.Wait() // wait errgroup complete and read error | ||
|
||
// The error shall be one returned by the function | ||
// as regular errgroup.WithContext does not respect parent context | ||
if err != context.Canceled { | ||
t.Errorf("expected a context.Canceled error, got=%+v", err) | ||
} | ||
} | ||
|
||
// TestErrGroupFallback tests fallback logic to be compatible with x/sync/errgroup | ||
func TestErrGroupFallback(t *testing.T) { | ||
eg := Group{} | ||
eg.SetLimit(2) | ||
|
||
ch1 := make(chan error) | ||
eg.Go(func() error { return <-ch1 }) | ||
|
||
ch2 := make(chan error) | ||
ok := eg.TryGo(func() error { return <-ch2 }) | ||
if !ok { | ||
t.Errorf("Expected errgroup.TryGo to success!!!") | ||
} | ||
|
||
// The limit set to 2, so 3rd function shall not be possible to call | ||
ok = eg.TryGo(func() error { | ||
t.Errorf("This function is unexpected to be called!!!") | ||
return nil | ||
}) | ||
if ok { | ||
t.Errorf("Expected errgroup.TryGo to fail!!!") | ||
} | ||
|
||
ch1 <- nil | ||
ch2 <- nil | ||
err := eg.Wait() | ||
|
||
if err != nil { | ||
t.Errorf("expected a nil error, got=%+v", err) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.