Skip to content

Commit

Permalink
Merge pull request #235 from deploymenttheory/dev-dw
Browse files Browse the repository at this point in the history
testing retries
  • Loading branch information
ShocOne authored Jun 7, 2024
2 parents 778ea08 + 076b699 commit 704c3ac
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,17 @@ func (c *Client) DoMultiPartRequest(method, endpoint string, files map[string][]
ctx, cancel := context.WithTimeout(context.Background(), c.clientConfig.ClientOptions.Timeout.CustomTimeout.Duration())
defer cancel()

body, contentType, err := createStreamingMultipartRequestBody(files, formDataFields, fileContentTypes, formDataPartHeaders, log)
if err != nil {
var body io.Reader
var contentType string

// Create multipart body in a function to ensure it runs again on retry
createBody := func() error {
var err error
body, contentType, err = createStreamingMultipartRequestBody(files, formDataFields, fileContentTypes, formDataPartHeaders, log)
return err
}

if err := createBody(); err != nil {
log.Error("Failed to create streaming multipart request body", zap.Error(err))
return nil, err
}
Expand All @@ -125,6 +134,20 @@ func (c *Client) DoMultiPartRequest(method, endpoint string, files map[string][]
for attempt := 1; attempt <= maxRetries; attempt++ {
startTime := time.Now()

// Create a new request for each retry
if attempt > 1 {
if err := createBody(); err != nil {
log.Error("Failed to recreate streaming multipart request body", zap.Error(err))
return nil, err
}
req, err = http.NewRequestWithContext(ctx, method, url, body)
if err != nil {
log.Error("Failed to create HTTP request on retry", zap.Error(err))
return nil, err
}
req.Header.Set("Content-Type", contentType)
}

resp, requestErr = c.httpClient.Do(req)
duration := time.Since(startTime)

Expand Down
124 changes: 102 additions & 22 deletions httpclient/multipartrequest.go.back
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/textproto"
"os"
"path/filepath"
"sync"
"time"

"github.com/deploymenttheory/go-api-http-client/authenticationhandler"
Expand All @@ -20,6 +21,13 @@ import (
"go.uber.org/zap"
)

// UploadState represents the state of an upload operation, including the last uploaded byte.
// This struct is used to track the progress of file uploads for resumable uploads and to resume uploads from the last uploaded byte.
type UploadState struct {
LastUploadedByte int64
sync.Mutex
}

// DoMultiPartRequest creates and executes a multipart/form-data HTTP request for file uploads and form fields.
// This function handles constructing the multipart request body, setting the necessary headers, and executing the request.
// It supports custom content types and headers for each part of the multipart request, and handles authentication and
Expand Down Expand Up @@ -83,18 +91,18 @@ func (c *Client) DoMultiPartRequest(method, endpoint string, files map[string][]

log.Info("Executing multipart file upload request", zap.String("method", method), zap.String("endpoint", endpoint))

// Call the helper function to create a streaming multipart request body
body, contentType, err := createStreamingMultipartRequestBody(files, formDataFields, fileContentTypes, formDataPartHeaders, log)
if err != nil {
return nil, err
}

url := c.APIHandler.ConstructAPIResourceEndpoint(endpoint, log)

// Create a context with timeout
// Create a context with timeout based on the custom timeout duration
ctx, cancel := context.WithTimeout(context.Background(), c.clientConfig.ClientOptions.Timeout.CustomTimeout.Duration())
defer cancel()

body, contentType, err := createStreamingMultipartRequestBody(files, formDataFields, fileContentTypes, formDataPartHeaders, log)
if err != nil {
log.Error("Failed to create streaming multipart request body", zap.Error(err))
return nil, err
}

req, err := http.NewRequestWithContext(ctx, method, url, body)
if err != nil {
log.Error("Failed to create HTTP request", zap.Error(err))
Expand All @@ -109,26 +117,68 @@ func (c *Client) DoMultiPartRequest(method, endpoint string, files map[string][]
headerHandler.SetRequestHeaders(endpoint)
headerHandler.LogHeaders(c.clientConfig.ClientOptions.Logging.HideSensitiveData)

startTime := time.Now()
var resp *http.Response
var requestErr error

resp, err := c.httpClient.Do(req)
if err != nil {
log.Error("Failed to send request", zap.String("method", method), zap.String("endpoint", endpoint), zap.Error(err))
return nil, err
}
// Retry logic
maxRetries := 3
for attempt := 1; attempt <= maxRetries; attempt++ {
startTime := time.Now()

resp, requestErr = c.httpClient.Do(req)
duration := time.Since(startTime)

duration := time.Since(startTime)
log.Debug("Request sent successfully", zap.String("method", method), zap.String("endpoint", endpoint), zap.Int("status_code", resp.StatusCode), zap.Duration("duration", duration))
if requestErr != nil {
log.Error("Failed to send request", zap.String("method", method), zap.String("endpoint", endpoint), zap.Error(requestErr))
if attempt < maxRetries {
log.Info("Retrying request", zap.Int("attempt", attempt))
time.Sleep(2 * time.Second)
continue
}
return nil, requestErr
}

log.Debug("Request sent successfully", zap.String("method", method), zap.String("endpoint", endpoint), zap.Int("status_code", resp.StatusCode), zap.Duration("duration", duration))

if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return resp, response.HandleAPISuccessResponse(resp, out, log)
}

if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return resp, response.HandleAPISuccessResponse(resp, out, log)
// If status code indicates a server error, retry
if resp.StatusCode >= 500 && attempt < maxRetries {
log.Info("Retrying request due to server error", zap.Int("status_code", resp.StatusCode), zap.Int("attempt", attempt))
time.Sleep(2 * time.Second)
continue
}

return resp, response.HandleAPIErrorResponse(resp, log)
}

return resp, response.HandleAPIErrorResponse(resp, log)
return resp, requestErr
}

// createStreamingMultipartRequestBody creates a streaming multipart request body with the provided files and form fields.
// This function constructs the body of a multipart/form-data request using an io.Pipe, allowing the request to be sent in chunks.
// It supports custom content types and headers for each part of the multipart request, and logs the process for debugging
// and monitoring purposes.

// Parameters:
// - files: A map where the key is the field name and the value is a slice of file paths to be included in the request.
// Each file path corresponds to a file that will be included in the multipart request.
// - formDataFields: A map of additional form fields to be included in the multipart request, where the key is the field name
// and the value is the field value. These are regular form fields that accompany the file uploads.
// - fileContentTypes: A map specifying the content type for each file part. The key is the field name and the value is the
// content type (e.g., "image/jpeg").
// - formDataPartHeaders: A map specifying custom headers for each part of the multipart form data. The key is the field name
// and the value is an http.Header containing the headers for that part.
// - log: An instance of a logger implementing the logger.Logger interface, used to log informational messages, warnings,
// and errors encountered during the construction of the multipart request body.

// Returns:
// - io.Reader: The constructed multipart request body reader. This reader streams the multipart form data payload ready to be sent.
// - string: The content type of the multipart request body. This includes the boundary string used by the multipart writer.
// - error: An error object indicating failure during the construction of the multipart request body. This could be due to issues
// such as file reading errors or multipart writer errors.
func createStreamingMultipartRequestBody(files map[string][]string, formDataFields map[string]string, fileContentTypes map[string]string, formDataPartHeaders map[string]http.Header, log logger.Logger) (io.Reader, string, error) {
pr, pw := io.Pipe()
writer := multipart.NewWriter(pw)
Expand Down Expand Up @@ -216,7 +266,8 @@ func addFilePart(writer *multipart.Writer, fieldName, filePath string, fileConte
}

progressLogger := logUploadProgress(file, fileSize.Size(), log)
if err := chunkFileUpload(file, encoder, log, progressLogger); err != nil {
uploadState := &UploadState{}
if err := chunkFileUpload(file, encoder, log, progressLogger, uploadState); err != nil {
log.Error("Failed to copy file content", zap.String("filePath", filePath), zap.Error(err))
return err
}
Expand Down Expand Up @@ -278,25 +329,43 @@ func setFormDataPartHeader(fieldname, filename, contentType string, customHeader

// chunkFileUpload reads the file upload into chunks and writes it to the writer.
// This function reads the file in chunks and writes it to the provided writer, allowing for progress logging during the upload.
// chunk size is set to 1024 KB (1 MB) by default.
// The chunk size is set to 8192 KB (8 MB) by default. This is a common chunk size used for file uploads to cloud storage services.

// Azure Blob Storage has a minimum chunk size of 4 MB and a maximum of 100 MB for block blobs.
// GCP Cloud Storage has a minimum chunk size of 256 KB and a maximum of 5 GB.
// AWS S3 has a minimum chunk size of 5 MB and a maximum of 5 GB.

// The function also calculates the total number of chunks and logs the chunk number during the upload process.

// Parameters:
// - file: The file to be uploaded.
// - writer: The writer to which the file content will be written.
// - log: An instance of a logger implementing the logger.Logger interface, used to log informational messages, warnings,
// and errors encountered during the file upload.
// - updateProgress: A function to update the upload progress, typically used for logging purposes.
// - uploadState: A pointer to an UploadState struct used to track the progress of the file upload for resumable uploads.

// Returns:
// - error: An error object indicating failure during the file upload. This could be due to issues such as file reading errors
// or writer errors.
func chunkFileUpload(file *os.File, writer io.Writer, log logger.Logger, updateProgress func(int64)) error {
const chunkSize = 1024 * 1024 // 1024 bytes * 1024 (1 MB)
func chunkFileUpload(file *os.File, writer io.Writer, log logger.Logger, updateProgress func(int64), uploadState *UploadState) error {
const chunkSize = 8 * 1024 * 1024 // 8 MB
buffer := make([]byte, chunkSize)
totalWritten := int64(0)
chunkWritten := int64(0)
fileName := filepath.Base(file.Name())

// Seek to the last uploaded byte
file.Seek(uploadState.LastUploadedByte, io.SeekStart)

// Calculate the total number of chunks
fileInfo, err := file.Stat()
if err != nil {
return fmt.Errorf("failed to get file info: %v", err)
}
totalChunks := (fileInfo.Size() + chunkSize - 1) / chunkSize
currentChunk := uploadState.LastUploadedByte / chunkSize

for {
n, err := file.Read(buffer)
if err != nil && err != io.EOF {
Expand All @@ -308,6 +377,10 @@ func chunkFileUpload(file *os.File, writer io.Writer, log logger.Logger, updateP

written, err := writer.Write(buffer[:n])
if err != nil {
// Save the state before returning the error
uploadState.Lock()
uploadState.LastUploadedByte += totalWritten
uploadState.Unlock()
return err
}

Expand All @@ -316,8 +389,11 @@ func chunkFileUpload(file *os.File, writer io.Writer, log logger.Logger, updateP
updateProgress(int64(written))

if chunkWritten >= chunkSize {
currentChunk++
log.Debug("File Upload Chunk Sent",
zap.String("file_name", fileName),
zap.Int64("chunk_number", currentChunk),
zap.Int64("total_chunks", totalChunks),
zap.Int64("kb_sent", chunkWritten/1024),
zap.Int64("total_kb_sent", totalWritten/1024))
chunkWritten = 0
Expand All @@ -326,8 +402,11 @@ func chunkFileUpload(file *os.File, writer io.Writer, log logger.Logger, updateP

// Log any remaining bytes that were written but didn't reach the log threshold
if chunkWritten > 0 {
currentChunk++
log.Debug("Final Upload Chunk Sent",
zap.String("file_name", fileName),
zap.Int64("chunk_number", currentChunk),
zap.Int64("total_chunks", totalChunks),
zap.Int64("kb_sent", chunkWritten/1024),
zap.Int64("total_kb_sent", totalWritten/1024))
}
Expand All @@ -346,6 +425,7 @@ func chunkFileUpload(file *os.File, writer io.Writer, log logger.Logger, updateP

// Returns:
// - func(int64): A function that takes the number of bytes written as an argument and logs the upload progress.
// logUploadProgress logs the upload progress based on the percentage of the total file size.
func logUploadProgress(file *os.File, fileSize int64, log logger.Logger) func(int64) {
var uploaded int64 = 0
const logInterval = 5 // Log every 5% increment
Expand Down

0 comments on commit 704c3ac

Please sign in to comment.