Skip to content

Commit

Permalink
Run repository delete for yum/static plugins as a synchronous operation.
Browse files Browse the repository at this point in the history
  • Loading branch information
cclerget committed Dec 11, 2023
1 parent f7a66a1 commit 43c04ac
Show file tree
Hide file tree
Showing 23 changed files with 519 additions and 264 deletions.
2 changes: 1 addition & 1 deletion build/mage/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ var binaries = map[string]binaryConfig{
interfaceName: "Static",
},
useProto: true,
baseImage: "alpine:3.17",
baseImage: BaseImage,
},
}

Expand Down
66 changes: 61 additions & 5 deletions internal/pkg/repository/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ package repository

import (
"context"
"errors"
"io"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/google/go-containerregistry/pkg/name"
"github.com/google/go-containerregistry/pkg/v1/remote"
Expand Down Expand Up @@ -48,15 +50,19 @@ type RepoHandler struct {
startedCh chan struct{}

cancel context.CancelFunc

syncArtifactsMutex sync.RWMutex
syncArtifacts map[string]chan error
}

func NewRepoHandler(repository string, params *HandlerParams, cancel context.CancelFunc) *RepoHandler {
return &RepoHandler{
Repository: repository,
Params: params,
Queued: make(chan struct{}, 1),
startedCh: make(chan struct{}),
cancel: cancel,
Repository: repository,
Params: params,
Queued: make(chan struct{}, 1),
startedCh: make(chan struct{}),
cancel: cancel,
syncArtifacts: make(map[string]chan error),
}
}

Expand Down Expand Up @@ -162,3 +168,53 @@ func (rh *RepoHandler) DeleteManifest(ref string) (errFn error) {
}
return remote.Delete(namedRef, rh.Params.RemoteOptions...)
}

func (rh *RepoHandler) SyncArtifact(ctx context.Context, name string, timeout time.Duration) (chan error, func() error) {
errCh := make(chan error, 1)

rh.syncArtifactsMutex.Lock()
rh.syncArtifacts[name] = errCh
rh.syncArtifactsMutex.Unlock()

return errCh, func() error {
var err error

pkgCtx, cancel := context.WithTimeout(ctx, timeout)

select {
case <-pkgCtx.Done():
if errors.Is(pkgCtx.Err(), context.Canceled) {
err = errors.New("waiting timeout")
} else {
err = errors.New("waiting interruption")
}
case err = <-errCh:
}

rh.syncArtifactsMutex.Lock()
close(errCh)
delete(rh.syncArtifacts, name)
rh.syncArtifactsMutex.Unlock()

cancel()

return err
}
}

func (rh *RepoHandler) SyncArtifactResult(name string, err error) {
rh.syncArtifactsMutex.RLock()
if errCh, ok := rh.syncArtifacts[name]; ok {
errCh <- err
}
rh.syncArtifactsMutex.RUnlock()
}

func (rh *RepoHandler) SyncArtifactReset() {
rh.syncArtifactsMutex.Lock()
for k, v := range rh.syncArtifacts {
delete(rh.syncArtifacts, k)
close(v)
}
rh.syncArtifactsMutex.Unlock()
}
29 changes: 21 additions & 8 deletions internal/pkg/sqlite/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ func New(ctx context.Context, name string, storage Storage) (*DB, error) {
func (db *DB) migrate() error {
migrations, err := schema.FSMigrations(db.storage.SchemaFS, db.storage.SchemaGlob)
if err != nil {
return fmt.Errorf("while setting %s DB migration: %w", db.name, err)
return fmt.Errorf("while setting %s database migration: %w", db.name, err)
}

migrator := schema.NewMigrator(schema.WithDialect(schema.SQLite))
if err := migrator.Apply(db.DB, migrations); err != nil {
return fmt.Errorf("while applying %s DB migration: %w", db.name, err)
return fmt.Errorf("while applying %s database migration: %w", db.name, err)
}

return nil
Expand Down Expand Up @@ -85,7 +85,7 @@ func (db *DB) Open(ctx context.Context) error {
defer remoteReader.Close()

if err := pull(db.path, remoteReader); err != nil {
return fmt.Errorf("while pulling log DB: %w", err)
return fmt.Errorf("while pulling %s database: %w", db.name, err)
}
}
} else if err != nil {
Expand All @@ -94,7 +94,7 @@ func (db *DB) Open(ctx context.Context) error {

db.DB, err = sqlx.Open("sqlite", db.path)
if err != nil {
return fmt.Errorf("while opening log DB %s: %w", db.path, err)
return fmt.Errorf("while opening %s database %s: %w", db.name, db.path, err)
}
db.SetMaxOpenConns(1)

Expand All @@ -110,28 +110,28 @@ func (db *DB) Sync(ctx context.Context) error {

remoteWriter, err := db.storage.Bucket.NewWriter(ctx, key, &blob.WriterOptions{})
if err != nil {
return fmt.Errorf("while initializing s3 object writer: %w", err)
return fmt.Errorf("while initializing object writer: %w", err)
}

db.Lock()
defer db.Unlock()

if err := push(db.path, remoteWriter); err != nil {
_ = remoteWriter.Close()
return fmt.Errorf("while pushing log database to s3 bucket: %w", err)
return fmt.Errorf("while pushing %s database to storage bucket: %w", db.name, err)
}

return remoteWriter.Close()
}

func (db *DB) Close(removeDB bool) error {
func (db *DB) Close(removeLocalDB bool) error {
db.Lock()
defer db.Unlock()

if db.DB != nil && db.Reference.Load() == 0 {
err := db.DB.Close()
db.DB = nil
if removeDB {
if removeLocalDB {
if removeErr := os.Remove(db.path); removeErr != nil && err == nil {
err = removeErr
}
Expand All @@ -141,3 +141,16 @@ func (db *DB) Close(removeDB bool) error {

return nil
}

func (db *DB) Delete(ctx context.Context) error {
db.Lock()
defer db.Unlock()

key := filepath.Join(db.storage.Repository, db.storage.CompressedFilename)

if err := db.storage.Bucket.Delete(ctx, key); err != nil {
return fmt.Errorf("while deleting %s database on object storage: %w", db.name, err)
}

return nil
}
4 changes: 2 additions & 2 deletions internal/plugins/static/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ func checkRepository(repository string) error {
return nil
}

func (p *Plugin) DeleteRepository(ctx context.Context, repository string, deletePackages bool) (err error) {
func (p *Plugin) DeleteRepository(ctx context.Context, repository string, deleteFiles bool) (err error) {
if err := checkRepository(repository); err != nil {
return err
}
return p.repositoryManager.Get(ctx, repository).DeleteRepository(ctx, repository, deletePackages)
return p.repositoryManager.Get(ctx, repository).DeleteRepository(ctx, deleteFiles)
}

func (p *Plugin) ListRepositoryLogs(ctx context.Context, repository string, page *apiv1.Page) (logs []apiv1.RepositoryLog, err error) {
Expand Down
27 changes: 27 additions & 0 deletions internal/plugins/static/pkg/staticdb/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,30 @@ func (db *RepositoryDB) WalkFiles(ctx context.Context, walkFn WalkFileFunc) erro

return nil
}

func (db *RepositoryDB) CountFiles(ctx context.Context) (int, error) {
db.Reference.Add(1)
defer db.Reference.Add(-1)

if err := db.Open(ctx); err != nil {
return 0, err
}

rows, err := db.QueryxContext(ctx, "SELECT COUNT(tag) FROM files")
if err != nil {
return 0, err
}
defer rows.Close()

count := 0

if rows.Next() {
if err := rows.Scan(&count); err != nil {
return 0, err
}
} else {
return 0, fmt.Errorf("no rows returned from count query")
}

return count, nil
}
100 changes: 67 additions & 33 deletions internal/plugins/static/pkg/staticrepository/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,45 +11,65 @@ import (

"github.com/RussellLuo/kun/pkg/werror"
"github.com/RussellLuo/kun/pkg/werror/gcode"
"github.com/hashicorp/go-multierror"
"go.ciq.dev/beskar/internal/plugins/static/pkg/staticdb"
apiv1 "go.ciq.dev/beskar/pkg/plugins/static/api/v1"
"golang.org/x/sync/semaphore"
)

func (h *Handler) DeleteRepository(ctx context.Context, repository string, deletePackages bool) (err error) {
func (h *Handler) DeleteRepository(ctx context.Context, deleteFiles bool) (err error) {
if !h.Started() {
return werror.Wrap(gcode.ErrUnavailable, err)
} else if h.delete.Swap(true) {
return werror.Wrap(gcode.ErrAlreadyExists, fmt.Errorf("repository %s is being deleted", h.Repository))
}
defer func() {
h.SyncArtifactReset()

db, err := h.getRepositoryDB(ctx)
if err != nil {
return werror.Wrap(gcode.ErrInternal, err)
}
if err == nil {
// stop the repo handler and trigger cleanup
h.Stop()
} else {
h.delete.Store(false)
}
}()

var repositoryFiles []*apiv1.RepositoryFile
err = db.WalkFiles(ctx, func(file *staticdb.RepositoryFile) error {
repositoryFiles = append(repositoryFiles, toRepositoryFileAPI(file))
return nil
})
db, err := h.getRepositoryDB(ctx)
if err != nil {
return werror.Wrap(gcode.ErrInternal, err)
}

if len(repositoryFiles) > 0 {
if !deletePackages {
return werror.Wrap(gcode.ErrInternal, fmt.Errorf("err deleting static repository: files must be deleted from repository"))
if !deleteFiles {
count, err := db.CountFiles(ctx)
if err != nil {
return werror.Wrap(gcode.ErrInternal, err)
} else if count > 0 {
return werror.Wrap(gcode.ErrFailedPrecondition, fmt.Errorf("repository %s has %d files associated with it", h.Repository, count))
}

for _, file := range repositoryFiles {
err = h.RemoveRepositoryFile(ctx, file.Tag)
if err != nil {
return werror.Wrap(gcode.ErrInternal, err)
} else {
deleteFile := new(multierror.Group)
// maximum parallel file deletion
sem := semaphore.NewWeighted(100)

// delete all files associated with this repo
err = db.WalkFiles(ctx, func(file *staticdb.RepositoryFile) error {
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
deleteFile.Go(func() error {
defer sem.Release(1)
return h.removeRepositoryFile(ctx, file)
})
return nil
})
if err != nil {
return werror.Wrap(gcode.ErrInternal, err)
}
if err := deleteFile.Wait().ErrorOrNil(); err != nil {
return werror.Wrap(gcode.ErrInternal, err)
}
}

// delete repo from mutex (use the remove in manager)
h.RepoHandler.Params.Remove(repository)

return nil
}

Expand Down Expand Up @@ -79,9 +99,34 @@ func (h *Handler) ListRepositoryLogs(ctx context.Context, _ *apiv1.Page) (logs [
return logs, nil
}

func (h *Handler) removeRepositoryFile(ctx context.Context, file *staticdb.RepositoryFile) error {
tagRef := filepath.Join(h.Repository, "files:"+file.Tag)

digest, err := h.GetManifestDigest(tagRef)
if err != nil {
return err
}

errCh, waitSync := h.SyncArtifact(ctx, file.Name, time.Minute)

digestRef := filepath.Join(h.Repository, "files@"+digest)

if err := h.DeleteManifest(digestRef); err != nil {
errCh <- err
}

if err := waitSync(); err != nil {
return fmt.Errorf("file %s removal processing error: %w", file.Name, err)
}

return nil
}

func (h *Handler) RemoveRepositoryFile(ctx context.Context, tag string) (err error) {
if !h.Started() {
return werror.Wrap(gcode.ErrUnavailable, err)
} else if h.delete.Load() {
return werror.Wrap(gcode.ErrAlreadyExists, fmt.Errorf("repository %s is being deleted", h.Repository))
}

db, err := h.getRepositoryDB(ctx)
Expand All @@ -95,18 +140,7 @@ func (h *Handler) RemoveRepositoryFile(ctx context.Context, tag string) (err err
return werror.Wrap(gcode.ErrInternal, err)
} else if file.Tag == "" {
return werror.Wrap(gcode.ErrNotFound, fmt.Errorf("file with tag %s not found", tag))
}

tagRef := filepath.Join(h.Repository, "files:"+file.Tag)

digest, err := h.GetManifestDigest(tagRef)
if err != nil {
return werror.Wrap(gcode.ErrInternal, err)
}

digestRef := filepath.Join(h.Repository, "files@"+digest)

if err := h.DeleteManifest(digestRef); err != nil {
} else if err := h.removeRepositoryFile(ctx, file); err != nil {
return werror.Wrap(gcode.ErrInternal, err)
}

Expand Down
4 changes: 3 additions & 1 deletion internal/plugins/static/pkg/staticrepository/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func (h *Handler) processFileManifest(ctx context.Context, fileManifest *v1.Mani
fileName := fileLayer.Annotations[imagespec.AnnotationTitle]

defer func() {
h.SyncArtifactResult(fileName, errFn)
if errFn == nil {
return
}
Expand Down Expand Up @@ -63,6 +64,7 @@ func (h *Handler) deleteFileManifest(ctx context.Context, fileManifest *v1.Manif
fileName := fileLayer.Annotations[imagespec.AnnotationTitle]

defer func() {
h.SyncArtifactResult(fileName, errFn)
if errFn == nil {
return
}
Expand All @@ -72,7 +74,7 @@ func (h *Handler) deleteFileManifest(ctx context.Context, fileManifest *v1.Manif

err = h.removeFileFromRepositoryDatabase(ctx, fileName)
if err != nil {
return fmt.Errorf("while removing package %s from metadata database: %w", fileName, err)
return fmt.Errorf("while removing file %s from metadata database: %w", fileName, err)
}

return nil
Expand Down
Loading

0 comments on commit 43c04ac

Please sign in to comment.