Skip to content

Commit

Permalink
test: add snapshot tests
Browse files Browse the repository at this point in the history
  • Loading branch information
FabianKramm committed Feb 12, 2025
1 parent eba9140 commit fc72540
Show file tree
Hide file tree
Showing 413 changed files with 45,617 additions and 4,472 deletions.
28 changes: 12 additions & 16 deletions cmd/vcluster/cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,15 @@ import (
"github.com/loft-sh/vcluster/pkg/config"
"github.com/loft-sh/vcluster/pkg/constants"
"github.com/loft-sh/vcluster/pkg/etcd"
"github.com/loft-sh/vcluster/pkg/snapshot/file"
"github.com/loft-sh/vcluster/pkg/snapshot/s3"
"github.com/loft-sh/vcluster/pkg/snapshot"
"github.com/spf13/cobra"
"k8s.io/klog/v2"
)

type RestoreOptions struct {
S3 s3.Options
File file.Options
Snapshot snapshot.Options

Compress bool
Storage string
Config string
}

Expand All @@ -36,26 +33,23 @@ func NewRestoreCommand() *cobra.Command {
if err != nil {
klog.Warningf("Error parsing environment variables: %v", err)
} else {
options.S3 = envOptions.S3
options.File = envOptions.File
options.Snapshot = *envOptions
}

cmd := &cobra.Command{
Use: "restore",
Short: "restore a vCluster",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(cmd *cobra.Command, _ []string) error {
return options.Run(cmd.Context())
},
}

cmd.Flags().StringVar(&options.Config, "config", constants.DefaultVClusterConfigLocation, "The path where to find the vCluster config to load")
cmd.Flags().StringVar(&options.Storage, "storage", "s3", "The storage to backup to. Can be either s3 or file")
cmd.Flags().BoolVar(&options.Compress, "compress", false, "If the backup should be compressed")

// add storage flags
file.AddFileFlags(cmd.Flags(), &options.File)
s3.AddS3Flags(cmd.Flags(), &options.S3)
snapshot.AddFlags(cmd.Flags(), &options.Snapshot)
return cmd
}

Expand All @@ -67,7 +61,7 @@ func (o *RestoreOptions) Run(ctx context.Context) error {
}

// make sure to validate options
err = validateOptions(vConfig, o.Storage, &o.S3, &o.File)
err = validateOptions(vConfig, &o.Snapshot)
if err != nil {
return err
}
Expand All @@ -79,21 +73,24 @@ func (o *RestoreOptions) Run(ctx context.Context) error {
}

// create store
objectStore, err := createStore(ctx, o.Storage, &o.S3, &o.File)
objectStore, err := createStore(ctx, &o.Snapshot)
if err != nil {
return fmt.Errorf("failed to create store: %w", err)
}

// now stream objects from object store to etcd
reader, err := objectStore.GetObject()
reader, err := objectStore.GetObject(ctx)
if err != nil {
return fmt.Errorf("failed to get backup: %w", err)
}
defer reader.Close()

// print log message that we start restoring
klog.Infof("Start restoring etcd snapshot from %s...", objectStore.Target())

// optionally decompress
gzipReader := reader
if o.Compress {
if o.Compress || o.Snapshot.Type == "oci" {
gzipReader, err = gzip.NewReader(reader)
if err != nil {
return fmt.Errorf("failed to create gzip reader: %w", err)
Expand All @@ -105,7 +102,6 @@ func (o *RestoreOptions) Run(ctx context.Context) error {
tarReader := tar.NewReader(gzipReader)

// now restore each key value
klog.FromContext(ctx).Info("Start restoring etcd snapshot...")
restoredKeys := 0
for {
// read from archive
Expand Down
135 changes: 72 additions & 63 deletions cmd/vcluster/cmd/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"path/filepath"
"time"

"github.com/go-logr/logr"
"github.com/loft-sh/log/logr/zapr"
vclusterconfig "github.com/loft-sh/vcluster/config"
"github.com/loft-sh/vcluster/pkg/config"
"github.com/loft-sh/vcluster/pkg/constants"
Expand All @@ -21,24 +23,25 @@ import (
"github.com/loft-sh/vcluster/pkg/setup"
"github.com/loft-sh/vcluster/pkg/snapshot"
"github.com/loft-sh/vcluster/pkg/snapshot/file"
"github.com/loft-sh/vcluster/pkg/snapshot/oci"
"github.com/loft-sh/vcluster/pkg/snapshot/s3"
"github.com/loft-sh/vcluster/pkg/util/servicecidr"
"github.com/spf13/cobra"
"go.uber.org/zap"
"google.golang.org/grpc/grpclog"
"k8s.io/klog/v2"
)

type Storage interface {
Target() string
PutObject(body io.Reader) error
GetObject() (io.ReadCloser, error)
PutObject(ctx context.Context, body io.Reader) error
GetObject(ctx context.Context) (io.ReadCloser, error)
}

type SnapshotOptions struct {
S3 s3.Options
File file.Options
Snapshot snapshot.Options

Compress bool
Storage string
Prefix string
Config string
}
Expand All @@ -49,27 +52,24 @@ func NewSnapshotCommand() *cobra.Command {
if err != nil {
klog.Warningf("Error parsing environment variables: %v", err)
} else {
options.S3 = envOptions.S3
options.File = envOptions.File
options.Snapshot = *envOptions
}

cmd := &cobra.Command{
Use: "snapshot",
Short: "snapshot a vCluster",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(cmd *cobra.Command, _ []string) error {
return options.Run(cmd.Context())
},
}

cmd.Flags().StringVar(&options.Config, "config", constants.DefaultVClusterConfigLocation, "The path where to find the vCluster config to load")
cmd.Flags().StringVar(&options.Prefix, "prefix", "/", "The prefix to use to snapshot the etcd")
cmd.Flags().StringVar(&options.Storage, "storage", "s3", "The storage to snapshot to. Can be either s3 or file")
cmd.Flags().BoolVar(&options.Compress, "compress", false, "If the snapshot should be compressed")

// add storage flags
file.AddFileFlags(cmd.Flags(), &options.File)
s3.AddS3Flags(cmd.Flags(), &options.S3)
snapshot.AddFlags(cmd.Flags(), &options.Snapshot)
return cmd
}

Expand All @@ -81,7 +81,7 @@ func (o *SnapshotOptions) Run(ctx context.Context) error {
}

// make sure to validate options
err = validateOptions(vConfig, o.Storage, &o.S3, &o.File)
err = validateOptions(vConfig, &o.Snapshot)
if err != nil {
return err
}
Expand All @@ -93,7 +93,7 @@ func (o *SnapshotOptions) Run(ctx context.Context) error {
}

// create store
objectStore, err := createStore(ctx, o.Storage, &o.S3, &o.File)
objectStore, err := createStore(ctx, &o.Snapshot)
if err != nil {
return fmt.Errorf("failed to create store: %w", err)
}
Expand All @@ -119,15 +119,15 @@ func (o *SnapshotOptions) writeSnapshot(ctx context.Context, etcdClient etcd.Cli
defer writer.Close()
go func() {
defer reader.Close()
errChan <- objectStore.PutObject(reader)
errChan <- objectStore.PutObject(ctx, reader)
}()

// start listing the keys
listChan := etcdClient.ListStream(ctx, o.Prefix)

// optionally compress
gzipWriter := io.WriteCloser(writer)
if o.Compress {
if o.Compress || o.Snapshot.Type == "oci" {
gzipWriter = gzip.NewWriter(writer)
defer gzipWriter.Close()
}
Expand Down Expand Up @@ -179,21 +179,11 @@ func (o *SnapshotOptions) writeSnapshot(ctx context.Context, etcdClient etcd.Cli
}
}

func validateOptions(vConfig *config.VirtualClusterConfig, storage string, s3Options *s3.Options, fileOptions *file.Options) error {
func validateOptions(vConfig *config.VirtualClusterConfig, options *snapshot.Options) error {
// storage needs to be either s3 or file
if storage == "s3" {
if s3Options.Key == "" {
return fmt.Errorf("--s3-key must be specified")
}
if s3Options.Bucket == "" {
return fmt.Errorf("--s3-bucket must be specified")
}
} else if storage == "file" {
if fileOptions.Path == "" {
return fmt.Errorf("--file-path must be specified")
}
} else {
return fmt.Errorf("--storage must be either 's3' or 'file'")
err := snapshot.Validate(options)
if err != nil {
return err
}

// only support k3s and k8s distro
Expand Down Expand Up @@ -225,6 +215,11 @@ func newEtcdClient(ctx context.Context, vConfig *config.VirtualClusterConfig, st
return nil, fmt.Errorf("start external database backing store: %w", err)
}
}
} else if vConfig.BackingStoreType() == vclusterconfig.StoreTypeExternalEtcd {
_, err := generateCertificates(ctx, vConfig)
if err != nil {
return nil, fmt.Errorf("failed to get certificates: %w", err)
}
}

// create the etcd client
Expand All @@ -241,7 +236,10 @@ func isEtcdReachable(ctx context.Context, endpoint string, certificates *etcd.Ce
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

etcdClient, err := etcd.GetEtcdClient(ctx, certificates, endpoint)
zapLog := zap.NewNop()
grpclog.SetLoggerV2(grpclog.NewLoggerV2(io.Discard, io.Discard, io.Discard))
ctx = logr.NewContext(ctx, zapr.NewLoggerWithOptions(zapLog))
etcdClient, err := etcd.GetEtcdClient(ctx, zapLog, certificates, endpoint)
if err == nil {
defer func() {
_ = etcdClient.Close()
Expand Down Expand Up @@ -297,34 +295,10 @@ func startEmbeddedBackingStore(ctx context.Context, vConfig *config.VirtualClust

// embedded etcd
if vConfig.BackingStoreType() == vclusterconfig.StoreTypeEmbeddedEtcd {
var err error
klog.FromContext(ctx).Info("Starting embedded etcd...")

// init the clients
vConfig.ControlPlaneConfig, vConfig.ControlPlaneNamespace, vConfig.ControlPlaneService, vConfig.WorkloadConfig, vConfig.WorkloadNamespace, vConfig.WorkloadService, err = pro.GetRemoteClient(vConfig)
if err != nil {
return err
}
err = setup.InitClients(vConfig)
certificatesDir, err := generateCertificates(ctx, vConfig)
if err != nil {
return err
}

// retrieve service cidr
serviceCIDR := vConfig.ServiceCIDR
if serviceCIDR == "" {
var warning string
serviceCIDR, warning = servicecidr.GetServiceCIDR(ctx, vConfig.WorkloadClient, vConfig.WorkloadNamespace)
if warning != "" {
klog.Warning(warning)
}
}

// generate etcd certificates
certificatesDir := "/data/pki"
err = setup.GenerateCerts(ctx, vConfig.ControlPlaneClient, vConfig.Name, vConfig.ControlPlaneNamespace, serviceCIDR, certificatesDir, vConfig)
if err != nil {
return err
return fmt.Errorf("failed to get certificates: %w", err)
}

// we need to run this with the parent ctx as otherwise this context
Expand All @@ -346,20 +320,55 @@ func startEmbeddedBackingStore(ctx context.Context, vConfig *config.VirtualClust
return nil
}

func createStore(ctx context.Context, storage string, s3Options *s3.Options, fileOptions *file.Options) (Storage, error) {
if storage == "s3" {
objectStore := s3.NewObjectStore(klog.FromContext(ctx))
err := objectStore.Init(s3Options)
func generateCertificates(ctx context.Context, vConfig *config.VirtualClusterConfig) (string, error) {
var err error

// init the clients
vConfig.ControlPlaneConfig, vConfig.ControlPlaneNamespace, vConfig.ControlPlaneService, vConfig.WorkloadConfig, vConfig.WorkloadNamespace, vConfig.WorkloadService, err = pro.GetRemoteClient(vConfig)
if err != nil {
return "", err
}
err = setup.InitClients(vConfig)
if err != nil {
return "", err
}

// retrieve service cidr
serviceCIDR := vConfig.ServiceCIDR
if serviceCIDR == "" {
var warning string
serviceCIDR, warning = servicecidr.GetServiceCIDR(ctx, vConfig.WorkloadClient, vConfig.WorkloadNamespace)
if warning != "" {
klog.Warning(warning)
}
}

// generate etcd certificates
certificatesDir := "/data/pki"
err = setup.GenerateCerts(ctx, vConfig.ControlPlaneClient, vConfig.Name, vConfig.ControlPlaneNamespace, serviceCIDR, certificatesDir, vConfig)
if err != nil {
return "", err
}

return certificatesDir, nil
}

func createStore(ctx context.Context, options *snapshot.Options) (Storage, error) {
if options.Type == "s3" {
objectStore := s3.NewStore(klog.FromContext(ctx))
err := objectStore.Init(&options.S3)
if err != nil {
return nil, fmt.Errorf("failed to init s3 object store: %w", err)
}

return objectStore, nil
} else if storage == "file" {
return file.NewFileStore(fileOptions), nil
} else if options.Type == "file" {
return file.NewStore(&options.File), nil
} else if options.Type == "oci" {
return oci.NewStore(&options.OCI), nil
}

return nil, fmt.Errorf("unknown storage: %s", storage)
return nil, fmt.Errorf("unknown storage: %s", options.Type)
}

func writeKeyValue(tarWriter *tar.Writer, key, value []byte) error {
Expand Down
Loading

0 comments on commit fc72540

Please sign in to comment.