Skip to content

Commit

Permalink
go/runtime/registry: Cleanup bundles for versions lower then active
Browse files Browse the repository at this point in the history
  • Loading branch information
martintomazic committed Jan 14, 2025
1 parent 149f7af commit 8bfe9f6
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 2 deletions.
1 change: 1 addition & 0 deletions .changelog/5737.feature.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ From now on, we are removing them automatically:

1. At initialization time, for bundles no longer present in the configuration
file (both regular and detached).
2. For versions lower than the active runtime version (regular bundles).
5 changes: 5 additions & 0 deletions go/runtime/bundle/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ func (r *mockRegistry) GetComponents(common.Namespace, version.Version) ([]*Expl
panic("unimplemented")
}

// CleanStaleBundles implements Registry.
func (r *mockRegistry) CleanStaleBundles(context.Context, common.Namespace, version.Version) {
panic("unimplemented")
}

func newMockListener() *mockRegistry {
return &mockRegistry{
manifestHashes: make(map[hash.Hash]struct{}),
Expand Down
92 changes: 92 additions & 0 deletions go/runtime/bundle/registry.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package bundle

import (
"context"
"errors"
"fmt"
"maps"
"os"
"slices"
"sync"

Expand Down Expand Up @@ -34,6 +37,11 @@ type Registry interface {
// AddBundle adds a bundle from the given path.
AddBundle(path string, manifestHash hash.Hash) error

// CleanStaleBundles removes (regular) bundles (components and manifests)
// for versions less then active. It also removes bundle file and
// corresponding exploded dir.
CleanStaleBundles(ctx context.Context, runtimeID common.Namespace, active version.Version)

// GetVersions returns versions for the given runtime, sorted in ascending
// order.
GetVersions(runtimeID common.Namespace) []version.Version
Expand Down Expand Up @@ -202,6 +210,90 @@ func (r *registry) AddBundle(path string, manifestHash hash.Hash) error {
return nil
}

// CleanStaleBundles implements Registry.
func (r *registry) CleanStaleBundles(ctx context.Context, runtimeID common.Namespace, active version.Version) {
for _, v := range r.GetVersions(runtimeID) {
select {
case <-ctx.Done():
return
default:
if v.Less(active) {
r.logger.Info("Removing bundle with version lower then active",
"runtimeId", runtimeID,
"version", v,
"activeVersion", active,
)
r.cleanBundle(runtimeID, v)
}
}
}
}

// cleanBundle removes (regular) bundle from the registry.
//
// Additionally, it removes exploded subdir and bundle file from the bundle dir.
func (r *registry) cleanBundle(runtimeID common.Namespace, version version.Version) {
explDir, err := r.removeBundle(runtimeID, version)
if err != nil {
r.logger.Warn("Bundle clean-up unsuccessful: invalid registry state",
"error", err,
"id", runtimeID,
"runtime_version", version,
)
}

if err := os.RemoveAll(explDir); err != nil {
r.logger.Error("failed to remove stale exploded (regular) bundle dir",
"id", runtimeID,
"runtime_version", version,
"ExplodedDataDir", explDir,
"err", err,
)
}

if err := os.Remove(explDir + FileExtension); err != nil {
r.logger.Error("failed to remove stale exploded (regular) bundle file",
"id", runtimeID,
"runtime_version", version,
"ExplodedDataDir", explDir,
"err", err,
)
}
}

// removeBundle removes bundle for the runtimeID and version from the registry,
// if it exists.
func (r *registry) removeBundle(runtimeID common.Namespace, version version.Version) (string, error) {
r.mu.Lock()
defer r.mu.Unlock()
var explDir string
manifest, ok := r.manifests[runtimeID][version]
if !ok {
return explDir, errors.New("registry missing Manifest")
}
explComp := r.components[runtimeID][component.ID_RONL][version]
if explComp == nil {
return explDir, errors.New("components missing RONL component")
}
explDir = explComp.ExplodedDataDir
manifestHash := manifest.Hash()

r.logger.Debug("Removing (regular) bundle from registry",
"ronl_version", version,
"runtimeID", runtimeID,
"manifestHash", manifestHash,
"exploded_subdir", explDir,
)

// Removal should be atomic.
delete(r.manifests[runtimeID], version)
delete(r.bundles, manifestHash)
for _, c := range manifest.Components {
delete(r.components[runtimeID][c.ID()], c.Version)
}
return explDir, nil
}

// GetVersions implements Registry.
func (r *registry) GetVersions(runtimeID common.Namespace) []version.Version {
r.mu.RLock()
Expand Down
52 changes: 51 additions & 1 deletion go/runtime/bundle/registry_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package bundle

import (
"context"
"os"
"path/filepath"
"strings"
"testing"
Expand Down Expand Up @@ -44,7 +46,10 @@ func TestBundleRegistry(t *testing.T) {
path3, err := createSyntheticBundle(dir, runtimeID1, version1, []component.Kind{component.RONL})
require.NoError(t, err)

paths := []string{path0, path1, path2, path3}
path4, err := createSyntheticBundle(dir, runtimeID1, version2, []component.Kind{component.RONL})
require.NoError(t, err)

paths := []string{path0, path1, path2, path3, path4}

// Compute manifest hashes.
var hashes []hash.Hash
Expand Down Expand Up @@ -100,4 +105,49 @@ func TestBundleRegistry(t *testing.T) {
_, err = registry.GetComponents(runtimeID1, version3)
require.Error(t, err)
require.ErrorContains(t, err, "component 'ronl', version '3.0.0', for runtime '8000000000000000000000000000000000000000000000000000000000000001' not found")

// Add fifth bundle (new RONL component for runtime 1).
err = registry.AddBundle(paths[4], hashes[4])
require.NoError(t, err)

// We should have 3 exploded subdirs for regular bundles.
require.Equal(t, 3, explodedRegBundleNum(t, dir))

// We should have 2 versions for runtime 1.
versions := registry.GetVersions(runtimeID1)
require.Equal(t, 2, len(versions))

// Fetch components for runtime 1, version 2.
comps, err = registry.GetComponents(runtimeID1, version2)
require.NoError(t, err)
require.Equal(t, 2, len(comps))
require.Equal(t, version2, comps[0].Version)
require.Equal(t, version2, comps[1].Version)

// Remove version 1 for runtime 1 from the registry.
registry.CleanStaleBundles(context.Background(), runtimeID1, version2)

// We should have 1 versions for runtime 1.
versions = registry.GetVersions(runtimeID1)
require.Equal(t, 1, len(versions))

// Attempt to fetch components for runtime 1, version 1 (just deleted).
_, err = registry.GetComponents(runtimeID1, version1)
require.Error(t, err)
require.ErrorContains(t, err, "component 'ronl', version '1.0.0', for runtime '8000000000000000000000000000000000000000000000000000000000000001' not found")

// We should have two exploded subdirs for regular bundles.
require.Equal(t, 2, explodedRegBundleNum(t, dir))
}

func explodedRegBundleNum(t *testing.T, dir string) int {
entries, err := os.ReadDir(ExplodedPath(dir))
require.NoError(t, err)
var res int
for _, entry := range entries {
if entry.Name() != "detached" {
res++
}
}
return res
}
19 changes: 18 additions & 1 deletion go/runtime/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,16 +383,33 @@ func (r *runtime) run(ctx context.Context) {
}
defer regSub.Close()

wg := &sync.WaitGroup{}
var regInitialized, activeInitialized bool
for {
select {
case <-ctx.Done():
// Ensure bundle clean-up terminates before returning.
wg.Wait()
return
case <-epoCh:
case epoch := <-epoCh:
if up := r.updateActiveDescriptor(ctx); up && !activeInitialized {
close(r.activeDescriptorCh)
activeInitialized = true
}

r.RLock()
rt := r.activeDescriptor
r.RUnlock()
if rt != nil {
if active := rt.ActiveDeployment(epoch); active != nil {
wg.Add(1)
go func() {
r.bundleRegistry.CleanStaleBundles(ctx, r.ID(), active.Version)
wg.Done()
}()
}
}

case rt := <-regCh:
if !rt.ID.Equal(&r.id) {
continue
Expand Down

0 comments on commit 8bfe9f6

Please sign in to comment.