From f3f52e3a923a7194221e18451e56799184f8d879 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Tue, 7 Jan 2025 05:31:17 +0100 Subject: [PATCH] go/runtime/registry: Cleanup bundles for versions lower then active --- .changelog/5737.feature.md | 1 + go/runtime/bundle/discovery_test.go | 5 +++ go/runtime/bundle/registry.go | 65 +++++++++++++++++++++++++++++ go/runtime/bundle/registry_test.go | 52 ++++++++++++++++++++++- go/runtime/registry/registry.go | 16 ++++++- 5 files changed, 137 insertions(+), 2 deletions(-) diff --git a/.changelog/5737.feature.md b/.changelog/5737.feature.md index dc796efeaaf..5daeaf11e9b 100644 --- a/.changelog/5737.feature.md +++ b/.changelog/5737.feature.md @@ -7,3 +7,4 @@ From now on, we are removing them automatically: 1. At initialization time, for bundles no longer present in the configuration file (both regular and detached). +2. For versions lower than the active runtime version (regular bundles). diff --git a/go/runtime/bundle/discovery_test.go b/go/runtime/bundle/discovery_test.go index 7ffef0870b1..00a88301ed1 100644 --- a/go/runtime/bundle/discovery_test.go +++ b/go/runtime/bundle/discovery_test.go @@ -59,6 +59,11 @@ func (r *mockRegistry) GetComponents(common.Namespace, version.Version) ([]*Expl panic("unimplemented") } +// CleanStaleBundles implements Registry. +func (r *mockRegistry) CleanStaleBundles(context.Context, common.Namespace, version.Version) { + panic("unimplemented") +} + func newMockListener() *mockRegistry { return &mockRegistry{ manifestHashes: make(map[hash.Hash]struct{}), diff --git a/go/runtime/bundle/registry.go b/go/runtime/bundle/registry.go index aa084249fd9..c34825cddb9 100644 --- a/go/runtime/bundle/registry.go +++ b/go/runtime/bundle/registry.go @@ -1,8 +1,10 @@ package bundle import ( + "context" "fmt" "maps" + "os" "slices" "sync" @@ -34,6 +36,11 @@ type Registry interface { // AddBundle adds a bundle from the given path. AddBundle(path string, manifestHash hash.Hash) error + // CleanStaleBundles removes (regular) bundles (components and manifests) + // for versions less then active. It also removes bundle file and + // corresponding exploded dir. + CleanStaleBundles(ctx context.Context, wg *sync.WaitGroup, runtimeID common.Namespace, active version.Version) + // GetVersions returns versions for the given runtime, sorted in ascending // order. GetVersions(runtimeID common.Namespace) []version.Version @@ -202,6 +209,64 @@ func (r *registry) AddBundle(path string, manifestHash hash.Hash) error { return nil } +// CleanStaleBundles implements Registry. +func (r *registry) CleanStaleBundles(ctx context.Context, wg *sync.WaitGroup, runtimeID common.Namespace, active version.Version) { + defer wg.Done() + for _, v := range r.GetVersions(runtimeID) { + select { + case <-ctx.Done(): + return + default: + if v.Less(active) { + r.removeBundle(runtimeID, v, active) + } + } + } +} + +func (r *registry) removeBundle(runtimeID common.Namespace, version version.Version, active version.Version) { + r.mu.Lock() + manifest, ok := r.manifests[runtimeID][version] + if !ok { + return + } + explDir := r.components[runtimeID][component.ID_RONL][version].ExplodedDataDir + manifestHash := manifest.Hash() + r.logger.Info("Removing (regular) bundle, together with exploded subdir with version lower than active", + "active_version", active, + "ronl_version", version, + "runtimeID", runtimeID, + "manifestHash", manifestHash, + "exploded_subdir", explDir, + ) + + delete(r.manifests[runtimeID], version) + delete(r.bundles, manifestHash) + for _, c := range manifest.Components { + delete(r.components[runtimeID][c.ID()], c.Version) + } + r.mu.Unlock() + + // Don't lock when removing files. + if err := os.RemoveAll(explDir); err != nil { + r.logger.Error("failed to remove stale exploded (regular) bundle", + "id", runtimeID, + "runtime_version", version, + "ExplodedDataDir", explDir, + "err", err, + ) + } + + if err := os.Remove(explDir + FileExtension); err != nil { + r.logger.Error("failed to remove stale (regular) bundle file", + "id", runtimeID, + "runtime_version", version, + "ExplodedDataDir", explDir, + "err", err, + ) + } +} + // GetVersions implements Registry. func (r *registry) GetVersions(runtimeID common.Namespace) []version.Version { r.mu.RLock() diff --git a/go/runtime/bundle/registry_test.go b/go/runtime/bundle/registry_test.go index 1dde2ccfd18..5c4ffa0388a 100644 --- a/go/runtime/bundle/registry_test.go +++ b/go/runtime/bundle/registry_test.go @@ -1,6 +1,8 @@ package bundle import ( + "context" + "os" "path/filepath" "strings" "testing" @@ -44,7 +46,10 @@ func TestBundleRegistry(t *testing.T) { path3, err := createSyntheticBundle(dir, runtimeID1, version1, []component.Kind{component.RONL}) require.NoError(t, err) - paths := []string{path0, path1, path2, path3} + path4, err := createSyntheticBundle(dir, runtimeID1, version2, []component.Kind{component.RONL}) + require.NoError(t, err) + + paths := []string{path0, path1, path2, path3, path4} // Compute manifest hashes. var hashes []hash.Hash @@ -100,4 +105,49 @@ func TestBundleRegistry(t *testing.T) { _, err = registry.GetComponents(runtimeID1, version3) require.Error(t, err) require.ErrorContains(t, err, "component 'ronl', version '3.0.0', for runtime '8000000000000000000000000000000000000000000000000000000000000001' not found") + + // Add fifth bundle (new RONL component for runtime 1). + err = registry.AddBundle(paths[4], hashes[4]) + require.NoError(t, err) + + // We should have 3 exploded subdirs for regular bundles. + require.Equal(t, 3, explodedRegBundleNum(t, dir)) + + // We should have 2 versions for runtime 1. + versions := registry.GetVersions(runtimeID1) + require.Equal(t, 2, len(versions)) + + // Fetch components for runtime 1, version 2. + comps, err = registry.GetComponents(runtimeID1, version2) + require.NoError(t, err) + require.Equal(t, 2, len(comps)) + require.Equal(t, version2, comps[0].Version) + require.Equal(t, version2, comps[1].Version) + + // Remove version 1 for runtime 1 from the registry. + registry.CleanStaleBundles(context.Background(), runtimeID1, version2) + + // We should have 1 versions for runtime 1. + versions = registry.GetVersions(runtimeID1) + require.Equal(t, 1, len(versions)) + + // Attempt to fetch components for runtime 1, version 1 (just deleted). + _, err = registry.GetComponents(runtimeID1, version1) + require.Error(t, err) + require.ErrorContains(t, err, "component 'ronl', version '1.0.0', for runtime '8000000000000000000000000000000000000000000000000000000000000001' not found") + + // We should have two exploded subdirs for regular bundles. + require.Equal(t, 2, explodedRegBundleNum(t, dir)) +} + +func explodedRegBundleNum(t *testing.T, dir string) int { + entries, err := os.ReadDir(ExplodedPath(dir)) + require.NoError(t, err) + var res int + for _, entry := range entries { + if entry.Name() != "detached" { + res++ + } + } + return res } diff --git a/go/runtime/registry/registry.go b/go/runtime/registry/registry.go index e84fb26c314..2edbf879a06 100644 --- a/go/runtime/registry/registry.go +++ b/go/runtime/registry/registry.go @@ -383,16 +383,30 @@ func (r *runtime) run(ctx context.Context) { } defer regSub.Close() + wg := &sync.WaitGroup{} var regInitialized, activeInitialized bool for { select { case <-ctx.Done(): + // Ensure bundle clean-up terminates before returning. + wg.Wait() return - case <-epoCh: + case epoch := <-epoCh: if up := r.updateActiveDescriptor(ctx); up && !activeInitialized { close(r.activeDescriptorCh) activeInitialized = true } + + r.RLock() + rt := r.activeDescriptor + r.RUnlock() + if rt != nil { + if active := rt.ActiveDeployment(epoch); active != nil { + wg.Add(1) + go r.bundleRegistry.CleanStaleBundles(ctx, wg, r.ID(), active.Version) + } + } + case rt := <-regCh: if !rt.ID.Equal(&r.id) { continue