Skip to content

Commit

Permalink
Merge pull request #16 from forta-network/ali/optimize-flushes
Browse files Browse the repository at this point in the history
Optimize R2 flush part implementation
  • Loading branch information
aomerk authored Jan 11, 2024
2 parents dd203e8 + 6bd514f commit c2d10dd
Showing 1 changed file with 30 additions and 6 deletions.
36 changes: 30 additions & 6 deletions drivers/r2/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1236,14 +1236,28 @@ func (w *writer) flushPart() error {
// nothing to write
return nil
}
if len(w.pendingPart) < int(w.driver.ChunkSize) {
// closing with a small pending part
// combine ready and pending to avoid writing a small part
w.readyPart = append(w.readyPart, w.pendingPart...)
w.pendingPart = nil

// Check if the ready part is less than the chunk size
if len(w.readyPart) < int(w.driver.ChunkSize) {
// If there's enough in the pending part to fill the ready part up to the chunk size
if len(w.pendingPart) + len(w.readyPart) >= int(w.driver.ChunkSize) {
fillSize := int(w.driver.ChunkSize) - len(w.readyPart)
w.readyPart = append(w.readyPart, w.pendingPart[:fillSize]...)
w.pendingPart = w.pendingPart[fillSize:]
}
}

ctx := context.Background()
// Check if the ready part is the right size to upload
// ready part must be equal to chunk size,
// except for the last part where it can be equal or smaller than the chunk size
isReadyToUpload := len(w.readyPart) == int(w.driver.ChunkSize) || (len(w.readyPart) < int(w.driver.ChunkSize) && len(w.pendingPart) == 0)
if !isReadyToUpload {
// If the ready part is not yet full and it's not the last part, wait for more data
return nil
}

// Upload the part
partNumber := aws.Int32(int32(len(w.parts) + 1))
resp, err := w.driver.R2.UploadPart(ctx, &s3.UploadPartInput{
Bucket: aws.String(w.driver.Bucket),
Expand All @@ -1255,12 +1269,22 @@ func (w *writer) flushPart() error {
if err != nil {
return err
}

// Append the uploaded part's info to the parts slice
w.parts = append(w.parts, types.Part{
ETag: resp.ETag,
PartNumber: partNumber,
Size: aws.Int64(int64(len(w.readyPart))),
})

// Prepare for the next part
w.readyPart = w.pendingPart
w.pendingPart = nil
return w.flushPart()

// If there's more to upload, continue
if len(w.readyPart) > 0 {
return w.flushPart()
}

return nil
}

0 comments on commit c2d10dd

Please sign in to comment.