Skip to content

Commit

Permalink
test: add snapshot tests
Browse files Browse the repository at this point in the history
  • Loading branch information
FabianKramm committed Feb 12, 2025
1 parent eba9140 commit 13e3a4c
Show file tree
Hide file tree
Showing 18 changed files with 674 additions and 492 deletions.
2 changes: 1 addition & 1 deletion cmd/vcluster/cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewRestoreCommand() *cobra.Command {
Use: "restore",
Short: "restore a vCluster",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(cmd *cobra.Command, _ []string) error {
return options.Run(cmd.Context())
},
}
Expand Down
11 changes: 9 additions & 2 deletions cmd/vcluster/cmd/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"path/filepath"
"time"

"github.com/go-logr/logr"
"github.com/loft-sh/log/logr/zapr"
vclusterconfig "github.com/loft-sh/vcluster/config"
"github.com/loft-sh/vcluster/pkg/config"
"github.com/loft-sh/vcluster/pkg/constants"
Expand All @@ -24,6 +26,8 @@ import (
"github.com/loft-sh/vcluster/pkg/snapshot/s3"
"github.com/loft-sh/vcluster/pkg/util/servicecidr"
"github.com/spf13/cobra"
"go.uber.org/zap"
"google.golang.org/grpc/grpclog"
"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -57,7 +61,7 @@ func NewSnapshotCommand() *cobra.Command {
Use: "snapshot",
Short: "snapshot a vCluster",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(cmd *cobra.Command, _ []string) error {
return options.Run(cmd.Context())
},
}
Expand Down Expand Up @@ -241,7 +245,10 @@ func isEtcdReachable(ctx context.Context, endpoint string, certificates *etcd.Ce
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

etcdClient, err := etcd.GetEtcdClient(ctx, certificates, endpoint)
zapLog := zap.NewNop()
grpclog.SetLoggerV2(grpclog.NewLoggerV2(io.Discard, io.Discard, io.Discard))
ctx = logr.NewContext(ctx, zapr.NewLoggerWithOptions(zapLog))
etcdClient, err := etcd.GetEtcdClient(ctx, zapLog, certificates, endpoint)
if err == nil {
defer func() {
_ = etcdClient.Close()
Expand Down
207 changes: 16 additions & 191 deletions cmd/vclusterctl/cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@ package cmd
import (
"context"
"fmt"
"io"
"os"
"os/signal"
"strings"
"syscall"

"github.com/blang/semver/v4"
"github.com/loft-sh/log"
Expand All @@ -19,23 +15,21 @@ import (
"github.com/loft-sh/vcluster/pkg/lifecycle"
"github.com/loft-sh/vcluster/pkg/snapshot"
"github.com/loft-sh/vcluster/pkg/snapshot/file"
"github.com/loft-sh/vcluster/pkg/snapshot/pod"
"github.com/loft-sh/vcluster/pkg/snapshot/s3"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
)

type RestoreCmd struct {
*flags.GlobalFlags

S3 s3.Options
File file.Options

Storage string

Snapshot snapshot.Options
Pod pod.Options

Log log.Logger
}

Expand Down Expand Up @@ -68,8 +62,9 @@ vcluster restore test --namespace test
cobraCmd.Flags().StringVar(&cmd.Storage, "storage", "s3", "The storage to restore from. Can be either s3 or file")

// add storage flags
file.AddFileFlags(cobraCmd.Flags(), &cmd.File)
s3.AddS3Flags(cobraCmd.Flags(), &cmd.S3)
file.AddFileFlags(cobraCmd.Flags(), &cmd.Snapshot.File)
s3.AddS3Flags(cobraCmd.Flags(), &cmd.Snapshot.S3)
pod.AddPodFlags(cobraCmd.Flags(), &cmd.Pod)
return cobraCmd
}

Expand All @@ -91,13 +86,9 @@ func (cmd *RestoreCmd) Run(ctx context.Context, args []string) error {
}

// check if snapshot is supported
if vCluster.Version != "dev-next" {
version, err := semver.Parse(strings.TrimPrefix(vCluster.Version, "v"))
if err != nil {
return fmt.Errorf("parsing vCluster version: %w", err)
}

// check if version matches
version, err := semver.Parse(strings.TrimPrefix(vCluster.Version, "v"))
if err == nil {
// only check if version matches if vCluster actually has a parsable version
if version.LT(semver.MustParse(minSnapshotVersion)) {
return fmt.Errorf("vCluster version %s snapshotting is not supported", vCluster.Version)
}
Expand All @@ -119,16 +110,6 @@ func (cmd *RestoreCmd) Run(ctx context.Context, args []string) error {
}
}()

// now restore vCluster
err = cmd.restoreVCluster(ctx, kubeClient, vCluster)
if err != nil {
return fmt.Errorf("restore vCluster %s: %w", vCluster.Name, err)
}

return nil
}

func (cmd *RestoreCmd) restoreVCluster(ctx context.Context, kubeClient *kubernetes.Clientset, vCluster *find.VCluster) error {
// get pod spec
var podSpec *corev1.PodSpec
if vCluster.StatefulSet != nil {
Expand All @@ -139,166 +120,10 @@ func (cmd *RestoreCmd) restoreVCluster(ctx context.Context, kubeClient *kubernet
return fmt.Errorf("vCluster %s has no StatefulSet or Deployment", vCluster.Name)
}

// now start the snapshot pod that takes the snapshot
restorePod, err := cmd.startRestorePod(ctx, kubeClient, vCluster.Namespace, vCluster.Name, podSpec)
if err != nil {
return fmt.Errorf("starting snapshot pod: %w", err)
}

// create interrupt channel
sigint := make(chan os.Signal, 1)
defer func() {
// make sure we won't interfere with interrupts anymore
signal.Stop(sigint)

// delete the restore pod when we are done
_ = kubeClient.CoreV1().Pods(restorePod.Namespace).Delete(ctx, restorePod.Name, metav1.DeleteOptions{})
}()

// also delete on interrupt
go func() {
// interrupt signal sent from terminal
signal.Notify(sigint, os.Interrupt)
// sigterm signal sent from kubernetes
signal.Notify(sigint, syscall.SIGTERM)

// wait until we get killed
<-sigint

// cleanup virtual cluster
err = kubeClient.CoreV1().Pods(restorePod.Namespace).Delete(ctx, restorePod.Name, metav1.DeleteOptions{
GracePeriodSeconds: ptr.To(int64(1)),
})
if err != nil {
klog.Warningf("Error deleting snapshot pod: %v", err)
}
os.Exit(1)
}()

// wait for pod to become ready
err = waitForReadyPod(ctx, kubeClient, restorePod.Namespace, restorePod.Name, "restore", cmd.Log)
if err != nil {
return fmt.Errorf("waiting for restore pod to become ready: %w", err)
}

// now log the snapshot pod
reader, err := kubeClient.CoreV1().Pods(restorePod.Namespace).GetLogs(restorePod.Name, &corev1.PodLogOptions{
Follow: true,
}).Stream(ctx)
if err != nil {
return fmt.Errorf("stream restore pod logs: %w", err)
}
defer reader.Close()

// stream into stdout
cmd.Log.Infof("Printing logs of restore pod...")
_, err = io.Copy(os.Stdout, reader)
if err != nil {
return fmt.Errorf("write restore pod logs: %w", err)
}

// check restore pod for exit code
exitCode, err := waitForCompletedPod(ctx, kubeClient, restorePod.Namespace, restorePod.Name, "restore", cmd.Log)
if err != nil {
return err
}

// check exit code of restore pod
if exitCode != 0 {
return fmt.Errorf("restore pod failed: exit code %d", exitCode)
}

return nil
}

func (cmd *RestoreCmd) startRestorePod(ctx context.Context, kubeClient *kubernetes.Clientset, namespace, vCluster string, podSpec *corev1.PodSpec) (*corev1.Pod, error) {
var syncerContainer *corev1.Container
for _, container := range podSpec.Containers {
if container.Name == "syncer" {
syncerContainer = &container
break
}
}
if syncerContainer == nil {
return nil, fmt.Errorf("couldn't find syncer container")
}

// build args
env := syncerContainer.Env
options, err := toOptionsString(&snapshot.Options{
S3: cmd.S3,
File: cmd.File,
})
if err != nil {
return nil, err
}
env = append(env, corev1.EnvVar{
Name: "VCLUSTER_STORAGE_OPTIONS",
Value: options,
})

// build the pod spec
newPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "vcluster-restore-",
Namespace: namespace,
Labels: map[string]string{
"app": "vcluster-restore",
},
},
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
ServiceAccountName: podSpec.ServiceAccountName,
TerminationGracePeriodSeconds: ptr.To(int64(1)),
NodeSelector: podSpec.NodeSelector,
Affinity: podSpec.Affinity,
Tolerations: podSpec.Tolerations,
SecurityContext: podSpec.SecurityContext,
Volumes: podSpec.Volumes,
Containers: []corev1.Container{
{
Name: "restore",
Image: syncerContainer.Image,
Command: []string{"/vcluster", "restore", "--storage", cmd.Storage},
SecurityContext: syncerContainer.SecurityContext,
Env: env,
EnvFrom: syncerContainer.EnvFrom,
VolumeMounts: syncerContainer.VolumeMounts,
},
},
},
}

// add persistent volume claim volume if necessary
for _, volumeMount := range syncerContainer.VolumeMounts {
if volumeMount.Name == "data" {
// check if its part of the pod spec
found := false
for _, volume := range newPod.Spec.Volumes {
if volume.Name == volumeMount.Name {
found = true
break
}
}
if !found {
newPod.Spec.Volumes = append(newPod.Spec.Volumes, corev1.Volume{
Name: volumeMount.Name,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: "data-" + vCluster + "-0",
},
},
})
}
}
}

// create the pod
cmd.Log.Infof("Starting restore pod for vCluster %s/%s...", namespace, vCluster)
newPod, err = kubeClient.CoreV1().Pods(namespace).Create(ctx, newPod, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("creating restore pod: %w", err)
}

return newPod, nil
// set missing pod options and run snapshot restore pod
cmd.Pod.Namespace = vCluster.Namespace
cmd.Pod.VCluster = vCluster.Name
cmd.Pod.PodSpec = podSpec
cmd.Pod.Command = []string{"/vcluster", "restore", "--storage", cmd.Storage}
return pod.RunSnapshotPod(ctx, kubeClient, &cmd.Pod, &cmd.Snapshot, cmd.Log)
}
Loading

0 comments on commit 13e3a4c

Please sign in to comment.