Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix recv() 120s Timeout on EAGAIN by Retrying Indefinitely #69

Merged
merged 2 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion api/librdb-ext-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ typedef enum {
RDBX_ERR_RESP_INVALID_TARGET_VERSION,
RDBX_ERR_RESP_READ,
RDBX_ERR_RESP2REDIS_CREATE_SOCKET,
RDBX_ERR_RESP2REDIS_CONF_NONBLOCK_SOCKET,
RDBX_ERR_RESP2REDIS_CONF_BLOCK_SOCKET,
RDBX_ERR_RESP2REDIS_INVALID_ADDRESS,
RDBX_ERR_RESP2REDIS_FAILED_CONNECT,
RDBX_ERR_RESP2REDIS_FAILED_READ,
RDBX_ERR_RESP2REDIS_FAILED_WRITE,
RDBX_ERR_RESP2REDIS_CONN_CLOSE,
RDBX_ERR_RESP2REDIS_MAX_RETRIES,
RDBX_ERR_RESP2REDIS_SET_TIMEOUT,
RDBX_ERR_RESP2REDIS_AUTH_FAILED,
} RdbxRes;

/****************************************************************
Expand Down
4 changes: 3 additions & 1 deletion src/cli/rdb-cli.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ static void logger(RdbLogLevel l, const char *msg) {
[RDB_LOG_DBG] = "DEBUG :",
};

if (logfile != NULL)
if (logfile != NULL) {
fprintf(logfile, "%s %s\n", logLevelStr[l], msg);
fflush(logfile);
}

if (l == RDB_LOG_ERR)
printf("%s %s\n", logLevelStr[l], msg);
Expand Down
92 changes: 56 additions & 36 deletions src/ext/respToRedisLoader.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include "extCommon.h"
#include "readerResp.h"

Expand All @@ -14,16 +15,16 @@
#include <openssl/err.h>
#endif

#define PIPELINE_DEPTH_DEF 200 /* Default Number of pending cmds before waiting for response(s) */
#define PIPELINE_DEPTH_MAX 1000 /* limit the max value allowed to configure for pipeline depth */
#define PIPELINE_DEPTH_DEF 200 /* Default Number of pending cmds before waiting for response(s) */
#define PIPELINE_DEPTH_MAX 1000 /* limit the max value allowed to configure for pipeline depth */

#define NUM_RECORDED_CMDS 400 /* Number of commands to backlog, in a cyclic array */
#define RECORDED_KEY_MAX_LEN 40 /* Maximum payload size from any command to record into cyclic array */
#define NUM_RECORDED_CMDS 400 /* Number of commands to backlog, in a cyclic array */
#define RECORDED_KEY_MAX_LEN 40 /* Maximum payload size from any command to record into cyclic array */

#define REPLY_BUFF_SIZE 1024 /* reply buffer size */

#define MAX_EINTR_RETRY 3
#define REPLY_BUFF_SIZE 1024 /* reply buffer size */

#define MAX_EINTR_RETRY 5
#define RECV_CMD_TIMEOUT_SEC 10 /* recv() command timeout in seconds */

struct RdbxRespToRedisLoader {

Expand Down Expand Up @@ -86,22 +87,22 @@ static int onReadRepliesErrorCb(void *context, char *msg) {
*
* numToRead - minimum number of replies to read from the socket before
* returning.
* sendError - if set, an error occurred while writing to the server. In
* sentError - if set, an error occurred while writing to the server. In
* this case the function will try to read replies from the
* server. Maybe one of the replies will contain an error message
* that explains why write got failed. Whether error message is
* received or not, the function will return to the original issue.
*
* Return 0 for success, 1 otherwise. */
static int readReplies(RdbxRespToRedisLoader *ctx, int numToRead, int sendError) {
int noDataEv = 0;
static int readReplies(RdbxRespToRedisLoader *ctx, int numToRead, int sentError) {
int retries = 0;
char buff[REPLY_BUFF_SIZE];

RespReaderCtx *respReader = &ctx->respReader;
size_t countRepliesBefore = respReader->countReplies;
size_t repliesExpected = respReader->countReplies + numToRead;

while ((respReader->countReplies < repliesExpected) || (sendError)) {
while ((respReader->countReplies < repliesExpected) || (sentError)) {
int bytesReceived = recv(ctx->fd, buff, sizeof(buff), 0);

if (bytesReceived > 0) {
Expand All @@ -114,18 +115,25 @@ static int readReplies(RdbxRespToRedisLoader *ctx, int numToRead, int sendError)

/* handle error */

if (sendError)
return 0; /* Failed read error message from dst. Back to original issue. */
if (sentError)
return 0; /* Done lookup for error message. Return to original issue */

if (bytesReceived == 0) {
RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP2REDIS_CONN_CLOSE,
"Connection closed by the remote side");
return 1;
} else {
if ((!noDataEv) && (errno == EAGAIN || errno == EWOULDBLOCK)) {
noDataEv = 1;
RDB_log(ctx->p, RDB_LOG_WRN, "No data available from redis-server");
continue; /* Try one more time (Timeout is 60sec) */
if (errno == EAGAIN || errno == EWOULDBLOCK) {
retries++;
RDB_log(ctx->p, RDB_LOG_INF,
"No reply from redis-server for %d seconds",
retries * RECV_CMD_TIMEOUT_SEC);

/* Parser got external error? Currently Used only for testing */
if (RDB_getErrorCode(ctx->p) != RDB_OK)
return 1;

continue;
}

RDB_reportError(ctx->p,
Expand Down Expand Up @@ -209,7 +217,7 @@ static int redisLoaderWritev(void *context, struct iovec *iov, int iovCnt,

/* Error occurred. Try to receive error msg from dst, which might explain
why write got failed */
readReplies(ctx, 0, 1/*sendError*/);
readReplies(ctx, 0, 1/*sentError*/);
return 1;
}

Expand Down Expand Up @@ -324,18 +332,35 @@ _LIBRDB_API void RDBX_setPipelineDepth(RdbxRespToRedisLoader *r2r, int depth) {
r2r->pendingCmds.pipelineDepth = (depth <= 0 || depth>PIPELINE_DEPTH_MAX) ? PIPELINE_DEPTH_DEF : depth;
}

/* Create a loader from an existing file descriptor */
_LIBRDB_API RdbxRespToRedisLoader *RDBX_createRespToRedisFd(RdbParser *p,
RdbxToResp *rdbToResp,
RdbxRedisAuth *auth,
int fd) {
RdbxRespToRedisLoader *ctx;
if ((ctx = RDB_alloc(p, sizeof(RdbxRespToRedisLoader))) == NULL) {
RDB_reportError(p, (RdbRes) RDBX_ERR_RESP_FAILED_ALLOC,
"Failed to allocate struct RdbxRespToRedisLoader");
/* Ensure the socket is in blocking mode */
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1 || fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == -1) {
RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_CONF_BLOCK_SOCKET,
"Failed to configure for blocking mode. errno=%d: %s",
errno, strerror(errno));
return NULL;
}

/* Set receive timeout (blocking, but with a limit) */
struct timeval timeout = { .tv_sec = RECV_CMD_TIMEOUT_SEC, .tv_usec = 0 };
if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) < 0) {
RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_SET_TIMEOUT,
"Failed to configure for blocking mode. errno=%d: %s",
errno, strerror(errno));
return NULL;
}

RdbxRespToRedisLoader *ctx = RDB_alloc(p, sizeof(RdbxRespToRedisLoader));
if (!ctx) {
RDB_reportError(p, (RdbRes) RDBX_ERR_RESP_FAILED_ALLOC, "Failed to allocate struct RdbxRespToRedisLoader");
return NULL;
}

/* init RdbxRespToRedisLoader context */
memset(ctx, 0, sizeof(RdbxRespToRedisLoader));
ctx->p = p;
ctx->fd = fd;
Expand All @@ -345,23 +370,27 @@ _LIBRDB_API RdbxRespToRedisLoader *RDBX_createRespToRedisFd(RdbParser *p,
readRespInit(&ctx->respReader);
setErrorCb(&ctx->respReader, ctx, onReadRepliesErrorCb);

if (auth && (redisAuth(ctx, auth) != RDB_OK))
if (auth && (redisAuth(ctx, auth) != RDB_OK)) {
RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_AUTH_FAILED, "Redis authentication failed.");
RDB_free(p, ctx);
return NULL;
}

/* Set 'this' writer to rdbToResp */
/* Set writer to rdbToResp */
RdbxRespWriter inst = {ctx, redisLoaderDelete, redisLoaderWritev, redisLoaderFlush};
RDBX_attachRespWriter(rdbToResp, &inst);
return ctx;
}

/* Create a loader and establish a TCP connection */
_LIBRDB_API RdbxRespToRedisLoader *RDBX_createRespToRedisTcp(RdbParser *p,
RdbxToResp *rdbToResp,
RdbxRedisAuth *auth,
const char *hostname,
int port) {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == -1) {
RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_CREATE_SOCKET, "Failed to create tcp socket");
RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_CREATE_SOCKET, "Failed to create TCP socket");
return NULL;
}

Expand All @@ -377,21 +406,12 @@ _LIBRDB_API RdbxRespToRedisLoader *RDBX_createRespToRedisTcp(RdbParser *p,
}

if (connect(sockfd, (struct sockaddr *) &server_addr, sizeof(server_addr)) == -1) {
RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_INVALID_ADDRESS,
RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_FAILED_CONNECT,
"Failed to connect(hostname=%s, port=%d) => errno=%d",
hostname, port, errno);
goto createErr;
}

/* Set the recv() timeout. Avoid blocking forever. */
struct timeval timeout = { .tv_sec = 60, .tv_usec = 0 };
if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) < 0) {
RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_SET_TIMEOUT,
"Failed to set socket receive timeout. errno=%d: %s",
errno, strerror(errno));
goto createErr;
}

RdbxRespToRedisLoader *res = RDBX_createRespToRedisFd(p, rdbToResp, auth, sockfd);

if (!res) goto createErr;
Expand Down
85 changes: 80 additions & 5 deletions test/test_rdb_to_redis.c
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <time.h>
#include <arpa/inet.h>
#include <assert.h>
#include "test_common.h"

int serverMajorVer, serverMinorVer;
Expand Down Expand Up @@ -303,7 +303,7 @@ static void test_rdb_to_redis_function(void **state) {
}

/* test relied on rdbtest module within redis repo, if available */
void test_rdb_to_redis_module(void **state) {
static void test_rdb_to_redis_module(void **state) {
UNUSED(state);

/* Skip test if testrdb is not loaded */
Expand Down Expand Up @@ -360,7 +360,7 @@ static void test_rdb_to_redis_module_aux_empty(void **state) {
rdb_to_tcp(DUMP_FOLDER("module_aux_empty.rdb"), 1, 1, NULL);
}

void test_rdb_to_redis_stream(void **state) {
static void test_rdb_to_redis_stream(void **state) {
UNUSED(state);
test_rdb_to_redis_common(DUMP_FOLDER("stream_v11.rdb"), 1, NULL, NULL);
}
Expand Down Expand Up @@ -523,6 +523,80 @@ static void test_rdb_to_redis_func_lib_replace_if_exist(void **state) {
}
}

/* Create dummy TCP server that doesn't respond to the client and verify that
* the parser retries after TIMEOUT_SECONDS. Not part of CI since it takes to long */
int countdownRetries;
RdbParser *parser;
void dummyTcpTimeoutLogger(RdbLogLevel l, const char *msg) {
UNUSED(l);
if (strstr(msg, "No reply from redis-server for") != NULL)
if (--countdownRetries == 0)
RDB_reportError(parser, (RdbRes)12345678, "Inject error to end the test");
}
void test_rdb_tcp_timeout(void **state) {
UNUSED(state);
const int RECV_TIMEOUT_SECONDS = 10; /* socket retry timeout */
int test_retries = 3; /* limit test to finite number of retries */
int server_fd, client_fd;
struct sockaddr_in server_addr, client_addr;
socklen_t client_len = sizeof(client_addr);

/* expected to retry 3 times before ending the test */
countdownRetries = test_retries;

/* Dummy TCP server that only receives messages but does not respond */
server_fd = socket(AF_INET, SOCK_STREAM, 0);
assert_true(server_fd >= 0);

int opt = 1;
setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(0);
assert_int_equal(bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)), 0);
socklen_t addr_len = sizeof(server_addr);
assert_int_equal(getsockname(server_fd, (struct sockaddr *)&server_addr, &addr_len), 0);
int assigned_port = ntohs(server_addr.sin_port);
assert_int_equal(listen(server_fd, 1), 0);
printf("Dummy TCP server started, waiting for client to connect...\n");

parser = RDB_createParserRdb(NULL);
RDB_setLogLevel(parser, RDB_LOG_INF);
RDB_setLogger(parser, dummyTcpTimeoutLogger);
assert_non_null(RDBX_createReaderFile(parser, DUMP_FOLDER("single_key.rdb")));

RdbxToRespConf rdb2respConf = {
.supportRestore = 1,
.dstRedisVersion = getTargetRedisVersion(NULL, NULL),
.supportRestoreModuleAux = isSupportRestoreModuleAux()
};

RdbxToResp *rdbToResp;
assert_non_null(rdbToResp = RDBX_createHandlersToResp(parser, &rdb2respConf));
assert_non_null(RDBX_createRespToRedisTcp(parser, rdbToResp, NULL, "127.0.0.1", assigned_port));

client_fd = accept(server_fd, (struct sockaddr *)&client_addr, &client_len);
assert_true(client_fd >= 0);

/* Start the timer to measure timeout and run parser */
time_t start_time = time(NULL);
RdbStatus status;
while ((status = RDB_parse(parser)) == RDB_STATUS_WAIT_MORE_DATA);

/* Verify dummy error code */
assert_int_equal(RDB_getErrorCode(parser), 12345678);

/* Measure elapsed time and verify it's within the expected range */
time_t elapsedTime = time(NULL) - start_time;
int expectedTime = RECV_TIMEOUT_SECONDS * test_retries;
printf("Elapsed time: %ld, expected time: %d\n", elapsedTime, expectedTime);
assert_in_range(elapsedTime, expectedTime - 2, expectedTime + 2);

RDB_deleteParser(parser);
close(client_fd);
close(server_fd);
}

/*************************** group_rdb_to_redis *******************************/
int group_rdb_to_redis(void) {

Expand Down Expand Up @@ -580,6 +654,7 @@ int group_rdb_to_redis(void) {
cmocka_unit_test_setup(test_rdb_to_redis_multiple_dbs, setupTest),
cmocka_unit_test_setup(test_rdb_to_redis_function, setupTest),
cmocka_unit_test_setup(test_rdb_to_redis_func_lib_replace_if_exist, setupTest),
//cmocka_unit_test_setup(test_rdb_tcp_timeout, setupTest), /* too long to run */
};

int res = cmocka_run_group_tests(tests, NULL, NULL);
Expand Down