Skip to content

Commit

Permalink
go/runtime/registry: Cleanup bundles for versions lower then active
Browse files Browse the repository at this point in the history
  • Loading branch information
martintomazic committed Jan 10, 2025
1 parent 327668c commit f3f52e3
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 2 deletions.
1 change: 1 addition & 0 deletions .changelog/5737.feature.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ From now on, we are removing them automatically:

1. At initialization time, for bundles no longer present in the configuration
file (both regular and detached).
2. For versions lower than the active runtime version (regular bundles).
5 changes: 5 additions & 0 deletions go/runtime/bundle/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ func (r *mockRegistry) GetComponents(common.Namespace, version.Version) ([]*Expl
panic("unimplemented")
}

// CleanStaleBundles implements Registry.
func (r *mockRegistry) CleanStaleBundles(context.Context, common.Namespace, version.Version) {
panic("unimplemented")
}

func newMockListener() *mockRegistry {
return &mockRegistry{
manifestHashes: make(map[hash.Hash]struct{}),
Expand Down
65 changes: 65 additions & 0 deletions go/runtime/bundle/registry.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package bundle

import (
"context"
"fmt"
"maps"
"os"
"slices"
"sync"

Expand Down Expand Up @@ -34,6 +36,11 @@ type Registry interface {
// AddBundle adds a bundle from the given path.
AddBundle(path string, manifestHash hash.Hash) error

// CleanStaleBundles removes (regular) bundles (components and manifests)
// for versions less then active. It also removes bundle file and
// corresponding exploded dir.
CleanStaleBundles(ctx context.Context, wg *sync.WaitGroup, runtimeID common.Namespace, active version.Version)

// GetVersions returns versions for the given runtime, sorted in ascending
// order.
GetVersions(runtimeID common.Namespace) []version.Version
Expand Down Expand Up @@ -202,6 +209,64 @@ func (r *registry) AddBundle(path string, manifestHash hash.Hash) error {
return nil
}

// CleanStaleBundles implements Registry.
func (r *registry) CleanStaleBundles(ctx context.Context, wg *sync.WaitGroup, runtimeID common.Namespace, active version.Version) {
defer wg.Done()
for _, v := range r.GetVersions(runtimeID) {
select {
case <-ctx.Done():
return
default:
if v.Less(active) {
r.removeBundle(runtimeID, v, active)
}
}
}
}

func (r *registry) removeBundle(runtimeID common.Namespace, version version.Version, active version.Version) {
r.mu.Lock()
manifest, ok := r.manifests[runtimeID][version]
if !ok {
return
}
explDir := r.components[runtimeID][component.ID_RONL][version].ExplodedDataDir
manifestHash := manifest.Hash()
r.logger.Info("Removing (regular) bundle, together with exploded subdir with version lower than active",
"active_version", active,
"ronl_version", version,
"runtimeID", runtimeID,
"manifestHash", manifestHash,
"exploded_subdir", explDir,
)

delete(r.manifests[runtimeID], version)
delete(r.bundles, manifestHash)
for _, c := range manifest.Components {
delete(r.components[runtimeID][c.ID()], c.Version)
}
r.mu.Unlock()

// Don't lock when removing files.
if err := os.RemoveAll(explDir); err != nil {
r.logger.Error("failed to remove stale exploded (regular) bundle",
"id", runtimeID,
"runtime_version", version,
"ExplodedDataDir", explDir,
"err", err,
)
}

if err := os.Remove(explDir + FileExtension); err != nil {
r.logger.Error("failed to remove stale (regular) bundle file",
"id", runtimeID,
"runtime_version", version,
"ExplodedDataDir", explDir,
"err", err,
)
}
}

// GetVersions implements Registry.
func (r *registry) GetVersions(runtimeID common.Namespace) []version.Version {
r.mu.RLock()
Expand Down
52 changes: 51 additions & 1 deletion go/runtime/bundle/registry_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package bundle

import (
"context"
"os"
"path/filepath"
"strings"
"testing"
Expand Down Expand Up @@ -44,7 +46,10 @@ func TestBundleRegistry(t *testing.T) {
path3, err := createSyntheticBundle(dir, runtimeID1, version1, []component.Kind{component.RONL})
require.NoError(t, err)

paths := []string{path0, path1, path2, path3}
path4, err := createSyntheticBundle(dir, runtimeID1, version2, []component.Kind{component.RONL})
require.NoError(t, err)

paths := []string{path0, path1, path2, path3, path4}

// Compute manifest hashes.
var hashes []hash.Hash
Expand Down Expand Up @@ -100,4 +105,49 @@ func TestBundleRegistry(t *testing.T) {
_, err = registry.GetComponents(runtimeID1, version3)
require.Error(t, err)
require.ErrorContains(t, err, "component 'ronl', version '3.0.0', for runtime '8000000000000000000000000000000000000000000000000000000000000001' not found")

// Add fifth bundle (new RONL component for runtime 1).
err = registry.AddBundle(paths[4], hashes[4])
require.NoError(t, err)

// We should have 3 exploded subdirs for regular bundles.
require.Equal(t, 3, explodedRegBundleNum(t, dir))

// We should have 2 versions for runtime 1.
versions := registry.GetVersions(runtimeID1)
require.Equal(t, 2, len(versions))

// Fetch components for runtime 1, version 2.
comps, err = registry.GetComponents(runtimeID1, version2)
require.NoError(t, err)
require.Equal(t, 2, len(comps))
require.Equal(t, version2, comps[0].Version)
require.Equal(t, version2, comps[1].Version)

// Remove version 1 for runtime 1 from the registry.
registry.CleanStaleBundles(context.Background(), runtimeID1, version2)

// We should have 1 versions for runtime 1.
versions = registry.GetVersions(runtimeID1)
require.Equal(t, 1, len(versions))

// Attempt to fetch components for runtime 1, version 1 (just deleted).
_, err = registry.GetComponents(runtimeID1, version1)
require.Error(t, err)
require.ErrorContains(t, err, "component 'ronl', version '1.0.0', for runtime '8000000000000000000000000000000000000000000000000000000000000001' not found")

// We should have two exploded subdirs for regular bundles.
require.Equal(t, 2, explodedRegBundleNum(t, dir))
}

func explodedRegBundleNum(t *testing.T, dir string) int {
entries, err := os.ReadDir(ExplodedPath(dir))
require.NoError(t, err)
var res int
for _, entry := range entries {
if entry.Name() != "detached" {
res++
}
}
return res
}
16 changes: 15 additions & 1 deletion go/runtime/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,16 +383,30 @@ func (r *runtime) run(ctx context.Context) {
}
defer regSub.Close()

wg := &sync.WaitGroup{}
var regInitialized, activeInitialized bool
for {
select {
case <-ctx.Done():
// Ensure bundle clean-up terminates before returning.
wg.Wait()
return
case <-epoCh:
case epoch := <-epoCh:
if up := r.updateActiveDescriptor(ctx); up && !activeInitialized {
close(r.activeDescriptorCh)
activeInitialized = true
}

r.RLock()
rt := r.activeDescriptor
r.RUnlock()
if rt != nil {
if active := rt.ActiveDeployment(epoch); active != nil {
wg.Add(1)
go r.bundleRegistry.CleanStaleBundles(ctx, wg, r.ID(), active.Version)
}
}

case rt := <-regCh:
if !rt.ID.Equal(&r.id) {
continue
Expand Down

0 comments on commit f3f52e3

Please sign in to comment.