Skip to content

Commit

Permalink
First steel thread for archive fetch.
Browse files Browse the repository at this point in the history
  • Loading branch information
macneale4 committed Feb 5, 2025
1 parent c9b9be7 commit 08e0bd9
Show file tree
Hide file tree
Showing 8 changed files with 278 additions and 40 deletions.
142 changes: 135 additions & 7 deletions go/libraries/doltcore/remotestorage/chunk_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ import (
"context"
"errors"
"io"
"sync"
"sync/atomic"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/dolthub/gozstd"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"

Expand Down Expand Up @@ -57,8 +60,9 @@ type ChunkFetcher struct {
// buy having a Hash, but are empty. NM4.
resCh chan nbs.ToChunker

abortCh chan struct{}
stats StatsRecorder
abortCh chan struct{}
stats StatsRecorder
dictCache *DictionaryCache
}

const (
Expand All @@ -68,7 +72,14 @@ const (
reliableCallDeliverRespTimeout = 15 * time.Second
)

var globalDictCache *DictionaryCache
var once sync.Once

func NewChunkFetcher(ctx context.Context, dcs *DoltChunkStore) *ChunkFetcher {
once.Do(func() {
globalDictCache = NewDictionaryCache(newDownloads(), dcs.csClient)
})

eg, ctx := errgroup.WithContext(ctx)
ret := &ChunkFetcher{
eg: eg,
Expand Down Expand Up @@ -370,19 +381,27 @@ func (d downloads) Add(resp *remotesapi.DownloadLoc) {
d.refreshes[path] = refresh
}
for _, r := range gr.Ranges {
// NM4 - this is where the offset is read!! do something here or nearby.
d.ranges.Insert(gr.Url, r.Hash, r.Offset, r.Length)
// NM4 - Split at this point? Break the dictionary into its own request.
d.ranges.Insert(gr.Url, r.Hash[:], r.Offset, r.Length, r.DictionaryOffset, r.DictionaryLength)
// if r.DictionaryLength == 0 {
// // NM4 - maybe invert the hash, and add it to a set of..... not sure.
// d.ranges.Insert(gr.Url, r.Hash, r.DictionaryOffset, r.DictionaryLength)
// }
}
}

// NM4 - On the client side, we only request HttpRanges for raw bytes. The struct includes the dictionary offset and length,
// but those only make sense in the response of DownloadLocations.
func toGetRange(rs []*ranges.GetRange) *GetRange {
ret := new(GetRange)
for _, r := range rs {
ret.Url = r.Url
ret.Ranges = append(ret.Ranges, &remotesapi.RangeChunk{
Hash: r.Hash,
Offset: r.Offset,
Length: r.Length,
Hash: r.Hash,
Offset: r.Offset,
Length: r.Length,
DictionaryOffset: r.DictionaryOffset,
DictionaryLength: r.DictionaryLength,
})
}
return ret
Expand Down Expand Up @@ -596,3 +615,112 @@ func fetcherDownloadURLThread(ctx context.Context, fetchReqCh chan fetchReq, don
}
}
}

///////

type DictionaryKey struct {
url string
off uint64
len uint32
}

type DictionaryCache struct {
mu sync.Mutex
cache map[DictionaryKey]*gozstd.DDict
client remotesapi.ChunkStoreServiceClient
dlds downloads
}

func NewDictionaryCache(downloads downloads, client remotesapi.ChunkStoreServiceClient) *DictionaryCache {
return &DictionaryCache{
mu: sync.Mutex{},
cache: make(map[DictionaryKey]*gozstd.DDict),
client: client,
dlds: downloads,
}
}

func (dc *DictionaryCache) Get(rang *GetRange, idx int, stats StatsRecorder, recorder reliable.HealthRecorder) (*gozstd.DDict, error) {
// Way too granular... but I'll use a real cache for production. prototype maddddddneeesssss
dc.mu.Lock()
defer dc.mu.Unlock()

path := rang.ResourcePath()
off := rang.Ranges[idx].DictionaryOffset
ln := rang.Ranges[idx].DictionaryLength

key := DictionaryKey{path, off, ln}
if v, ok := dc.cache[key]; ok {
return v, nil
} else {

pathToUrl := dc.dlds.refreshes[path]
if pathToUrl == nil {
// Kinda do what Add does....
refresh := new(locationRefresh)

sRang := &remotesapi.HttpGetRange{}
sRang.Url = rang.Url
sRang.Ranges = append(sRang.Ranges, &remotesapi.RangeChunk{Offset: off, Length: ln})
rang := &remotesapi.DownloadLoc_HttpGetRange{HttpGetRange: sRang}
dl := &remotesapi.DownloadLoc{Location: rang}

refresh.Add(dl)
dc.dlds.refreshes[path] = refresh

pathToUrl = refresh
}

ctx := context.Background()
fetcher := globalHttpFetcher

urlF := func(lastError error) (string, error) {
earl, err := pathToUrl.GetURL(ctx, lastError, dc.client)
if err != nil {
return "", err
}
if earl == "" {
earl = path
}
return earl, nil
}

resp := reliable.StreamingRangeDownload(ctx, reliable.StreamingRangeRequest{
Fetcher: fetcher,
Offset: off,
Length: uint64(ln),
UrlFact: urlF,
Stats: stats,
Health: recorder,
BackOffFact: func(ctx context.Context) backoff.BackOff {
return downloadBackOff(ctx, 3) // params.DownloadRetryCount)
},
Throughput: reliable.MinimumThroughputCheck{
CheckInterval: defaultRequestParams.ThroughputMinimumCheckInterval,
BytesPerCheck: defaultRequestParams.ThroughputMinimumBytesPerCheck,
NumIntervals: defaultRequestParams.ThroughputMinimumNumIntervals,
},
RespHeadersTimeout: defaultRequestParams.RespHeadersTimeout,
})
defer resp.Close()

buf := make([]byte, ln)
_, err := io.ReadFull(resp.Body, buf)
if err != nil {
return nil, err
}

rawDict, err := gozstd.Decompress(nil, buf)
if err != nil {
return nil, err
}

dict, err := gozstd.NewDDict(rawDict)
if err != nil {
return nil, err
}

dc.cache[key] = dict
return dict, nil
}
}
88 changes: 78 additions & 10 deletions go/libraries/doltcore/remotestorage/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"time"

"github.com/cenkalti/backoff/v4"
"github.com/dolthub/gozstd"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -371,6 +372,7 @@ func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.Ha
return nil
}

// NM4 - Extending the protobuf isn't not really necesary. Possible split this out into a new struct.
type GetRange remotesapi.HttpGetRange

func (gr *GetRange) ResourcePath() string {
Expand Down Expand Up @@ -436,6 +438,7 @@ func (gr *GetRange) GetDownloadFunc(ctx context.Context, stats StatsRecorder, he
if len(gr.Ranges) == 0 {
return func() error { return nil }
}

return func() error {
urlF := func(lastError error) (string, error) {
url, err := pathToUrl(ctx, lastError, gr.ResourcePath())
Expand Down Expand Up @@ -466,9 +469,9 @@ func (gr *GetRange) GetDownloadFunc(ctx context.Context, stats StatsRecorder, he
RespHeadersTimeout: params.RespHeadersTimeout,
})
defer resp.Close()
reader := &RangeChunkReader{GetRange: gr, Reader: resp.Body}
reader := &RangeChunkReader{Path: gr.ResourcePath(), GetRange: gr, Reader: resp.Body}
for {
cc, err := reader.ReadChunk()
cc, err := reader.ReadChunk(stats, health)
if errors.Is(err, io.EOF) {
return nil
}
Expand All @@ -484,36 +487,101 @@ func (gr *GetRange) GetDownloadFunc(ctx context.Context, stats StatsRecorder, he
}
}

type ArchiveToChunker struct {
h hash.Hash
dictionary *gozstd.DDict
chunkData []byte
}

func (a ArchiveToChunker) Hash() hash.Hash {
return a.h
}

func (a ArchiveToChunker) ToChunk() (chunks.Chunk, error) {
dict := a.dictionary
data := a.chunkData
rawChunk, err := gozstd.DecompressDict(nil, data, dict)
// NM4 - calculate chunk addr for safety while testing.
newChunk := chunks.NewChunk(rawChunk)

if newChunk.Hash() != a.h {
panic("Hash Mismatch!!")
}

return newChunk, err

}

func (a ArchiveToChunker) FullCompressedChunkLen() uint32 {
//TODO Not sure what the right impl for this is.... NM4.
return uint32(len(a.chunkData)) // + dictionary???
}

func (a ArchiveToChunker) IsEmpty() bool {
//TODO implement me
return len(a.chunkData) == 0
}

func (a ArchiveToChunker) IsGhost() bool {
//TODO implement me
// NM4 - yes, need to. Or maybe not????
return false
}

var _ nbs.ToChunker = (*ArchiveToChunker)(nil)

type RangeChunkReader struct {
Path string
GetRange *GetRange
Reader io.Reader
i int
skip int
}

func (r *RangeChunkReader) ReadChunk() (nbs.CompressedChunk, error) {
// NM4 - THis is the place where we need to intercept responses and conjour the "full" chunk.
func (r *RangeChunkReader) ReadChunk(stats StatsRecorder, health reliable.HealthRecorder) (nbs.ToChunker, error) {
if r.skip > 0 {
_, err := io.CopyN(io.Discard, r.Reader, int64(r.skip))
if err != nil {
return nbs.CompressedChunk{}, err
}
r.skip = 0
}
if r.i >= len(r.GetRange.Ranges) {

idx := r.i
r.i += 1

if idx >= len(r.GetRange.Ranges) {
return nbs.CompressedChunk{}, io.EOF
}
if r.i < len(r.GetRange.Ranges)-1 {
r.skip = int(r.GetRange.GapBetween(r.i, r.i+1))
if idx < len(r.GetRange.Ranges)-1 {
r.skip = int(r.GetRange.GapBetween(idx, idx+1))
}
l := r.GetRange.Ranges[r.i].Length
h := hash.New(r.GetRange.Ranges[r.i].Hash)
r.i += 1

rang := r.GetRange.Ranges[idx]
l := rang.Length
h := hash.New(rang.Hash)

if strings.HasPrefix(h.String(), "eh9e0b3ou") {
_ = h.String()
}

buf := make([]byte, l)
_, err := io.ReadFull(r.Reader, buf)
if err != nil {
return nbs.CompressedChunk{}, err
} else {
return nbs.NewCompressedChunk(h, buf)
if rang.DictionaryLength == 0 {
// NOMS snappy compressed chunk.
return nbs.NewCompressedChunk(h, buf)
} else {
dict, err := globalDictCache.Get(r.GetRange, idx, stats, health)
if err != nil {
return nbs.CompressedChunk{}, err
}

return ArchiveToChunker{h: h, dictionary: dict, chunkData: buf}, nil
}
}
}

Expand Down
17 changes: 12 additions & 5 deletions go/libraries/doltcore/remotestorage/internal/ranges/ranges.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ type GetRange struct {
Offset uint64
Length uint32
Region *Region

// Archive file format requires the url/dictionary offset/length to be carried through to fully resolve the chunk.
// This information is not used withing the range calculations at all, as the range is not related to the chunk content.
DictionaryOffset uint64
DictionaryLength uint32
}

// A |Region| represents a continuous range of bytes within in a Url.
Expand Down Expand Up @@ -145,12 +150,14 @@ func (t *Tree) Len() int {
return t.t.Len()
}

func (t *Tree) Insert(url string, hash []byte, offset uint64, length uint32) {
func (t *Tree) Insert(url string, hash []byte, offset uint64, length uint32, dictOffset uint64, dictLength uint32) {
ins := &GetRange{
Url: t.intern(url),
Hash: hash,
Offset: offset,
Length: length,
Url: t.intern(url),
Hash: hash,
Offset: offset,
Length: length,
DictionaryOffset: dictOffset,
DictionaryLength: dictLength,
}
t.t.ReplaceOrInsert(ins)

Expand Down
11 changes: 9 additions & 2 deletions go/store/datas/pull/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sync/atomic"
"time"

"github.com/dolthub/dolt/go/libraries/doltcore/remotestorage"
"golang.org/x/sync/errgroup"

"github.com/dolthub/dolt/go/libraries/doltcore/dconfig"
Expand Down Expand Up @@ -371,8 +372,14 @@ func (p *Puller) Pull(ctx context.Context) error {
if err != nil {
return err
}
} else {
panic("TODO: handle ZStd-CompressedChunk") // NM4.
} else if _, ok := cChk.(remotestorage.ArchiveToChunker); ok {
// NM4 - Until we can write quickly to archives.....
cc := nbs.ChunkToCompressedChunk(chnk)

err = p.wr.AddCompressedChunk(ctx, cc)
if err != nil {
return err
}
}
}
})
Expand Down
2 changes: 1 addition & 1 deletion go/store/nbs/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Chunks from the Archive.
ByteSpans are arbitrary offset/lengths into the file which store (1) zstd dictionary data, and (2) compressed chunk
data. Each Chunk is stored as a pair of ByteSpans (dict,data). Dictionary ByteSpans can (should) be used by multiple
Chunks, so there are more ByteSpans than Chunks. The Index is used to map Chunks to ByteSpan pairs. These pairs are
called ChunkRefs, and were store them as [uint32,uint32] on disk. This allows us to quickly find the ByteSpans for a
called ChunkRefs, and we store them as [uint32,uint32] on disk. This allows us to quickly find the ByteSpans for a
given Chunk with minimal processing at load time.
A Dolt Archive file follows the following format:
Expand Down
Loading

0 comments on commit 08e0bd9

Please sign in to comment.