diff --git a/cmd/vcluster/cmd/restore.go b/cmd/vcluster/cmd/restore.go index 4dfc09288f..82dfe468f3 100644 --- a/cmd/vcluster/cmd/restore.go +++ b/cmd/vcluster/cmd/restore.go @@ -44,7 +44,7 @@ func NewRestoreCommand() *cobra.Command { Use: "restore", Short: "restore a vCluster", Args: cobra.NoArgs, - RunE: func(cmd *cobra.Command, args []string) error { + RunE: func(cmd *cobra.Command, _ []string) error { return options.Run(cmd.Context()) }, } diff --git a/cmd/vcluster/cmd/snapshot.go b/cmd/vcluster/cmd/snapshot.go index 50f3e9d936..930a5ef0ff 100644 --- a/cmd/vcluster/cmd/snapshot.go +++ b/cmd/vcluster/cmd/snapshot.go @@ -12,6 +12,8 @@ import ( "path/filepath" "time" + "github.com/go-logr/logr" + "github.com/loft-sh/log/logr/zapr" vclusterconfig "github.com/loft-sh/vcluster/config" "github.com/loft-sh/vcluster/pkg/config" "github.com/loft-sh/vcluster/pkg/constants" @@ -24,6 +26,8 @@ import ( "github.com/loft-sh/vcluster/pkg/snapshot/s3" "github.com/loft-sh/vcluster/pkg/util/servicecidr" "github.com/spf13/cobra" + "go.uber.org/zap" + "google.golang.org/grpc/grpclog" "k8s.io/klog/v2" ) @@ -57,7 +61,7 @@ func NewSnapshotCommand() *cobra.Command { Use: "snapshot", Short: "snapshot a vCluster", Args: cobra.NoArgs, - RunE: func(cmd *cobra.Command, args []string) error { + RunE: func(cmd *cobra.Command, _ []string) error { return options.Run(cmd.Context()) }, } @@ -241,7 +245,10 @@ func isEtcdReachable(ctx context.Context, endpoint string, certificates *etcd.Ce ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - etcdClient, err := etcd.GetEtcdClient(ctx, certificates, endpoint) + zapLog := zap.NewNop() + grpclog.SetLoggerV2(grpclog.NewLoggerV2(io.Discard, io.Discard, io.Discard)) + ctx = logr.NewContext(ctx, zapr.NewLoggerWithOptions(zapLog)) + etcdClient, err := etcd.GetEtcdClient(ctx, zapLog, certificates, endpoint) if err == nil { defer func() { _ = etcdClient.Close() diff --git a/cmd/vclusterctl/cmd/restore.go b/cmd/vclusterctl/cmd/restore.go index 0721abb111..202659d494 100644 --- a/cmd/vclusterctl/cmd/restore.go +++ b/cmd/vclusterctl/cmd/restore.go @@ -3,11 +3,7 @@ package cmd import ( "context" "fmt" - "io" - "os" - "os/signal" "strings" - "syscall" "github.com/blang/semver/v4" "github.com/loft-sh/log" @@ -19,23 +15,21 @@ import ( "github.com/loft-sh/vcluster/pkg/lifecycle" "github.com/loft-sh/vcluster/pkg/snapshot" "github.com/loft-sh/vcluster/pkg/snapshot/file" + "github.com/loft-sh/vcluster/pkg/snapshot/pod" "github.com/loft-sh/vcluster/pkg/snapshot/s3" "github.com/spf13/cobra" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - "k8s.io/klog/v2" - "k8s.io/utils/ptr" ) type RestoreCmd struct { *flags.GlobalFlags - S3 s3.Options - File file.Options - Storage string + Snapshot snapshot.Options + Pod pod.Options + Log log.Logger } @@ -68,8 +62,9 @@ vcluster restore test --namespace test cobraCmd.Flags().StringVar(&cmd.Storage, "storage", "s3", "The storage to restore from. Can be either s3 or file") // add storage flags - file.AddFileFlags(cobraCmd.Flags(), &cmd.File) - s3.AddS3Flags(cobraCmd.Flags(), &cmd.S3) + file.AddFileFlags(cobraCmd.Flags(), &cmd.Snapshot.File) + s3.AddS3Flags(cobraCmd.Flags(), &cmd.Snapshot.S3) + pod.AddPodFlags(cobraCmd.Flags(), &cmd.Pod) return cobraCmd } @@ -91,13 +86,9 @@ func (cmd *RestoreCmd) Run(ctx context.Context, args []string) error { } // check if snapshot is supported - if vCluster.Version != "dev-next" { - version, err := semver.Parse(strings.TrimPrefix(vCluster.Version, "v")) - if err != nil { - return fmt.Errorf("parsing vCluster version: %w", err) - } - - // check if version matches + version, err := semver.Parse(strings.TrimPrefix(vCluster.Version, "v")) + if err == nil { + // only check if version matches if vCluster actually has a parsable version if version.LT(semver.MustParse(minSnapshotVersion)) { return fmt.Errorf("vCluster version %s snapshotting is not supported", vCluster.Version) } @@ -119,16 +110,6 @@ func (cmd *RestoreCmd) Run(ctx context.Context, args []string) error { } }() - // now restore vCluster - err = cmd.restoreVCluster(ctx, kubeClient, vCluster) - if err != nil { - return fmt.Errorf("restore vCluster %s: %w", vCluster.Name, err) - } - - return nil -} - -func (cmd *RestoreCmd) restoreVCluster(ctx context.Context, kubeClient *kubernetes.Clientset, vCluster *find.VCluster) error { // get pod spec var podSpec *corev1.PodSpec if vCluster.StatefulSet != nil { @@ -139,166 +120,10 @@ func (cmd *RestoreCmd) restoreVCluster(ctx context.Context, kubeClient *kubernet return fmt.Errorf("vCluster %s has no StatefulSet or Deployment", vCluster.Name) } - // now start the snapshot pod that takes the snapshot - restorePod, err := cmd.startRestorePod(ctx, kubeClient, vCluster.Namespace, vCluster.Name, podSpec) - if err != nil { - return fmt.Errorf("starting snapshot pod: %w", err) - } - - // create interrupt channel - sigint := make(chan os.Signal, 1) - defer func() { - // make sure we won't interfere with interrupts anymore - signal.Stop(sigint) - - // delete the restore pod when we are done - _ = kubeClient.CoreV1().Pods(restorePod.Namespace).Delete(ctx, restorePod.Name, metav1.DeleteOptions{}) - }() - - // also delete on interrupt - go func() { - // interrupt signal sent from terminal - signal.Notify(sigint, os.Interrupt) - // sigterm signal sent from kubernetes - signal.Notify(sigint, syscall.SIGTERM) - - // wait until we get killed - <-sigint - - // cleanup virtual cluster - err = kubeClient.CoreV1().Pods(restorePod.Namespace).Delete(ctx, restorePod.Name, metav1.DeleteOptions{ - GracePeriodSeconds: ptr.To(int64(1)), - }) - if err != nil { - klog.Warningf("Error deleting snapshot pod: %v", err) - } - os.Exit(1) - }() - - // wait for pod to become ready - err = waitForReadyPod(ctx, kubeClient, restorePod.Namespace, restorePod.Name, "restore", cmd.Log) - if err != nil { - return fmt.Errorf("waiting for restore pod to become ready: %w", err) - } - - // now log the snapshot pod - reader, err := kubeClient.CoreV1().Pods(restorePod.Namespace).GetLogs(restorePod.Name, &corev1.PodLogOptions{ - Follow: true, - }).Stream(ctx) - if err != nil { - return fmt.Errorf("stream restore pod logs: %w", err) - } - defer reader.Close() - - // stream into stdout - cmd.Log.Infof("Printing logs of restore pod...") - _, err = io.Copy(os.Stdout, reader) - if err != nil { - return fmt.Errorf("write restore pod logs: %w", err) - } - - // check restore pod for exit code - exitCode, err := waitForCompletedPod(ctx, kubeClient, restorePod.Namespace, restorePod.Name, "restore", cmd.Log) - if err != nil { - return err - } - - // check exit code of restore pod - if exitCode != 0 { - return fmt.Errorf("restore pod failed: exit code %d", exitCode) - } - - return nil -} - -func (cmd *RestoreCmd) startRestorePod(ctx context.Context, kubeClient *kubernetes.Clientset, namespace, vCluster string, podSpec *corev1.PodSpec) (*corev1.Pod, error) { - var syncerContainer *corev1.Container - for _, container := range podSpec.Containers { - if container.Name == "syncer" { - syncerContainer = &container - break - } - } - if syncerContainer == nil { - return nil, fmt.Errorf("couldn't find syncer container") - } - - // build args - env := syncerContainer.Env - options, err := toOptionsString(&snapshot.Options{ - S3: cmd.S3, - File: cmd.File, - }) - if err != nil { - return nil, err - } - env = append(env, corev1.EnvVar{ - Name: "VCLUSTER_STORAGE_OPTIONS", - Value: options, - }) - - // build the pod spec - newPod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - GenerateName: "vcluster-restore-", - Namespace: namespace, - Labels: map[string]string{ - "app": "vcluster-restore", - }, - }, - Spec: corev1.PodSpec{ - RestartPolicy: corev1.RestartPolicyNever, - ServiceAccountName: podSpec.ServiceAccountName, - TerminationGracePeriodSeconds: ptr.To(int64(1)), - NodeSelector: podSpec.NodeSelector, - Affinity: podSpec.Affinity, - Tolerations: podSpec.Tolerations, - SecurityContext: podSpec.SecurityContext, - Volumes: podSpec.Volumes, - Containers: []corev1.Container{ - { - Name: "restore", - Image: syncerContainer.Image, - Command: []string{"/vcluster", "restore", "--storage", cmd.Storage}, - SecurityContext: syncerContainer.SecurityContext, - Env: env, - EnvFrom: syncerContainer.EnvFrom, - VolumeMounts: syncerContainer.VolumeMounts, - }, - }, - }, - } - - // add persistent volume claim volume if necessary - for _, volumeMount := range syncerContainer.VolumeMounts { - if volumeMount.Name == "data" { - // check if its part of the pod spec - found := false - for _, volume := range newPod.Spec.Volumes { - if volume.Name == volumeMount.Name { - found = true - break - } - } - if !found { - newPod.Spec.Volumes = append(newPod.Spec.Volumes, corev1.Volume{ - Name: volumeMount.Name, - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: "data-" + vCluster + "-0", - }, - }, - }) - } - } - } - - // create the pod - cmd.Log.Infof("Starting restore pod for vCluster %s/%s...", namespace, vCluster) - newPod, err = kubeClient.CoreV1().Pods(namespace).Create(ctx, newPod, metav1.CreateOptions{}) - if err != nil { - return nil, fmt.Errorf("creating restore pod: %w", err) - } - - return newPod, nil + // set missing pod options and run snapshot restore pod + cmd.Pod.Namespace = vCluster.Namespace + cmd.Pod.VCluster = vCluster.Name + cmd.Pod.PodSpec = podSpec + cmd.Pod.Command = []string{"/vcluster", "restore", "--storage", cmd.Storage} + return pod.RunSnapshotPod(ctx, kubeClient, &cmd.Pod, &cmd.Snapshot, cmd.Log) } diff --git a/cmd/vclusterctl/cmd/snapshot.go b/cmd/vclusterctl/cmd/snapshot.go index b06400f7e1..727f7d5719 100644 --- a/cmd/vclusterctl/cmd/snapshot.go +++ b/cmd/vclusterctl/cmd/snapshot.go @@ -2,15 +2,8 @@ package cmd import ( "context" - "encoding/base64" - "encoding/json" "fmt" - "io" - "os" - "os/signal" "strings" - "syscall" - "time" "github.com/blang/semver/v4" "github.com/loft-sh/log" @@ -20,15 +13,12 @@ import ( "github.com/loft-sh/vcluster/pkg/cli/util" "github.com/loft-sh/vcluster/pkg/snapshot" "github.com/loft-sh/vcluster/pkg/snapshot/file" + "github.com/loft-sh/vcluster/pkg/snapshot/pod" "github.com/loft-sh/vcluster/pkg/snapshot/s3" - "github.com/loft-sh/vcluster/pkg/util/clihelper" "github.com/spf13/cobra" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" - "k8s.io/klog/v2" - "k8s.io/utils/ptr" ) var minSnapshotVersion = "0.23.0-alpha.8" @@ -36,11 +26,11 @@ var minSnapshotVersion = "0.23.0-alpha.8" type SnapshotCmd struct { *flags.GlobalFlags - S3 s3.Options - File file.Options - Storage string + Snapshot snapshot.Options + Pod pod.Options + Log log.Logger } @@ -73,8 +63,9 @@ vcluster snapshot test --namespace test cobraCmd.Flags().StringVar(&cmd.Storage, "storage", "s3", "The storage to snapshot to. Can be either s3 or file") // add storage flags - file.AddFileFlags(cobraCmd.Flags(), &cmd.File) - s3.AddS3Flags(cobraCmd.Flags(), &cmd.S3) + file.AddFileFlags(cobraCmd.Flags(), &cmd.Snapshot.File) + s3.AddS3Flags(cobraCmd.Flags(), &cmd.Snapshot.S3) + pod.AddPodFlags(cobraCmd.Flags(), &cmd.Pod) return cobraCmd } @@ -110,13 +101,9 @@ func (cmd *SnapshotCmd) Run(ctx context.Context, args []string) error { } // check if snapshot is supported - if vCluster.Version != "dev-next" { - version, err := semver.Parse(strings.TrimPrefix(vCluster.Version, "v")) - if err != nil { - return fmt.Errorf("parsing vCluster version %s: %w", vCluster.Version, err) - } - - // check if version matches + version, err := semver.Parse(strings.TrimPrefix(vCluster.Version, "v")) + if err == nil { + // only check if version matches if vCluster actually has a parsable version if version.LT(semver.MustParse(minSnapshotVersion)) { return fmt.Errorf("vCluster version %s snapshotting is not supported", vCluster.Version) } @@ -133,268 +120,9 @@ func (cmd *SnapshotCmd) Run(ctx context.Context, args []string) error { } // now start the snapshot pod that takes the snapshot - snapshotPod, err := cmd.startSnapshotPod(ctx, kubeClient, vClusterPod) - if err != nil { - return fmt.Errorf("starting snapshot pod: %w", err) - } - defer func() { - // delete the snapshot pod when we are done - _ = kubeClient.CoreV1().Pods(snapshotPod.Namespace).Delete(ctx, snapshotPod.Name, metav1.DeleteOptions{}) - }() - - // also delete on interrupt - go func() { - sigint := make(chan os.Signal, 1) - // interrupt signal sent from terminal - signal.Notify(sigint, os.Interrupt) - // sigterm signal sent from kubernetes - signal.Notify(sigint, syscall.SIGTERM) - - // wait until we get killed - <-sigint - - // cleanup virtual cluster - err = kubeClient.CoreV1().Pods(snapshotPod.Namespace).Delete(ctx, snapshotPod.Name, metav1.DeleteOptions{ - GracePeriodSeconds: ptr.To(int64(1)), - }) - if err != nil { - klog.Warningf("Error deleting snapshot pod: %v", err) - } - os.Exit(1) - }() - - // wait for pod to become ready - err = waitForReadyPod(ctx, kubeClient, snapshotPod.Namespace, snapshotPod.Name, "snapshot", cmd.Log) - if err != nil { - return fmt.Errorf("waiting for snapshot pod to become ready: %w", err) - } - - // now log the snapshot pod - reader, err := kubeClient.CoreV1().Pods(snapshotPod.Namespace).GetLogs(snapshotPod.Name, &corev1.PodLogOptions{ - Follow: true, - }).Stream(ctx) - if err != nil { - return fmt.Errorf("stream snapshot pod logs: %w", err) - } - defer reader.Close() - - // stream into stdout - cmd.Log.Infof("Printing logs of snapshot pod...") - _, err = io.Copy(os.Stdout, reader) - if err != nil { - return fmt.Errorf("write snapshot pod logs: %w", err) - } - - // check snapshot pod for exit code - exitCode, err := waitForCompletedPod(ctx, kubeClient, snapshotPod.Namespace, snapshotPod.Name, "snapshot", cmd.Log) - if err != nil { - return err - } - - // check exit code of snapshot container - if exitCode != 0 { - return fmt.Errorf("snapshot pod failed: exit code %d", exitCode) - } - - return nil -} - -func (cmd *SnapshotCmd) startSnapshotPod(ctx context.Context, kubeClient *kubernetes.Clientset, vClusterPod *corev1.Pod) (*corev1.Pod, error) { - var syncerContainer *corev1.Container - for _, container := range vClusterPod.Spec.Containers { - if container.Name == "syncer" { - syncerContainer = &container - break - } - } - if syncerContainer == nil { - return nil, fmt.Errorf("couldn't find syncer container") - } - - // build args - env := syncerContainer.Env - options, err := toOptionsString(&snapshot.Options{ - S3: cmd.S3, - File: cmd.File, - }) - if err != nil { - return nil, err - } - env = append(env, corev1.EnvVar{ - Name: "VCLUSTER_STORAGE_OPTIONS", - Value: options, - }) - - // build the pod spec - newPod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - GenerateName: "vcluster-snapshot-", - Namespace: vClusterPod.Namespace, - Labels: map[string]string{ - "app": "vcluster-snapshot", - }, - }, - Spec: corev1.PodSpec{ - RestartPolicy: corev1.RestartPolicyNever, - ServiceAccountName: vClusterPod.Spec.ServiceAccountName, - AutomountServiceAccountToken: ptr.To(false), - TerminationGracePeriodSeconds: ptr.To(int64(1)), - NodeName: vClusterPod.Spec.NodeName, - Tolerations: vClusterPod.Spec.Tolerations, - SecurityContext: vClusterPod.Spec.SecurityContext, - Volumes: vClusterPod.Spec.Volumes, - Containers: []corev1.Container{ - { - Name: "snapshot", - Image: syncerContainer.Image, - Command: []string{"/vcluster", "snapshot", "--storage", cmd.Storage}, - SecurityContext: syncerContainer.SecurityContext, - Env: env, - EnvFrom: syncerContainer.EnvFrom, - VolumeMounts: syncerContainer.VolumeMounts, - }, - }, - }, - } - - // create the pod - cmd.Log.Infof("Starting snapshot pod %s/%s...", vClusterPod.Namespace, vClusterPod.Name) - newPod, err = kubeClient.CoreV1().Pods(vClusterPod.Namespace).Create(ctx, newPod, metav1.CreateOptions{}) - if err != nil { - return nil, fmt.Errorf("creating snapshot pod: %w", err) - } - - return newPod, nil -} - -func getExitCode(pod *corev1.Pod, container string) int32 { - for _, containerStatus := range pod.Status.ContainerStatuses { - if containerStatus.Name != container { - continue - } - - if containerStatus.State.Terminated != nil { - return containerStatus.State.Terminated.ExitCode - } - - return -1 - } - - return -1 -} - -func waitForCompletedPod(ctx context.Context, kubeClient *kubernetes.Clientset, namespace, name, container string, log log.Logger) (int32, error) { - exitCode := int32(-1) - err := wait.PollUntilContextTimeout(ctx, time.Second*2, time.Minute, true, func(ctx context.Context) (bool, error) { - pod, err := kubeClient.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{}) - if err != nil { - // this is a fatal - return false, fmt.Errorf("error trying to retrieve pod %s/%s: %w", namespace, name, err) - } - - for _, containerStatus := range pod.Status.ContainerStatuses { - if containerStatus.Name != container { - continue - } - - if containerStatus.State.Running != nil { - return false, nil - } else if containerStatus.State.Terminated != nil { - exitCode = containerStatus.State.Terminated.ExitCode - return true, nil - } else if containerStatus.State.Waiting != nil { - if containerStatus.State.Waiting.Message != "" { - return false, fmt.Errorf("error: %s container is waiting: %s (%s)", container, containerStatus.State.Waiting.Message, containerStatus.State.Waiting.Reason) - } else if containerStatus.State.Waiting.Reason != "" { - return false, fmt.Errorf("error: %s container is waiting: %s", container, containerStatus.State.Waiting.Reason) - } - - return false, fmt.Errorf("error: %s container is waiting", container) - } - - return false, nil - } - - return false, nil - }) - if err != nil { - return exitCode, err - } - - return exitCode, nil -} - -func waitForReadyPod(ctx context.Context, kubeClient kubernetes.Interface, namespace, name, container string, log log.Logger) error { - now := time.Now() - err := wait.PollUntilContextTimeout(ctx, time.Second*2, time.Minute, true, func(ctx context.Context) (bool, error) { - pod, err := kubeClient.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{}) - if err != nil { - // this is a fatal - return false, fmt.Errorf("error trying to retrieve pod %s/%s: %w", namespace, name, err) - } - - found := false - for _, containerStatus := range pod.Status.ContainerStatuses { - if containerStatus.State.Running != nil && containerStatus.Ready { - if containerStatus.Name == container { - found = true - } - - continue - } else if containerStatus.State.Terminated != nil || (containerStatus.State.Waiting != nil && clihelper.CriticalStatus[containerStatus.State.Waiting.Reason]) { - // if the container is completed that is fine as well - if containerStatus.State.Terminated != nil && containerStatus.State.Terminated.ExitCode == 0 { - found = true - continue - } - - reason := "" - message := "" - if containerStatus.State.Terminated != nil { - reason = containerStatus.State.Terminated.Reason - message = containerStatus.State.Terminated.Message - } else if containerStatus.State.Waiting != nil { - reason = containerStatus.State.Waiting.Reason - message = containerStatus.State.Waiting.Message - } - - out, err := kubeClient.CoreV1().Pods(namespace).GetLogs(pod.Name, &corev1.PodLogOptions{ - Container: container, - }).Do(context.Background()).Raw() - if err != nil { - return false, fmt.Errorf("there seems to be an issue with pod %s/%s starting up: %s (%s)", namespace, name, message, reason) - } - - return false, fmt.Errorf("there seems to be an issue with pod %s (%s - %s), logs:\n%s", name, message, reason, string(out)) - } else if containerStatus.State.Waiting != nil && time.Now().After(now.Add(time.Second*10)) { - if containerStatus.State.Waiting.Message != "" { - log.Infof("Please keep waiting, %s container is still starting up: %s (%s)", container, containerStatus.State.Waiting.Message, containerStatus.State.Waiting.Reason) - } else if containerStatus.State.Waiting.Reason != "" { - log.Infof("Please keep waiting, %s container is still starting up: %s", container, containerStatus.State.Waiting.Reason) - } else { - log.Infof("Please keep waiting, %s container is still starting up...", container) - } - - now = time.Now() - } - - return false, nil - } - - return found, nil - }) - if err != nil { - return err - } - - return nil -} - -func toOptionsString(options *snapshot.Options) (string, error) { - jsonBytes, err := json.Marshal(options) - if err != nil { - return "", err - } - - return base64.StdEncoding.EncodeToString(jsonBytes), nil + cmd.Pod.Command = []string{"/vcluster", "snapshot", "--storage", cmd.Storage} + cmd.Pod.Namespace = vClusterPod.Namespace + cmd.Pod.VCluster = vCluster.Name + cmd.Pod.PodSpec = &vClusterPod.Spec + return pod.RunSnapshotPod(ctx, kubeClient, &cmd.Pod, &cmd.Snapshot, cmd.Log) } diff --git a/pkg/cli/add_vcluster_helm.go b/pkg/cli/add_vcluster_helm.go index 4d032cf051..9a9a87e074 100644 --- a/pkg/cli/add_vcluster_helm.go +++ b/pkg/cli/add_vcluster_helm.go @@ -157,7 +157,8 @@ func addVClusterHelm( // restart vCluster if options.Restart { - err = lifecycle.DeletePods(ctx, kubeClient, "app=vcluster,release="+vCluster.Name, vCluster.Namespace, log) + log.Infof("Restarting vCluster") + err = lifecycle.DeletePods(ctx, kubeClient, "app=vcluster,release="+vCluster.Name, vCluster.Namespace) if err != nil { return fmt.Errorf("delete vcluster workloads: %w", err) } diff --git a/pkg/cli/pause_helm.go b/pkg/cli/pause_helm.go index bee4ab0ccf..965c6c5b70 100644 --- a/pkg/cli/pause_helm.go +++ b/pkg/cli/pause_helm.go @@ -50,7 +50,7 @@ func PauseVCluster(ctx context.Context, kubeClient *kubernetes.Clientset, vClust return err } - err = lifecycle.DeletePods(ctx, kubeClient, "vcluster.loft.sh/managed-by="+vCluster.Name, vCluster.Namespace, log) + err = lifecycle.DeletePods(ctx, kubeClient, "vcluster.loft.sh/managed-by="+vCluster.Name, vCluster.Namespace) if err != nil { return fmt.Errorf("delete vcluster workloads: %w", err) } diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index 3868aab6e1..85853ef5b3 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -9,6 +9,7 @@ import ( "github.com/loft-sh/vcluster/pkg/config" "github.com/loft-sh/vcluster/pkg/constants" clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" ) type Value struct { @@ -96,7 +97,7 @@ func New(ctx context.Context, certificates *Certificates, endpoints ...string) ( return nil, err } - etcdClient, err := GetEtcdClient(ctx, certificates, endpoints...) + etcdClient, err := GetEtcdClient(ctx, zap.L().Named("etcd-client"), certificates, endpoints...) if err != nil { return nil, err } diff --git a/pkg/etcd/util.go b/pkg/etcd/util.go index 9440ad29ff..b6a99620dc 100644 --- a/pkg/etcd/util.go +++ b/pkg/etcd/util.go @@ -27,7 +27,7 @@ type Certificates struct { func WaitForEtcd(parentCtx context.Context, certificates *Certificates, endpoints ...string) error { var err error waitErr := wait.PollUntilContextTimeout(parentCtx, time.Second, waitForClientTimeout, true, func(ctx context.Context) (bool, error) { - etcdClient, err := GetEtcdClient(ctx, certificates, endpoints...) + etcdClient, err := GetEtcdClient(ctx, zap.L().Named("wait-for-etcd"), certificates, endpoints...) if err == nil { defer func() { _ = etcdClient.Close() @@ -54,8 +54,8 @@ func WaitForEtcd(parentCtx context.Context, certificates *Certificates, endpoint // If the runtime config does not list any endpoints, the default endpoint is used. // The returned client should be closed when no longer needed, in order to avoid leaking GRPC // client goroutines. -func GetEtcdClient(ctx context.Context, certificates *Certificates, endpoints ...string) (*clientv3.Client, error) { - cfg, err := getClientConfig(ctx, certificates, endpoints...) +func GetEtcdClient(ctx context.Context, log *zap.Logger, certificates *Certificates, endpoints ...string) (*clientv3.Client, error) { + cfg, err := getClientConfig(ctx, log, certificates, endpoints...) if err != nil { return nil, err } @@ -65,13 +65,13 @@ func GetEtcdClient(ctx context.Context, certificates *Certificates, endpoints .. // getClientConfig generates an etcd client config connected to the specified endpoints. // If no endpoints are provided, getEndpoints is called to provide defaults. -func getClientConfig(ctx context.Context, certificates *Certificates, endpoints ...string) (*clientv3.Config, error) { +func getClientConfig(ctx context.Context, log *zap.Logger, certificates *Certificates, endpoints ...string) (*clientv3.Config, error) { config := &clientv3.Config{ Endpoints: endpoints, Context: ctx, DialTimeout: 5 * time.Second, - Logger: zap.L().Named("etcd-client"), + Logger: log, } if len(endpoints) > 0 { diff --git a/pkg/lifecycle/lifecycle.go b/pkg/lifecycle/lifecycle.go index ce7f244634..764592c6a3 100644 --- a/pkg/lifecycle/lifecycle.go +++ b/pkg/lifecycle/lifecycle.go @@ -57,14 +57,13 @@ func PauseVCluster(ctx context.Context, kubeClient *kubernetes.Clientset, name, } // DeletePods deletes all pods associated with a running vcluster -func DeletePods(ctx context.Context, kubeClient *kubernetes.Clientset, labelSelector, namespace string, log log.BaseLogger) error { +func DeletePods(ctx context.Context, kubeClient *kubernetes.Clientset, labelSelector, namespace string) error { list, err := kubeClient.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector}) if err != nil { return err } if len(list.Items) > 0 { - log.Infof("Relaunching %d vcluster pods", len(list.Items)) for _, item := range list.Items { err = kubeClient.CoreV1().Pods(namespace).Delete(ctx, item.Name, metav1.DeleteOptions{}) if err != nil { diff --git a/pkg/snapshot/pod/pod.go b/pkg/snapshot/pod/pod.go new file mode 100644 index 0000000000..2293fd5083 --- /dev/null +++ b/pkg/snapshot/pod/pod.go @@ -0,0 +1,446 @@ +package pod + +import ( + "context" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "os" + "os/signal" + "strings" + "syscall" + "time" + + "github.com/loft-sh/log" + "github.com/loft-sh/vcluster/pkg/snapshot" + "github.com/loft-sh/vcluster/pkg/util/clihelper" + "github.com/spf13/pflag" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + "k8s.io/utils/ptr" +) + +type Options struct { + // command options + Command []string + Namespace string + VCluster string + PodSpec *corev1.PodSpec + + // user-defined options + Mounts []string + Env []string + Image string +} + +func AddPodFlags(fs *pflag.FlagSet, podOptions *Options) { + fs.StringVar(&podOptions.Image, "pod-image", podOptions.Image, "Image to use for the created pod") + fs.StringArrayVar(&podOptions.Mounts, "pod-mount", nil, "Additional mounts for the created pod. Use form :/:. Supported types are: pvc, secret, configmap. E.g.: pvc:my-pvc:/path-in-pod or secret:my-secret/my-key:/path-in-pod") + fs.StringArrayVar(&podOptions.Env, "pod-env", nil, "Additional environment variables for the created pod. Use key=value. E.g.: MY_ENV=my-value") +} + +func RunSnapshotPod( + ctx context.Context, + kubeClient *kubernetes.Clientset, + options *Options, + snapshotOptions *snapshot.Options, + log log.Logger, +) error { + // create snapshot pod + snapshotPod, err := CreateSnapshotPod( + ctx, + kubeClient, + options, + snapshotOptions, + log, + ) + if err != nil { + return err + } + + // create interrupt channel + sigint := make(chan os.Signal, 1) + defer func() { + // make sure we won't interfere with interrupts anymore + signal.Stop(sigint) + + // delete the pod when we are done + _ = kubeClient.CoreV1().Pods(snapshotPod.Namespace).Delete(ctx, snapshotPod.Name, metav1.DeleteOptions{}) + }() + + // also delete on interrupt + go func() { + // interrupt signal sent from terminal + signal.Notify(sigint, os.Interrupt) + // sigterm signal sent from kubernetes + signal.Notify(sigint, syscall.SIGTERM) + + // wait until we get killed + <-sigint + + // cleanup pod + err = kubeClient.CoreV1().Pods(snapshotPod.Namespace).Delete(ctx, snapshotPod.Name, metav1.DeleteOptions{ + GracePeriodSeconds: ptr.To(int64(1)), + }) + if err != nil { + klog.Warningf("Error deleting snapshot pod: %v", err) + } + os.Exit(1) + }() + + // wait for pod to become ready + err = waitForReadyPod(ctx, kubeClient, snapshotPod.Namespace, snapshotPod.Name, "snapshot", log) + if err != nil { + return fmt.Errorf("waiting for restore pod to become ready: %w", err) + } + + // now log the snapshot pod + reader, err := kubeClient.CoreV1().Pods(snapshotPod.Namespace).GetLogs(snapshotPod.Name, &corev1.PodLogOptions{ + Follow: true, + }).Stream(ctx) + if err != nil { + return fmt.Errorf("stream snapshot pod logs: %w", err) + } + defer reader.Close() + + // stream into stdout + log.Infof("Printing logs of pod %s...", snapshotPod.Name) + _, err = io.Copy(os.Stdout, reader) + if err != nil { + return fmt.Errorf("write pod logs: %w", err) + } + + // check restore pod for exit code + exitCode, err := waitForCompletedPod(ctx, kubeClient, snapshotPod.Namespace, snapshotPod.Name, "snapshot") + if err != nil { + return err + } + + // check exit code of snapshot pod + if exitCode != 0 { + return fmt.Errorf("snapshot pod failed: exit code %d", exitCode) + } + + return nil +} + +func CreateSnapshotPod( + ctx context.Context, + kubeClient *kubernetes.Clientset, + options *Options, + snapshotOptions *snapshot.Options, + log log.Logger, +) (*corev1.Pod, error) { + var syncerContainer *corev1.Container + for _, container := range options.PodSpec.Containers { + if container.Name == "syncer" { + syncerContainer = &container + break + } + } + if syncerContainer == nil { + return nil, fmt.Errorf("couldn't find syncer container") + } + + // build args + env := syncerContainer.Env + optionsString, err := toOptionsString(snapshotOptions) + if err != nil { + return nil, err + } + env = append(env, corev1.EnvVar{ + Name: "VCLUSTER_STORAGE_OPTIONS", + Value: optionsString, + }) + + // parse extra volumes + extraVolumes, extraVolumeMounts, err := parseExtraVolumes(options.Mounts) + if err != nil { + return nil, fmt.Errorf("parsing extra volumes: %w", err) + } + + // parse extra env + extraEnv, err := parseExtraEnv(options.Env) + if err != nil { + return nil, fmt.Errorf("parsing extra env: %w", err) + } + + // image + image := syncerContainer.Image + if options.Image != "" { + image = options.Image + } + + // build the pod spec + newPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "vcluster-snapshot-", + Namespace: options.Namespace, + Labels: map[string]string{ + "app": "vcluster-snapshot", + }, + }, + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyNever, + ServiceAccountName: options.PodSpec.ServiceAccountName, + AutomountServiceAccountToken: ptr.To(false), + TerminationGracePeriodSeconds: ptr.To(int64(1)), + NodeSelector: options.PodSpec.NodeSelector, + Affinity: options.PodSpec.Affinity, + Tolerations: options.PodSpec.Tolerations, + SecurityContext: options.PodSpec.SecurityContext, + Volumes: append(options.PodSpec.Volumes, extraVolumes...), + Containers: []corev1.Container{ + { + Name: "snapshot", + Image: image, + Command: options.Command, + SecurityContext: syncerContainer.SecurityContext, + Env: append(env, extraEnv...), + EnvFrom: syncerContainer.EnvFrom, + VolumeMounts: append(syncerContainer.VolumeMounts, extraVolumeMounts...), + }, + }, + }, + } + + // add persistent volume claim volume if necessary + for _, volumeMount := range syncerContainer.VolumeMounts { + if volumeMount.Name == "data" { + // check if its part of the pod spec + found := false + for _, volume := range newPod.Spec.Volumes { + if volume.Name == volumeMount.Name { + found = true + break + } + } + if !found { + newPod.Spec.Volumes = append(newPod.Spec.Volumes, corev1.Volume{ + Name: volumeMount.Name, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: "data-" + options.VCluster + "-0", + }, + }, + }) + } + } + } + + // create the pod + log.Infof("Starting snapshot pod for vCluster %s/%s...", options.Namespace, options.VCluster) + newPod, err = kubeClient.CoreV1().Pods(options.Namespace).Create(ctx, newPod, metav1.CreateOptions{}) + if err != nil { + return nil, fmt.Errorf("creating pod: %w", err) + } + + return newPod, nil +} + +func toOptionsString(options *snapshot.Options) (string, error) { + jsonBytes, err := json.Marshal(options) + if err != nil { + return "", err + } + + return base64.StdEncoding.EncodeToString(jsonBytes), nil +} + +func parseExtraEnv(env []string) ([]corev1.EnvVar, error) { + extraEnv := make([]corev1.EnvVar, 0, len(env)) + for _, envVar := range env { + parts := strings.SplitN(envVar, "=", 2) + if len(parts) != 2 { + return nil, fmt.Errorf("invalid environment variable %s", envVar) + } + + extraEnv = append(extraEnv, corev1.EnvVar{ + Name: parts[0], + Value: parts[1], + }) + } + + return extraEnv, nil +} + +func parseExtraVolumes(volumes []string) ([]corev1.Volume, []corev1.VolumeMount, error) { + extraVolumes := make([]corev1.Volume, 0, len(volumes)) + extraVolumeMounts := make([]corev1.VolumeMount, 0, len(volumes)) + for idx, volume := range volumes { + volumeName := fmt.Sprintf("extra-volume-%d", idx) + volumeSplit := strings.Split(volume, ":") + if len(volumeSplit) != 3 { + return nil, nil, fmt.Errorf("invalid volume format: %s, expected type:name:path", volume) + } + + items := []corev1.KeyToPath{} + name := volumeSplit[1] + nameSplit := strings.Split(volumeSplit[1], "/") + if len(nameSplit) == 2 { + name = nameSplit[0] + items = append(items, corev1.KeyToPath{ + Key: nameSplit[1], + Path: nameSplit[1], + }) + } + + switch volumeSplit[0] { + case "pvc": + if len(items) > 0 { + return nil, nil, fmt.Errorf("invalid name format: %s, expected type:name:path", name) + } + + extraVolumes = append(extraVolumes, corev1.Volume{ + Name: volumeName, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: volumeSplit[1], + }, + }, + }) + case "secret": + extraVolumes = append(extraVolumes, corev1.Volume{ + Name: volumeName, + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: name, + Items: items, + }, + }, + }) + case "configmap": + extraVolumes = append(extraVolumes, corev1.Volume{ + Name: volumeName, + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: name, + }, + Items: items, + }, + }, + }) + default: + return nil, nil, fmt.Errorf("invalid type: %s, expected pvc, secret or configmap", volumeSplit[0]) + } + + extraVolumeMounts = append(extraVolumeMounts, corev1.VolumeMount{ + Name: volumeName, + MountPath: volumeSplit[2], + }) + } + + return extraVolumes, extraVolumeMounts, nil +} + +func waitForReadyPod(ctx context.Context, kubeClient kubernetes.Interface, namespace, name, container string, log log.Logger) error { + now := time.Now() + err := wait.PollUntilContextTimeout(ctx, time.Second*2, time.Minute*2, true, func(ctx context.Context) (bool, error) { + pod, err := kubeClient.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + // this is a fatal + return false, fmt.Errorf("error trying to retrieve pod %s/%s: %w", namespace, name, err) + } + + found := false + for _, containerStatus := range pod.Status.ContainerStatuses { + if containerStatus.State.Running != nil && containerStatus.Ready { + if containerStatus.Name == container { + found = true + } + + continue + } else if containerStatus.State.Terminated != nil || (containerStatus.State.Waiting != nil && clihelper.CriticalStatus[containerStatus.State.Waiting.Reason]) { + // if the container is completed that is fine as well + if containerStatus.State.Terminated != nil && containerStatus.State.Terminated.ExitCode == 0 { + found = true + continue + } + + reason := "" + message := "" + if containerStatus.State.Terminated != nil { + reason = containerStatus.State.Terminated.Reason + message = containerStatus.State.Terminated.Message + } else if containerStatus.State.Waiting != nil { + reason = containerStatus.State.Waiting.Reason + message = containerStatus.State.Waiting.Message + } + + out, err := kubeClient.CoreV1().Pods(namespace).GetLogs(pod.Name, &corev1.PodLogOptions{ + Container: container, + }).Do(context.Background()).Raw() + if err != nil { + return false, fmt.Errorf("there seems to be an issue with pod %s/%s starting up: %s (%s)", namespace, name, message, reason) + } + + return false, fmt.Errorf("there seems to be an issue with pod %s (%s - %s), logs:\n%s", name, message, reason, string(out)) + } else if containerStatus.State.Waiting != nil && time.Now().After(now.Add(time.Second*10)) { + if containerStatus.State.Waiting.Message != "" { + log.Infof("Please keep waiting, %s container is still starting up: %s (%s)", container, containerStatus.State.Waiting.Message, containerStatus.State.Waiting.Reason) + } else if containerStatus.State.Waiting.Reason != "" { + log.Infof("Please keep waiting, %s container is still starting up: %s", container, containerStatus.State.Waiting.Reason) + } else { + log.Infof("Please keep waiting, %s container is still starting up...", container) + } + + now = time.Now() + } + + return false, nil + } + + return found, nil + }) + if err != nil { + return err + } + + return nil +} + +func waitForCompletedPod(ctx context.Context, kubeClient *kubernetes.Clientset, namespace, name, container string) (int32, error) { + exitCode := int32(-1) + err := wait.PollUntilContextTimeout(ctx, time.Second*2, time.Minute, true, func(ctx context.Context) (bool, error) { + pod, err := kubeClient.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + // this is a fatal + return false, fmt.Errorf("error trying to retrieve pod %s/%s: %w", namespace, name, err) + } + + for _, containerStatus := range pod.Status.ContainerStatuses { + if containerStatus.Name != container { + continue + } + + if containerStatus.State.Running != nil { + return false, nil + } else if containerStatus.State.Terminated != nil { + exitCode = containerStatus.State.Terminated.ExitCode + return true, nil + } else if containerStatus.State.Waiting != nil { + if containerStatus.State.Waiting.Message != "" { + return false, fmt.Errorf("error: %s container is waiting: %s (%s)", container, containerStatus.State.Waiting.Message, containerStatus.State.Waiting.Reason) + } else if containerStatus.State.Waiting.Reason != "" { + return false, fmt.Errorf("error: %s container is waiting: %s", container, containerStatus.State.Waiting.Reason) + } + + return false, fmt.Errorf("error: %s container is waiting", container) + } + + return false, nil + } + + return false, nil + }) + if err != nil { + return exitCode, err + } + + return exitCode, nil +} diff --git a/pkg/snapshot/s3/store.go b/pkg/snapshot/s3/store.go index 927784c8cc..c5f4a0d42b 100644 --- a/pkg/snapshot/s3/store.go +++ b/pkg/snapshot/s3/store.go @@ -96,7 +96,6 @@ type ObjectStore struct { kmsKeyID string sseCustomerKey string sseCustomerKeyMd5 string - signatureVersion string serverSideEncryption string tagging string checksumAlg string diff --git a/test/e2e/e2e_suite_test.go b/test/e2e/e2e_suite_test.go index 6284fd9306..b82bbb24f0 100644 --- a/test/e2e/e2e_suite_test.go +++ b/test/e2e/e2e_suite_test.go @@ -24,6 +24,7 @@ import ( _ "github.com/loft-sh/vcluster/test/e2e/manifests" _ "github.com/loft-sh/vcluster/test/e2e/node" _ "github.com/loft-sh/vcluster/test/e2e/servicesync" + _ "github.com/loft-sh/vcluster/test/e2e/snapshot" _ "github.com/loft-sh/vcluster/test/e2e/syncer/fromhost" _ "github.com/loft-sh/vcluster/test/e2e/syncer/networkpolicies" _ "github.com/loft-sh/vcluster/test/e2e/syncer/pods" diff --git a/test/e2e/snapshot/snapshot.go b/test/e2e/snapshot/snapshot.go new file mode 100644 index 0000000000..da29897561 --- /dev/null +++ b/test/e2e/snapshot/snapshot.go @@ -0,0 +1,170 @@ +package snapshot + +import ( + "context" + "os" + "os/exec" + "time" + + "github.com/loft-sh/vcluster/test/framework" + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" +) + +var _ = ginkgo.Describe("Snapshot VCluster", func() { + f := framework.DefaultFramework + ginkgo.It("run vcluster snapshot and vcluster restore", func() { + ginkgo.By("Make sure vcluster pods are running") + pods, err := f.HostClient.CoreV1().Pods(f.VclusterNamespace).List(f.Context, metav1.ListOptions{ + LabelSelector: "app=vcluster", + }) + framework.ExpectNoError(err) + framework.ExpectEqual(true, len(pods.Items) > 0) + + // create a pvc we will use to store the snapshot + pvc := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "snapshot-pvc", + Namespace: f.VclusterNamespace, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}, + Resources: corev1.VolumeResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceStorage: resource.MustParse("5Gi"), + }, + }, + }, + } + _, err = f.HostClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(f.Context, pvc, metav1.CreateOptions{}) + framework.ExpectNoError(err) + + // now create a service that should be there when we restore again + _, err = f.VClusterClient.CoreV1().Services("default").Create(f.Context, &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "snapshot-restore", + Namespace: "default", + Labels: map[string]string{ + "snapshot": "restore", + }, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "https", + Port: 443, + }, + }, + Type: corev1.ServiceTypeClusterIP, + }, + }, metav1.CreateOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("Snapshot vcluster") + cmd := exec.Command( + "vcluster", + "snapshot", + f.VclusterName, + "-n", f.VclusterNamespace, + "--storage", "file", + "--file-path", "/snapshot-pvc/snapshot.tar", + "--pod-mount", "pvc:snapshot-pvc:/snapshot-pvc", + ) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err = cmd.Run() + framework.ExpectNoError(err) + + // now delete the service + err = f.VClusterClient.CoreV1().Services("default").Delete(f.Context, "snapshot-restore", metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + // now create a service that should not be there when we restore + _, err = f.VClusterClient.CoreV1().Services("default").Create(f.Context, &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "snapshot-delete", + Namespace: "default", + Labels: map[string]string{ + "snapshot": "delete", + }, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "http", + Port: 80, + }, + }, + Type: corev1.ServiceTypeClusterIP, + }, + }, metav1.CreateOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("Restore vcluster") + cmd = exec.Command( + "vcluster", + "restore", + f.VclusterName, + "-n", f.VclusterNamespace, + "--storage", "file", + "--file-path", "/snapshot-pvc/snapshot.tar", + "--pod-mount", "pvc:snapshot-pvc:/snapshot-pvc", + ) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err = cmd.Run() + framework.ExpectNoError(err) + + // wait until vCluster is running + err = wait.PollUntilContextTimeout(f.Context, time.Second, time.Minute*2, false, func(ctx context.Context) (done bool, err error) { + newPods, _ := f.HostClient.CoreV1().Pods(f.VclusterNamespace).List(ctx, metav1.ListOptions{ + LabelSelector: "app=vcluster", + }) + p := len(newPods.Items) + if p > 0 { + // rp, running pod counter + rp := 0 + for _, pod := range newPods.Items { + if pod.Status.Phase == corev1.PodRunning { + rp = rp + 1 + } + } + if rp == p { + return true, nil + } + } + return false, nil + }) + framework.ExpectNoError(err) + + // delete the snapshot pvc + err = f.HostClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Delete(f.Context, pvc.Name, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + // check for the service getting deleted + gomega.Eventually(func() int { + services, err := f.HostClient.CoreV1().Services(f.VclusterNamespace).List(f.Context, metav1.ListOptions{ + LabelSelector: "snapshot=delete", + }) + framework.ExpectNoError(err) + return len(services.Items) + }).WithPolling(time.Second). + WithTimeout(framework.PollTimeout). + Should(gomega.Equal(0)) + + // check for the secret getting created + gomega.Eventually(func() int { + services, err := f.HostClient.CoreV1().Services(f.VclusterNamespace).List(f.Context, metav1.ListOptions{ + LabelSelector: "snapshot=restore", + }) + framework.ExpectNoError(err) + return len(services.Items) + }).WithPolling(time.Second). + WithTimeout(framework.PollTimeout). + Should(gomega.Equal(1)) + }) +}) diff --git a/test/e2e_isolation_mode/e2e_isolation_mode_test.go b/test/e2e_isolation_mode/e2e_isolation_mode_test.go index 7bfc9e4796..35f3e3375a 100644 --- a/test/e2e_isolation_mode/e2e_isolation_mode_test.go +++ b/test/e2e_isolation_mode/e2e_isolation_mode_test.go @@ -24,6 +24,7 @@ import ( _ "github.com/loft-sh/vcluster/test/e2e/manifests" _ "github.com/loft-sh/vcluster/test/e2e/node" _ "github.com/loft-sh/vcluster/test/e2e/servicesync" + _ "github.com/loft-sh/vcluster/test/e2e/snapshot" _ "github.com/loft-sh/vcluster/test/e2e/syncer/networkpolicies" _ "github.com/loft-sh/vcluster/test/e2e/syncer/pods" _ "github.com/loft-sh/vcluster/test/e2e/syncer/pvc" diff --git a/test/e2e_node/e2e_node_suite_test.go b/test/e2e_node/e2e_node_suite_test.go index 3ffb0c9940..8edbff6fc0 100644 --- a/test/e2e_node/e2e_node_suite_test.go +++ b/test/e2e_node/e2e_node_suite_test.go @@ -23,6 +23,7 @@ import ( _ "github.com/loft-sh/vcluster/test/e2e/k8sdefaultendpoint" _ "github.com/loft-sh/vcluster/test/e2e/manifests" _ "github.com/loft-sh/vcluster/test/e2e/servicesync" + _ "github.com/loft-sh/vcluster/test/e2e/snapshot" _ "github.com/loft-sh/vcluster/test/e2e/syncer/networkpolicies" _ "github.com/loft-sh/vcluster/test/e2e/syncer/pods" _ "github.com/loft-sh/vcluster/test/e2e/syncer/pvc" diff --git a/test/e2e_rootless/e2e_rootless_mode_suite_test.go b/test/e2e_rootless/e2e_rootless_mode_suite_test.go index cf683775b7..8eb098f37d 100644 --- a/test/e2e_rootless/e2e_rootless_mode_suite_test.go +++ b/test/e2e_rootless/e2e_rootless_mode_suite_test.go @@ -24,6 +24,7 @@ import ( _ "github.com/loft-sh/vcluster/test/e2e/manifests" _ "github.com/loft-sh/vcluster/test/e2e/node" _ "github.com/loft-sh/vcluster/test/e2e/servicesync" + _ "github.com/loft-sh/vcluster/test/e2e/snapshot" _ "github.com/loft-sh/vcluster/test/e2e/syncer/networkpolicies" _ "github.com/loft-sh/vcluster/test/e2e/syncer/pods" _ "github.com/loft-sh/vcluster/test/e2e/syncer/pvc" diff --git a/test/e2e_scheduler/e2e_scheduler_suite_test.go b/test/e2e_scheduler/e2e_scheduler_suite_test.go index 16adb50396..ba48d89cc3 100644 --- a/test/e2e_scheduler/e2e_scheduler_suite_test.go +++ b/test/e2e_scheduler/e2e_scheduler_suite_test.go @@ -23,6 +23,7 @@ import ( _ "github.com/loft-sh/vcluster/test/e2e/k8sdefaultendpoint" _ "github.com/loft-sh/vcluster/test/e2e/manifests" _ "github.com/loft-sh/vcluster/test/e2e/servicesync" + _ "github.com/loft-sh/vcluster/test/e2e/snapshot" _ "github.com/loft-sh/vcluster/test/e2e/syncer/networkpolicies" _ "github.com/loft-sh/vcluster/test/e2e/syncer/pods" _ "github.com/loft-sh/vcluster/test/e2e/syncer/pvc" diff --git a/test/e2e_target_namespace/e2e_target_namespace_test.go b/test/e2e_target_namespace/e2e_target_namespace_test.go index eb41b07caa..c08da34a81 100644 --- a/test/e2e_target_namespace/e2e_target_namespace_test.go +++ b/test/e2e_target_namespace/e2e_target_namespace_test.go @@ -16,6 +16,7 @@ import ( apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" // Enable cloud provider auth + _ "github.com/loft-sh/vcluster/test/e2e/snapshot" _ "github.com/loft-sh/vcluster/test/e2e_target_namespace/targetnamespace" _ "k8s.io/client-go/plugin/pkg/client/auth" // Register tests