Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[no-release-notes] go/store/nbs: Make CompressedChunk support ghost chunks and make and GhostBlockStore support GetManyCompressed. #8748

Merged
merged 5 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go/store/datas/pull/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,9 @@ func (p *Puller) Pull(ctx context.Context) error {
if err != nil {
return err
}
if cChk.IsGhost() {
return fmt.Errorf("attempted to push or pull ghost chunk: %w", nbs.ErrGhostChunkRequested)
}
if len(cChk.FullCompressedChunk) == 0 {
return errors.New("failed to get all chunks.")
}
Expand Down
6 changes: 6 additions & 0 deletions go/store/nbs/cmp_chunk_table_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ func (tw *CmpChunkTableWriter) GetMD5() []byte {

// AddCmpChunk adds a compressed chunk
func (tw *CmpChunkTableWriter) AddCmpChunk(c CompressedChunk) error {
if c.IsGhost() {
// Ghost chunks cannot be written to a table file. They should
// always be filtered by the write processes before landing
// here.
return ErrGhostChunkRequested
}
if len(c.CompressedData) == 0 {
panic("NBS blocks cannot be zero length")
}
Expand Down
42 changes: 10 additions & 32 deletions go/store/nbs/generational_chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (
)

var _ chunks.ChunkStore = (*GenerationalNBS)(nil)
var _ chunks.GenerationalCS = (*GenerationalNBS)(nil)
var _ chunks.TableFileStore = (*GenerationalNBS)(nil)
var _ chunks.GenerationalCS = (*GenerationalNBS)(nil)
var _ chunks.ChunkStoreGarbageCollector = (*GenerationalNBS)(nil)
var _ NBSCompressedChunkStore = (*GenerationalNBS)(nil)

type GenerationalNBS struct {
oldGen *NomsBlockStore
Expand Down Expand Up @@ -143,60 +143,38 @@ func (gcs *GenerationalNBS) GetMany(ctx context.Context, hashes hash.HashSet, fo
}

func (gcs *GenerationalNBS) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, CompressedChunk)) error {
mu := &sync.Mutex{}
var mu sync.Mutex
notInOldGen := hashes.Copy()
err := gcs.oldGen.GetManyCompressed(ctx, hashes, func(ctx context.Context, chunk CompressedChunk) {
func() {
mu.Lock()
defer mu.Unlock()
delete(notInOldGen, chunk.Hash())
}()

mu.Lock()
delete(notInOldGen, chunk.Hash())
mu.Unlock()
found(ctx, chunk)
})

if err != nil {
return err
}

if len(notInOldGen) == 0 {
return nil
}

notFound := notInOldGen.Copy()
err = gcs.newGen.GetManyCompressed(ctx, notInOldGen, func(ctx context.Context, chunk CompressedChunk) {
func() {
mu.Lock()
defer mu.Unlock()
delete(notFound, chunk.Hash())
}()
mu.Lock()
delete(notFound, chunk.Hash())
mu.Unlock()
found(ctx, chunk)
})
if err != nil {
return err
}

if len(notFound) == 0 {
return nil
}

// We are definitely missing some chunks. Check if any are ghost chunks, mainly to give a better error message.
// The missing chunks may be ghost chunks.
if gcs.ghostGen != nil {
// If any of the hashes are in the ghost store.
ghostFound := false
err := gcs.ghostGen.GetMany(ctx, hashes, func(ctx context.Context, chunk *chunks.Chunk) {
// This should be true for all chunks in the ghost store.
if chunk.IsGhost() {
ghostFound = true
}
})

if err != nil {
return err
}
if ghostFound {
return ErrGhostChunkRequested
}
return gcs.ghostGen.GetManyCompressed(ctx, notFound, found)
}
return nil
}
Expand Down
14 changes: 12 additions & 2 deletions go/store/nbs/ghost_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ type GhostBlockStore struct {
ghostObjectsFile string
}

// We use the Has, HasMany, Get, GetMany, and PersistGhostHashes methods from the ChunkStore interface. All other methods are not supported.
var _ chunks.ChunkStore = &GhostBlockStore{}
// We use the Has, HasMany, Get, GetMany, GetManyCompressed, and PersistGhostHashes methods from the ChunkStore interface. All other methods are not supported.
var _ chunks.ChunkStore = (*GhostBlockStore)(nil)
var _ NBSCompressedChunkStore = (*GenerationalNBS)(nil)

// NewGhostBlockStore returns a new GhostBlockStore instance. Currently the only parameter is the path to the directory
// where we will create a text file called ghostObjects.txt. This file will contain the hashes of the ghost objects. Creation
Expand Down Expand Up @@ -87,6 +88,15 @@ func (g GhostBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, found
return nil
}

func (g GhostBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, CompressedChunk)) error {
for h := range hashes {
if g.skippedRefs.Has(h) {
found(ctx, NewGhostCompressedChunk(h))
}
}
return nil
}

func (g *GhostBlockStore) PersistGhostHashes(ctx context.Context, hashes hash.HashSet) error {
if hashes.Size() == 0 {
return fmt.Errorf("runtime error. PersistGhostHashes called with empty hash set")
Expand Down
17 changes: 15 additions & 2 deletions go/store/nbs/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type CompressedChunk struct {

// CompressedData is just the snappy encoded byte buffer that stores the chunk data
CompressedData []byte

// true if the chunk is a ghost chunk.
ghost bool
}

// NewCompressedChunk creates a CompressedChunk
Expand All @@ -64,14 +67,20 @@ func NewCompressedChunk(h hash.Hash, buff []byte) (CompressedChunk, error) {
return CompressedChunk{H: h, FullCompressedChunk: buff, CompressedData: compressedData}, nil
}

func NewGhostCompressedChunk(h hash.Hash) CompressedChunk {
return CompressedChunk{H: h, ghost: true}
}

// ToChunk snappy decodes the compressed data and returns a chunks.Chunk
func (cmp CompressedChunk) ToChunk() (chunks.Chunk, error) {
data, err := snappy.Decode(nil, cmp.CompressedData)
if cmp.IsGhost() {
return *chunks.NewGhostChunk(cmp.H), nil
}

data, err := snappy.Decode(nil, cmp.CompressedData)
if err != nil {
return chunks.Chunk{}, err
}

return chunks.NewChunkWithHash(cmp.H, data), nil
}

Expand All @@ -96,6 +105,10 @@ func (cmp CompressedChunk) IsEmpty() bool {
return len(cmp.CompressedData) == 0 || (len(cmp.CompressedData) == 1 && cmp.CompressedData[0] == 0)
}

func (cmp CompressedChunk) IsGhost() bool {
return cmp.ghost
}

// CompressedSize returns the size of this CompressedChunk.
func (cmp CompressedChunk) CompressedSize() int {
return len(cmp.CompressedData)
Expand Down
Loading