Skip to content

Commit

Permalink
Merge pull request #8748 from dolthub/aaron/ghost-compressed-chunks
Browse files Browse the repository at this point in the history
[no-release-notes] go/store/nbs: Make CompressedChunk support ghost chunks and make and GhostBlockStore support GetManyCompressed.
  • Loading branch information
reltuk authored Jan 15, 2025
2 parents 0582af3 + 0b17081 commit 1681cbd
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 38 deletions.
3 changes: 3 additions & 0 deletions go/store/datas/pull/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,9 @@ func (p *Puller) Pull(ctx context.Context) error {
if err != nil {
return err
}
if cChk.IsGhost() {
return fmt.Errorf("attempted to push or pull ghost chunk: %w", nbs.ErrGhostChunkRequested)
}
if len(cChk.FullCompressedChunk) == 0 {
return errors.New("failed to get all chunks.")
}
Expand Down
34 changes: 34 additions & 0 deletions go/store/datas/pull/puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,40 @@ func TestChunkJournalPuller(t *testing.T) {
})
}

func TestPuller(t *testing.T) {
t.Run("GhostChunk", func(t *testing.T) {
ctx := context.Background()
gs, err := nbs.NewGhostBlockStore(t.TempDir())
waf, err := types.WalkAddrsForChunkStore(gs)
require.NoError(t, err)

ghost := hash.Parse("e6esqr35dkqnc7updhj6ap5v82sahm9r")

gs.PersistGhostHashes(ctx, hash.NewHashSet(ghost))

statsCh := make(chan Stats)
go func() {
for _ = range statsCh {
}
}()

dir := filepath.Join(os.TempDir(), uuid.New().String())
err = os.MkdirAll(dir, os.ModePerm)
require.NoError(t, err)

nbf := types.Format_Default.VersionString()
q := nbs.NewUnlimitedMemQuotaProvider()

st, err := nbs.NewLocalJournalingStore(ctx, nbf, dir, q)
require.NoError(t, err)

plr, err := NewPuller(ctx, t.TempDir(), 128, gs, st, waf, []hash.Hash{ghost}, statsCh)
require.NoError(t, err)
err = plr.Pull(ctx)
require.ErrorIs(t, err, nbs.ErrGhostChunkRequested)
})
}

func addTableValues(ctx context.Context, vrw types.ValueReadWriter, m types.Map, tableName string, alternatingKeyVals ...types.Value) (types.Map, error) {
val, ok, err := m.MaybeGet(ctx, types.String(tableName))

Expand Down
6 changes: 6 additions & 0 deletions go/store/nbs/cmp_chunk_table_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ func (tw *CmpChunkTableWriter) GetMD5() []byte {

// AddCmpChunk adds a compressed chunk
func (tw *CmpChunkTableWriter) AddCmpChunk(c CompressedChunk) error {
if c.IsGhost() {
// Ghost chunks cannot be written to a table file. They should
// always be filtered by the write processes before landing
// here.
return ErrGhostChunkRequested
}
if len(c.CompressedData) == 0 {
panic("NBS blocks cannot be zero length")
}
Expand Down
6 changes: 6 additions & 0 deletions go/store/nbs/cmp_chunk_table_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ func TestCmpChunkTableWriter(t *testing.T) {
compareContentsOfTables(t, ctx, hashes, tr, outputTR)
}

func TestCmpChunkTableWriterGhostChunk(t *testing.T) {
tw, err := NewCmpChunkTableWriter("")
require.NoError(t, err)
require.Error(t, tw.AddCmpChunk(NewGhostCompressedChunk(hash.Parse("6af71afc2ea0hmp4olev0vp9q1q5gvb1"))))
}

func TestContainsDuplicates(t *testing.T) {
t.Run("Empty", func(t *testing.T) {
require.False(t, containsDuplicates(prefixIndexSlice{}))
Expand Down
42 changes: 10 additions & 32 deletions go/store/nbs/generational_chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (
)

var _ chunks.ChunkStore = (*GenerationalNBS)(nil)
var _ chunks.GenerationalCS = (*GenerationalNBS)(nil)
var _ chunks.TableFileStore = (*GenerationalNBS)(nil)
var _ chunks.GenerationalCS = (*GenerationalNBS)(nil)
var _ chunks.ChunkStoreGarbageCollector = (*GenerationalNBS)(nil)
var _ NBSCompressedChunkStore = (*GenerationalNBS)(nil)

type GenerationalNBS struct {
oldGen *NomsBlockStore
Expand Down Expand Up @@ -143,60 +143,38 @@ func (gcs *GenerationalNBS) GetMany(ctx context.Context, hashes hash.HashSet, fo
}

func (gcs *GenerationalNBS) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, CompressedChunk)) error {
mu := &sync.Mutex{}
var mu sync.Mutex
notInOldGen := hashes.Copy()
err := gcs.oldGen.GetManyCompressed(ctx, hashes, func(ctx context.Context, chunk CompressedChunk) {
func() {
mu.Lock()
defer mu.Unlock()
delete(notInOldGen, chunk.Hash())
}()

mu.Lock()
delete(notInOldGen, chunk.Hash())
mu.Unlock()
found(ctx, chunk)
})

if err != nil {
return err
}

if len(notInOldGen) == 0 {
return nil
}

notFound := notInOldGen.Copy()
err = gcs.newGen.GetManyCompressed(ctx, notInOldGen, func(ctx context.Context, chunk CompressedChunk) {
func() {
mu.Lock()
defer mu.Unlock()
delete(notFound, chunk.Hash())
}()
mu.Lock()
delete(notFound, chunk.Hash())
mu.Unlock()
found(ctx, chunk)
})
if err != nil {
return err
}

if len(notFound) == 0 {
return nil
}

// We are definitely missing some chunks. Check if any are ghost chunks, mainly to give a better error message.
// The missing chunks may be ghost chunks.
if gcs.ghostGen != nil {
// If any of the hashes are in the ghost store.
ghostFound := false
err := gcs.ghostGen.GetMany(ctx, hashes, func(ctx context.Context, chunk *chunks.Chunk) {
// This should be true for all chunks in the ghost store.
if chunk.IsGhost() {
ghostFound = true
}
})

if err != nil {
return err
}
if ghostFound {
return ErrGhostChunkRequested
}
return gcs.ghostGen.GetManyCompressed(ctx, notFound, found)
}
return nil
}
Expand Down
20 changes: 17 additions & 3 deletions go/store/nbs/ghost_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"path/filepath"

"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/constants"
"github.com/dolthub/dolt/go/store/hash"
)

Expand All @@ -32,8 +33,9 @@ type GhostBlockStore struct {
ghostObjectsFile string
}

// We use the Has, HasMany, Get, GetMany, and PersistGhostHashes methods from the ChunkStore interface. All other methods are not supported.
var _ chunks.ChunkStore = &GhostBlockStore{}
// We use the Has, HasMany, Get, GetMany, GetManyCompressed, and PersistGhostHashes methods from the ChunkStore interface. All other methods are not supported.
var _ chunks.ChunkStore = (*GhostBlockStore)(nil)
var _ NBSCompressedChunkStore = (*GenerationalNBS)(nil)

// NewGhostBlockStore returns a new GhostBlockStore instance. Currently the only parameter is the path to the directory
// where we will create a text file called ghostObjects.txt. This file will contain the hashes of the ghost objects. Creation
Expand All @@ -52,6 +54,7 @@ func NewGhostBlockStore(nomsPath string) (*GhostBlockStore, error) {
// Other error, permission denied, etc, we want to hear about.
return nil, err
}
defer f.Close()
scanner := bufio.NewScanner(f)
skiplist := &hash.HashSet{}
for scanner.Scan() {
Expand Down Expand Up @@ -87,6 +90,15 @@ func (g GhostBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, found
return nil
}

func (g GhostBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, CompressedChunk)) error {
for h := range hashes {
if g.skippedRefs.Has(h) {
found(ctx, NewGhostCompressedChunk(h))
}
}
return nil
}

func (g *GhostBlockStore) PersistGhostHashes(ctx context.Context, hashes hash.HashSet) error {
if hashes.Size() == 0 {
return fmt.Errorf("runtime error. PersistGhostHashes called with empty hash set")
Expand All @@ -96,6 +108,7 @@ func (g *GhostBlockStore) PersistGhostHashes(ctx context.Context, hashes hash.Ha
if err != nil {
return err
}
defer f.Close()

for h := range hashes {
if _, err := f.WriteString(h.String() + "\n"); err != nil {
Expand Down Expand Up @@ -137,7 +150,8 @@ func (g GhostBlockStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunk
}

func (g GhostBlockStore) Version() string {
panic("GhostBlockStore does not support Version")
// This should never be used, but it makes testing a bit more ergonomic in a few places.
return constants.FormatDefaultString
}

func (g GhostBlockStore) AccessMode() chunks.ExclusiveAccessMode {
Expand Down
86 changes: 86 additions & 0 deletions go/store/nbs/ghost_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2025 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nbs

import (
"context"
"testing"

"github.com/stretchr/testify/require"

"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
)

func TestGhostBlockStore(t *testing.T) {
ctx := context.Background()
path := t.TempDir()
bs, err := NewGhostBlockStore(path)
require.NoError(t, err)
ghost, absent := hash.Parse("ifho8m890r9787lrpthif5ce6ru353fr"), hash.Parse("6af71afc2ea0hmp4olev0vp9q1q5gvb1")
require.NoError(t, bs.PersistGhostHashes(context.Background(), hash.NewHashSet(ghost)))
t.Run("Get", func(t *testing.T) {
t.Run("Ghost", func(t *testing.T) {
c, err := bs.Get(ctx, ghost)
require.NoError(t, err)
require.True(t, c.IsGhost())
})
t.Run("Absent", func(t *testing.T) {
c, err := bs.Get(ctx, absent)
require.NoError(t, err)
require.False(t, c.IsGhost())
require.True(t, c.IsEmpty())
})
})
t.Run("Has", func(t *testing.T) {
t.Run("Ghost", func(t *testing.T) {
h, err := bs.Has(ctx, ghost)
require.NoError(t, err)
require.True(t, h)
})
t.Run("Absent", func(t *testing.T) {
h, err := bs.Has(ctx, absent)
require.NoError(t, err)
require.False(t, h)
})
})
t.Run("HasMany", func(t *testing.T) {
a, err := bs.HasMany(ctx, hash.NewHashSet(absent, ghost))
require.NoError(t, err)
require.Len(t, a, 1)
require.True(t, a.Has(absent))
require.False(t, a.Has(ghost))
})
t.Run("GetMany", func(t *testing.T) {
var got []chunks.Chunk
err := bs.GetMany(ctx, hash.NewHashSet(absent, ghost), func(_ context.Context, c *chunks.Chunk) {
got = append(got, *c)
})
require.NoError(t, err)
require.Len(t, got, 1)
require.True(t, got[0].IsGhost())
require.Equal(t, ghost, got[0].Hash())
})
t.Run("GetManyCompressed", func(t *testing.T) {
var got []CompressedChunk
err := bs.GetManyCompressed(ctx, hash.NewHashSet(absent, ghost), func(_ context.Context, c CompressedChunk) {
got = append(got, c)
})
require.NoError(t, err)
require.Len(t, got, 1)
require.True(t, got[0].IsGhost())
require.Equal(t, ghost, got[0].Hash())
})
}
17 changes: 15 additions & 2 deletions go/store/nbs/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type CompressedChunk struct {

// CompressedData is just the snappy encoded byte buffer that stores the chunk data
CompressedData []byte

// true if the chunk is a ghost chunk.
ghost bool
}

// NewCompressedChunk creates a CompressedChunk
Expand All @@ -64,14 +67,20 @@ func NewCompressedChunk(h hash.Hash, buff []byte) (CompressedChunk, error) {
return CompressedChunk{H: h, FullCompressedChunk: buff, CompressedData: compressedData}, nil
}

func NewGhostCompressedChunk(h hash.Hash) CompressedChunk {
return CompressedChunk{H: h, ghost: true}
}

// ToChunk snappy decodes the compressed data and returns a chunks.Chunk
func (cmp CompressedChunk) ToChunk() (chunks.Chunk, error) {
data, err := snappy.Decode(nil, cmp.CompressedData)
if cmp.IsGhost() {
return *chunks.NewGhostChunk(cmp.H), nil
}

data, err := snappy.Decode(nil, cmp.CompressedData)
if err != nil {
return chunks.Chunk{}, err
}

return chunks.NewChunkWithHash(cmp.H, data), nil
}

Expand All @@ -96,6 +105,10 @@ func (cmp CompressedChunk) IsEmpty() bool {
return len(cmp.CompressedData) == 0 || (len(cmp.CompressedData) == 1 && cmp.CompressedData[0] == 0)
}

func (cmp CompressedChunk) IsGhost() bool {
return cmp.ghost
}

// CompressedSize returns the size of this CompressedChunk.
func (cmp CompressedChunk) CompressedSize() int {
return len(cmp.CompressedData)
Expand Down
2 changes: 1 addition & 1 deletion go/utils/copyrightshdrs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"strings"
)

var ExpectedHeader = regexp.MustCompile(`// Copyright (2019|2020|2021|2022|2023|2024|2019-2020|2019-2021|2019-2022|2019-2023|2019-2024|2020-2021|2020-2022|2020-2023|2020-2024|2021-2022|2021-2023|2021-2024|2022-2023|2022-2024|2023-2024) Dolthub, Inc.
var ExpectedHeader = regexp.MustCompile(`// Copyright (2019|2020|2021|2022|2023|2024|2025|2019-2020|2019-2021|2019-2022|2019-2023|2019-2024|2019-2025|2020-2021|2020-2022|2020-2023|2020-2024|2020-2025|2021-2022|2021-2023|2021-2024|2021-2025|2022-2023|2022-2024|2022-2025|2023-2024|2023-2025|2024-2025) Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 \(the "License"\);
// you may not use this file except in compliance with the License.
Expand Down

0 comments on commit 1681cbd

Please sign in to comment.