Skip to content

Commit

Permalink
Merge pull request #232 from snowflakedb/vreddy-Athena-SNOW-195522-sc…
Browse files Browse the repository at this point in the history
…ale-test

Athena customer is running 1000 Concurrent put processes, each uploading 800 files.
In some instances we have seen failed uploads, and adding retry at this level worked.
  • Loading branch information
sfc-gh-sshankar authored Oct 13, 2020
2 parents 7748054 + 0e5be6a commit b078274
Show file tree
Hide file tree
Showing 5 changed files with 357 additions and 40 deletions.
84 changes: 45 additions & 39 deletions cpp/FileTransferAgent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
#include "logger/SFLogger.hpp"
#include "snowflake/platform.h"
#include <chrono>

#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif
using ::std::string;
using ::std::vector;
using ::Snowflake::Client::RemoteStorageRequestOutcome;
Expand Down Expand Up @@ -398,48 +402,50 @@ RemoteStorageRequestOutcome Snowflake::Client::FileTransferAgent::uploadSingleFi
m_FileMetadataInitializer.initEncryptionMetadata(fileMetadata);
CXX_LOG_TRACE("Encryption metadata init done");

std::basic_iostream<char> *srcFileStream;
::std::fstream fs;

if (m_uploadStream)
{
srcFileStream = m_uploadStream;
}
else
{
try {
fs = ::std::fstream(fileMetadata->srcFileToUpload.c_str(),
::std::ios_base::in |
::std::ios_base::binary);
RemoteStorageRequestOutcome outcome = RemoteStorageRequestOutcome::SUCCESS;
RetryContext putRetryCtx(fileMetadata->srcFileName);
do
{
//Sleeps only when its a retry
putRetryCtx.waitForNextRetry();
std::basic_iostream<char> *srcFileStream;
::std::fstream fs;

if (m_uploadStream) {
srcFileStream = m_uploadStream;
} else {
try {
fs = ::std::fstream(fileMetadata->srcFileToUpload.c_str(),
::std::ios_base::in |
::std::ios_base::binary);
}
catch (...) {
std::string err = "Could not open source file " + fileMetadata->srcFileToUpload;
throw SnowflakeTransferException(TransferError::FAILED_TO_TRANSFER, err.c_str());
}
srcFileStream = &fs;
}
catch(...)

Crypto::CipherIOStream inputEncryptStream(*srcFileStream,
Crypto::CryptoOperation::ENCRYPT,
fileMetadata->encryptionMetadata.fileKey,
fileMetadata->encryptionMetadata.iv,
FILE_ENCRYPTION_BLOCK_SIZE);

fileMetadata->recordPutGetTimestamp(FileMetadata::PUT_START);
// upload stream
outcome = client->upload(fileMetadata, &inputEncryptStream);
fileMetadata->recordPutGetTimestamp(FileMetadata::PUT_END);
CXX_LOG_DEBUG("File upload done.");
if (fs.is_open())
{
std::string err= "Could not open source file " + fileMetadata->srcFileToUpload ;
throw SnowflakeTransferException(TransferError::FAILED_TO_TRANSFER, err.c_str());
fs.close();
}
srcFileStream = &fs;
}

Crypto::CipherIOStream inputEncryptStream(*srcFileStream,
Crypto::CryptoOperation::ENCRYPT,
fileMetadata->encryptionMetadata.fileKey,
fileMetadata->encryptionMetadata.iv,
FILE_ENCRYPTION_BLOCK_SIZE);

fileMetadata->recordPutGetTimestamp(FileMetadata::PUT_START);
// upload stream
RemoteStorageRequestOutcome outcome = client->upload(fileMetadata,
&inputEncryptStream);
fileMetadata->recordPutGetTimestamp(FileMetadata::PUT_END);
CXX_LOG_DEBUG("File upload done.");
if (fs.is_open())
{
fs.close();
}

m_executionResults->SetTransferOutCome(outcome, resultIndex);
fileMetadata->recordPutGetTimestamp(FileMetadata::PUTGET_END);
fileMetadata->printPutGetTimestamp();
m_executionResults->SetTransferOutCome(outcome, resultIndex);
fileMetadata->recordPutGetTimestamp(FileMetadata::PUTGET_END);
fileMetadata->printPutGetTimestamp();
} while (putRetryCtx.isRetryable(outcome));
CXX_LOG_DEBUG("Exit UploadSingleFile");
return outcome;
}
Expand Down
101 changes: 101 additions & 0 deletions cpp/FileTransferAgent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
#include "FileMetadata.hpp"
#include "FileMetadataInitializer.hpp"
#include "snowflake/platform.h"
#include <algorithm>
#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif

#define FILE_ENCRYPTION_BLOCK_SIZE 128

Expand All @@ -27,6 +33,101 @@ class IStorageClient;

class FileTransferExecutionResult;

constexpr unsigned long MILLI_SECONDS_IN_SECOND = 1000;

class RetryContext
{
public:
RetryContext(std::string &fileName):
m_retryCount(0),
m_putFileName(fileName),
m_maxRetryCount(10),
m_minSleepTimeInMs(3 * MILLI_SECONDS_IN_SECOND), //3 seconds
m_maxSleepTimeInMs(180 * MILLI_SECONDS_IN_SECOND), //180 seconds is the max sleep time
m_timeoutInMs(600 * MILLI_SECONDS_IN_SECOND) // timeout 600 seconds.
{
m_startTime = (unsigned long)time(NULL);
}

/**
* It is retryable if put file status is failed
* And retry count is in the limits
* And total elapsed time is less than the timeout value specified.
*
* @param putStatus: Put upload status.
* @return whether to retry or not.
*/
bool isRetryable(RemoteStorageRequestOutcome putStatus)
{
//If putStatus is not SUCCESS and not TOKEN_EXPIRED then put is retryable
bool isPutInRetryableState = ((putStatus != RemoteStorageRequestOutcome::SUCCESS) &&
(putStatus != RemoteStorageRequestOutcome::TOKEN_EXPIRED)) ;
//If file upload is successful in a retry log it
if(putStatus == RemoteStorageRequestOutcome::SUCCESS && m_retryCount > 1)
{
CXX_LOG_DEBUG("After %d retry put %s successfully uploaded.", m_retryCount-1, m_putFileName.c_str());
}
unsigned long elapsedTime = time(NULL) - m_startTime;
return isPutInRetryableState && m_retryCount <= m_maxRetryCount && elapsedTime < m_timeoutInMs;
}

/**
* get's next sleep time and sleeps
* sleep time in milli seconds.
*/
void waitForNextRetry()
{
unsigned long sleepTime = retrySleepTimeInMs();
if(sleepTime > 0) // Sleep only in the retries.
{
#ifdef _WIN32
Sleep(sleepTime); // Sleep for sleepTime milli seconds (Sleep(<time in milliseconds>) in windows)
#else
usleep(sleepTime * 1000); // usleep takes micro seconds as input param and sleepTime is in milli's
#endif
CXX_LOG_DEBUG("Retry count %d, Retrying after %ld milli seconds put file %s.", m_retryCount, sleepTime, m_putFileName.c_str());
}
++m_retryCount;
}

private:
unsigned long m_retryCount;
unsigned long m_maxRetryCount;
unsigned long m_minSleepTimeInMs;
unsigned long m_maxSleepTimeInMs;
unsigned long m_timeoutInMs;
unsigned long m_startTime;
std::string m_putFileName;
/**
* When retryCount is 0 its the initial try for put and not a retry so return 0
* XP’s backoff strategy (exponential backoff time with jitter).
* start sleep time 3 second
* max sleep time is 180 second,
* Jitter factor is 0.5.
* For example, the expected_sleep_time is 3, 6, 12, 24, etc
* The sleep time is (expected_sleep_time/2 + a random number between [0, expected_sleep_time/2))
* expected_sleep_time = 6
* return's (6/2 + (rand() % 3) )
* @return returns sleep time in milli seconds.
*/
unsigned long retrySleepTimeInMs()
{
if(m_retryCount == 0 ) {
return 0; //When its initial put (and not a retry)
}

unsigned long expectedSleepTimeInMs = m_minSleepTimeInMs * pow(2, (m_retryCount-1));

expectedSleepTimeInMs = (std::min)(expectedSleepTimeInMs, m_maxSleepTimeInMs);

unsigned long jitterInMs = (unsigned long)(rand() % (expectedSleepTimeInMs/2));

expectedSleepTimeInMs = (unsigned long)((expectedSleepTimeInMs/2) + jitterInMs);

return expectedSleepTimeInMs ;
}
};

/**
* This is the main class to external component (c api or ODBC)
* External component should implement IStatement interface to submit put
Expand Down
2 changes: 1 addition & 1 deletion include/snowflake/version.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
#ifndef SNOWFLAKE_CLIENT_VERSION_H
#define SNOWFLAKE_CLIENT_VERSION_H

#define SF_API_VERSION "0.5.9"
#define SF_API_VERSION "0.5.10"

#endif /* SNOWFLAKE_CLIENT_VERSION_H */
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ SET(TESTS_CXX
test_unit_file_metadata_init
test_unit_file_type_detect
test_unit_stream_splitter
test_unit_put_retry
test_unit_thread_pool
test_unit_base64
#test_cpp_select1
Expand Down
Loading

0 comments on commit b078274

Please sign in to comment.