diff --git a/go/libraries/doltcore/remotestorage/chunk_fetcher.go b/go/libraries/doltcore/remotestorage/chunk_fetcher.go index 91c7ef835f0..24bf2d5bab7 100644 --- a/go/libraries/doltcore/remotestorage/chunk_fetcher.go +++ b/go/libraries/doltcore/remotestorage/chunk_fetcher.go @@ -18,9 +18,12 @@ import ( "context" "errors" "io" + "sync" "sync/atomic" "time" + "github.com/cenkalti/backoff/v4" + "github.com/dolthub/gozstd" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -57,8 +60,9 @@ type ChunkFetcher struct { // buy having a Hash, but are empty. NM4. resCh chan nbs.ToChunker - abortCh chan struct{} - stats StatsRecorder + abortCh chan struct{} + stats StatsRecorder + dictCache *DictionaryCache } const ( @@ -68,7 +72,14 @@ const ( reliableCallDeliverRespTimeout = 15 * time.Second ) +var globalDictCache *DictionaryCache +var once sync.Once + func NewChunkFetcher(ctx context.Context, dcs *DoltChunkStore) *ChunkFetcher { + once.Do(func() { + globalDictCache = NewDictionaryCache(newDownloads(), dcs.csClient) + }) + eg, ctx := errgroup.WithContext(ctx) ret := &ChunkFetcher{ eg: eg, @@ -370,19 +381,27 @@ func (d downloads) Add(resp *remotesapi.DownloadLoc) { d.refreshes[path] = refresh } for _, r := range gr.Ranges { - // NM4 - this is where the offset is read!! do something here or nearby. - d.ranges.Insert(gr.Url, r.Hash, r.Offset, r.Length) + // NM4 - Split at this point? Break the dictionary into its own request. + d.ranges.Insert(gr.Url, r.Hash[:], r.Offset, r.Length, r.DictionaryOffset, r.DictionaryLength) + // if r.DictionaryLength == 0 { + // // NM4 - maybe invert the hash, and add it to a set of..... not sure. + // d.ranges.Insert(gr.Url, r.Hash, r.DictionaryOffset, r.DictionaryLength) + // } } } +// NM4 - On the client side, we only request HttpRanges for raw bytes. The struct includes the dictionary offset and length, +// but those only make sense in the response of DownloadLocations. func toGetRange(rs []*ranges.GetRange) *GetRange { ret := new(GetRange) for _, r := range rs { ret.Url = r.Url ret.Ranges = append(ret.Ranges, &remotesapi.RangeChunk{ - Hash: r.Hash, - Offset: r.Offset, - Length: r.Length, + Hash: r.Hash, + Offset: r.Offset, + Length: r.Length, + DictionaryOffset: r.DictionaryOffset, + DictionaryLength: r.DictionaryLength, }) } return ret @@ -596,3 +615,112 @@ func fetcherDownloadURLThread(ctx context.Context, fetchReqCh chan fetchReq, don } } } + +/////// + +type DictionaryKey struct { + url string + off uint64 + len uint32 +} + +type DictionaryCache struct { + mu sync.Mutex + cache map[DictionaryKey]*gozstd.DDict + client remotesapi.ChunkStoreServiceClient + dlds downloads +} + +func NewDictionaryCache(downloads downloads, client remotesapi.ChunkStoreServiceClient) *DictionaryCache { + return &DictionaryCache{ + mu: sync.Mutex{}, + cache: make(map[DictionaryKey]*gozstd.DDict), + client: client, + dlds: downloads, + } +} + +func (dc *DictionaryCache) Get(rang *GetRange, idx int, stats StatsRecorder, recorder reliable.HealthRecorder) (*gozstd.DDict, error) { + // Way too granular... but I'll use a real cache for production. prototype maddddddneeesssss + dc.mu.Lock() + defer dc.mu.Unlock() + + path := rang.ResourcePath() + off := rang.Ranges[idx].DictionaryOffset + ln := rang.Ranges[idx].DictionaryLength + + key := DictionaryKey{path, off, ln} + if v, ok := dc.cache[key]; ok { + return v, nil + } else { + + pathToUrl := dc.dlds.refreshes[path] + if pathToUrl == nil { + // Kinda do what Add does.... + refresh := new(locationRefresh) + + sRang := &remotesapi.HttpGetRange{} + sRang.Url = rang.Url + sRang.Ranges = append(sRang.Ranges, &remotesapi.RangeChunk{Offset: off, Length: ln}) + rang := &remotesapi.DownloadLoc_HttpGetRange{HttpGetRange: sRang} + dl := &remotesapi.DownloadLoc{Location: rang} + + refresh.Add(dl) + dc.dlds.refreshes[path] = refresh + + pathToUrl = refresh + } + + ctx := context.Background() + fetcher := globalHttpFetcher + + urlF := func(lastError error) (string, error) { + earl, err := pathToUrl.GetURL(ctx, lastError, dc.client) + if err != nil { + return "", err + } + if earl == "" { + earl = path + } + return earl, nil + } + + resp := reliable.StreamingRangeDownload(ctx, reliable.StreamingRangeRequest{ + Fetcher: fetcher, + Offset: off, + Length: uint64(ln), + UrlFact: urlF, + Stats: stats, + Health: recorder, + BackOffFact: func(ctx context.Context) backoff.BackOff { + return downloadBackOff(ctx, 3) // params.DownloadRetryCount) + }, + Throughput: reliable.MinimumThroughputCheck{ + CheckInterval: defaultRequestParams.ThroughputMinimumCheckInterval, + BytesPerCheck: defaultRequestParams.ThroughputMinimumBytesPerCheck, + NumIntervals: defaultRequestParams.ThroughputMinimumNumIntervals, + }, + RespHeadersTimeout: defaultRequestParams.RespHeadersTimeout, + }) + defer resp.Close() + + buf := make([]byte, ln) + _, err := io.ReadFull(resp.Body, buf) + if err != nil { + return nil, err + } + + rawDict, err := gozstd.Decompress(nil, buf) + if err != nil { + return nil, err + } + + dict, err := gozstd.NewDDict(rawDict) + if err != nil { + return nil, err + } + + dc.cache[key] = dict + return dict, nil + } +} diff --git a/go/libraries/doltcore/remotestorage/chunk_store.go b/go/libraries/doltcore/remotestorage/chunk_store.go index 72241b2d312..777febd4f54 100644 --- a/go/libraries/doltcore/remotestorage/chunk_store.go +++ b/go/libraries/doltcore/remotestorage/chunk_store.go @@ -32,6 +32,7 @@ import ( "time" "github.com/cenkalti/backoff/v4" + "github.com/dolthub/gozstd" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -371,6 +372,7 @@ func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.Ha return nil } +// NM4 - Extending the protobuf isn't not really necesary. Possible split this out into a new struct. type GetRange remotesapi.HttpGetRange func (gr *GetRange) ResourcePath() string { @@ -436,6 +438,7 @@ func (gr *GetRange) GetDownloadFunc(ctx context.Context, stats StatsRecorder, he if len(gr.Ranges) == 0 { return func() error { return nil } } + return func() error { urlF := func(lastError error) (string, error) { url, err := pathToUrl(ctx, lastError, gr.ResourcePath()) @@ -466,9 +469,9 @@ func (gr *GetRange) GetDownloadFunc(ctx context.Context, stats StatsRecorder, he RespHeadersTimeout: params.RespHeadersTimeout, }) defer resp.Close() - reader := &RangeChunkReader{GetRange: gr, Reader: resp.Body} + reader := &RangeChunkReader{Path: gr.ResourcePath(), GetRange: gr, Reader: resp.Body} for { - cc, err := reader.ReadChunk() + cc, err := reader.ReadChunk(stats, health) if errors.Is(err, io.EOF) { return nil } @@ -484,14 +487,59 @@ func (gr *GetRange) GetDownloadFunc(ctx context.Context, stats StatsRecorder, he } } +type ArchiveToChunker struct { + h hash.Hash + dictionary *gozstd.DDict + chunkData []byte +} + +func (a ArchiveToChunker) Hash() hash.Hash { + return a.h +} + +func (a ArchiveToChunker) ToChunk() (chunks.Chunk, error) { + dict := a.dictionary + data := a.chunkData + rawChunk, err := gozstd.DecompressDict(nil, data, dict) + // NM4 - calculate chunk addr for safety while testing. + newChunk := chunks.NewChunk(rawChunk) + + if newChunk.Hash() != a.h { + panic("Hash Mismatch!!") + } + + return newChunk, err + +} + +func (a ArchiveToChunker) FullCompressedChunkLen() uint32 { + //TODO Not sure what the right impl for this is.... NM4. + return uint32(len(a.chunkData)) // + dictionary??? +} + +func (a ArchiveToChunker) IsEmpty() bool { + //TODO implement me + return len(a.chunkData) == 0 +} + +func (a ArchiveToChunker) IsGhost() bool { + //TODO implement me + // NM4 - yes, need to. Or maybe not???? + return false +} + +var _ nbs.ToChunker = (*ArchiveToChunker)(nil) + type RangeChunkReader struct { + Path string GetRange *GetRange Reader io.Reader i int skip int } -func (r *RangeChunkReader) ReadChunk() (nbs.CompressedChunk, error) { +// NM4 - THis is the place where we need to intercept responses and conjour the "full" chunk. +func (r *RangeChunkReader) ReadChunk(stats StatsRecorder, health reliable.HealthRecorder) (nbs.ToChunker, error) { if r.skip > 0 { _, err := io.CopyN(io.Discard, r.Reader, int64(r.skip)) if err != nil { @@ -499,21 +547,41 @@ func (r *RangeChunkReader) ReadChunk() (nbs.CompressedChunk, error) { } r.skip = 0 } - if r.i >= len(r.GetRange.Ranges) { + + idx := r.i + r.i += 1 + + if idx >= len(r.GetRange.Ranges) { return nbs.CompressedChunk{}, io.EOF } - if r.i < len(r.GetRange.Ranges)-1 { - r.skip = int(r.GetRange.GapBetween(r.i, r.i+1)) + if idx < len(r.GetRange.Ranges)-1 { + r.skip = int(r.GetRange.GapBetween(idx, idx+1)) } - l := r.GetRange.Ranges[r.i].Length - h := hash.New(r.GetRange.Ranges[r.i].Hash) - r.i += 1 + + rang := r.GetRange.Ranges[idx] + l := rang.Length + h := hash.New(rang.Hash) + + if strings.HasPrefix(h.String(), "eh9e0b3ou") { + _ = h.String() + } + buf := make([]byte, l) _, err := io.ReadFull(r.Reader, buf) if err != nil { return nbs.CompressedChunk{}, err } else { - return nbs.NewCompressedChunk(h, buf) + if rang.DictionaryLength == 0 { + // NOMS snappy compressed chunk. + return nbs.NewCompressedChunk(h, buf) + } else { + dict, err := globalDictCache.Get(r.GetRange, idx, stats, health) + if err != nil { + return nbs.CompressedChunk{}, err + } + + return ArchiveToChunker{h: h, dictionary: dict, chunkData: buf}, nil + } } } diff --git a/go/libraries/doltcore/remotestorage/internal/ranges/ranges.go b/go/libraries/doltcore/remotestorage/internal/ranges/ranges.go index de1600bd0a4..9e3ebe16236 100644 --- a/go/libraries/doltcore/remotestorage/internal/ranges/ranges.go +++ b/go/libraries/doltcore/remotestorage/internal/ranges/ranges.go @@ -33,6 +33,11 @@ type GetRange struct { Offset uint64 Length uint32 Region *Region + + // Archive file format requires the url/dictionary offset/length to be carried through to fully resolve the chunk. + // This information is not used withing the range calculations at all, as the range is not related to the chunk content. + DictionaryOffset uint64 + DictionaryLength uint32 } // A |Region| represents a continuous range of bytes within in a Url. @@ -145,12 +150,14 @@ func (t *Tree) Len() int { return t.t.Len() } -func (t *Tree) Insert(url string, hash []byte, offset uint64, length uint32) { +func (t *Tree) Insert(url string, hash []byte, offset uint64, length uint32, dictOffset uint64, dictLength uint32) { ins := &GetRange{ - Url: t.intern(url), - Hash: hash, - Offset: offset, - Length: length, + Url: t.intern(url), + Hash: hash, + Offset: offset, + Length: length, + DictionaryOffset: dictOffset, + DictionaryLength: dictLength, } t.t.ReplaceOrInsert(ins) diff --git a/go/store/datas/pull/puller.go b/go/store/datas/pull/puller.go index 3e6a277c3e6..53beec0adbd 100644 --- a/go/store/datas/pull/puller.go +++ b/go/store/datas/pull/puller.go @@ -28,6 +28,7 @@ import ( "sync/atomic" "time" + "github.com/dolthub/dolt/go/libraries/doltcore/remotestorage" "golang.org/x/sync/errgroup" "github.com/dolthub/dolt/go/libraries/doltcore/dconfig" @@ -371,8 +372,14 @@ func (p *Puller) Pull(ctx context.Context) error { if err != nil { return err } - } else { - panic("TODO: handle ZStd-CompressedChunk") // NM4. + } else if _, ok := cChk.(remotestorage.ArchiveToChunker); ok { + // NM4 - Until we can write quickly to archives..... + cc := nbs.ChunkToCompressedChunk(chnk) + + err = p.wr.AddCompressedChunk(ctx, cc) + if err != nil { + return err + } } } }) diff --git a/go/store/nbs/archive.go b/go/store/nbs/archive.go index 650b45a0c96..65c4f3e6e11 100644 --- a/go/store/nbs/archive.go +++ b/go/store/nbs/archive.go @@ -27,7 +27,7 @@ Chunks from the Archive. ByteSpans are arbitrary offset/lengths into the file which store (1) zstd dictionary data, and (2) compressed chunk data. Each Chunk is stored as a pair of ByteSpans (dict,data). Dictionary ByteSpans can (should) be used by multiple Chunks, so there are more ByteSpans than Chunks. The Index is used to map Chunks to ByteSpan pairs. These pairs are -called ChunkRefs, and were store them as [uint32,uint32] on disk. This allows us to quickly find the ByteSpans for a +called ChunkRefs, and we store them as [uint32,uint32] on disk. This allows us to quickly find the ByteSpans for a given Chunk with minimal processing at load time. A Dolt Archive file follows the following format: diff --git a/go/store/nbs/archive_build.go b/go/store/nbs/archive_build.go index 071521b8c74..8a8ecf58737 100644 --- a/go/store/nbs/archive_build.go +++ b/go/store/nbs/archive_build.go @@ -57,7 +57,7 @@ func UnArchive(ctx context.Context, cs chunks.ChunkStore, smd StorageMetadata, p return err } if exists { - // We have a fast path to follow because oritinal table file is still on disk. + // We have a fast path to follow because original table file is still on disk. swapMap[arc.hash()] = orginTfId } else { // We don't have the original table file id, so we have to create a new one. diff --git a/go/store/nbs/archive_chunk_source.go b/go/store/nbs/archive_chunk_source.go index ebf1a854156..8ef6f0d710f 100644 --- a/go/store/nbs/archive_chunk_source.go +++ b/go/store/nbs/archive_chunk_source.go @@ -196,7 +196,7 @@ func (acs archiveChunkSource) getRecordRanges(_ context.Context, requests []getR result[hAddr] = rng } } - return result, gcBehavior_Block, nil // NM4 - FIXME. Merging. This is wrong. Use the keeperF + return result, gcBehavior_Continue, nil // NM4 - FIXME. Merging. This is wrong. Use the keeperF } func (acs archiveChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, ToChunker), keeper keeperF, stats *Stats) (bool, gcBehavior, error) { diff --git a/integration-tests/bats/archive.bats b/integration-tests/bats/archive.bats index 90ced9566cf..32100da5d30 100755 --- a/integration-tests/bats/archive.bats +++ b/integration-tests/bats/archive.bats @@ -115,19 +115,6 @@ mutations_and_gc_statement() { [ "$files" -eq "2" ] } -@test "archive: archive with remotesrv no go" { - dolt sql -q "$(mutations_and_gc_statement)" - dolt archive - - run dolt sql-server --remotesapi-port=12321 - [ "$status" -eq 1 ] - [[ "$output" =~ "archive files present" ]] || false - - run remotesrv --repo-mode - [ "$status" -eq 1 ] - [[ "$output" =~ "archive files present" ]] || false -} - @test "archive: archive --revert (fast)" { dolt sql -q "$(mutations_and_gc_statement)" dolt archive @@ -227,9 +214,50 @@ mutations_and_gc_statement() { [[ "$output" =~ "151525" ]] || false # i = 1 - 550, sum is 151525 } +@test "archive: can fetch chunks from an archived repo" { + mkdir -p remote/.dolt + mkdir cloned + + # Copy the archive test repo to remote directory + cp -R $BATS_TEST_DIRNAME/archive-test-repo/* remote/.dolt + cd remote + + port=$( definePORT ) + + remotesrv --http-port $port --grpc-port $port --repo-mode & + remotesrv_pid=$! + [[ "$remotesrv_pid" -gt 0 ]] || false + + cd ../cloned + dolt clone http://localhost:$port/test-org/test-repo repo1 + # Fetch when there are no changes. + cd repo1 + dolt fetch + + ## update the remote repo directly. Need to run the archive command when the server is stopped. + ## This will result in achived files on the remote, which we will need to read chunks from when we fetch. + cd ../../remote kill $remotesrv_pid wait $remotesrv_pid || : remotesrv_pid="" + dolt sql -q "$(mutations_and_gc_statement)" + dolt archive + + remotesrv --http-port $port --grpc-port $port --repo-mode & + remotesrv_pid=$! + [[ "$remotesrv_pid" -gt 0 ]] || false + + cd ../cloned/repo1 + + run dolt fetch + [ "$status" -eq 0 ] + + run dolt status + [ "$status" -eq 0 ] + + [[ "$output" =~ "Your branch is behind 'origin/main' by 20 commits, and can be fast-forwarded" ]] || false + # Verify the repo has integrity. + dolt fsck }