Skip to content

Commit

Permalink
[no-release-notes] go: nbs/store: Small cleanups to iterateAllChunks.
Browse files Browse the repository at this point in the history
Move it from fileTableReader onto tableReader, make it use ReadAt,
have it take Stats.
  • Loading branch information
reltuk committed Feb 4, 2025
1 parent b4b6d71 commit e701e5e
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 56 deletions.
2 changes: 1 addition & 1 deletion go/store/nbs/archive_chunk_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (acs archiveChunkSource) getManyCompressed(ctx context.Context, eg *errgrou
}, keeper, stats)
}

func (acs archiveChunkSource) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk)) error {
func (acs archiveChunkSource) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk), _ *Stats) error {
addrCount := uint32(len(acs.aRdr.prefixes))
for i := uint32(0); i < addrCount; i++ {
var h hash.Hash
Expand Down
2 changes: 1 addition & 1 deletion go/store/nbs/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,6 @@ func (tcs *testChunkSource) currentSize() uint64 {
panic("never used")
}

func (tcs *testChunkSource) iterateAllChunks(_ context.Context, _ func(chunks.Chunk)) error {
func (tcs *testChunkSource) iterateAllChunks(_ context.Context, _ func(chunks.Chunk), _ *Stats) error {
panic("never used")
}
5 changes: 0 additions & 5 deletions go/store/nbs/chunk_source_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package nbs
import (
"context"

"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
)

Expand Down Expand Up @@ -54,7 +53,3 @@ func (csa chunkSourceAdapter) clone() (chunkSource, error) {
}
return &chunkSourceAdapter{tr, csa.h}, nil
}

func (csa chunkSourceAdapter) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk)) error {
panic("unimplemented")
}
2 changes: 1 addition & 1 deletion go/store/nbs/empty_chunk_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,6 @@ func (ecs emptyChunkSource) clone() (chunkSource, error) {
return ecs, nil
}

func (ecs emptyChunkSource) iterateAllChunks(_ context.Context, _ func(chunks.Chunk)) error {
func (ecs emptyChunkSource) iterateAllChunks(_ context.Context, _ func(chunks.Chunk), _ *Stats) error {
return nil
}
44 changes: 0 additions & 44 deletions go/store/nbs/file_table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"path/filepath"
"time"

"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
)

Expand Down Expand Up @@ -170,49 +169,6 @@ func (ftr *fileTableReader) hash() hash.Hash {
return ftr.h
}

func (ftr *fileTableReader) iterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error {
count := ftr.idx.chunkCount()

rdr, err := ftr.tableReader.r.Reader(ctx)
if err != nil {
return err
}

skr, ok := rdr.(io.ReadSeeker)
if !ok {
return errors.New("runtime error: reader does not support seeking")
}

for i := uint32(0); i < count; i++ {
if ctx.Err() != nil {
return ctx.Err()
}

var h hash.Hash
ie, err := ftr.idx.indexEntry(i, &h)
if err != nil {
return err
}

readNBytes, err := readNFrom(skr, ie.Offset(), ie.Length())
if err != nil {
return err
}

cchk, err := NewCompressedChunk(h, readNBytes)
if err != nil {
return err
}
chk, err := cchk.ToChunk()
if err != nil {
return err
}

cb(chk)
}
return nil
}

func (ftr *fileTableReader) Close() error {
return ftr.tableReader.close()
}
Expand Down
2 changes: 1 addition & 1 deletion go/store/nbs/journal_chunk_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (s journalChunkSource) close() error {
return nil
}

func (s journalChunkSource) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk)) error {
func (s journalChunkSource) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk), _ *Stats) error {
var finalErr error

// TODO - a less time consuming lock is possible here. Using s.journal.snapshot and processJournalRecords()
Expand Down
4 changes: 2 additions & 2 deletions go/store/nbs/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1979,7 +1979,7 @@ func (gcf gcFinalizer) SwapChunksInStore(ctx context.Context) error {

func (nbs *NomsBlockStore) IterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error {
for _, v := range nbs.tables.novel {
err := v.iterateAllChunks(ctx, cb)
err := v.iterateAllChunks(ctx, cb, nbs.stats)
if err != nil {
return err
}
Expand All @@ -1988,7 +1988,7 @@ func (nbs *NomsBlockStore) IterateAllChunks(ctx context.Context, cb func(chunk c
}
}
for _, v := range nbs.tables.upstream {
err := v.iterateAllChunks(ctx, cb)
err := v.iterateAllChunks(ctx, cb, nbs.stats)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion go/store/nbs/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ type chunkSource interface {
// If there is a failure reading the chunk, the error will be returned - note that this can happen in the middle of
// the scan, and will likely mean that the scan didn't complete. Note that errors returned by this method are not
// related to the callback - if the callback discovers an error, it must manage that out of band.
iterateAllChunks(context.Context, func(chunk chunks.Chunk)) error
iterateAllChunks(context.Context, func(chunk chunks.Chunk), *Stats) error
}

type chunkSources []chunkSource
Expand Down
36 changes: 36 additions & 0 deletions go/store/nbs/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,3 +754,39 @@ func (tr tableReader) clone() (tableReader, error) {
blockSize: tr.blockSize,
}, nil
}

func (tr tableReader) iterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk), stats *Stats) error {
count := tr.idx.chunkCount()
for i := uint32(0); i < count; i++ {
if ctx.Err() != nil {
return ctx.Err()
}

var h hash.Hash
ie, err := tr.idx.indexEntry(i, &h)
if err != nil {
return err
}

res := make([]byte, ie.Length())
n, err := tr.r.ReadAtWithStats(ctx, res, int64(ie.Offset()), stats)
if err != nil {
return err
}
if uint32(n) != ie.Length() {
return errors.New("failed to read all data")
}

cchk, err := NewCompressedChunk(h, res)
if err != nil {
return err
}
chk, err := cchk.ToChunk()
if err != nil {
return err
}

cb(chk)
}
return nil
}

0 comments on commit e701e5e

Please sign in to comment.