Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

testing retries #235

Merged
merged 2 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,17 @@
ctx, cancel := context.WithTimeout(context.Background(), c.clientConfig.ClientOptions.Timeout.CustomTimeout.Duration())
defer cancel()

body, contentType, err := createStreamingMultipartRequestBody(files, formDataFields, fileContentTypes, formDataPartHeaders, log)
if err != nil {
var body io.Reader
var contentType string

// Create multipart body in a function to ensure it runs again on retry
createBody := func() error {
var err error
body, contentType, err = createStreamingMultipartRequestBody(files, formDataFields, fileContentTypes, formDataPartHeaders, log)
return err
}

if err := createBody(); err != nil {
log.Error("Failed to create streaming multipart request body", zap.Error(err))
return nil, err
}
Expand All @@ -125,6 +134,20 @@
for attempt := 1; attempt <= maxRetries; attempt++ {
startTime := time.Now()

// Create a new request for each retry
if attempt > 1 {
if err := createBody(); err != nil {
log.Error("Failed to recreate streaming multipart request body", zap.Error(err))

Check warning

Code scanning / gosec

Errors unhandled. Warning

Errors unhandled.
return nil, err
}
req, err = http.NewRequestWithContext(ctx, method, url, body)
if err != nil {
log.Error("Failed to create HTTP request on retry", zap.Error(err))

Check warning

Code scanning / gosec

Errors unhandled. Warning

Errors unhandled.
return nil, err
}
req.Header.Set("Content-Type", contentType)
}

resp, requestErr = c.httpClient.Do(req)
duration := time.Since(startTime)

Expand Down
124 changes: 102 additions & 22 deletions httpclient/multipartrequest.go.back
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/textproto"
"os"
"path/filepath"
"sync"
"time"

"github.com/deploymenttheory/go-api-http-client/authenticationhandler"
Expand All @@ -20,6 +21,13 @@ import (
"go.uber.org/zap"
)

// UploadState represents the state of an upload operation, including the last uploaded byte.
// This struct is used to track the progress of file uploads for resumable uploads and to resume uploads from the last uploaded byte.
type UploadState struct {
LastUploadedByte int64
sync.Mutex
}

// DoMultiPartRequest creates and executes a multipart/form-data HTTP request for file uploads and form fields.
// This function handles constructing the multipart request body, setting the necessary headers, and executing the request.
// It supports custom content types and headers for each part of the multipart request, and handles authentication and
Expand Down Expand Up @@ -83,18 +91,18 @@ func (c *Client) DoMultiPartRequest(method, endpoint string, files map[string][]

log.Info("Executing multipart file upload request", zap.String("method", method), zap.String("endpoint", endpoint))

// Call the helper function to create a streaming multipart request body
body, contentType, err := createStreamingMultipartRequestBody(files, formDataFields, fileContentTypes, formDataPartHeaders, log)
if err != nil {
return nil, err
}

url := c.APIHandler.ConstructAPIResourceEndpoint(endpoint, log)

// Create a context with timeout
// Create a context with timeout based on the custom timeout duration
ctx, cancel := context.WithTimeout(context.Background(), c.clientConfig.ClientOptions.Timeout.CustomTimeout.Duration())
defer cancel()

body, contentType, err := createStreamingMultipartRequestBody(files, formDataFields, fileContentTypes, formDataPartHeaders, log)
if err != nil {
log.Error("Failed to create streaming multipart request body", zap.Error(err))
return nil, err
}

req, err := http.NewRequestWithContext(ctx, method, url, body)
if err != nil {
log.Error("Failed to create HTTP request", zap.Error(err))
Expand All @@ -109,26 +117,68 @@ func (c *Client) DoMultiPartRequest(method, endpoint string, files map[string][]
headerHandler.SetRequestHeaders(endpoint)
headerHandler.LogHeaders(c.clientConfig.ClientOptions.Logging.HideSensitiveData)

startTime := time.Now()
var resp *http.Response
var requestErr error

resp, err := c.httpClient.Do(req)
if err != nil {
log.Error("Failed to send request", zap.String("method", method), zap.String("endpoint", endpoint), zap.Error(err))
return nil, err
}
// Retry logic
maxRetries := 3
for attempt := 1; attempt <= maxRetries; attempt++ {
startTime := time.Now()

resp, requestErr = c.httpClient.Do(req)
duration := time.Since(startTime)

duration := time.Since(startTime)
log.Debug("Request sent successfully", zap.String("method", method), zap.String("endpoint", endpoint), zap.Int("status_code", resp.StatusCode), zap.Duration("duration", duration))
if requestErr != nil {
log.Error("Failed to send request", zap.String("method", method), zap.String("endpoint", endpoint), zap.Error(requestErr))
if attempt < maxRetries {
log.Info("Retrying request", zap.Int("attempt", attempt))
time.Sleep(2 * time.Second)
continue
}
return nil, requestErr
}

log.Debug("Request sent successfully", zap.String("method", method), zap.String("endpoint", endpoint), zap.Int("status_code", resp.StatusCode), zap.Duration("duration", duration))

if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return resp, response.HandleAPISuccessResponse(resp, out, log)
}

if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return resp, response.HandleAPISuccessResponse(resp, out, log)
// If status code indicates a server error, retry
if resp.StatusCode >= 500 && attempt < maxRetries {
log.Info("Retrying request due to server error", zap.Int("status_code", resp.StatusCode), zap.Int("attempt", attempt))
time.Sleep(2 * time.Second)
continue
}

return resp, response.HandleAPIErrorResponse(resp, log)
}

return resp, response.HandleAPIErrorResponse(resp, log)
return resp, requestErr
}

// createStreamingMultipartRequestBody creates a streaming multipart request body with the provided files and form fields.
// This function constructs the body of a multipart/form-data request using an io.Pipe, allowing the request to be sent in chunks.
// It supports custom content types and headers for each part of the multipart request, and logs the process for debugging
// and monitoring purposes.

// Parameters:
// - files: A map where the key is the field name and the value is a slice of file paths to be included in the request.
// Each file path corresponds to a file that will be included in the multipart request.
// - formDataFields: A map of additional form fields to be included in the multipart request, where the key is the field name
// and the value is the field value. These are regular form fields that accompany the file uploads.
// - fileContentTypes: A map specifying the content type for each file part. The key is the field name and the value is the
// content type (e.g., "image/jpeg").
// - formDataPartHeaders: A map specifying custom headers for each part of the multipart form data. The key is the field name
// and the value is an http.Header containing the headers for that part.
// - log: An instance of a logger implementing the logger.Logger interface, used to log informational messages, warnings,
// and errors encountered during the construction of the multipart request body.

// Returns:
// - io.Reader: The constructed multipart request body reader. This reader streams the multipart form data payload ready to be sent.
// - string: The content type of the multipart request body. This includes the boundary string used by the multipart writer.
// - error: An error object indicating failure during the construction of the multipart request body. This could be due to issues
// such as file reading errors or multipart writer errors.
func createStreamingMultipartRequestBody(files map[string][]string, formDataFields map[string]string, fileContentTypes map[string]string, formDataPartHeaders map[string]http.Header, log logger.Logger) (io.Reader, string, error) {
pr, pw := io.Pipe()
writer := multipart.NewWriter(pw)
Expand Down Expand Up @@ -216,7 +266,8 @@ func addFilePart(writer *multipart.Writer, fieldName, filePath string, fileConte
}

progressLogger := logUploadProgress(file, fileSize.Size(), log)
if err := chunkFileUpload(file, encoder, log, progressLogger); err != nil {
uploadState := &UploadState{}
if err := chunkFileUpload(file, encoder, log, progressLogger, uploadState); err != nil {
log.Error("Failed to copy file content", zap.String("filePath", filePath), zap.Error(err))
return err
}
Expand Down Expand Up @@ -278,25 +329,43 @@ func setFormDataPartHeader(fieldname, filename, contentType string, customHeader

// chunkFileUpload reads the file upload into chunks and writes it to the writer.
// This function reads the file in chunks and writes it to the provided writer, allowing for progress logging during the upload.
// chunk size is set to 1024 KB (1 MB) by default.
// The chunk size is set to 8192 KB (8 MB) by default. This is a common chunk size used for file uploads to cloud storage services.

// Azure Blob Storage has a minimum chunk size of 4 MB and a maximum of 100 MB for block blobs.
// GCP Cloud Storage has a minimum chunk size of 256 KB and a maximum of 5 GB.
// AWS S3 has a minimum chunk size of 5 MB and a maximum of 5 GB.

// The function also calculates the total number of chunks and logs the chunk number during the upload process.

// Parameters:
// - file: The file to be uploaded.
// - writer: The writer to which the file content will be written.
// - log: An instance of a logger implementing the logger.Logger interface, used to log informational messages, warnings,
// and errors encountered during the file upload.
// - updateProgress: A function to update the upload progress, typically used for logging purposes.
// - uploadState: A pointer to an UploadState struct used to track the progress of the file upload for resumable uploads.

// Returns:
// - error: An error object indicating failure during the file upload. This could be due to issues such as file reading errors
// or writer errors.
func chunkFileUpload(file *os.File, writer io.Writer, log logger.Logger, updateProgress func(int64)) error {
const chunkSize = 1024 * 1024 // 1024 bytes * 1024 (1 MB)
func chunkFileUpload(file *os.File, writer io.Writer, log logger.Logger, updateProgress func(int64), uploadState *UploadState) error {
const chunkSize = 8 * 1024 * 1024 // 8 MB
buffer := make([]byte, chunkSize)
totalWritten := int64(0)
chunkWritten := int64(0)
fileName := filepath.Base(file.Name())

// Seek to the last uploaded byte
file.Seek(uploadState.LastUploadedByte, io.SeekStart)

// Calculate the total number of chunks
fileInfo, err := file.Stat()
if err != nil {
return fmt.Errorf("failed to get file info: %v", err)
}
totalChunks := (fileInfo.Size() + chunkSize - 1) / chunkSize
currentChunk := uploadState.LastUploadedByte / chunkSize

for {
n, err := file.Read(buffer)
if err != nil && err != io.EOF {
Expand All @@ -308,6 +377,10 @@ func chunkFileUpload(file *os.File, writer io.Writer, log logger.Logger, updateP

written, err := writer.Write(buffer[:n])
if err != nil {
// Save the state before returning the error
uploadState.Lock()
uploadState.LastUploadedByte += totalWritten
uploadState.Unlock()
return err
}

Expand All @@ -316,8 +389,11 @@ func chunkFileUpload(file *os.File, writer io.Writer, log logger.Logger, updateP
updateProgress(int64(written))

if chunkWritten >= chunkSize {
currentChunk++
log.Debug("File Upload Chunk Sent",
zap.String("file_name", fileName),
zap.Int64("chunk_number", currentChunk),
zap.Int64("total_chunks", totalChunks),
zap.Int64("kb_sent", chunkWritten/1024),
zap.Int64("total_kb_sent", totalWritten/1024))
chunkWritten = 0
Expand All @@ -326,8 +402,11 @@ func chunkFileUpload(file *os.File, writer io.Writer, log logger.Logger, updateP

// Log any remaining bytes that were written but didn't reach the log threshold
if chunkWritten > 0 {
currentChunk++
log.Debug("Final Upload Chunk Sent",
zap.String("file_name", fileName),
zap.Int64("chunk_number", currentChunk),
zap.Int64("total_chunks", totalChunks),
zap.Int64("kb_sent", chunkWritten/1024),
zap.Int64("total_kb_sent", totalWritten/1024))
}
Expand All @@ -346,6 +425,7 @@ func chunkFileUpload(file *os.File, writer io.Writer, log logger.Logger, updateP

// Returns:
// - func(int64): A function that takes the number of bytes written as an argument and logs the upload progress.
// logUploadProgress logs the upload progress based on the percentage of the total file size.
func logUploadProgress(file *os.File, fileSize int64, log logger.Logger) func(int64) {
var uploaded int64 = 0
const logInterval = 5 // Log every 5% increment
Expand Down
Loading