From ca8786ae966e9e54abd4fc4b2bc7c4c3e5799231 Mon Sep 17 00:00:00 2001 From: Ian Kaneshiro Date: Thu, 28 Mar 2024 14:57:59 -0700 Subject: [PATCH] Mirror Plugin: Mirror to mirror synchronization --- .../mirror/pkg/mirrorrepository/api.go | 14 + .../mirror/pkg/mirrorrepository/mirrorsync.go | 338 ++++++++++++++++++ .../mirror/pkg/mirrorrepository/sync.go | 203 +++++++---- pkg/utils/time.go | 8 + 4 files changed, 490 insertions(+), 73 deletions(-) create mode 100644 internal/plugins/mirror/pkg/mirrorrepository/mirrorsync.go diff --git a/internal/plugins/mirror/pkg/mirrorrepository/api.go b/internal/plugins/mirror/pkg/mirrorrepository/api.go index c2d61ad..cfe3be7 100644 --- a/internal/plugins/mirror/pkg/mirrorrepository/api.go +++ b/internal/plugins/mirror/pkg/mirrorrepository/api.go @@ -535,3 +535,17 @@ func toRepositoryFileAPI(file *mirrordb.RepositoryFile) *apiv1.RepositoryFile { ConfigID: file.ConfigID, } } + +func toRepositoryFileDB(file *apiv1.RepositoryFile) *mirrordb.RepositoryFile { + return &mirrordb.RepositoryFile{ + Tag: file.Tag, + Name: file.Name, + Reference: file.Reference, + Parent: file.Parent, + Link: file.Link, + ModifiedTime: utils.StringToTime(file.ModifiedTime), + Mode: file.Mode, + Size: file.Size, + ConfigID: file.ConfigID, + } +} diff --git a/internal/plugins/mirror/pkg/mirrorrepository/mirrorsync.go b/internal/plugins/mirror/pkg/mirrorrepository/mirrorsync.go new file mode 100644 index 0000000..dd958e9 --- /dev/null +++ b/internal/plugins/mirror/pkg/mirrorrepository/mirrorsync.go @@ -0,0 +1,338 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024, CIQ, Inc. All rights reserved +// SPDX-License-Identifier: Apache-2.0 + +package mirrorrepository + +import ( + "context" + "crypto/md5" //nolint:gosec + "encoding/hex" + "io" + "net/http" + "net/url" + "os" + "path" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/cenkalti/backoff" + "go.ciq.dev/beskar/internal/plugins/mirror/pkg/mirrordb" + "go.ciq.dev/beskar/pkg/oras" + "go.ciq.dev/beskar/pkg/orasmirror" + "go.ciq.dev/beskar/pkg/plugins/httpcodec" + apiv1 "go.ciq.dev/beskar/pkg/plugins/mirror/api/v1" + "go.ciq.dev/go-rsync/rsync" +) + +type MirrorSyncerPlan struct { + AddRemoteFiles []*mirrordb.RepositoryFile + DeleteLocalFiles []*mirrordb.RepositoryFile +} + +type MirrorSyncer struct { + h *Handler + config mirrorConfig + configID uint64 + parallelism int + + client *apiv1.HTTPClient + upstreamRepository string +} + +func NewMirrorSyncer(h *Handler, config mirrorConfig, configID uint64, parallelism int) (*MirrorSyncer, error) { + _, repository := path.Split(config.URL.Path) + + baseURL := &url.URL{ + Scheme: "https", + Host: config.URL.Host, + User: config.URL.User, + Path: apiv1.URLPath, + } + + client, err := apiv1.NewHTTPClient(httpcodec.JSONCodec, http.DefaultClient, baseURL.String()) + if err != nil { + h.logger.Error("Failed to create HTTP client", "error", err) + return nil, err + } + + return &MirrorSyncer{ + h: h, + config: config, + configID: configID, + parallelism: parallelism, + client: client, + upstreamRepository: path.Join("artifacts/mirror", repository), + }, nil +} + +func (s *MirrorSyncer) Plan() (*MirrorSyncerPlan, error) { + // Fetch remote files + remoteAPIFiles, err := s.client.ListRepositoryFiles(context.Background(), s.upstreamRepository, nil) + if err != nil { + s.h.logger.Error("Failed to list remote files", "error", err) + return nil, err + } + + // Convert to db file structure + remoteFiles := make([]*mirrordb.RepositoryFile, 0, len(remoteAPIFiles)) + for _, f := range remoteAPIFiles { + remoteFiles = append(remoteFiles, toRepositoryFileDB(f)) + } + + // Fetch local files + localFiles, err := s.h.listRepositoryFilesByConfigID(context.Background(), s.configID) + if err != nil { + s.h.logger.Error("Failed to list local files", "error", err) + return nil, err + } + + add, del := diff(localFiles, remoteFiles) + + return &MirrorSyncerPlan{ + AddRemoteFiles: add, + DeleteLocalFiles: del, + }, nil +} + +func diff(local, remote []*mirrordb.RepositoryFile) (add, del []*mirrordb.RepositoryFile) { + mLocal := make(map[string]*mirrordb.RepositoryFile, len(local)) + for _, l := range local { + mLocal[l.Name] = l + } + + // Find items in remote that are not in local + for _, r := range remote { + if _, found := mLocal[r.Name]; !found { + add = append(add, r) + } + } + + mRemote := make(map[string]*mirrordb.RepositoryFile, len(remote)) + for _, r := range remote { + mRemote[r.Name] = r + } + + // Find items in local that are not in remote + for _, l := range local { + if _, found := mRemote[l.Name]; !found { + del = append(del, l) + } + } + + return add, del +} + +func (s *MirrorSyncer) filePush(remoteFile *mirrordb.RepositoryFile) error { + fileReference := filepath.Clean(s.h.generateFileReference(strings.ToLower(remoteFile.Name))) + + // Generate GET URL + u := &url.URL{ + Scheme: "https", + Host: s.config.URL.Host, + User: s.config.URL.User, + Path: path.Join(s.config.URL.Path, remoteFile.Name), + } + + // Fetch file from remote + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, u.String(), nil) + if err != nil { + s.h.logger.Error("Failed to create request", "file", remoteFile.Name, "error", err) + return err + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + s.h.logger.Error("Failed to fetch file", "file", remoteFile.Name, "error", err) + return err + } + + f, err := os.CreateTemp(s.h.downloadDir(), "") + if err != nil { + s.h.logger.Error("Failed to create temp file", "file", remoteFile.Name, "error", err) + resp.Body.Close() + return err + } + defer os.Remove(f.Name()) + + s.h.logger.Debug("Downloading", "file", remoteFile.Name, "temp", f.Name()) + _, err = io.Copy(f, resp.Body) + if err != nil { + s.h.logger.Error("Failed to download file", "file", remoteFile.Name, "error", err) + resp.Body.Close() + return err + } + resp.Body.Close() + + // Commit content to storage + if err := f.Sync(); err != nil { + return err + } + + cb := backoff.WithMaxRetries( + backoff.NewConstantBackOff(5*time.Second), + 3, + ) + err = backoff.Retry(func() error { + // Seek to start of file + if _, err := f.Seek(0, 0); err != nil { + return err + } + + // Push file to storage + repoPath := filepath.Join(s.h.Repository, filepath.Dir(remoteFile.Name)) + s.h.logger.Debug("Pushing", "file", remoteFile.Name, "repo", repoPath) + pusher, err := orasmirror.NewStaticFileStreamPusher(f, strings.ToLower(filepath.Base(remoteFile.Name)), strings.ToLower(repoPath), s.h.Params.NameOptions...) + if err != nil { + s.h.logger.Error("Failed to create pusher", "file", remoteFile.Name, "error", err) + return err + } + + err = oras.Push(pusher, s.h.Params.RemoteOptions...) + if err != nil { + s.h.logger.Error("Failed to push file", "file", remoteFile.Name, "error", err) + return err + } + + return nil + }, cb) + if err != nil { + return err + } + + // Add entry to DB + //nolint:gosec + sum := md5.Sum([]byte(fileReference)) + tag := hex.EncodeToString(sum[:]) + + err = s.h.addFileToRepositoryDatabase(context.Background(), &mirrordb.RepositoryFile{ + Tag: tag, + Name: remoteFile.Name, + Reference: fileReference, + Parent: filepath.Dir(remoteFile.Name), + Link: "", + ModifiedTime: remoteFile.ModifiedTime, + Mode: remoteFile.Mode, + Size: remoteFile.Size, + ConfigID: s.configID, + }) + if err != nil { + s.h.logger.Error("Failed to add file to repository database", "file", remoteFile.Name, "error", err) + return err + } + + return nil +} + +func (s *MirrorSyncer) fileWorker(c chan *mirrordb.RepositoryFile, wg *sync.WaitGroup) { + wg.Add(1) + defer wg.Done() + + for remoteFile := range c { + if err := s.filePush(remoteFile); err != nil { + s.h.logger.Error("Failed to push file", "file", remoteFile.Name, "error", err) + } + } +} + +func (s *MirrorSyncer) Sync() error { + // Generate plan + plan, err := s.Plan() + if err != nil { + s.h.logger.Error("Failed to generate sync plan", "error", err) + return err + } + + // Create push channel and wait group + pushChan := make(chan *mirrordb.RepositoryFile) + wg := new(sync.WaitGroup) + + // Ensure download directory exists + if err := os.MkdirAll(s.h.downloadDir(), 0o755); err != nil { + return err + } + + // Start worker pool + for i := 0; i < s.parallelism; i++ { + go s.fileWorker(pushChan, wg) + } + + // Fetch/Update remote files + for _, remoteFile := range plan.AddRemoteFiles { + s.h.logger.Debug("Processing", "file", remoteFile.Name) + + fileReference := filepath.Clean(s.h.generateFileReference(strings.ToLower(remoteFile.Name))) + + if rsync.FileMode(remoteFile.Mode).IsREG() { + // Process file in worker pool + pushChan <- remoteFile + } else if rsync.FileMode(remoteFile.Mode).IsDIR() { + // Add entry to DB + //nolint:gosec + sum := md5.Sum([]byte(fileReference)) + tag := hex.EncodeToString(sum[:]) + + err := s.h.addFileToRepositoryDatabase(context.Background(), &mirrordb.RepositoryFile{ + Tag: tag, + Name: remoteFile.Name, + Reference: fileReference, + Parent: filepath.Dir(remoteFile.Name), + Link: "", + ModifiedTime: remoteFile.ModifiedTime, + Mode: remoteFile.Mode, + Size: remoteFile.Size, + ConfigID: s.configID, + }) + if err != nil { + return err + } + } else if rsync.FileMode(remoteFile.Mode).IsLNK() { + s.h.logger.Debug("Processing Link", "content", remoteFile.Link) + + intermediate := strings.TrimPrefix(remoteFile.Link, s.upstreamRepository) + link := s.h.generateFileReference(strings.ToLower(intermediate)) + + // Add entry to DB + //nolint:gosec + sum := md5.Sum([]byte(remoteFile.Name)) + tag := hex.EncodeToString(sum[:]) + + err := s.h.addFileToRepositoryDatabase(context.Background(), &mirrordb.RepositoryFile{ + Tag: tag, + Name: remoteFile.Name, + Reference: remoteFile.Name, + Parent: filepath.Dir(remoteFile.Name), + Link: link, + ModifiedTime: remoteFile.ModifiedTime, + Mode: remoteFile.Mode, + Size: remoteFile.Size, + ConfigID: s.configID, + }) + if err != nil { + return err + } + } + } + + close(pushChan) + + // Wait for all files to be processed + wg.Wait() + + // Remove local files + for _, localFile := range plan.DeleteLocalFiles { + s.h.logger.Debug("Removing", "file", localFile.Name) + + fileReference := filepath.Clean(s.h.generateFileReference(strings.ToLower(localFile.Name))) + + // Remove entry from DB + err := s.h.removeFileFromRepositoryDatabase(context.Background(), fileReference) + if err != nil { + s.h.logger.Error("Failed to remove file from repository database", "file", localFile.Name, "error", err) + return err + } + } + + return nil +} diff --git a/internal/plugins/mirror/pkg/mirrorrepository/sync.go b/internal/plugins/mirror/pkg/mirrorrepository/sync.go index 277891f..c71f866 100644 --- a/internal/plugins/mirror/pkg/mirrorrepository/sync.go +++ b/internal/plugins/mirror/pkg/mirrorrepository/sync.go @@ -5,6 +5,7 @@ package mirrorrepository import ( "context" + "fmt" "io" "os" @@ -47,51 +48,18 @@ func (h *Handler) repositorySync(_ context.Context) (errFn error) { } for i, config := range h.mirrorConfigs { - addr, module, path, err := rsync.SplitURL(config.URL) - if err != nil { - return err - } - - cOpts := []rsync.ClientOption{rsync.WithLogger(h.logger)} - if len(config.Exclusions) > 0 { - cOpts = append(cOpts, rsync.WithExclusionList(config.Exclusions)) - } - - if config.URL.User != nil { - password, _ := config.URL.User.Password() - cOpts = append(cOpts, rsync.WithClientAuth(config.URL.User.Username(), password)) - } - - s := NewStorage(h, config, uint64(i)) - - ppath := rsync.TrimPrepath(path) - client, err := rsync.SocketClient(s, addr, module, ppath, cOpts...) - if err != nil { - s.Close() - return err - } - - if config.HTTPURL != nil { - sp, err := client.GetSyncPlan() - if err != nil { - s.Close() + switch config.URL.Scheme { + case "rsync": + if err := h.rsync(config, i); err != nil { return err } - - ps := NewPlanSyncer(h, config, uint64(i), h.Params.Sync.MaxWorkerCount, sp) - - if err := ps.Sync(); err != nil { - s.Close() - return err - } - } else { - if err := client.Sync(); err != nil { - s.Close() + case "beskar-mirror": + if err := h.mirrorSync(config, i); err != nil { return err } + default: + return fmt.Errorf("unsupported scheme: %s", config.URL.Scheme) } - - s.Close() } h.logger.Debug("generating index.html files") @@ -122,52 +90,141 @@ func copyTo(src io.Reader, dest string) error { return nil } -func (h *Handler) getSyncPlan() (*apiv1.RepositorySyncPlan, error) { - plan := &apiv1.RepositorySyncPlan{ - Add: []string{}, - Remove: []string{}, - } - +func (h *Handler) getSyncPlan() (plan *apiv1.RepositorySyncPlan, err error) { for i, config := range h.mirrorConfigs { - addr, module, path, err := rsync.SplitURL(config.URL) - if err != nil { - return nil, err + switch config.URL.Scheme { + case "rsync": + plan, err = h.rsyncPlan(config, i) + if err != nil { + return nil, err + } + case "beskar-mirror": + plan, err = h.mirrorSyncPlan(config, i) + if err != nil { + return nil, err + } + default: + return nil, fmt.Errorf("unsupported scheme: %s", config.URL.Scheme) } + } - cOpts := []rsync.ClientOption{rsync.WithLogger(h.logger)} - if len(config.Exclusions) > 0 { - cOpts = append(cOpts, rsync.WithExclusionList(config.Exclusions)) - } + return plan, nil +} - if config.URL.User != nil { - password, _ := config.URL.User.Password() - cOpts = append(cOpts, rsync.WithClientAuth(config.URL.User.Username(), password)) - } +func (h *Handler) rsync(config mirrorConfig, configIndex int) error { + addr, module, path, err := rsync.SplitURL(config.URL) + if err != nil { + return err + } - s := NewStorage(h, config, uint64(i)) - defer s.Close() + cOpts := []rsync.ClientOption{rsync.WithLogger(h.logger)} + if len(config.Exclusions) > 0 { + cOpts = append(cOpts, rsync.WithExclusionList(config.Exclusions)) + } - ppath := rsync.TrimPrepath(path) - client, err := rsync.SocketClient(s, addr, module, ppath, cOpts...) - if err != nil { - s.Close() - return nil, err - } + if config.URL.User != nil { + password, _ := config.URL.User.Password() + cOpts = append(cOpts, rsync.WithClientAuth(config.URL.User.Username(), password)) + } + + s := NewStorage(h, config, uint64(configIndex)) + defer s.Close() + + ppath := rsync.TrimPrepath(path) + client, err := rsync.SocketClient(s, addr, module, ppath, cOpts...) + if err != nil { + return err + } + if config.HTTPURL != nil { sp, err := client.GetSyncPlan() if err != nil { - s.Close() - return nil, err + return err } - for _, f := range sp.AddRemoteFiles { - plan.Add = append(plan.Add, string(sp.RemoteFiles[f].Path)) + ps := NewPlanSyncer(h, config, uint64(configIndex), h.Params.Sync.MaxWorkerCount, sp) + if err := ps.Sync(); err != nil { + return err } - - for _, f := range sp.DeleteLocalFiles { - plan.Remove = append(plan.Remove, string(sp.LocalFiles[f].Path)) + } else { + if err := client.Sync(); err != nil { + return err } } - return plan, nil + return nil +} + +func (h *Handler) rsyncPlan(config mirrorConfig, configIndex int) (*apiv1.RepositorySyncPlan, error) { + addr, module, path, err := rsync.SplitURL(config.URL) + if err != nil { + return nil, err + } + + cOpts := []rsync.ClientOption{rsync.WithLogger(h.logger)} + if len(config.Exclusions) > 0 { + cOpts = append(cOpts, rsync.WithExclusionList(config.Exclusions)) + } + + if config.URL.User != nil { + password, _ := config.URL.User.Password() + cOpts = append(cOpts, rsync.WithClientAuth(config.URL.User.Username(), password)) + } + + s := NewStorage(h, config, uint64(configIndex)) + defer s.Close() + + ppath := rsync.TrimPrepath(path) + client, err := rsync.SocketClient(s, addr, module, ppath, cOpts...) + if err != nil { + return nil, err + } + + sp, err := client.GetSyncPlan() + if err != nil { + return nil, err + } + + var plan apiv1.RepositorySyncPlan + for _, f := range sp.AddRemoteFiles { + plan.Add = append(plan.Add, string(sp.RemoteFiles[f].Path)) + } + + for _, f := range sp.DeleteLocalFiles { + plan.Remove = append(plan.Remove, string(sp.LocalFiles[f].Path)) + } + + return &plan, nil +} + +func (h *Handler) mirrorSync(config mirrorConfig, configIndex int) error { + syncer, err := NewMirrorSyncer(h, config, uint64(configIndex), h.Params.Sync.MaxWorkerCount) + if err != nil { + return err + } + + return syncer.Sync() +} + +func (h *Handler) mirrorSyncPlan(config mirrorConfig, configIndex int) (*apiv1.RepositorySyncPlan, error) { + syncer, err := NewMirrorSyncer(h, config, uint64(configIndex), h.Params.Sync.MaxWorkerCount) + if err != nil { + return nil, err + } + + p, err := syncer.Plan() + if err != nil { + return nil, err + } + + var plan apiv1.RepositorySyncPlan + for _, f := range p.AddRemoteFiles { + plan.Add = append(plan.Add, f.Name) + } + + for _, f := range p.DeleteLocalFiles { + plan.Remove = append(plan.Remove, f.Name) + } + + return &plan, nil } diff --git a/pkg/utils/time.go b/pkg/utils/time.go index 16981bd..c83e3bb 100644 --- a/pkg/utils/time.go +++ b/pkg/utils/time.go @@ -13,3 +13,11 @@ func TimeToString(t int64) string { } return time.Unix(t, 0).Format(timeFormat) } + +func StringToTime(s string) int64 { + if s == "" { + return 0 + } + t, _ := time.Parse(timeFormat, s) + return t.Unix() +}