Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Periodically clean up cached bundles directory #5976

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changelog/5737.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/runtime/bundle: Cleanup bundles on runtime upgrade
5 changes: 5 additions & 0 deletions go/common/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func (v Version) Cmp(other Version) int {
return int(v.Patch) - int(other.Patch)
}

// Less checks if the current Version is strictly less than the given Version.
func (v Version) Less(other Version) bool {
return v.Cmp(other) < 0
}

// ToU64 returns the version as platform-dependent uint64.
func (v Version) ToU64() uint64 {
return (uint64(v.Major) << 32) | (uint64(v.Minor) << 16) | (uint64(v.Patch))
Expand Down
26 changes: 26 additions & 0 deletions go/common/version/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,3 +269,29 @@ func TestProtocolVersionCompatible(t *testing.T) {
require.Equal(t, v.isCompatible, Versions.Compatible(v.versions()), v.msg)
}
}

func TestLess(t *testing.T) {
tests := []struct {
name string
v1 Version
v2 Version
expect bool
}{
{"v1 less than v2 (major)", Version{1, 0, 0}, Version{2, 1, 3}, true},
{"v1 greater than v2 (major)", Version{2, 0, 0}, Version{1, 1, 3}, false},
{"v1 less than v2 (minor)", Version{1, 1, 0}, Version{1, 2, 5}, true},
{"v1 greater than v2 (minor)", Version{1, 2, 0}, Version{1, 1, 5}, false},
{"v1 less than v2 (patch)", Version{1, 0, 1}, Version{1, 0, 2}, true},
{"v1 greater than v2 (patch)", Version{1, 0, 2}, Version{1, 0, 1}, false},
{"v1 equal to v2", Version{1, 0, 0}, Version{1, 0, 0}, false},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
result := tc.v1.Less(tc.v2)
if result != tc.expect {
t.Errorf("Less(%v, %v) = %v, want %v", tc.v1, tc.v2, result, tc.expect)
}
})
}
}
4 changes: 4 additions & 0 deletions go/oasis-test-runner/oasis/oasis.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ type Node struct { // nolint: maligned
entity *Entity
}

func (n *Node) Dir() string {
return n.dir.String()
}

// SetArchiveMode sets the archive mode.
func (n *Node) SetArchiveMode(archive bool) {
n.consensus.EnableArchiveMode = archive
Expand Down
70 changes: 70 additions & 0 deletions go/oasis-test-runner/scenario/e2e/runtime/runtime_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/oasisprotocol/oasis-core/go/common/logging"
cmSync "github.com/oasisprotocol/oasis-core/go/common/sync"
"github.com/oasisprotocol/oasis-core/go/common/version"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/env"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis/cli"
Expand Down Expand Up @@ -106,9 +107,78 @@ func (sc *runtimeUpgradeImpl) Run(ctx context.Context, childEnv *env.Env) error
// Run client again.
sc.Logger.Info("starting a second client to check if runtime works")
sc.Scenario.TestClient = NewTestClient().WithSeed("seed2").WithScenario(InsertRemoveEncWithSecretsScenarioV2)

// Ensure that after upgrade, every compute worker had its old exploded
// bundle removed from its bundles dir.
for _, worker := range sc.Net.ComputeWorkers() {
if err := ensureCorrectBundlesDir(sc.Logger, worker.Name, worker.Dir()); err != nil {
sc.Logger.Error("compute worker bundle dir clean-up error",
"worker", worker.Name,
"err", err,
)
return err
}
}

return sc.RunTestClientAndCheckLogs(ctx, childEnv)
}

func ensureCorrectBundlesDir(logger *logging.Logger, workerName, workerDir string) error {
logger.Info("ensuring cached exploded bundle for version 0.0.0 was removed",
"worker", workerName)

// There should be only one exploded bundle dir.
bundlesDir := bundle.ExplodedPath(workerDir)
entry, err := ensureSingleEntry(logger, bundlesDir)
explDir := path.Join(bundlesDir, entry)
if err != nil {
return fmt.Errorf("ensureSingleEntry(logger, %s): %w", workerDir, err)
}

// Ensure exploded cached bundle is for the latest version (0.1.0).
manifest, err := bundle.ReadManifest(explDir)
if err != nil {
return fmt.Errorf("bundle.ReadManifest(%s)", explDir)
}
want := version.Version{Minor: uint16(1)}
got := manifest.GetVersion()
if want != got {
return fmt.Errorf("unexpected bundle version: want %v, but %v got", want, got)
}
return nil
}

func ensureSingleEntry(logger *logging.Logger, dir string) (string, error) {
entries, err := os.ReadDir(dir)
if err != nil {
logger.Error("failed to read dir")
return "", err
}

if n := len(entries); n != 1 {
var entryNames []string
for _, entry := range entries {
entryNames = append(entryNames, entry.Name())
}
logger.Error("failed to ensure only single dir entry",
"entries", strings.Join(entryNames, "; "),
"dir", dir,
)
return "", fmt.Errorf("unexpected number of dir entries: expected 1, but %d got", n)
}

entry := entries[0]

if !entry.IsDir() {
logger.Error("failed to ensure single dir entry: entry is not dir",
"entry", entry.Name(),
)
return "", fmt.Errorf("%s is not a dir", entry)
}

return entry.Name(), nil
}

type bundleServer struct {
startOne cmSync.One

Expand Down
16 changes: 16 additions & 0 deletions go/runtime/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,22 @@ func Open(fn string, opts ...OpenOption) (_ *Bundle, err error) {
return bnd, nil
}

// ReadManifest reads and parses a Manifest from the exploded bundle directory.
//
// Returns an error if the manifest file cannot be read or parsed.
func ReadManifest(bundleDir string) (*Manifest, error) {
b, err := os.ReadFile(filepath.Join(bundleDir, manifestName))
if err != nil {
return nil, fmt.Errorf("failed to read manifest: %w", err)
}

var manifest Manifest
if err = json.Unmarshal(b, &manifest); err != nil {
return nil, fmt.Errorf("failed to parse manifest: %w", err)
}
return &manifest, nil
}

// Data is a data item in the bundle.
type Data interface {
// Open returns an io.ReadCloser that can be used to access the underlying data.
Expand Down
81 changes: 72 additions & 9 deletions go/runtime/bundle/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package bundle
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -19,6 +18,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/common/logging"
cmSync "github.com/oasisprotocol/oasis-core/go/common/sync"
"github.com/oasisprotocol/oasis-core/go/common/version"
"github.com/oasisprotocol/oasis-core/go/config"
)

Expand Down Expand Up @@ -46,6 +46,13 @@ type ManifestStore interface {
// AddManifest adds the provided manifest, whose components were extracted
// to the specified directory, to the store.
AddManifest(manifest *Manifest, dir string) error

// RemoveManifests removes all regular manifests for the specified runtimeID
// whose versions are lower than the provided active version.
//
// It returns a list of directories for the removed exploded bundles or an error
// if the operation fails.
RemoveManifests(runtimeID common.Namespace, active version.Version) ([]string, error)
}

// Manager is responsible for managing bundles.
Expand All @@ -64,6 +71,8 @@ type Manager struct {

downloadCh chan struct{}
downloadQueue map[common.Namespace][]hash.Hash
cleanupCh chan struct{}
cleanupQueue map[common.Namespace]version.Version

client *http.Client
store ManifestStore
Expand Down Expand Up @@ -121,6 +130,8 @@ func NewManager(dataDir string, runtimeIDs []common.Namespace, store ManifestSto
runtimeBaseURLs: runtimeBaseURLs,
downloadCh: make(chan struct{}, 1),
downloadQueue: make(map[common.Namespace][]hash.Hash),
cleanupCh: make(chan struct{}, 1),
cleanupQueue: make(map[common.Namespace]version.Version),
client: &client,
store: store,
logger: *logger,
Expand Down Expand Up @@ -193,6 +204,9 @@ func (m *Manager) run(ctx context.Context) {
select {
case <-ticker.C:
case <-m.downloadCh:
case <-m.cleanupCh:
m.CleanStaleBundles()
continue
case <-ctx.Done():
m.logger.Info("stopping")
return
Expand Down Expand Up @@ -243,13 +257,38 @@ func (m *Manager) Queue(runtimeID common.Namespace, manifestHashes []hash.Hash)
}
}

// QueueCleanup updates the active version for the runtime ID,
// triggering clean-up of regular bundles for version lower than active.
func (m *Manager) QueueCleanup(runtimeID common.Namespace, active version.Version) {
// Update the active versions.
m.mu.Lock()
defer m.mu.Unlock()

m.cleanupQueue[runtimeID] = active

// Trigger immediate clean-up of stale bundles.
select {
case m.cleanupCh <- struct{}{}:
default:
}
}

// Download tries to download bundles in the queue.
func (m *Manager) Download() {
for runtimeID := range m.runtimeIDs {
m.downloadBundles(runtimeID)
}
}

// CleanStaleBundles removes outdated manifest hashes and deletes corresponding
// exploded bundles for runtimes in the clean-up queue.
func (m *Manager) CleanStaleBundles() {
m.logger.Info("removing regular bundles with version less than active")
for runtimeID := range m.runtimeIDs {
m.cleanStaleBundles(runtimeID)
}
}

func (m *Manager) downloadBundles(runtimeID common.Namespace) {
// Try to download queued bundles.
m.mu.RLock()
Expand Down Expand Up @@ -437,6 +476,32 @@ func (m *Manager) fetchBundle(url string) (string, error) {
return file.Name(), nil
}

func (m *Manager) cleanStaleBundles(runtimeID common.Namespace) {
m.mu.Lock()
active, ok := m.cleanupQueue[runtimeID]
m.mu.Unlock()

if !ok {
return
}

// TODO should you also remove dirs for manifests successfully removed,
// even if error?
dirs, err := m.store.RemoveManifests(runtimeID, active)
Comment on lines +488 to +490
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO

if err != nil {
m.logger.Error("failed to remove regular manifests from the registry",
"runtimeID", runtimeID,
"version", active,
"err", err,
)
return
}

for _, dir := range dirs {
_ = m.removeBundle(dir)
}
}

func (m *Manager) loadManifests() (map[string]*Manifest, error) {
m.logger.Info("loading manifests")

Expand All @@ -453,22 +518,20 @@ func (m *Manager) loadManifests() (map[string]*Manifest, error) {
}
dir := filepath.Join(m.bundleDir, entry.Name())

b, err := os.ReadFile(filepath.Join(dir, manifestName))
manifest, err := ReadManifest(dir)
if err != nil {
return nil, fmt.Errorf("failed to read manifest: %w", err)
}

var manifest Manifest
if err = json.Unmarshal(b, &manifest); err != nil {
return nil, fmt.Errorf("failed to parse manifest: %w", err)
m.logger.Error("failed to read manifest",
"err", err,
)
return nil, fmt.Errorf("ReadManifest(%s): %w", dir, err)
}

m.logger.Info("manifest loaded",
"name", manifest.Name,
"hash", manifest.Hash(),
)

manifests[dir] = &manifest
manifests[dir] = manifest
}

return manifests, nil
Expand Down
6 changes: 6 additions & 0 deletions go/runtime/bundle/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (

"github.com/stretchr/testify/require"

"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/common/version"
)

type mockStore struct {
Expand All @@ -28,6 +30,10 @@ func (r *mockStore) AddManifest(manifest *Manifest, _ string) error {
return nil
}

func (r *mockStore) RemoveManifests(_ common.Namespace, _ version.Version) ([]string, error) {
panic("RemoveManifests not implemented")
}

func TestRegisterManifest(t *testing.T) {
store := newMockStore()
manager, err := NewManager("", nil, store)
Expand Down
Loading
Loading