diff --git a/go/store/nbs/archive_chunk_source.go b/go/store/nbs/archive_chunk_source.go index 1b1f55d826..3d3a59ec4d 100644 --- a/go/store/nbs/archive_chunk_source.go +++ b/go/store/nbs/archive_chunk_source.go @@ -177,7 +177,7 @@ func (acs archiveChunkSource) getManyCompressed(ctx context.Context, eg *errgrou }, keeper, stats) } -func (acs archiveChunkSource) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk)) error { +func (acs archiveChunkSource) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk), _ *Stats) error { addrCount := uint32(len(acs.aRdr.prefixes)) for i := uint32(0); i < addrCount; i++ { var h hash.Hash diff --git a/go/store/nbs/archive_test.go b/go/store/nbs/archive_test.go index c78d3b5710..4de745e6a5 100644 --- a/go/store/nbs/archive_test.go +++ b/go/store/nbs/archive_test.go @@ -716,6 +716,6 @@ func (tcs *testChunkSource) currentSize() uint64 { panic("never used") } -func (tcs *testChunkSource) iterateAllChunks(_ context.Context, _ func(chunks.Chunk)) error { +func (tcs *testChunkSource) iterateAllChunks(_ context.Context, _ func(chunks.Chunk), _ *Stats) error { panic("never used") } diff --git a/go/store/nbs/chunk_source_adapter.go b/go/store/nbs/chunk_source_adapter.go index db1eb72fd6..f86d7527b4 100644 --- a/go/store/nbs/chunk_source_adapter.go +++ b/go/store/nbs/chunk_source_adapter.go @@ -17,7 +17,6 @@ package nbs import ( "context" - "github.com/dolthub/dolt/go/store/chunks" "github.com/dolthub/dolt/go/store/hash" ) @@ -54,7 +53,3 @@ func (csa chunkSourceAdapter) clone() (chunkSource, error) { } return &chunkSourceAdapter{tr, csa.h}, nil } - -func (csa chunkSourceAdapter) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk)) error { - panic("unimplemented") -} diff --git a/go/store/nbs/empty_chunk_source.go b/go/store/nbs/empty_chunk_source.go index 8d00c820de..60b7f953d2 100644 --- a/go/store/nbs/empty_chunk_source.go +++ b/go/store/nbs/empty_chunk_source.go @@ -94,6 +94,6 @@ func (ecs emptyChunkSource) clone() (chunkSource, error) { return ecs, nil } -func (ecs emptyChunkSource) iterateAllChunks(_ context.Context, _ func(chunks.Chunk)) error { +func (ecs emptyChunkSource) iterateAllChunks(_ context.Context, _ func(chunks.Chunk), _ *Stats) error { return nil } diff --git a/go/store/nbs/file_table_reader.go b/go/store/nbs/file_table_reader.go index 873450efd2..a95c2109c3 100644 --- a/go/store/nbs/file_table_reader.go +++ b/go/store/nbs/file_table_reader.go @@ -30,7 +30,6 @@ import ( "path/filepath" "time" - "github.com/dolthub/dolt/go/store/chunks" "github.com/dolthub/dolt/go/store/hash" ) @@ -170,49 +169,6 @@ func (ftr *fileTableReader) hash() hash.Hash { return ftr.h } -func (ftr *fileTableReader) iterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error { - count := ftr.idx.chunkCount() - - rdr, err := ftr.tableReader.r.Reader(ctx) - if err != nil { - return err - } - - skr, ok := rdr.(io.ReadSeeker) - if !ok { - return errors.New("runtime error: reader does not support seeking") - } - - for i := uint32(0); i < count; i++ { - if ctx.Err() != nil { - return ctx.Err() - } - - var h hash.Hash - ie, err := ftr.idx.indexEntry(i, &h) - if err != nil { - return err - } - - readNBytes, err := readNFrom(skr, ie.Offset(), ie.Length()) - if err != nil { - return err - } - - cchk, err := NewCompressedChunk(h, readNBytes) - if err != nil { - return err - } - chk, err := cchk.ToChunk() - if err != nil { - return err - } - - cb(chk) - } - return nil -} - func (ftr *fileTableReader) Close() error { return ftr.tableReader.close() } diff --git a/go/store/nbs/journal_chunk_source.go b/go/store/nbs/journal_chunk_source.go index e7bf50dc4a..10a5548b0c 100644 --- a/go/store/nbs/journal_chunk_source.go +++ b/go/store/nbs/journal_chunk_source.go @@ -230,7 +230,7 @@ func (s journalChunkSource) close() error { return nil } -func (s journalChunkSource) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk)) error { +func (s journalChunkSource) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk), _ *Stats) error { var finalErr error // TODO - a less time consuming lock is possible here. Using s.journal.snapshot and processJournalRecords() diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index a9c4911e36..b1f0ba99ea 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -1979,7 +1979,7 @@ func (gcf gcFinalizer) SwapChunksInStore(ctx context.Context) error { func (nbs *NomsBlockStore) IterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error { for _, v := range nbs.tables.novel { - err := v.iterateAllChunks(ctx, cb) + err := v.iterateAllChunks(ctx, cb, nbs.stats) if err != nil { return err } @@ -1988,7 +1988,7 @@ func (nbs *NomsBlockStore) IterateAllChunks(ctx context.Context, cb func(chunk c } } for _, v := range nbs.tables.upstream { - err := v.iterateAllChunks(ctx, cb) + err := v.iterateAllChunks(ctx, cb, nbs.stats) if err != nil { return err } diff --git a/go/store/nbs/table.go b/go/store/nbs/table.go index c476398973..234163be86 100644 --- a/go/store/nbs/table.go +++ b/go/store/nbs/table.go @@ -265,7 +265,7 @@ type chunkSource interface { // If there is a failure reading the chunk, the error will be returned - note that this can happen in the middle of // the scan, and will likely mean that the scan didn't complete. Note that errors returned by this method are not // related to the callback - if the callback discovers an error, it must manage that out of band. - iterateAllChunks(context.Context, func(chunk chunks.Chunk)) error + iterateAllChunks(context.Context, func(chunk chunks.Chunk), *Stats) error } type chunkSources []chunkSource diff --git a/go/store/nbs/table_reader.go b/go/store/nbs/table_reader.go index c55d48c28a..de66ffd27b 100644 --- a/go/store/nbs/table_reader.go +++ b/go/store/nbs/table_reader.go @@ -754,3 +754,39 @@ func (tr tableReader) clone() (tableReader, error) { blockSize: tr.blockSize, }, nil } + +func (tr tableReader) iterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk), stats *Stats) error { + count := tr.idx.chunkCount() + for i := uint32(0); i < count; i++ { + if ctx.Err() != nil { + return ctx.Err() + } + + var h hash.Hash + ie, err := tr.idx.indexEntry(i, &h) + if err != nil { + return err + } + + res := make([]byte, ie.Length()) + n, err := tr.r.ReadAtWithStats(ctx, res, int64(ie.Offset()), stats) + if err != nil { + return err + } + if uint32(n) != ie.Length() { + return errors.New("failed to read all data") + } + + cchk, err := NewCompressedChunk(h, res) + if err != nil { + return err + } + chk, err := cchk.ToChunk() + if err != nil { + return err + } + + cb(chk) + } + return nil +}