Skip to content

Commit

Permalink
Merge pull request #8861 from dolthub/aaron/remotestorage-cache-cleanup
Browse files Browse the repository at this point in the history
[no-release-notes] go: remotestorage: chunk_store.go: Clean up ChunkCache.
  • Loading branch information
reltuk authored Feb 21, 2025
2 parents ed5eaaf + d586d2a commit 89fce6e
Show file tree
Hide file tree
Showing 7 changed files with 551 additions and 286 deletions.
31 changes: 15 additions & 16 deletions go/libraries/doltcore/remotestorage/chunk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,21 @@ import (
"github.com/dolthub/dolt/go/store/nbs"
)

// ChunkCache is an interface used for caching chunks
// ChunkCache is an interface used for caching chunks and has presence that
// has already been fetched from remotestorage. Care should be taken when
// using ChunkCache if it is possible for the remote to GC, since in that
// case the cache could contain stale data.
type ChunkCache interface {
// Put puts a slice of chunks into the cache. Error returned if the cache capacity has been exceeded.
Put(c []nbs.ToChunker) error
// Insert some observed / fetched chunks into the cache. These
// chunks may or may not be returned in the future.
InsertChunks(cs []nbs.ToChunker)
// Get previously cached chunks, if they are still available.
GetCachedChunks(h hash.HashSet) map[hash.Hash]nbs.ToChunker

// Get gets a map of hash to chunk for a set of hashes. In the event that a chunk is not in the cache, chunks.Empty.
// is put in it's place
Get(h hash.HashSet) map[hash.Hash]nbs.ToChunker

// Has takes a set of hashes and returns the set of hashes that the cache currently does not have in it.
Has(h hash.HashSet) (absent hash.HashSet)

// PutChunk puts a single chunk in the cache. Returns an error if the cache capacity has been exceeded.
PutChunk(chunk nbs.ToChunker) error

// GetAndClearChunksToFlush gets a map of hash to chunk which includes all the chunks that were put in the cache
// between the last time GetAndClearChunksToFlush was called and now.
GetAndClearChunksToFlush() map[hash.Hash]nbs.ToChunker
// Insert all hashes in |h| as existing in the remote.
InsertHas(h hash.HashSet)
// Returns the absent set from |h|, filtering it by records
// which are known to be present in the remote based on
// previous |InsertHas| calls.
GetCachedHas(h hash.HashSet) (absent hash.HashSet)
}
161 changes: 71 additions & 90 deletions go/libraries/doltcore/remotestorage/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
"github.com/dolthub/dolt/go/store/types"
)

var ErrCacheCapacityExceeded = errors.New("too much data: the cache capacity has been reached")
var ErrWriteBufferCapacityExceeded = errors.New("too much data: the write buffer capacity has been reached")

var ErrUploadFailed = errors.New("upload failed")

Expand Down Expand Up @@ -123,6 +123,7 @@ type DoltChunkStore struct {
root hash.Hash
csClient remotesapi.ChunkStoreServiceClient
finalizer func() error
wb WriteBuffer
cache ChunkCache
metadata *remotesapi.GetRepoMetadataResponse
nbf *types.NomsBinFormat
Expand Down Expand Up @@ -172,6 +173,7 @@ func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, pa
csClient: csClient,
finalizer: func() error { return nil },
cache: newMapChunkCache(),
wb: newMapWriteBuffer(),
metadata: metadata,
nbf: nbf,
httpFetcher: globalHttpFetcher,
Expand All @@ -185,79 +187,46 @@ func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, pa
return cs, nil
}

func (dcs *DoltChunkStore) clone() *DoltChunkStore {
ret := *dcs
ret.repoToken = new(atomic.Value)
return &ret
}

func (dcs *DoltChunkStore) WithHTTPFetcher(fetcher HTTPFetcher) *DoltChunkStore {
return &DoltChunkStore{
repoId: dcs.repoId,
repoPath: dcs.repoPath,
repoToken: new(atomic.Value),
host: dcs.host,
root: dcs.root,
csClient: dcs.csClient,
finalizer: dcs.finalizer,
cache: dcs.cache,
metadata: dcs.metadata,
nbf: dcs.nbf,
httpFetcher: fetcher,
params: dcs.params,
stats: dcs.stats,
}
ret := dcs.clone()
ret.httpFetcher = fetcher
return ret
}

func (dcs *DoltChunkStore) WithNoopWriteBuffer() *DoltChunkStore {
ret := dcs.clone()
ret.wb = noopWriteBuffer{}
return ret
}

func (dcs *DoltChunkStore) WithWriteBuffer(wb WriteBuffer) *DoltChunkStore {
ret := dcs.clone()
ret.wb = wb
return ret
}

func (dcs *DoltChunkStore) WithNoopChunkCache() *DoltChunkStore {
return &DoltChunkStore{
repoId: dcs.repoId,
repoPath: dcs.repoPath,
repoToken: new(atomic.Value),
host: dcs.host,
root: dcs.root,
csClient: dcs.csClient,
finalizer: dcs.finalizer,
cache: noopChunkCache,
metadata: dcs.metadata,
nbf: dcs.nbf,
httpFetcher: dcs.httpFetcher,
params: dcs.params,
stats: dcs.stats,
logger: dcs.logger,
}
ret := dcs.clone()
ret.cache = noopChunkCache
return ret
}

func (dcs *DoltChunkStore) WithChunkCache(cache ChunkCache) *DoltChunkStore {
return &DoltChunkStore{
repoId: dcs.repoId,
repoPath: dcs.repoPath,
repoToken: new(atomic.Value),
host: dcs.host,
root: dcs.root,
csClient: dcs.csClient,
finalizer: dcs.finalizer,
cache: cache,
metadata: dcs.metadata,
nbf: dcs.nbf,
httpFetcher: dcs.httpFetcher,
params: dcs.params,
stats: dcs.stats,
logger: dcs.logger,
}
ret := dcs.clone()
ret.cache = cache
return ret
}

func (dcs *DoltChunkStore) WithNetworkRequestParams(params NetworkRequestParams) *DoltChunkStore {
return &DoltChunkStore{
repoId: dcs.repoId,
repoPath: dcs.repoPath,
repoToken: new(atomic.Value),
host: dcs.host,
root: dcs.root,
csClient: dcs.csClient,
finalizer: dcs.finalizer,
cache: dcs.cache,
metadata: dcs.metadata,
nbf: dcs.nbf,
httpFetcher: dcs.httpFetcher,
params: params,
stats: dcs.stats,
logger: dcs.logger,
}
ret := dcs.clone()
ret.params = params
return ret
}

func (dcs *DoltChunkStore) SetLogger(logger chunks.DebugLogger) {
Expand Down Expand Up @@ -344,19 +313,18 @@ func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.Ha
ctx, span := tracer.Start(ctx, "remotestorage.GetManyCompressed")
defer span.End()

hashToChunk := dcs.cache.Get(hashes)
hashToChunk := dcs.cache.GetCachedChunks(hashes)
dcs.wb.AddBufferedChunks(hashes, hashToChunk)

span.SetAttributes(attribute.Int("num_hashes", len(hashes)), attribute.Int("cache_hits", len(hashToChunk)))
atomic.AddUint32(&dcs.stats.Hits, uint32(len(hashToChunk)))

notCached := make([]hash.Hash, 0, len(hashes))
for h := range hashes {
c := hashToChunk[h]

if c == nil || c.IsEmpty() {
notCached = append(notCached, h)
} else {
if c, ok := hashToChunk[h]; ok {
found(ctx, c)
} else {
notCached = append(notCached, h)
}
}

Expand Down Expand Up @@ -633,9 +601,7 @@ func (dcs *DoltChunkStore) readChunksAndCache(ctx context.Context, hashes []hash
}
// Don't forward on empty/not found chunks.
if !cc.IsEmpty() {
if err := dcs.cache.PutChunk(cc); err != nil {
return err
}
dcs.cache.InsertChunks([]nbs.ToChunker{cc})
found(egCtx, cc)
}
}
Expand Down Expand Up @@ -663,8 +629,8 @@ const maxHasManyBatchSize = 16 * 1024
// absent from the store.
func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error) {
// get the set of hashes that isn't already in the cache
notCached := dcs.cache.Has(hashes)

notCached := dcs.cache.GetCachedHas(hashes)
dcs.wb.RemovePresentChunks(notCached)
if len(notCached) == 0 {
return notCached, nil
}
Expand All @@ -673,7 +639,7 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha
hashSl, byteSl := HashSetToSlices(notCached)

absent := make(hash.HashSet)
var found []nbs.ToChunker
found := make(hash.HashSet)
var err error

batchItr(len(hashSl), maxHasManyBatchSize, func(st, end int) (stop bool) {
Expand Down Expand Up @@ -714,8 +680,7 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha
absent[currHash] = struct{}{}
j++
} else {
c := nbs.ChunkToCompressedChunk(chunks.NewChunkWithHash(currHash, []byte{}))
found = append(found, c)
found.Insert(currHash)
}
}

Expand All @@ -731,9 +696,7 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha
}

if len(found) > 0 {
if err := dcs.cache.Put(found); err != nil {
return hash.HashSet{}, err
}
dcs.cache.InsertHas(found)
}

return absent, nil
Expand Down Expand Up @@ -767,7 +730,8 @@ func (dcs *DoltChunkStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chu
}

cc := nbs.ChunkToCompressedChunk(c)
if err := dcs.cache.Put([]nbs.ToChunker{cc}); err != nil {
err = dcs.wb.Put(cc)
if err != nil {
return err
}
return nil
Expand Down Expand Up @@ -850,8 +814,26 @@ func (dcs *DoltChunkStore) loadRoot(ctx context.Context) error {
// persisted root hash from last to current (or keeps it the same).
// If last doesn't match the root in persistent storage, returns false.
func (dcs *DoltChunkStore) Commit(ctx context.Context, current, last hash.Hash) (bool, error) {
hashToChunkCount, err := dcs.uploadChunks(ctx)
toUpload := dcs.wb.GetAllForWrite()
var resp *remotesapi.CommitResponse
defer func() {
// We record success based on the CommitResponse
// |Success| field, which is only |true| when the call
// successfully updated the root hash of the
// remote. With the current API, we cannot distinguish
// the case where the commit failed because |last| was
// stale but the provided chunks were still
// successfully added to the remote. If the write is
// retried in such a case, we will currently write the
// chunks to the remote again.
if resp != nil {
dcs.wb.WriteCompleted(resp.Success)
} else {
dcs.wb.WriteCompleted(false)
}
}()

hashToChunkCount, err := dcs.uploadChunks(ctx, toUpload)
if err != nil {
return false, err
}
Expand All @@ -873,7 +855,7 @@ func (dcs *DoltChunkStore) Commit(ctx context.Context, current, last hash.Hash)
NbsVersion: nbs.StorageVersion,
},
}
resp, err := dcs.csClient.Commit(ctx, req)
resp, err = dcs.csClient.Commit(ctx, req)
if err != nil {
return false, NewRpcError(err, "Commit", dcs.host, req)
}
Expand Down Expand Up @@ -911,18 +893,18 @@ func (dcs *DoltChunkStore) Close() error {
return dcs.finalizer()
}

// getting this working using the simplest approach first
func (dcs *DoltChunkStore) uploadChunks(ctx context.Context) (map[hash.Hash]int, error) {
hashToChunk := dcs.cache.GetAndClearChunksToFlush()

// Uploads all chunks in |hashToChunk| to the remote store and returns
// the manifest entries that correspond to the new table files. Used
// by |Commit|. Typically |hashToChunk| will have come from our |wb|
// |writeBuffer|.
func (dcs *DoltChunkStore) uploadChunks(ctx context.Context, hashToChunk map[hash.Hash]nbs.CompressedChunk) (map[hash.Hash]int, error) {
if len(hashToChunk) == 0 {
return map[hash.Hash]int{}, nil
}

chnks := make([]chunks.Chunk, 0, len(hashToChunk))
for _, chable := range hashToChunk {
ch, err := chable.ToChunk()

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -951,7 +933,6 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context) (map[hash.Hash]int,
}

for h, contentHash := range hashToContentHash {
// Can parallelize this in the future if needed
err := dcs.uploadTableFileWithRetries(ctx, h, uint64(hashToCount[h]), contentHash, func() (io.ReadCloser, uint64, error) {
data := hashToData[h]
return io.NopCloser(bytes.NewReader(data)), uint64(len(data)), nil
Expand Down
Loading

0 comments on commit 89fce6e

Please sign in to comment.