From 8bfe9f62be0eb139318d78cf7ef3eb530632905d Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Tue, 7 Jan 2025 05:31:17 +0100 Subject: [PATCH] go/runtime/registry: Cleanup bundles for versions lower then active --- .changelog/5737.feature.md | 1 + go/runtime/bundle/discovery_test.go | 5 ++ go/runtime/bundle/registry.go | 92 +++++++++++++++++++++++++++++ go/runtime/bundle/registry_test.go | 52 +++++++++++++++- go/runtime/registry/registry.go | 19 +++++- 5 files changed, 167 insertions(+), 2 deletions(-) diff --git a/.changelog/5737.feature.md b/.changelog/5737.feature.md index dc796efeaaf..5daeaf11e9b 100644 --- a/.changelog/5737.feature.md +++ b/.changelog/5737.feature.md @@ -7,3 +7,4 @@ From now on, we are removing them automatically: 1. At initialization time, for bundles no longer present in the configuration file (both regular and detached). +2. For versions lower than the active runtime version (regular bundles). diff --git a/go/runtime/bundle/discovery_test.go b/go/runtime/bundle/discovery_test.go index a278ad1ec91..db74f2bb965 100644 --- a/go/runtime/bundle/discovery_test.go +++ b/go/runtime/bundle/discovery_test.go @@ -59,6 +59,11 @@ func (r *mockRegistry) GetComponents(common.Namespace, version.Version) ([]*Expl panic("unimplemented") } +// CleanStaleBundles implements Registry. +func (r *mockRegistry) CleanStaleBundles(context.Context, common.Namespace, version.Version) { + panic("unimplemented") +} + func newMockListener() *mockRegistry { return &mockRegistry{ manifestHashes: make(map[hash.Hash]struct{}), diff --git a/go/runtime/bundle/registry.go b/go/runtime/bundle/registry.go index aa084249fd9..2184d955609 100644 --- a/go/runtime/bundle/registry.go +++ b/go/runtime/bundle/registry.go @@ -1,8 +1,11 @@ package bundle import ( + "context" + "errors" "fmt" "maps" + "os" "slices" "sync" @@ -34,6 +37,11 @@ type Registry interface { // AddBundle adds a bundle from the given path. AddBundle(path string, manifestHash hash.Hash) error + // CleanStaleBundles removes (regular) bundles (components and manifests) + // for versions less then active. It also removes bundle file and + // corresponding exploded dir. + CleanStaleBundles(ctx context.Context, runtimeID common.Namespace, active version.Version) + // GetVersions returns versions for the given runtime, sorted in ascending // order. GetVersions(runtimeID common.Namespace) []version.Version @@ -202,6 +210,90 @@ func (r *registry) AddBundle(path string, manifestHash hash.Hash) error { return nil } +// CleanStaleBundles implements Registry. +func (r *registry) CleanStaleBundles(ctx context.Context, runtimeID common.Namespace, active version.Version) { + for _, v := range r.GetVersions(runtimeID) { + select { + case <-ctx.Done(): + return + default: + if v.Less(active) { + r.logger.Info("Removing bundle with version lower then active", + "runtimeId", runtimeID, + "version", v, + "activeVersion", active, + ) + r.cleanBundle(runtimeID, v) + } + } + } +} + +// cleanBundle removes (regular) bundle from the registry. +// +// Additionally, it removes exploded subdir and bundle file from the bundle dir. +func (r *registry) cleanBundle(runtimeID common.Namespace, version version.Version) { + explDir, err := r.removeBundle(runtimeID, version) + if err != nil { + r.logger.Warn("Bundle clean-up unsuccessful: invalid registry state", + "error", err, + "id", runtimeID, + "runtime_version", version, + ) + } + + if err := os.RemoveAll(explDir); err != nil { + r.logger.Error("failed to remove stale exploded (regular) bundle dir", + "id", runtimeID, + "runtime_version", version, + "ExplodedDataDir", explDir, + "err", err, + ) + } + + if err := os.Remove(explDir + FileExtension); err != nil { + r.logger.Error("failed to remove stale exploded (regular) bundle file", + "id", runtimeID, + "runtime_version", version, + "ExplodedDataDir", explDir, + "err", err, + ) + } +} + +// removeBundle removes bundle for the runtimeID and version from the registry, +// if it exists. +func (r *registry) removeBundle(runtimeID common.Namespace, version version.Version) (string, error) { + r.mu.Lock() + defer r.mu.Unlock() + var explDir string + manifest, ok := r.manifests[runtimeID][version] + if !ok { + return explDir, errors.New("registry missing Manifest") + } + explComp := r.components[runtimeID][component.ID_RONL][version] + if explComp == nil { + return explDir, errors.New("components missing RONL component") + } + explDir = explComp.ExplodedDataDir + manifestHash := manifest.Hash() + + r.logger.Debug("Removing (regular) bundle from registry", + "ronl_version", version, + "runtimeID", runtimeID, + "manifestHash", manifestHash, + "exploded_subdir", explDir, + ) + + // Removal should be atomic. + delete(r.manifests[runtimeID], version) + delete(r.bundles, manifestHash) + for _, c := range manifest.Components { + delete(r.components[runtimeID][c.ID()], c.Version) + } + return explDir, nil +} + // GetVersions implements Registry. func (r *registry) GetVersions(runtimeID common.Namespace) []version.Version { r.mu.RLock() diff --git a/go/runtime/bundle/registry_test.go b/go/runtime/bundle/registry_test.go index 1dde2ccfd18..5c4ffa0388a 100644 --- a/go/runtime/bundle/registry_test.go +++ b/go/runtime/bundle/registry_test.go @@ -1,6 +1,8 @@ package bundle import ( + "context" + "os" "path/filepath" "strings" "testing" @@ -44,7 +46,10 @@ func TestBundleRegistry(t *testing.T) { path3, err := createSyntheticBundle(dir, runtimeID1, version1, []component.Kind{component.RONL}) require.NoError(t, err) - paths := []string{path0, path1, path2, path3} + path4, err := createSyntheticBundle(dir, runtimeID1, version2, []component.Kind{component.RONL}) + require.NoError(t, err) + + paths := []string{path0, path1, path2, path3, path4} // Compute manifest hashes. var hashes []hash.Hash @@ -100,4 +105,49 @@ func TestBundleRegistry(t *testing.T) { _, err = registry.GetComponents(runtimeID1, version3) require.Error(t, err) require.ErrorContains(t, err, "component 'ronl', version '3.0.0', for runtime '8000000000000000000000000000000000000000000000000000000000000001' not found") + + // Add fifth bundle (new RONL component for runtime 1). + err = registry.AddBundle(paths[4], hashes[4]) + require.NoError(t, err) + + // We should have 3 exploded subdirs for regular bundles. + require.Equal(t, 3, explodedRegBundleNum(t, dir)) + + // We should have 2 versions for runtime 1. + versions := registry.GetVersions(runtimeID1) + require.Equal(t, 2, len(versions)) + + // Fetch components for runtime 1, version 2. + comps, err = registry.GetComponents(runtimeID1, version2) + require.NoError(t, err) + require.Equal(t, 2, len(comps)) + require.Equal(t, version2, comps[0].Version) + require.Equal(t, version2, comps[1].Version) + + // Remove version 1 for runtime 1 from the registry. + registry.CleanStaleBundles(context.Background(), runtimeID1, version2) + + // We should have 1 versions for runtime 1. + versions = registry.GetVersions(runtimeID1) + require.Equal(t, 1, len(versions)) + + // Attempt to fetch components for runtime 1, version 1 (just deleted). + _, err = registry.GetComponents(runtimeID1, version1) + require.Error(t, err) + require.ErrorContains(t, err, "component 'ronl', version '1.0.0', for runtime '8000000000000000000000000000000000000000000000000000000000000001' not found") + + // We should have two exploded subdirs for regular bundles. + require.Equal(t, 2, explodedRegBundleNum(t, dir)) +} + +func explodedRegBundleNum(t *testing.T, dir string) int { + entries, err := os.ReadDir(ExplodedPath(dir)) + require.NoError(t, err) + var res int + for _, entry := range entries { + if entry.Name() != "detached" { + res++ + } + } + return res } diff --git a/go/runtime/registry/registry.go b/go/runtime/registry/registry.go index af22f35bb87..8be3f240c4e 100644 --- a/go/runtime/registry/registry.go +++ b/go/runtime/registry/registry.go @@ -383,16 +383,33 @@ func (r *runtime) run(ctx context.Context) { } defer regSub.Close() + wg := &sync.WaitGroup{} var regInitialized, activeInitialized bool for { select { case <-ctx.Done(): + // Ensure bundle clean-up terminates before returning. + wg.Wait() return - case <-epoCh: + case epoch := <-epoCh: if up := r.updateActiveDescriptor(ctx); up && !activeInitialized { close(r.activeDescriptorCh) activeInitialized = true } + + r.RLock() + rt := r.activeDescriptor + r.RUnlock() + if rt != nil { + if active := rt.ActiveDeployment(epoch); active != nil { + wg.Add(1) + go func() { + r.bundleRegistry.CleanStaleBundles(ctx, r.ID(), active.Version) + wg.Done() + }() + } + } + case rt := <-regCh: if !rt.ID.Equal(&r.id) { continue