diff --git a/pkg/commands/apply.go b/pkg/commands/apply.go index 8ec1276c49..204ef553b5 100644 --- a/pkg/commands/apply.go +++ b/pkg/commands/apply.go @@ -15,6 +15,7 @@ package commands import ( + "fmt" "log" "os" "os/exec" @@ -22,6 +23,7 @@ import ( "github.com/google/ko/pkg/commands/options" "github.com/spf13/cobra" "github.com/spf13/pflag" + "golang.org/x/sync/errgroup" "k8s.io/cli-runtime/pkg/genericclioptions" ) @@ -110,7 +112,12 @@ func addApply(topLevel *cobra.Command) { log.Fatalf("error piping to 'kubectl apply': %v", err) } - go func() { + // Cancel on signals. + ctx := createCancellableContext() + + // Make sure builds are cancelled if kubectl apply fails. + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { // kubectl buffers data before starting to apply it, which // can lead to resources being created more slowly than desired. // In the case of --watch, it can lead to resources not being @@ -122,13 +129,19 @@ func addApply(topLevel *cobra.Command) { stdin.Write([]byte("---\n")) } // Once primed kick things off. - ctx := createCancellableContext() - resolveFilesToWriter(ctx, builder, publisher, fo, so, sto, stdin) - }() + return resolveFilesToWriter(ctx, builder, publisher, fo, so, sto, stdin) + }) + + g.Go(func() error { + // Run it. + if err := kubectlCmd.Run(); err != nil { + return fmt.Errorf("error executing 'kubectl apply': %v", err) + } + return nil + }) - // Run it. - if err := kubectlCmd.Run(); err != nil { - log.Fatalf("error executing 'kubectl apply': %v", err) + if err := g.Wait(); err != nil { + log.Fatal(err) } }, } diff --git a/pkg/commands/create.go b/pkg/commands/create.go index b5347ab8d2..e46bf1bc99 100644 --- a/pkg/commands/create.go +++ b/pkg/commands/create.go @@ -15,6 +15,7 @@ package commands import ( + "fmt" "log" "os" "os/exec" @@ -22,6 +23,7 @@ import ( "github.com/google/ko/pkg/commands/options" "github.com/spf13/cobra" "github.com/spf13/pflag" + "golang.org/x/sync/errgroup" "k8s.io/cli-runtime/pkg/genericclioptions" ) @@ -110,7 +112,12 @@ func addCreate(topLevel *cobra.Command) { log.Fatalf("error piping to 'kubectl create': %v", err) } - go func() { + // Cancel on signals. + ctx := createCancellableContext() + + // Make sure builds are cancelled if kubectl create fails. + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { // kubectl buffers data before starting to create it, which // can lead to resources being created more slowly than desired. // In the case of --watch, it can lead to resources not being @@ -122,13 +129,19 @@ func addCreate(topLevel *cobra.Command) { stdin.Write([]byte("---\n")) } // Once primed kick things off. - ctx := createCancellableContext() - resolveFilesToWriter(ctx, builder, publisher, fo, so, sto, stdin) - }() + return resolveFilesToWriter(ctx, builder, publisher, fo, so, sto, stdin) + }) + + g.Go(func() error { + // Run it. + if err := kubectlCmd.Run(); err != nil { + return fmt.Errorf("error executing 'kubectl create': %v", err) + } + return nil + }) - // Run it. - if err := kubectlCmd.Run(); err != nil { - log.Fatalf("error executing 'kubectl create': %v", err) + if err := g.Wait(); err != nil { + log.Fatal(err) } }, } diff --git a/pkg/commands/resolve.go b/pkg/commands/resolve.go index 706120dc4f..71ac1f2e5d 100644 --- a/pkg/commands/resolve.go +++ b/pkg/commands/resolve.go @@ -67,7 +67,9 @@ func addResolve(topLevel *cobra.Command) { log.Fatalf("error creating publisher: %v", err) } ctx := createCancellableContext() - resolveFilesToWriter(ctx, builder, publisher, fo, so, sto, os.Stdout) + if err := resolveFilesToWriter(ctx, builder, publisher, fo, so, sto, os.Stdout); err != nil { + log.Fatal(err) + } }, } options.AddLocalArg(resolve, lo) diff --git a/pkg/commands/resolver.go b/pkg/commands/resolver.go index 773a6d3b86..342961e7fc 100644 --- a/pkg/commands/resolver.go +++ b/pkg/commands/resolver.go @@ -33,6 +33,7 @@ import ( "github.com/google/ko/pkg/publish" "github.com/google/ko/pkg/resolve" "github.com/mattmoor/dep-notify/pkg/graph" + "golang.org/x/sync/errgroup" "gopkg.in/yaml.v3" "k8s.io/apimachinery/pkg/labels" ) @@ -82,7 +83,7 @@ func gobuildOptions(bo *options.BuildOptions) ([]build.Option, error) { func makeBuilder(bo *options.BuildOptions) (*build.Caching, error) { opt, err := gobuildOptions(bo) if err != nil { - log.Fatalf("error setting up builder options: %v", err) + return nil, fmt.Errorf("error setting up builder options: %v", err) } innerBuilder, err := build.NewGo(opt...) if err != nil { @@ -154,7 +155,7 @@ func resolveFilesToWriter( fo *options.FilenameOptions, so *options.SelectorOptions, sto *options.StrictOptions, - out io.WriteCloser) { + out io.WriteCloser) error { defer out.Close() // By having this as a channel, we can hook this up to a filesystem @@ -189,12 +190,16 @@ func resolveFilesToWriter( }) }) if err != nil { - log.Fatalf("Error creating dep-notify graph: %v", err) + return fmt.Errorf("creating dep-notify graph: %v", err) } // Cleanup the fsnotify hooks when we're done. defer g.Shutdown() } + // This tracks resolution errors and ensures we cancel other builds if an + // individual build fails. + errs, ctx := errgroup.WithContext(ctx) + var futures []resolvedFuture for { // Each iteration, if there is anything in the list of futures, @@ -212,7 +217,7 @@ func resolveFilesToWriter( } select { - case f, ok := <-fs: + case file, ok := <-fs: if !ok { // a nil channel is never available to receive on. // This allows us to drain the list of in-process @@ -229,7 +234,8 @@ func resolveFilesToWriter( // Kick off the resolution that will respond with its bytes on // the future. - go func(f string) { + f := file // defensive copy + errs.Go(func() error { defer close(ch) // Record the builds we do via this builder. recordingBuilder := &build.Recorder{ @@ -237,13 +243,14 @@ func resolveFilesToWriter( } b, err := resolveFile(ctx, f, recordingBuilder, publisher, so, sto) if err != nil { - // Don't let build errors disrupt the watch. - lg := log.Fatalf + // This error is sometimes expected during watch mode, so this + // isn't fatal. Just print it and keep the watch open. + err := fmt.Errorf("error processing import paths in %q: %v", f, err) if fo.Watch { - lg = log.Printf + log.Print(err) + return nil } - lg("error processing import paths in %q: %v", f, err) - return + return err } // Associate with this file the collection of binary import paths. sm.Store(f, recordingBuilder.ImportPaths) @@ -255,11 +262,15 @@ func resolveFilesToWriter( // notifications that they change will result in no affected // yamls, and no new builds or deploys. if err := g.Add(ip); err != nil { - log.Fatalf("Error adding importpath to dep graph: %v", err) + // If we're in watch mode, just fail. + err := fmt.Errorf("adding importpath to dep graph: %v", err) + errCh <- err + return err } } } - }(f) + return nil + }) case b, ok := <-bf: // Once the head channel returns something, dequeue it. @@ -275,9 +286,13 @@ func resolveFilesToWriter( } case err := <-errCh: - log.Fatalf("Error watching dependencies: %v", err) + return fmt.Errorf("watching dependencies: %v", err) } } + + // Make sure we exit with an error. + // See https://github.com/google/ko/issues/84 + return errs.Wait() } func resolveFile( @@ -351,5 +366,4 @@ func resolveFile( e.Close() return buf.Bytes(), nil - }