diff --git a/go/store/datas/pull/puller.go b/go/store/datas/pull/puller.go index 7d7806fb8c0..9401b58d7e6 100644 --- a/go/store/datas/pull/puller.go +++ b/go/store/datas/pull/puller.go @@ -343,6 +343,9 @@ func (p *Puller) Pull(ctx context.Context) error { if err != nil { return err } + if cChk.IsGhost() { + return fmt.Errorf("attempted to push or pull ghost chunk: %w", nbs.ErrGhostChunkRequested) + } if len(cChk.FullCompressedChunk) == 0 { return errors.New("failed to get all chunks.") } diff --git a/go/store/datas/pull/puller_test.go b/go/store/datas/pull/puller_test.go index 0fcef246a70..71d14e7496d 100644 --- a/go/store/datas/pull/puller_test.go +++ b/go/store/datas/pull/puller_test.go @@ -71,6 +71,40 @@ func TestChunkJournalPuller(t *testing.T) { }) } +func TestPuller(t *testing.T) { + t.Run("GhostChunk", func(t *testing.T) { + ctx := context.Background() + gs, err := nbs.NewGhostBlockStore(t.TempDir()) + waf, err := types.WalkAddrsForChunkStore(gs) + require.NoError(t, err) + + ghost := hash.Parse("e6esqr35dkqnc7updhj6ap5v82sahm9r") + + gs.PersistGhostHashes(ctx, hash.NewHashSet(ghost)) + + statsCh := make(chan Stats) + go func() { + for _ = range statsCh { + } + }() + + dir := filepath.Join(os.TempDir(), uuid.New().String()) + err = os.MkdirAll(dir, os.ModePerm) + require.NoError(t, err) + + nbf := types.Format_Default.VersionString() + q := nbs.NewUnlimitedMemQuotaProvider() + + st, err := nbs.NewLocalJournalingStore(ctx, nbf, dir, q) + require.NoError(t, err) + + plr, err := NewPuller(ctx, t.TempDir(), 128, gs, st, waf, []hash.Hash{ghost}, statsCh) + require.NoError(t, err) + err = plr.Pull(ctx) + require.ErrorIs(t, err, nbs.ErrGhostChunkRequested) + }) +} + func addTableValues(ctx context.Context, vrw types.ValueReadWriter, m types.Map, tableName string, alternatingKeyVals ...types.Value) (types.Map, error) { val, ok, err := m.MaybeGet(ctx, types.String(tableName)) diff --git a/go/store/nbs/cmp_chunk_table_writer.go b/go/store/nbs/cmp_chunk_table_writer.go index 308ccf2b17c..905154463ea 100644 --- a/go/store/nbs/cmp_chunk_table_writer.go +++ b/go/store/nbs/cmp_chunk_table_writer.go @@ -76,6 +76,12 @@ func (tw *CmpChunkTableWriter) GetMD5() []byte { // AddCmpChunk adds a compressed chunk func (tw *CmpChunkTableWriter) AddCmpChunk(c CompressedChunk) error { + if c.IsGhost() { + // Ghost chunks cannot be written to a table file. They should + // always be filtered by the write processes before landing + // here. + return ErrGhostChunkRequested + } if len(c.CompressedData) == 0 { panic("NBS blocks cannot be zero length") } diff --git a/go/store/nbs/cmp_chunk_table_writer_test.go b/go/store/nbs/cmp_chunk_table_writer_test.go index cd1c7450d41..170cc43cb64 100644 --- a/go/store/nbs/cmp_chunk_table_writer_test.go +++ b/go/store/nbs/cmp_chunk_table_writer_test.go @@ -95,6 +95,12 @@ func TestCmpChunkTableWriter(t *testing.T) { compareContentsOfTables(t, ctx, hashes, tr, outputTR) } +func TestCmpChunkTableWriterGhostChunk(t *testing.T) { + tw, err := NewCmpChunkTableWriter("") + require.NoError(t, err) + require.Error(t, tw.AddCmpChunk(NewGhostCompressedChunk(hash.Parse("6af71afc2ea0hmp4olev0vp9q1q5gvb1")))) +} + func TestContainsDuplicates(t *testing.T) { t.Run("Empty", func(t *testing.T) { require.False(t, containsDuplicates(prefixIndexSlice{})) diff --git a/go/store/nbs/generational_chunk_store.go b/go/store/nbs/generational_chunk_store.go index d88169be6a1..c2396ecc2b4 100644 --- a/go/store/nbs/generational_chunk_store.go +++ b/go/store/nbs/generational_chunk_store.go @@ -28,10 +28,10 @@ import ( ) var _ chunks.ChunkStore = (*GenerationalNBS)(nil) -var _ chunks.GenerationalCS = (*GenerationalNBS)(nil) var _ chunks.TableFileStore = (*GenerationalNBS)(nil) var _ chunks.GenerationalCS = (*GenerationalNBS)(nil) var _ chunks.ChunkStoreGarbageCollector = (*GenerationalNBS)(nil) +var _ NBSCompressedChunkStore = (*GenerationalNBS)(nil) type GenerationalNBS struct { oldGen *NomsBlockStore @@ -143,60 +143,38 @@ func (gcs *GenerationalNBS) GetMany(ctx context.Context, hashes hash.HashSet, fo } func (gcs *GenerationalNBS) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, CompressedChunk)) error { - mu := &sync.Mutex{} + var mu sync.Mutex notInOldGen := hashes.Copy() err := gcs.oldGen.GetManyCompressed(ctx, hashes, func(ctx context.Context, chunk CompressedChunk) { - func() { - mu.Lock() - defer mu.Unlock() - delete(notInOldGen, chunk.Hash()) - }() - + mu.Lock() + delete(notInOldGen, chunk.Hash()) + mu.Unlock() found(ctx, chunk) }) - if err != nil { return err } - if len(notInOldGen) == 0 { return nil } notFound := notInOldGen.Copy() err = gcs.newGen.GetManyCompressed(ctx, notInOldGen, func(ctx context.Context, chunk CompressedChunk) { - func() { - mu.Lock() - defer mu.Unlock() - delete(notFound, chunk.Hash()) - }() + mu.Lock() + delete(notFound, chunk.Hash()) + mu.Unlock() found(ctx, chunk) }) if err != nil { return err } - if len(notFound) == 0 { return nil } - // We are definitely missing some chunks. Check if any are ghost chunks, mainly to give a better error message. + // The missing chunks may be ghost chunks. if gcs.ghostGen != nil { - // If any of the hashes are in the ghost store. - ghostFound := false - err := gcs.ghostGen.GetMany(ctx, hashes, func(ctx context.Context, chunk *chunks.Chunk) { - // This should be true for all chunks in the ghost store. - if chunk.IsGhost() { - ghostFound = true - } - }) - - if err != nil { - return err - } - if ghostFound { - return ErrGhostChunkRequested - } + return gcs.ghostGen.GetManyCompressed(ctx, notFound, found) } return nil } diff --git a/go/store/nbs/ghost_store.go b/go/store/nbs/ghost_store.go index 45411a74419..11d23de6a68 100644 --- a/go/store/nbs/ghost_store.go +++ b/go/store/nbs/ghost_store.go @@ -24,6 +24,7 @@ import ( "path/filepath" "github.com/dolthub/dolt/go/store/chunks" + "github.com/dolthub/dolt/go/store/constants" "github.com/dolthub/dolt/go/store/hash" ) @@ -32,8 +33,9 @@ type GhostBlockStore struct { ghostObjectsFile string } -// We use the Has, HasMany, Get, GetMany, and PersistGhostHashes methods from the ChunkStore interface. All other methods are not supported. -var _ chunks.ChunkStore = &GhostBlockStore{} +// We use the Has, HasMany, Get, GetMany, GetManyCompressed, and PersistGhostHashes methods from the ChunkStore interface. All other methods are not supported. +var _ chunks.ChunkStore = (*GhostBlockStore)(nil) +var _ NBSCompressedChunkStore = (*GenerationalNBS)(nil) // NewGhostBlockStore returns a new GhostBlockStore instance. Currently the only parameter is the path to the directory // where we will create a text file called ghostObjects.txt. This file will contain the hashes of the ghost objects. Creation @@ -52,6 +54,7 @@ func NewGhostBlockStore(nomsPath string) (*GhostBlockStore, error) { // Other error, permission denied, etc, we want to hear about. return nil, err } + defer f.Close() scanner := bufio.NewScanner(f) skiplist := &hash.HashSet{} for scanner.Scan() { @@ -87,6 +90,15 @@ func (g GhostBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, found return nil } +func (g GhostBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, CompressedChunk)) error { + for h := range hashes { + if g.skippedRefs.Has(h) { + found(ctx, NewGhostCompressedChunk(h)) + } + } + return nil +} + func (g *GhostBlockStore) PersistGhostHashes(ctx context.Context, hashes hash.HashSet) error { if hashes.Size() == 0 { return fmt.Errorf("runtime error. PersistGhostHashes called with empty hash set") @@ -96,6 +108,7 @@ func (g *GhostBlockStore) PersistGhostHashes(ctx context.Context, hashes hash.Ha if err != nil { return err } + defer f.Close() for h := range hashes { if _, err := f.WriteString(h.String() + "\n"); err != nil { @@ -137,7 +150,8 @@ func (g GhostBlockStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunk } func (g GhostBlockStore) Version() string { - panic("GhostBlockStore does not support Version") + // This should never be used, but it makes testing a bit more ergonomic in a few places. + return constants.FormatDefaultString } func (g GhostBlockStore) AccessMode() chunks.ExclusiveAccessMode { diff --git a/go/store/nbs/ghost_store_test.go b/go/store/nbs/ghost_store_test.go new file mode 100644 index 00000000000..e6fc6146f7e --- /dev/null +++ b/go/store/nbs/ghost_store_test.go @@ -0,0 +1,86 @@ +// Copyright 2025 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nbs + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/dolthub/dolt/go/store/chunks" + "github.com/dolthub/dolt/go/store/hash" +) + +func TestGhostBlockStore(t *testing.T) { + ctx := context.Background() + path := t.TempDir() + bs, err := NewGhostBlockStore(path) + require.NoError(t, err) + ghost, absent := hash.Parse("ifho8m890r9787lrpthif5ce6ru353fr"), hash.Parse("6af71afc2ea0hmp4olev0vp9q1q5gvb1") + require.NoError(t, bs.PersistGhostHashes(context.Background(), hash.NewHashSet(ghost))) + t.Run("Get", func(t *testing.T) { + t.Run("Ghost", func(t *testing.T) { + c, err := bs.Get(ctx, ghost) + require.NoError(t, err) + require.True(t, c.IsGhost()) + }) + t.Run("Absent", func(t *testing.T) { + c, err := bs.Get(ctx, absent) + require.NoError(t, err) + require.False(t, c.IsGhost()) + require.True(t, c.IsEmpty()) + }) + }) + t.Run("Has", func(t *testing.T) { + t.Run("Ghost", func(t *testing.T) { + h, err := bs.Has(ctx, ghost) + require.NoError(t, err) + require.True(t, h) + }) + t.Run("Absent", func(t *testing.T) { + h, err := bs.Has(ctx, absent) + require.NoError(t, err) + require.False(t, h) + }) + }) + t.Run("HasMany", func(t *testing.T) { + a, err := bs.HasMany(ctx, hash.NewHashSet(absent, ghost)) + require.NoError(t, err) + require.Len(t, a, 1) + require.True(t, a.Has(absent)) + require.False(t, a.Has(ghost)) + }) + t.Run("GetMany", func(t *testing.T) { + var got []chunks.Chunk + err := bs.GetMany(ctx, hash.NewHashSet(absent, ghost), func(_ context.Context, c *chunks.Chunk) { + got = append(got, *c) + }) + require.NoError(t, err) + require.Len(t, got, 1) + require.True(t, got[0].IsGhost()) + require.Equal(t, ghost, got[0].Hash()) + }) + t.Run("GetManyCompressed", func(t *testing.T) { + var got []CompressedChunk + err := bs.GetManyCompressed(ctx, hash.NewHashSet(absent, ghost), func(_ context.Context, c CompressedChunk) { + got = append(got, c) + }) + require.NoError(t, err) + require.Len(t, got, 1) + require.True(t, got[0].IsGhost()) + require.Equal(t, ghost, got[0].Hash()) + }) +} diff --git a/go/store/nbs/table_reader.go b/go/store/nbs/table_reader.go index 820dc9f81f0..3ff059fb480 100644 --- a/go/store/nbs/table_reader.go +++ b/go/store/nbs/table_reader.go @@ -48,6 +48,9 @@ type CompressedChunk struct { // CompressedData is just the snappy encoded byte buffer that stores the chunk data CompressedData []byte + + // true if the chunk is a ghost chunk. + ghost bool } // NewCompressedChunk creates a CompressedChunk @@ -64,14 +67,20 @@ func NewCompressedChunk(h hash.Hash, buff []byte) (CompressedChunk, error) { return CompressedChunk{H: h, FullCompressedChunk: buff, CompressedData: compressedData}, nil } +func NewGhostCompressedChunk(h hash.Hash) CompressedChunk { + return CompressedChunk{H: h, ghost: true} +} + // ToChunk snappy decodes the compressed data and returns a chunks.Chunk func (cmp CompressedChunk) ToChunk() (chunks.Chunk, error) { - data, err := snappy.Decode(nil, cmp.CompressedData) + if cmp.IsGhost() { + return *chunks.NewGhostChunk(cmp.H), nil + } + data, err := snappy.Decode(nil, cmp.CompressedData) if err != nil { return chunks.Chunk{}, err } - return chunks.NewChunkWithHash(cmp.H, data), nil } @@ -96,6 +105,10 @@ func (cmp CompressedChunk) IsEmpty() bool { return len(cmp.CompressedData) == 0 || (len(cmp.CompressedData) == 1 && cmp.CompressedData[0] == 0) } +func (cmp CompressedChunk) IsGhost() bool { + return cmp.ghost +} + // CompressedSize returns the size of this CompressedChunk. func (cmp CompressedChunk) CompressedSize() int { return len(cmp.CompressedData) diff --git a/go/utils/copyrightshdrs/main.go b/go/utils/copyrightshdrs/main.go index c31b453b336..c7895da6e4a 100644 --- a/go/utils/copyrightshdrs/main.go +++ b/go/utils/copyrightshdrs/main.go @@ -23,7 +23,7 @@ import ( "strings" ) -var ExpectedHeader = regexp.MustCompile(`// Copyright (2019|2020|2021|2022|2023|2024|2019-2020|2019-2021|2019-2022|2019-2023|2019-2024|2020-2021|2020-2022|2020-2023|2020-2024|2021-2022|2021-2023|2021-2024|2022-2023|2022-2024|2023-2024) Dolthub, Inc. +var ExpectedHeader = regexp.MustCompile(`// Copyright (2019|2020|2021|2022|2023|2024|2025|2019-2020|2019-2021|2019-2022|2019-2023|2019-2024|2019-2025|2020-2021|2020-2022|2020-2023|2020-2024|2020-2025|2021-2022|2021-2023|2021-2024|2021-2025|2022-2023|2022-2024|2022-2025|2023-2024|2023-2025|2024-2025) Dolthub, Inc. // // Licensed under the Apache License, Version 2.0 \(the "License"\); // you may not use this file except in compliance with the License.