diff --git a/cmd/provider.go b/cmd/provider.go index 00051c84..7f408b57 100644 --- a/cmd/provider.go +++ b/cmd/provider.go @@ -98,7 +98,7 @@ func makeProviderCmd() *cobra.Command { DeployHandler: handlers.MakeDeployHandler(client, cni, baseUserSecretsPath, alwaysPull), FunctionReader: handlers.MakeReadHandler(client), ReplicaReader: handlers.MakeReplicaReaderHandler(client), - ReplicaUpdater: handlers.MakeReplicaUpdateHandler(client, cni), + ReplicaUpdater: handlers.MakeReplicaUpdateHandler(client, cni, invokeResolver), UpdateHandler: handlers.MakeUpdateHandler(client, cni, baseUserSecretsPath, alwaysPull), HealthHandler: func(w http.ResponseWriter, r *http.Request) {}, InfoHandler: handlers.MakeInfoHandler(Version, GitCommit), diff --git a/pkg/provider/handlers/scale.go b/pkg/provider/handlers/scale.go index f705dc2f..c2d39193 100644 --- a/pkg/provider/handlers/scale.go +++ b/pkg/provider/handlers/scale.go @@ -6,16 +6,20 @@ import ( "fmt" "io/ioutil" "log" + "net" "net/http" + "net/url" + "time" "github.com/containerd/containerd" "github.com/containerd/containerd/namespaces" gocni "github.com/containerd/go-cni" + "github.com/openfaas/faas-provider/proxy" "github.com/openfaas/faas-provider/types" ) -func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w http.ResponseWriter, r *http.Request) { +func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI, resolver proxy.BaseURLResolver) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { @@ -30,12 +34,9 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w h log.Printf("[Scale] request: %s\n", string(body)) req := types.ScaleServiceRequest{} - err := json.Unmarshal(body, &req) - - if err != nil { + if err := json.Unmarshal(body, &req); err != nil { log.Printf("[Scale] error parsing input: %s\n", err) http.Error(w, err.Error(), http.StatusBadRequest) - return } @@ -55,18 +56,23 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w h name := req.ServiceName - if _, err := GetFunction(client, name, namespace); err != nil { + fn, err := GetFunction(client, name, namespace) + if err != nil { msg := fmt.Sprintf("service %s not found", name) log.Printf("[Scale] %s\n", msg) http.Error(w, msg, http.StatusNotFound) return } - ctx := namespaces.WithNamespace(context.Background(), namespace) + healthPath := "/_/healthz" + if v := fn.annotations["com.openfaas.health.http.path"]; len(v) > 0 { + healthPath = v + } - ctr, ctrErr := client.LoadContainer(ctx, name) - if ctrErr != nil { - msg := fmt.Sprintf("cannot load service %s, error: %s", name, ctrErr) + ctx := namespaces.WithNamespace(context.Background(), namespace) + ctr, err := client.LoadContainer(ctx, name) + if err != nil { + msg := fmt.Sprintf("cannot load service %s, error: %s", name, err) log.Printf("[Scale] %s\n", msg) http.Error(w, msg, http.StatusNotFound) return @@ -75,16 +81,16 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w h var taskExists bool var taskStatus *containerd.Status - task, taskErr := ctr.Task(ctx, nil) - if taskErr != nil { - msg := fmt.Sprintf("cannot load task for service %s, error: %s", name, taskErr) + task, err := ctr.Task(ctx, nil) + if err != nil { + msg := fmt.Sprintf("cannot load task for service %s, error: %s", name, err) log.Printf("[Scale] %s\n", msg) taskExists = false } else { taskExists = true - status, statusErr := task.Status(ctx) - if statusErr != nil { - msg := fmt.Sprintf("cannot load task status for %s, error: %s", name, statusErr) + status, err := task.Status(ctx) + if err != nil { + msg := fmt.Sprintf("cannot load task status for %s, error: %s", name, err) log.Printf("[Scale] %s\n", msg) http.Error(w, msg, http.StatusInternalServerError) return @@ -99,28 +105,31 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w h if req.Replicas == 0 { // If a task is running, pause it if taskExists && taskStatus.Status == containerd.Running { - if pauseErr := task.Pause(ctx); pauseErr != nil { - wrappedPauseErr := fmt.Errorf("error pausing task %s, error: %s", name, pauseErr) - log.Printf("[Scale] %s\n", wrappedPauseErr.Error()) - http.Error(w, wrappedPauseErr.Error(), http.StatusNotFound) + if err := task.Pause(ctx); err != nil { + werr := fmt.Errorf("error pausing task %s, error: %s", name, err) + log.Printf("[Scale] %s\n", werr.Error()) + http.Error(w, werr.Error(), http.StatusNotFound) return } } + + // Otherwise, no action is required + return } if taskExists { if taskStatus != nil { if taskStatus.Status == containerd.Paused { - if resumeErr := task.Resume(ctx); resumeErr != nil { - log.Printf("[Scale] error resuming task %s, error: %s\n", name, resumeErr) - http.Error(w, resumeErr.Error(), http.StatusBadRequest) + if err := task.Resume(ctx); err != nil { + log.Printf("[Scale] error resuming task %s, error: %s\n", name, err) + http.Error(w, err.Error(), http.StatusBadRequest) return } } else if taskStatus.Status == containerd.Stopped { // Stopped tasks cannot be restarted, must be removed, and created again - if _, delErr := task.Delete(ctx); delErr != nil { - log.Printf("[Scale] error deleting stopped task %s, error: %s\n", name, delErr) - http.Error(w, delErr.Error(), http.StatusBadRequest) + if _, err := task.Delete(ctx); err != nil { + log.Printf("[Scale] error deleting stopped task %s, error: %s\n", name, err) + http.Error(w, err.Error(), http.StatusBadRequest) return } createNewTask = true @@ -131,12 +140,70 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w h } if createNewTask { - deployErr := createTask(ctx, client, ctr, cni) - if deployErr != nil { - log.Printf("[Scale] error deploying %s, error: %s\n", name, deployErr) - http.Error(w, deployErr.Error(), http.StatusBadRequest) + err := createTask(ctx, client, ctr, cni) + if err != nil { + log.Printf("[Scale] error deploying %s, error: %s\n", name, err) + http.Error(w, err.Error(), http.StatusBadRequest) return } } + + if err := waitUntilHealthy(name, resolver, healthPath); err != nil { + log.Printf("[Scale] error waiting for function %s to become ready, error: %s\n", name, err) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + } +} + +// waitUntilHealthy blocks until the healthPath returns a HTTP 200 for the +// IP address resolved for the given function. +// Maximum retries: 100 +// Delay between each attempt: 20ms +// A custom path can be set via an annotation in the function's spec: +// com.openfaas.health.http.path: /handlers/ready +// +func waitUntilHealthy(name string, resolver proxy.BaseURLResolver, healthPath string) error { + endpoint, err := resolver.Resolve(name) + if err != nil { + return err } + + host, port, _ := net.SplitHostPort(endpoint.Host) + u, err := url.Parse(fmt.Sprintf("http://%s:%s%s", host, port, healthPath)) + if err != nil { + return err + } + + // Try to hit the health endpoint and block until + // ready. + attempts := 100 + pause := time.Millisecond * 20 + for i := 0; i < attempts; i++ { + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + if err != nil { + return err + } + + res, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + + if res.Body != nil { + res.Body.Close() + } + + if res.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected health status: %d", res.StatusCode) + } + + if err == nil { + break + } + + time.Sleep(pause) + } + + return nil }