Skip to content

Commit

Permalink
Deterministically fail ko {apply, create} (#133)
Browse files Browse the repository at this point in the history
When resolving files, we would just log.Fatal if we encountered an
error. This seems to be racy and causes ko to exit with a 0 error code
when it shouldn't. To fix this, we synchronize the builder goroutines
with the kubectl go routine and exit with an error if either of them
failed.

This fix also happened to fix a goroutine leak. If the kubectl goroutine
failed, we never properly cancelled the builds, which would happily
conitnue compiling packages and consuming resources.
  • Loading branch information
jonjohnsonjr authored Feb 11, 2020
1 parent c3a657a commit cfd680d
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 29 deletions.
27 changes: 20 additions & 7 deletions pkg/commands/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
package commands

import (
"fmt"
"log"
"os"
"os/exec"

"github.com/google/ko/pkg/commands/options"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"golang.org/x/sync/errgroup"
"k8s.io/cli-runtime/pkg/genericclioptions"
)

Expand Down Expand Up @@ -110,7 +112,12 @@ func addApply(topLevel *cobra.Command) {
log.Fatalf("error piping to 'kubectl apply': %v", err)
}

go func() {
// Cancel on signals.
ctx := createCancellableContext()

// Make sure builds are cancelled if kubectl apply fails.
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
// kubectl buffers data before starting to apply it, which
// can lead to resources being created more slowly than desired.
// In the case of --watch, it can lead to resources not being
Expand All @@ -122,13 +129,19 @@ func addApply(topLevel *cobra.Command) {
stdin.Write([]byte("---\n"))
}
// Once primed kick things off.
ctx := createCancellableContext()
resolveFilesToWriter(ctx, builder, publisher, fo, so, sto, stdin)
}()
return resolveFilesToWriter(ctx, builder, publisher, fo, so, sto, stdin)
})

g.Go(func() error {
// Run it.
if err := kubectlCmd.Run(); err != nil {
return fmt.Errorf("error executing 'kubectl apply': %v", err)
}
return nil
})

// Run it.
if err := kubectlCmd.Run(); err != nil {
log.Fatalf("error executing 'kubectl apply': %v", err)
if err := g.Wait(); err != nil {
log.Fatal(err)
}
},
}
Expand Down
27 changes: 20 additions & 7 deletions pkg/commands/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
package commands

import (
"fmt"
"log"
"os"
"os/exec"

"github.com/google/ko/pkg/commands/options"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"golang.org/x/sync/errgroup"
"k8s.io/cli-runtime/pkg/genericclioptions"
)

Expand Down Expand Up @@ -110,7 +112,12 @@ func addCreate(topLevel *cobra.Command) {
log.Fatalf("error piping to 'kubectl create': %v", err)
}

go func() {
// Cancel on signals.
ctx := createCancellableContext()

// Make sure builds are cancelled if kubectl create fails.
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
// kubectl buffers data before starting to create it, which
// can lead to resources being created more slowly than desired.
// In the case of --watch, it can lead to resources not being
Expand All @@ -122,13 +129,19 @@ func addCreate(topLevel *cobra.Command) {
stdin.Write([]byte("---\n"))
}
// Once primed kick things off.
ctx := createCancellableContext()
resolveFilesToWriter(ctx, builder, publisher, fo, so, sto, stdin)
}()
return resolveFilesToWriter(ctx, builder, publisher, fo, so, sto, stdin)
})

g.Go(func() error {
// Run it.
if err := kubectlCmd.Run(); err != nil {
return fmt.Errorf("error executing 'kubectl create': %v", err)
}
return nil
})

// Run it.
if err := kubectlCmd.Run(); err != nil {
log.Fatalf("error executing 'kubectl create': %v", err)
if err := g.Wait(); err != nil {
log.Fatal(err)
}
},
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/commands/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ func addResolve(topLevel *cobra.Command) {
log.Fatalf("error creating publisher: %v", err)
}
ctx := createCancellableContext()
resolveFilesToWriter(ctx, builder, publisher, fo, so, sto, os.Stdout)
if err := resolveFilesToWriter(ctx, builder, publisher, fo, so, sto, os.Stdout); err != nil {
log.Fatal(err)
}
},
}
options.AddLocalArg(resolve, lo)
Expand Down
42 changes: 28 additions & 14 deletions pkg/commands/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/google/ko/pkg/publish"
"github.com/google/ko/pkg/resolve"
"github.com/mattmoor/dep-notify/pkg/graph"
"golang.org/x/sync/errgroup"
"gopkg.in/yaml.v3"
"k8s.io/apimachinery/pkg/labels"
)
Expand Down Expand Up @@ -82,7 +83,7 @@ func gobuildOptions(bo *options.BuildOptions) ([]build.Option, error) {
func makeBuilder(bo *options.BuildOptions) (*build.Caching, error) {
opt, err := gobuildOptions(bo)
if err != nil {
log.Fatalf("error setting up builder options: %v", err)
return nil, fmt.Errorf("error setting up builder options: %v", err)
}
innerBuilder, err := build.NewGo(opt...)
if err != nil {
Expand Down Expand Up @@ -154,7 +155,7 @@ func resolveFilesToWriter(
fo *options.FilenameOptions,
so *options.SelectorOptions,
sto *options.StrictOptions,
out io.WriteCloser) {
out io.WriteCloser) error {
defer out.Close()

// By having this as a channel, we can hook this up to a filesystem
Expand Down Expand Up @@ -189,12 +190,16 @@ func resolveFilesToWriter(
})
})
if err != nil {
log.Fatalf("Error creating dep-notify graph: %v", err)
return fmt.Errorf("creating dep-notify graph: %v", err)
}
// Cleanup the fsnotify hooks when we're done.
defer g.Shutdown()
}

// This tracks resolution errors and ensures we cancel other builds if an
// individual build fails.
errs, ctx := errgroup.WithContext(ctx)

var futures []resolvedFuture
for {
// Each iteration, if there is anything in the list of futures,
Expand All @@ -212,7 +217,7 @@ func resolveFilesToWriter(
}

select {
case f, ok := <-fs:
case file, ok := <-fs:
if !ok {
// a nil channel is never available to receive on.
// This allows us to drain the list of in-process
Expand All @@ -229,21 +234,23 @@ func resolveFilesToWriter(

// Kick off the resolution that will respond with its bytes on
// the future.
go func(f string) {
f := file // defensive copy
errs.Go(func() error {
defer close(ch)
// Record the builds we do via this builder.
recordingBuilder := &build.Recorder{
Builder: builder,
}
b, err := resolveFile(ctx, f, recordingBuilder, publisher, so, sto)
if err != nil {
// Don't let build errors disrupt the watch.
lg := log.Fatalf
// This error is sometimes expected during watch mode, so this
// isn't fatal. Just print it and keep the watch open.
err := fmt.Errorf("error processing import paths in %q: %v", f, err)
if fo.Watch {
lg = log.Printf
log.Print(err)
return nil
}
lg("error processing import paths in %q: %v", f, err)
return
return err
}
// Associate with this file the collection of binary import paths.
sm.Store(f, recordingBuilder.ImportPaths)
Expand All @@ -255,11 +262,15 @@ func resolveFilesToWriter(
// notifications that they change will result in no affected
// yamls, and no new builds or deploys.
if err := g.Add(ip); err != nil {
log.Fatalf("Error adding importpath to dep graph: %v", err)
// If we're in watch mode, just fail.
err := fmt.Errorf("adding importpath to dep graph: %v", err)
errCh <- err
return err
}
}
}
}(f)
return nil
})

case b, ok := <-bf:
// Once the head channel returns something, dequeue it.
Expand All @@ -275,9 +286,13 @@ func resolveFilesToWriter(
}

case err := <-errCh:
log.Fatalf("Error watching dependencies: %v", err)
return fmt.Errorf("watching dependencies: %v", err)
}
}

// Make sure we exit with an error.
// See https://github.com/google/ko/issues/84
return errs.Wait()
}

func resolveFile(
Expand Down Expand Up @@ -351,5 +366,4 @@ func resolveFile(
e.Close()

return buf.Bytes(), nil

}

0 comments on commit cfd680d

Please sign in to comment.