Skip to content

Commit

Permalink
go: remotestorage: chunk cache: PR feedback.
Browse files Browse the repository at this point in the history
  • Loading branch information
reltuk committed Feb 20, 2025
1 parent 7dadc7c commit d586d2a
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 44 deletions.
43 changes: 20 additions & 23 deletions go/libraries/doltcore/remotestorage/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,22 +224,9 @@ func (dcs *DoltChunkStore) WithChunkCache(cache ChunkCache) *DoltChunkStore {
}

func (dcs *DoltChunkStore) WithNetworkRequestParams(params NetworkRequestParams) *DoltChunkStore {
return &DoltChunkStore{
repoId: dcs.repoId,
repoPath: dcs.repoPath,
repoToken: new(atomic.Value),
host: dcs.host,
root: dcs.root,
csClient: dcs.csClient,
finalizer: dcs.finalizer,
cache: dcs.cache,
metadata: dcs.metadata,
nbf: dcs.nbf,
httpFetcher: dcs.httpFetcher,
params: params,
stats: dcs.stats,
logger: dcs.logger,
}
ret := dcs.clone()
ret.params = params
return ret
}

func (dcs *DoltChunkStore) SetLogger(logger chunks.DebugLogger) {
Expand Down Expand Up @@ -327,7 +314,7 @@ func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.Ha
defer span.End()

hashToChunk := dcs.cache.GetCachedChunks(hashes)
dcs.wb.AddPendingChunks(hashes, hashToChunk)
dcs.wb.AddBufferedChunks(hashes, hashToChunk)

span.SetAttributes(attribute.Int("num_hashes", len(hashes)), attribute.Int("cache_hits", len(hashToChunk)))
atomic.AddUint32(&dcs.stats.Hits, uint32(len(hashToChunk)))
Expand Down Expand Up @@ -828,9 +815,22 @@ func (dcs *DoltChunkStore) loadRoot(ctx context.Context) error {
// If last doesn't match the root in persistent storage, returns false.
func (dcs *DoltChunkStore) Commit(ctx context.Context, current, last hash.Hash) (bool, error) {
toUpload := dcs.wb.GetAllForWrite()
var success bool
var resp *remotesapi.CommitResponse
defer func() {
dcs.wb.WriteCompleted(success)
// We record success based on the CommitResponse
// |Success| field, which is only |true| when the call
// successfully updated the root hash of the
// remote. With the current API, we cannot distinguish
// the case where the commit failed because |last| was
// stale but the provided chunks were still
// successfully added to the remote. If the write is
// retried in such a case, we will currently write the
// chunks to the remote again.
if resp != nil {
dcs.wb.WriteCompleted(resp.Success)
} else {
dcs.wb.WriteCompleted(false)
}
}()

hashToChunkCount, err := dcs.uploadChunks(ctx, toUpload)
Expand All @@ -855,7 +855,7 @@ func (dcs *DoltChunkStore) Commit(ctx context.Context, current, last hash.Hash)
NbsVersion: nbs.StorageVersion,
},
}
resp, err := dcs.csClient.Commit(ctx, req)
resp, err = dcs.csClient.Commit(ctx, req)
if err != nil {
return false, NewRpcError(err, "Commit", dcs.host, req)
}
Expand All @@ -864,9 +864,6 @@ func (dcs *DoltChunkStore) Commit(ctx context.Context, current, last hash.Hash)
return false, NewRpcError(err, "Commit", dcs.host, req)
}

// We only delete the chunks that we wrote to the remote from
// our write buffer if our commit was successful.
success = resp.Success
return resp.Success, dcs.refreshRepoMetadata(ctx)
}

Expand Down
40 changes: 24 additions & 16 deletions go/libraries/doltcore/remotestorage/writebuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,36 @@ import (
)

type WriteBuffer interface {
// Add a compressed chunk to the write buffer. It will be
// returned from future calls to |GetAllForWrite| until a
// write is successful.
Put(nbs.CompressedChunk) error

// Returns the current set of written chunks. After this
// returns, concurrent calls to other methods may block until
// |WriteCompleted| is called. Calls to |GetAllForWrite| must
// be bracketed by a call to |WriteCompleted|.
// be followed by a call to |WriteCompleted| once the write
// attempt is finished.
GetAllForWrite() map[hash.Hash]nbs.CompressedChunk

// Called after a call to |GetAllForWrite|, this records
// success or failure of the write operation. If the write
// operation was successful, then the written chunks are now
// in the upstream, and so they can be cleared. Otherwise, the
// written chunks are retained in the write buffer so that the
// write can be retried.
// in the upstream and they can be cleared from the write
// buffer. Otherwise, the written chunks are retained in the
// write buffer so that the write can be retried.
WriteCompleted(success bool)

// ChunkStore clients expect to read their own writes before a commit.
// On the get path, remotestorage should add pending chunks to its result
// set. On the HasMany path, remotestorage should remove present chunks
// ChunkStore clients expect to read their own writes before a
// commit. On the get path, remotestorage should add buffered
// chunks matching a given |query| to its |result|. On the
// HasMany path, remotestorage should remove present chunks
// from its absent set on the HasMany response.
AddPendingChunks(h hash.HashSet, res map[hash.Hash]nbs.ToChunker)
RemovePresentChunks(h hash.HashSet)
AddBufferedChunks(query hash.HashSet, result map[hash.Hash]nbs.ToChunker)
// Removes the addresses of any buffered chunks from |hashes|.
// Used to filter the |absent| response of a HasMany call so
// that buffered chunks are not considered absent.
RemovePresentChunks(hashes hash.HashSet)
}

type noopWriteBuffer struct {
Expand All @@ -64,7 +72,7 @@ func (noopWriteBuffer) WriteCompleted(success bool) {
panic("call to WriteCompleted on a noopWriteBuffer")
}

func (noopWriteBuffer) AddPendingChunks(hash.HashSet, map[hash.Hash]nbs.ToChunker) {
func (noopWriteBuffer) AddBufferedChunks(hash.HashSet, map[hash.Hash]nbs.ToChunker) {
}

func (noopWriteBuffer) RemovePresentChunks(hash.HashSet) {
Expand Down Expand Up @@ -123,7 +131,7 @@ func (b *mapWriteBuffer) WriteCompleted(success bool) {
b.cond.Broadcast()
}

func (b *mapWriteBuffer) AddPendingChunks(hs hash.HashSet, res map[hash.Hash]nbs.ToChunker) {
func (b *mapWriteBuffer) AddBufferedChunks(hs hash.HashSet, res map[hash.Hash]nbs.ToChunker) {
b.mu.Lock()
defer b.mu.Unlock()
for h := range hs {
Expand All @@ -134,22 +142,22 @@ func (b *mapWriteBuffer) AddPendingChunks(hs hash.HashSet, res map[hash.Hash]nbs
}
}

func (b *mapWriteBuffer) RemovePresentChunks(absent hash.HashSet) {
func (b *mapWriteBuffer) RemovePresentChunks(hashes hash.HashSet) {
b.mu.Lock()
defer b.mu.Unlock()
if len(b.chunks) < len(absent) {
if len(b.chunks) < len(hashes) {
for h := range b.chunks {
absent.Remove(h)
hashes.Remove(h)
}
} else {
var toRemove []hash.Hash
for h := range absent {
for h := range hashes {
if _, ok := b.chunks[h]; ok {
toRemove = append(toRemove, h)
}
}
for _, h := range toRemove {
absent.Remove(h)
hashes.Remove(h)
}
}
}
10 changes: 5 additions & 5 deletions go/libraries/doltcore/remotestorage/writebuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestNoopWriteBuffer(t *testing.T) {
assert.Panics(t, func() {
cache.WriteCompleted(false)
})
cache.AddPendingChunks(make(hash.HashSet), make(map[hash.Hash]nbs.ToChunker))
cache.AddBufferedChunks(make(hash.HashSet), make(map[hash.Hash]nbs.ToChunker))
cache.RemovePresentChunks(make(hash.HashSet))
}

Expand All @@ -54,7 +54,7 @@ func TestMapWriteBuffer(t *testing.T) {
query.Insert(hash.Of(bs[:]))
}
res := make(map[hash.Hash]nbs.ToChunker)
cache.AddPendingChunks(query, res)
cache.AddBufferedChunks(query, res)
assert.Len(t, res, 0)

// Insert some chunks.
Expand All @@ -66,12 +66,12 @@ func TestMapWriteBuffer(t *testing.T) {
cache.Put(nbs.ChunkToCompressedChunk(chk))
inserted.Insert(chk.Hash())
}
cache.AddPendingChunks(query, res)
cache.AddBufferedChunks(query, res)
assert.Len(t, res, 0)
for h := range inserted {
query.Insert(h)
}
cache.AddPendingChunks(query, res)
cache.AddBufferedChunks(query, res)
assert.Len(t, res, 8)

cache.RemovePresentChunks(query)
Expand All @@ -84,7 +84,7 @@ func TestMapWriteBuffer(t *testing.T) {
toWrite := cache.GetAllForWrite()
assert.Len(t, toWrite, 8)
res = make(map[hash.Hash]nbs.ToChunker)
cache.AddPendingChunks(query, res)
cache.AddBufferedChunks(query, res)
assert.Len(t, res, 8)
cache.RemovePresentChunks(query)
assert.Len(t, query, 64)
Expand Down

0 comments on commit d586d2a

Please sign in to comment.