Skip to content

Commit

Permalink
go/runtime/registry: Cleanup bundles for versions lower then active
Browse files Browse the repository at this point in the history
  • Loading branch information
martintomazic committed Jan 14, 2025
1 parent 15881ec commit 3d4b25d
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 2 deletions.
1 change: 1 addition & 0 deletions .changelog/5737.feature.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ From now on, we are removing them automatically:

1. At initialization time, for bundles no longer present in the configuration
file (both regular and detached).
2. For versions lower than the active runtime version (regular bundles).
6 changes: 6 additions & 0 deletions go/runtime/bundle/discovery_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bundle

import (
"context"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -58,6 +59,11 @@ func (r *mockRegistry) GetComponents(common.Namespace, version.Version) ([]*Expl
panic("unimplemented")
}

// CleanStaleBundles implements Registry.
func (r *mockRegistry) CleanStaleBundles(context.Context, common.Namespace, version.Version) {
panic("unimplemented")
}

func newMockListener() *mockRegistry {
return &mockRegistry{
manifestHashes: make(map[hash.Hash]struct{}),
Expand Down
92 changes: 92 additions & 0 deletions go/runtime/bundle/registry.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package bundle

import (
"context"
"errors"
"fmt"
"maps"
"os"
"slices"
"sync"

Expand Down Expand Up @@ -34,6 +37,11 @@ type Registry interface {
// AddBundle adds a bundle from the given path.
AddBundle(path string, manifestHash hash.Hash) error

// CleanStaleBundles removes (regular) bundles (components and manifests)
// for versions less then active. It also removes bundle file and
// corresponding exploded dir.
CleanStaleBundles(ctx context.Context, runtimeID common.Namespace, active version.Version)

// GetVersions returns versions for the given runtime, sorted in ascending
// order.
GetVersions(runtimeID common.Namespace) []version.Version
Expand Down Expand Up @@ -202,6 +210,90 @@ func (r *registry) AddBundle(path string, manifestHash hash.Hash) error {
return nil
}

// CleanStaleBundles implements Registry.
func (r *registry) CleanStaleBundles(ctx context.Context, runtimeID common.Namespace, active version.Version) {
for _, v := range r.GetVersions(runtimeID) {
select {
case <-ctx.Done():
return

Check warning on line 218 in go/runtime/bundle/registry.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/bundle/registry.go#L217-L218

Added lines #L217 - L218 were not covered by tests
default:
if v.Less(active) {
r.logger.Info("Removing bundle with version lower then active",
"runtimeId", runtimeID,
"version", v,
"activeVersion", active,
)
r.cleanBundle(runtimeID, v)
}
}
}
}

// cleanBundle removes (regular) bundle from the registry.
//
// Additionally, it removes exploded subdir and bundle file from the bundle dir.
func (r *registry) cleanBundle(runtimeID common.Namespace, version version.Version) {
explDir, err := r.removeBundle(runtimeID, version)
if err != nil {
r.logger.Warn("Bundle clean-up unsuccessful: invalid registry state",
"error", err,
"id", runtimeID,
"runtime_version", version,
)

Check warning on line 242 in go/runtime/bundle/registry.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/bundle/registry.go#L238-L242

Added lines #L238 - L242 were not covered by tests
}

if err := os.RemoveAll(explDir); err != nil {
r.logger.Error("failed to remove stale exploded (regular) bundle dir",
"id", runtimeID,
"runtime_version", version,
"ExplodedDataDir", explDir,
"err", err,
)

Check warning on line 251 in go/runtime/bundle/registry.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/bundle/registry.go#L246-L251

Added lines #L246 - L251 were not covered by tests
}

if err := os.Remove(explDir + FileExtension); err != nil {
r.logger.Error("failed to remove stale exploded (regular) bundle file",
"id", runtimeID,
"runtime_version", version,
"ExplodedDataDir", explDir,
"err", err,
)
}
}

// removeBundle removes bundle for the runtimeID and version from the registry,
// if it exists.
func (r *registry) removeBundle(runtimeID common.Namespace, version version.Version) (string, error) {
r.mu.Lock()
defer r.mu.Unlock()
var explDir string
manifest, ok := r.manifests[runtimeID][version]
if !ok {
return explDir, errors.New("registry missing Manifest")

Check warning on line 272 in go/runtime/bundle/registry.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/bundle/registry.go#L272

Added line #L272 was not covered by tests
}
explComp := r.components[runtimeID][component.ID_RONL][version]
if explComp == nil {
return explDir, errors.New("components missing RONL component")

Check warning on line 276 in go/runtime/bundle/registry.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/bundle/registry.go#L276

Added line #L276 was not covered by tests
}
explDir = explComp.ExplodedDataDir
manifestHash := manifest.Hash()

r.logger.Debug("Removing (regular) bundle from registry",
"ronl_version", version,
"runtimeID", runtimeID,
"manifestHash", manifestHash,
"exploded_subdir", explDir,
)

// Removal should be atomic.
delete(r.manifests[runtimeID], version)
delete(r.bundles, manifestHash)
for _, c := range manifest.Components {
delete(r.components[runtimeID][c.ID()], c.Version)
}
return explDir, nil
}

// GetVersions implements Registry.
func (r *registry) GetVersions(runtimeID common.Namespace) []version.Version {
r.mu.RLock()
Expand Down
52 changes: 51 additions & 1 deletion go/runtime/bundle/registry_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package bundle

import (
"context"
"os"
"path/filepath"
"slices"
"strings"
Expand Down Expand Up @@ -91,7 +93,10 @@ func TestBundleRegistry(t *testing.T) {
path3, err := createSyntheticBundle(dir, runtimeID1, version1, []component.Kind{component.RONL})
require.NoError(t, err)

paths := []string{path0, path1, path2, path3}
path4, err := createSyntheticBundle(dir, runtimeID1, version2, []component.Kind{component.RONL})
require.NoError(t, err)

paths := []string{path0, path1, path2, path3, path4}

// Compute manifest hashes.
var hashes []hash.Hash
Expand Down Expand Up @@ -147,4 +152,49 @@ func TestBundleRegistry(t *testing.T) {
_, err = registry.GetComponents(runtimeID1, version3)
require.Error(t, err)
require.ErrorContains(t, err, "component 'ronl', version '3.0.0', for runtime '8000000000000000000000000000000000000000000000000000000000000001' not found")

// Add fifth bundle (new RONL component for runtime 1).
err = registry.AddBundle(paths[4], hashes[4])
require.NoError(t, err)

// We should have 3 exploded subdirs for regular bundles.
require.Equal(t, 3, explodedRegBundleNum(t, dir))

// We should have 2 versions for runtime 1.
versions := registry.GetVersions(runtimeID1)
require.Equal(t, 2, len(versions))

// Fetch components for runtime 1, version 2.
comps, err = registry.GetComponents(runtimeID1, version2)
require.NoError(t, err)
require.Equal(t, 2, len(comps))
require.Equal(t, version2, comps[0].Version)
require.Equal(t, version2, comps[1].Version)

// Remove version 1 for runtime 1 from the registry.
registry.CleanStaleBundles(context.Background(), runtimeID1, version2)

// We should have 1 versions for runtime 1.
versions = registry.GetVersions(runtimeID1)
require.Equal(t, 1, len(versions))

// Attempt to fetch components for runtime 1, version 1 (just deleted).
_, err = registry.GetComponents(runtimeID1, version1)
require.Error(t, err)
require.ErrorContains(t, err, "component 'ronl', version '1.0.0', for runtime '8000000000000000000000000000000000000000000000000000000000000001' not found")

// We should have two exploded subdirs for regular bundles.
require.Equal(t, 2, explodedRegBundleNum(t, dir))
}

func explodedRegBundleNum(t *testing.T, dir string) int {
entries, err := os.ReadDir(ExplodedPath(dir))
require.NoError(t, err)
var res int
for _, entry := range entries {
if entry.Name() != "detached" {
res++
}
}
return res
}
19 changes: 18 additions & 1 deletion go/runtime/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,16 +383,33 @@ func (r *runtime) run(ctx context.Context) {
}
defer regSub.Close()

wg := &sync.WaitGroup{}
var regInitialized, activeInitialized bool
for {
select {
case <-ctx.Done():
// Ensure bundle clean-up terminates before returning.
wg.Wait()
return
case <-epoCh:
case epoch := <-epoCh:
if up := r.updateActiveDescriptor(ctx); up && !activeInitialized {
close(r.activeDescriptorCh)
activeInitialized = true
}

r.RLock()
rt := r.activeDescriptor
r.RUnlock()
if rt != nil {
if active := rt.ActiveDeployment(epoch); active != nil {
wg.Add(1)
go func() {
r.bundleRegistry.CleanStaleBundles(ctx, r.ID(), active.Version)
wg.Done()
}()
}
}

case rt := <-regCh:
if !rt.ID.Equal(&r.id) {
continue
Expand Down

0 comments on commit 3d4b25d

Please sign in to comment.