Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[no-release-notes] go/store/nbs: generational_chunk_store.go: In GCMode_Full, also take dependencies on chunks read from OldGen. #8796

Merged
merged 1 commit into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions go/store/chunks/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,11 @@ type ChunkStoreGarbageCollector interface {
//
// This function should not block indefinitely and should return an
// error if a GC is already in progress.
BeginGC(addChunk func(hash.Hash) bool) error
BeginGC(addChunk func(hash.Hash) bool, mode GCMode) error

// EndGC indicates that the GC is over. The previously provided
// addChunk function must not be called after this function.
EndGC()
EndGC(mode GCMode)

// MarkAndSweepChunks returns a handle that can be used to supply
// hashes which should be saved into |dest|. The hashes are
Expand Down Expand Up @@ -257,6 +257,12 @@ type GenerationalCS interface {
NewGen() ChunkStoreGarbageCollector
OldGen() ChunkStoreGarbageCollector
GhostGen() ChunkStore

// Has the same return values as OldGen().HasMany, but should be used by a
// generational GC process as the filter function instead of
// OldGen().HasMany. This function never takes read dependencies on the
// chunks that it queries.
OldGenGCFilter() HasManyFunc
}

var ErrUnsupportedOperation = errors.New("operation not supported")
Expand Down
4 changes: 2 additions & 2 deletions go/store/chunks/memory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,11 +335,11 @@ func (ms *MemoryStoreView) Commit(ctx context.Context, current, last hash.Hash)
return success, nil
}

func (ms *MemoryStoreView) BeginGC(keeper func(hash.Hash) bool) error {
func (ms *MemoryStoreView) BeginGC(keeper func(hash.Hash) bool, _ GCMode) error {
return ms.transitionToGC(keeper)
}

func (ms *MemoryStoreView) EndGC() {
func (ms *MemoryStoreView) EndGC(_ GCMode) {
ms.transitionToNoGC()
}

Expand Down
8 changes: 4 additions & 4 deletions go/store/chunks/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,20 @@ func (s *TestStoreView) Put(ctx context.Context, c Chunk, getAddrs GetAddrsCurry
return s.ChunkStore.Put(ctx, c, getAddrs)
}

func (s *TestStoreView) BeginGC(keeper func(hash.Hash) bool) error {
func (s *TestStoreView) BeginGC(keeper func(hash.Hash) bool, mode GCMode) error {
collector, ok := s.ChunkStore.(ChunkStoreGarbageCollector)
if !ok {
return ErrUnsupportedOperation
}
return collector.BeginGC(keeper)
return collector.BeginGC(keeper, mode)
}

func (s *TestStoreView) EndGC() {
func (s *TestStoreView) EndGC(mode GCMode) {
collector, ok := s.ChunkStore.(ChunkStoreGarbageCollector)
if !ok {
panic(ErrUnsupportedOperation)
}
collector.EndGC()
collector.EndGC(mode)
}

func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, mode GCMode) (MarkAndSweeper, error) {
Expand Down
33 changes: 29 additions & 4 deletions go/store/nbs/generational_chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,12 +494,37 @@ func (gcs *GenerationalNBS) UpdateManifest(ctx context.Context, updates map[hash
return gcs.newGen.UpdateManifest(ctx, updates)
}

func (gcs *GenerationalNBS) BeginGC(keeper func(hash.Hash) bool) error {
return gcs.newGen.BeginGC(keeper)
func (gcs *GenerationalNBS) OldGenGCFilter() chunks.HasManyFunc {
return func(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error) {
return gcs.oldGen.hasManyDep(ctx, hashes, gcDependencyMode_NoDependency)
}
}

func (gcs *GenerationalNBS) EndGC() {
gcs.newGen.EndGC()
func (gcs *GenerationalNBS) BeginGC(keeper func(hash.Hash) bool, mode chunks.GCMode) error {
err := gcs.newGen.BeginGC(keeper, mode)
if err != nil {
return err
}
// In GCMode_Full, the OldGen is also being collected. In normal
// operation, the OldGen is not being collected because it is
// still growing monotonically and nothing in it is at risk of
// going away. In Full mode, we want to take read dependencies
// from the OldGen as well.
if mode == chunks.GCMode_Full {
err = gcs.oldGen.BeginGC(keeper, mode)
if err != nil {
gcs.newGen.EndGC(mode)
return err
}
}
return nil
}

func (gcs *GenerationalNBS) EndGC(mode chunks.GCMode) {
if mode == chunks.GCMode_Full {
gcs.oldGen.EndGC(mode)
}
gcs.newGen.EndGC(mode)
}

func (gcs *GenerationalNBS) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.MarkAndSweeper, error) {
Expand Down
8 changes: 4 additions & 4 deletions go/store/nbs/nbs_metrics_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ func (nbsMW *NBSMetricWrapper) SupportedOperations() chunks.TableFileStoreOps {
return nbsMW.nbs.SupportedOperations()
}

func (nbsMW *NBSMetricWrapper) BeginGC(keeper func(hash.Hash) bool) error {
return nbsMW.nbs.BeginGC(keeper)
func (nbsMW *NBSMetricWrapper) BeginGC(keeper func(hash.Hash) bool, mode chunks.GCMode) error {
return nbsMW.nbs.BeginGC(keeper, mode)
}

func (nbsMW *NBSMetricWrapper) EndGC() {
nbsMW.nbs.EndGC()
func (nbsMW *NBSMetricWrapper) EndGC(mode chunks.GCMode) {
nbsMW.nbs.EndGC(mode)
}

func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.MarkAndSweeper, error) {
Expand Down
32 changes: 14 additions & 18 deletions go/store/nbs/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,6 @@ func (nbs *NomsBlockStore) conjoinIfRequired(ctx context.Context) (bool, error)
func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.Hash]uint32) (mi ManifestInfo, err error) {
nbs.mu.Lock()
defer nbs.mu.Unlock()
err = nbs.waitForGC(ctx)
if err != nil {
return
}

err = nbs.checkAllManifestUpdatesExist(ctx, updates)
if err != nil {
return
Expand Down Expand Up @@ -361,11 +356,6 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.
func (nbs *NomsBlockStore) UpdateManifestWithAppendix(ctx context.Context, updates map[hash.Hash]uint32, option ManifestAppendixOption) (mi ManifestInfo, err error) {
nbs.mu.Lock()
defer nbs.mu.Unlock()
err = nbs.waitForGC(ctx)
if err != nil {
return
}

err = nbs.checkAllManifestUpdatesExist(ctx, updates)
if err != nil {
return
Expand Down Expand Up @@ -517,11 +507,6 @@ func fromManifestAppendixOptionNewContents(upstream manifestContents, appendixSp
func OverwriteStoreManifest(ctx context.Context, store *NomsBlockStore, root hash.Hash, tableFiles map[hash.Hash]uint32, appendixTableFiles map[hash.Hash]uint32) (err error) {
store.mu.Lock()
defer store.mu.Unlock()
err = store.waitForGC(ctx)
if err != nil {
return
}

contents := manifestContents{
root: root,
nbfVers: store.upstream.nbfVers,
Expand Down Expand Up @@ -1128,6 +1113,10 @@ func (nbs *NomsBlockStore) Has(ctx context.Context, h hash.Hash) (bool, error) {
}

func (nbs *NomsBlockStore) HasMany(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error) {
return nbs.hasManyDep(ctx, hashes, gcDependencyMode_TakeDependency)
}

func (nbs *NomsBlockStore) hasManyDep(ctx context.Context, hashes hash.HashSet, gcDepMode gcDependencyMode) (hash.HashSet, error) {
if hashes.Size() == 0 {
return nil, nil
}
Expand All @@ -1143,7 +1132,11 @@ func (nbs *NomsBlockStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha

nbs.mu.Lock()
if nbs.mt != nil {
remaining, gcb, err := nbs.mt.hasMany(reqs, nbs.keeperFunc)
keeper := nbs.keeperFunc
if gcDepMode == gcDependencyMode_NoDependency {
keeper = nil
}
remaining, gcb, err := nbs.mt.hasMany(reqs, keeper)
if err != nil {
nbs.mu.Unlock()
return nil, err
Expand All @@ -1162,6 +1155,9 @@ func (nbs *NomsBlockStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha
}
}
tables, keeper, endRead := nbs.tables, nbs.keeperFunc, nbs.beginRead()
if gcDepMode == gcDependencyMode_NoDependency {
keeper = nil
}
nbs.mu.Unlock()

remaining, gcb, err := tables.hasMany(reqs, keeper)
Expand Down Expand Up @@ -1730,7 +1726,7 @@ func (nbs *NomsBlockStore) pruneTableFiles(ctx context.Context) (err error) {
}, mtime)
}

func (nbs *NomsBlockStore) BeginGC(keeper func(hash.Hash) bool) error {
func (nbs *NomsBlockStore) BeginGC(keeper func(hash.Hash) bool, _ chunks.GCMode) error {
nbs.cond.L.Lock()
defer nbs.cond.L.Unlock()
if nbs.gcInProgress {
Expand All @@ -1742,7 +1738,7 @@ func (nbs *NomsBlockStore) BeginGC(keeper func(hash.Hash) bool) error {
return nil
}

func (nbs *NomsBlockStore) EndGC() {
func (nbs *NomsBlockStore) EndGC(_ chunks.GCMode) {
nbs.cond.L.Lock()
defer nbs.cond.L.Unlock()
if !nbs.gcInProgress {
Expand Down
4 changes: 2 additions & 2 deletions go/store/nbs/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func TestNBSCopyGC(t *testing.T) {
require.NoError(t, err)
require.True(t, ok)

require.NoError(t, st.BeginGC(nil))
require.NoError(t, st.BeginGC(nil, chunks.GCMode_Full))
noopFilter := func(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error) {
return hashes, nil
}
Expand All @@ -349,7 +349,7 @@ func TestNBSCopyGC(t *testing.T) {
require.NoError(t, err)
require.NoError(t, sweeper.Close(ctx))
require.NoError(t, finalizer.SwapChunksInStore(ctx))
st.EndGC()
st.EndGC(chunks.GCMode_Full)

for h, c := range keepers {
out, err := st.Get(ctx, h)
Expand Down
12 changes: 6 additions & 6 deletions go/store/types/value_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
var oldGenHasMany chunks.HasManyFunc
switch mode {
case GCModeDefault:
oldGenHasMany = oldGen.HasMany
oldGenHasMany = gcs.OldGenGCFilter()
chksMode = chunks.GCMode_Default
case GCModeFull:
oldGenHasMany = unfilteredHashFunc
Expand All @@ -601,11 +601,11 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
}

err := func() error {
err := collector.BeginGC(lvs.gcAddChunk)
err := collector.BeginGC(lvs.gcAddChunk, chksMode)
if err != nil {
return err
}
defer collector.EndGC()
defer collector.EndGC(chksMode)

var callCancelSafepoint bool
if safepoint != nil {
Expand Down Expand Up @@ -650,7 +650,7 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
}

if mode == GCModeDefault {
oldGenHasMany = oldGen.HasMany
oldGenHasMany = gcs.OldGenGCFilter()
} else {
oldGenHasMany = newFileHasMany
}
Expand Down Expand Up @@ -685,11 +685,11 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
newGenRefs.InsertAll(oldGenRefs)

err := func() error {
err := collector.BeginGC(lvs.gcAddChunk)
err := collector.BeginGC(lvs.gcAddChunk, chunks.GCMode_Full)
if err != nil {
return err
}
defer collector.EndGC()
defer collector.EndGC(chunks.GCMode_Full)

var callCancelSafepoint bool
if safepoint != nil {
Expand Down
56 changes: 38 additions & 18 deletions integration-tests/go-sql-server-driver/concurrent_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,49 @@ import (
)

func TestConcurrentGC(t *testing.T) {
var gct gcTest
gct.numThreads = 8
gct.duration = 10 * time.Second
t.Run("NoCommits", func(t *testing.T) {
gct.run(t)
t.Run("Normal", func(t *testing.T) {
var gct = gcTest{
numThreads: 8,
duration: 10 * time.Second,
}
gct.run(t)
})
t.Run("Full", func(t *testing.T) {
var gct = gcTest{
numThreads: 8,
duration: 10 * time.Second,
full: true,
}
gct.run(t)
})
})
gct.commit = true
t.Run("WithCommits", func(t *testing.T) {
gct.run(t)
t.Run("Normal", func(t *testing.T) {
var gct = gcTest{
numThreads: 8,
duration: 10 * time.Second,
commit: true,
}
gct.run(t)
})
t.Run("Full", func(t *testing.T) {
var gct = gcTest{
numThreads: 8,
duration: 10 * time.Second,
commit: true,
full: true,
}
gct.run(t)
})
})
}

type gcTest struct {
numThreads int
duration time.Duration
commit bool
full bool
}

func (gct gcTest) createDB(t *testing.T, ctx context.Context, db *sql.DB) {
Expand Down Expand Up @@ -118,19 +145,12 @@ func (gct gcTest) doGC(t *testing.T, ctx context.Context, db *sql.DB) error {
})
}()
b := time.Now()
_, err = conn.ExecContext(ctx, "call dolt_gc()")
if err != nil {
if !assert.NotContains(t, err.Error(), "dangling ref") {
return err
}
if !assert.NotContains(t, err.Error(), "is unexpected noms value") {
return err
}
if !assert.NotContains(t, err.Error(), "interface conversion: types.Value is nil") {
return err
}
t.Logf("err in Exec dolt_gc: %v", err)
if !gct.full {
_, err = conn.ExecContext(ctx, "call dolt_gc()")
} else {
_, err = conn.ExecContext(ctx, `call dolt_gc("--full")`)
}
if assert.NoError(t, err) {
t.Logf("successful dolt_gc took %v", time.Since(b))
}
return nil
Expand Down
Loading