Skip to content

Commit

Permalink
Merge branch 'main' into support-rdb-v12
Browse files Browse the repository at this point in the history
  • Loading branch information
moticless committed Apr 16, 2024
2 parents 1698435 + 2ffa3fb commit c64a509
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 33 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ name: CI
on:
push:
pull_request:
schedule:
- cron: '0 0 * * 0' # Run every Sunday at midnight UTC

jobs:
build:
runs-on: ubuntu-latest

steps:
- name: Checkout
uses: actions/checkout@v3
Expand Down
2 changes: 1 addition & 1 deletion runtests
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash -ex
#!/bin/bash -e

VALGRIND=0
REDIS_FOLDER=""
Expand Down
9 changes: 7 additions & 2 deletions src/cli/rdb-cli.c
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,11 @@ int readCommonOptions(RdbParser *p, int argc, char* argv[], Options *options, in
return at;
}

void closeLogFileOnExit() {
if (logfile != NULL)
fclose(logfile);
}

int main(int argc, char **argv)
{
Options options;
Expand Down Expand Up @@ -396,6 +401,8 @@ int main(int argc, char **argv)
return RDB_ERR_GENERAL;
}

atexit(closeLogFileOnExit);

/* create the parser and attach it a file reader */
RdbParser *parser = RDB_createParserRdb(NULL);
RDB_setLogLevel(parser, RDB_LOG_INF);
Expand Down Expand Up @@ -425,7 +432,5 @@ int main(int argc, char **argv)

RDB_deleteParser(parser);

fclose(logfile);

return 0;
}
56 changes: 41 additions & 15 deletions src/ext/respToRedisLoader.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,25 @@ static void onReadRepliesError(RdbxRespToRedisLoader *ctx) {
ctx->respReader.countReplies);
}

/* Read 'numToRead' replies from the socket. * Return 0 for success, 1 otherwise. */
static int readReplies(RdbxRespToRedisLoader *ctx, int numToRead) {
/* Read 'numToRead' replies from the socket.
*
* numToRead - minimum number of replies to read from the socket before
* returning.
* sendError - if set, an error occurred while writing to the server. In
* this case the function will try to read replies from the
* server. Maybe one of the replies will contain an error message
* that explains why write got failed. Whether error message is
* received or not, the function will return to the original issue.
*
* Return 0 for success, 1 otherwise. */
static int readReplies(RdbxRespToRedisLoader *ctx, int numToRead, int sendError) {
char buff[REPLY_BUFF_SIZE];

RespReaderCtx *respReader = &ctx->respReader;
size_t countRepliesBefore = respReader->countReplies;
size_t repliesExpected = respReader->countReplies + numToRead;

while (respReader->countReplies < repliesExpected) {
while ((respReader->countReplies < repliesExpected) || (sendError)) {
int bytesReceived = recv(ctx->fd, buff, sizeof(buff), 0);

if (bytesReceived > 0) {
Expand All @@ -71,12 +81,23 @@ static int readReplies(RdbxRespToRedisLoader *ctx, int numToRead) {
onReadRepliesError(ctx);
return 1;
}
continue;
}

/* handle error */

if (sendError)
return 0; /* Failed read error message from dst. Back to original issue. */

} else if (bytesReceived == 0) {
RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP2REDIS_CONN_CLOSE, "Connection closed by the remote side");
if (bytesReceived == 0) {
RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP2REDIS_CONN_CLOSE,
"Connection closed by the remote side");
return 1;
} else {
RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP2REDIS_FAILED_READ, "Failed to recv() from Redis server. (errno=%d)", errno);
RDB_reportError(ctx->p,
(RdbRes) RDBX_ERR_RESP2REDIS_FAILED_READ,
"Failed to recv() from Redis server. errno=%d: %s",
errno, strerror(errno));
return 1;
}
}
Expand Down Expand Up @@ -106,29 +127,30 @@ static int redisLoaderWritev(void *context, struct iovec *iov, int iovCnt,
RdbxRespToRedisLoader *ctx = context;

if (unlikely(ctx->pendingCmds.num == ctx->pendingCmds.pipelineDepth)) {
if (readReplies(ctx, 1 /* at least one */))
if (readReplies(ctx, 1 /* at least one */, 0))
return 1;
}

if (startCmd) recordCommandSent(ctx, startCmd);

while (1)
{
writeResult = writev(ctx->fd, iov, iovCnt);
struct msghdr msg = { .msg_iov = iov, .msg_iovlen = iovCnt };
writeResult = sendmsg(ctx->fd, &msg, MSG_NOSIGNAL /*Ignore SIGPIPE signal*/);

/* check for error */
if (unlikely(writeResult == -1)) {
if (errno == EINTR) {
if ((retries++) >= MAX_EINTR_RETRY) {
RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP2REDIS_FAILED_WRITE,
"Failed to write socket. Exceeded EINTR retry limit");
return 1;
break;
}
continue;
} else {
RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP2REDIS_FAILED_WRITE,
"Failed to write socket (errno=%d)", errno);
return 1;
break;
}
}

Expand All @@ -140,24 +162,28 @@ static int redisLoaderWritev(void *context, struct iovec *iov, int iovCnt,
}

/* if managed to send all iov entries */
if (likely(iovCnt == 0))
break;
if (likely(iovCnt == 0)) {
ctx->pendingCmds.num += endCmd;
return 0;
}

/* Update pointed iov entry. Only partial of its data sent */
iov->iov_len -= writeResult;
iov->iov_base = (char *) iov->iov_base + writeResult;
}

ctx->pendingCmds.num += endCmd;
return 0;
/* Error occurred. Try to receive error msg from dst, which might explain
why write got failed */
readReplies(ctx, 0, 1/*sendError*/);
return 1;
}

/* Flush the pending commands by reading the remaining replies.
* Return 0 for success, 1 otherwise. */
static int redisLoaderFlush(void *context) {
RdbxRespToRedisLoader *ctx = context;
if (ctx->pendingCmds.num)
return readReplies(ctx, ctx->pendingCmds.num);
return readReplies(ctx, ctx->pendingCmds.num, 0);
return 0;
}

Expand Down
33 changes: 25 additions & 8 deletions src/lib/parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ _LIBRDB_API RdbParser *RDB_createParserRdb(RdbMemAlloc *memAlloc) {
p->reader = NULL;
p->cache = NULL;
p->errorMsg[0] = '\0';
p->errorMsgAt = 0;
p->appCbCtx.numBulks = 0;
p->loggerCb = loggerCbDefault;
p->logLevel = RDB_LOG_DBG;
Expand Down Expand Up @@ -276,8 +277,10 @@ _LIBRDB_API void RDB_deleteParser(RdbParser *p) {
}

_LIBRDB_API RdbStatus RDB_parse(RdbParser *p) {
if (p->state == RDB_STATE_CONFIGURING)
if (p->state == RDB_STATE_CONFIGURING) {
IF_NOT_OK_RETURN(finalizeConfig(p, 0));
p->state = RDB_STATE_RUNNING;
}

/* nothing special to do after pause */
if (p->state == RDB_STATE_PAUSED)
Expand All @@ -288,8 +291,10 @@ _LIBRDB_API RdbStatus RDB_parse(RdbParser *p) {

_LIBRDB_API RdbStatus RDB_parseBuff(RdbParser *p, unsigned char *buff, size_t size, int isEOF) {

if (p->state == RDB_STATE_CONFIGURING)
if (p->state == RDB_STATE_CONFIGURING) {
IF_NOT_OK_RETURN(finalizeConfig(p, 1));
p->state = RDB_STATE_RUNNING;
}

if (p->state != RDB_STATE_PAUSED)
{
Expand Down Expand Up @@ -447,28 +452,41 @@ _LIBRDB_API RdbRes RDB_getErrorCode(RdbParser *p) {
}

_LIBRDB_API void RDB_reportError(RdbParser *p, RdbRes e, const char *msg, ...) {
int nchars = 0;
p->errorCode = e;
/* Record errorCode only of the first error */
if (p->errorCode == RDB_OK)
p->errorCode = e;

if (msg == NULL) {
p->errorMsg[0] = '\0';
return;
}

/* RDB_OK & RDB_OK_DONT_PROPAGATE - not a real errors to report */
assert (e != RDB_OK && e != RDB_OK_DONT_PROPAGATE);

/* If error message is too long, then trim it in order to record this last message */
if (p->errorMsgAt > LAST_ERR_MSG_OFFSET) {
p->errorMsgAt = LAST_ERR_MSG_OFFSET;
p->errorMsgAt += snprintf(p->errorMsg + p->errorMsgAt,
MAX_ERROR_MSG - p->errorMsgAt,
"\n... last recorded error message: ...\n");
}
p->errorMsgAt += snprintf(p->errorMsg + p->errorMsgAt, MAX_ERROR_MSG - p->errorMsgAt, "[errcode=%d] ", e);

if (p->state == RDB_STATE_RUNNING) {
nchars = snprintf(p->errorMsg, MAX_ERROR_MSG, "[%s::State=%d] ",
p->errorMsgAt += snprintf(p->errorMsg + p->errorMsgAt,
MAX_ERROR_MSG - p->errorMsgAt, "[%s::State=%d] ",
peInfo[p->parsingElement].funcname,
p->elmCtx.state);
}

va_list args;
va_start(args, msg);
vsnprintf(p->errorMsg + nchars, MAX_ERROR_MSG - nchars, msg, args);
p->errorMsgAt += vsnprintf(p->errorMsg + p->errorMsgAt, MAX_ERROR_MSG - p->errorMsgAt, msg, args);
va_end(args);

if (p->errorMsgAt >= MAX_ERROR_MSG) return;
p->errorMsgAt += snprintf(p->errorMsg + p->errorMsgAt, MAX_ERROR_MSG - p->errorMsgAt, "\n");

RDB_log(p, RDB_LOG_ERR, "%s", p->errorMsg);
}

Expand Down Expand Up @@ -796,7 +814,6 @@ static RdbStatus finalizeConfig(RdbParser *p, int isParseFromBuff) {

resolveMultipleLevelsRegistration(p);

p->state = RDB_STATE_RUNNING;
RDB_log(p, RDB_LOG_INF, "Start processing RDB source");
return RDB_STATUS_OK;
}
Expand Down
12 changes: 9 additions & 3 deletions src/lib/parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
#include "defines.h"
#include "../../api/librdb-api.h"

#define MAX_ERROR_MSG 1024

/* Max error message length. Chain one or more recorded error messages. */
#define MAX_ERROR_MSG 2048
/* When reaching configured offset, keep overwrite the last error message */
#define LAST_ERR_MSG_OFFSET 1600

#define MAX_APP_BULKS 2
#define NOP /*no-op*/
#define IF_NOT_OK_RETURN(cmd) do {RdbStatus s; s = cmd; if (unlikely(s!=RDB_STATUS_OK)) return s;} while (0)
Expand Down Expand Up @@ -289,8 +294,7 @@ typedef struct ElementCtx {
/* The parser can handle one level of nested parsing-elements (PE), whereby a PE
* may be called by another PE and control is returned to the caller once the
* parsing of sub-element is complete. Currently, this functionality is only
* utilized by the raw list PE, which calls upon the raw string PE to parse
* individual string elements. */
* utilized to call raw string PE to parse individual string elements. */
typedef struct ParsingSubElement {

/* let callee knows which element and state to callback once done */
Expand Down Expand Up @@ -373,7 +377,9 @@ struct RdbParser {

/*** error reporting ***/
RdbRes errorCode;

char errorMsg[MAX_ERROR_MSG];
int errorMsgAt;

/*** read RDB from reader VS read RDB from buffer ***/

Expand Down
5 changes: 4 additions & 1 deletion test/test_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ void cleanTmpFolder(void) {
const char *folder_path = "./test/tmp";

DIR *dir = opendir(folder_path);
assert_true(dir != NULL);
if (dir == NULL) {
printf("Failed to open directory: %s\n", folder_path);
assert_true(0);
}

struct dirent *entry;
while ((entry = readdir(dir)) != NULL) {
Expand Down
2 changes: 2 additions & 0 deletions test/test_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,6 @@ void setEnvVar (const char *name, const char *val);
char *substring(char *str, size_t len, char *substr);
void assert_file_payload(const char *filename, char *expData, int expLen, MatchType matchType, int expMatch);

void dummyLogger(RdbLogLevel l, const char *msg);

int printHexDump(const char *addr, size_t len, char *obuf, int obuflen);
47 changes: 45 additions & 2 deletions test/test_main.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ static void test_createReader_missingFile(void **state) {
assert_int_equal(err, RDB_ERR_FAILED_OPEN_RDB_FILE);

/* verify returned error string */
assert_string_equal(RDB_getErrorMessage(parser),
"Failed to open RDB file `./test/dumps/non_exist_file.rdb`: No such file or directory\n");
assert_true(strstr(RDB_getErrorMessage(parser), "Failed to open RDB file"));
RDB_deleteParser(parser);
}

Expand Down Expand Up @@ -136,6 +135,49 @@ static void test_examples(void **state) {
runSystemCmd("make example > /dev/null ");
}

RdbRes handle_start_rdb_report_long_errors(RdbParser *p, void *userData, int rdbVersion) {
UNUSED(userData, rdbVersion);
for (int i = 2 ; i < 1000; i++)
RDB_reportError(p, (RdbRes) i, "Error Report number:%d", i);
return 1001; /* This value will be eventually returned as the error code */
}

static void test_report_long_error(void **state) {
RdbStatus status;
UNUSED(state);
void *user_data = NULL;

RdbHandlersRawCallbacks cb = { .handleStartRdb = handle_start_rdb_report_long_errors };
RdbParser *parser = RDB_createParserRdb(NULL);
RDB_setLogger(parser, dummyLogger);
assert_non_null(RDBX_createReaderFile(parser, "./test/dumps/quicklist2_v11.rdb"));
assert_non_null(RDB_createHandlersRaw(parser, &cb, user_data, NULL));


while ((status = RDB_parse(parser)) == RDB_STATUS_WAIT_MORE_DATA);
assert_int_equal(status, RDB_STATUS_ERROR);
const char *returned = RDB_getErrorMessage(parser);
const char *expected =
"[errcode=2] [elementRdbHeader::State=0] Error Report number:2\n"
"[errcode=3] [elementRdbHeader::State=0] Error Report number:3\n[errcode=4] [elementRdbHeader::State=0] Error Report number:4\n"
"[errcode=5] [elementRdbHeader::State=0] Error Report number:5\n[errcode=6] [elementRdbHeader::State=0] Error Report number:6\n"
"[errcode=7] [elementRdbHeader::State=0] Error Report number:7\n[errcode=8] [elementRdbHeader::State=0] Error Report number:8\n"
"[errcode=9] [elementRdbHeader::State=0] Error Report number:9\n[errcode=10] [elementRdbHeader::State=0] Error Report number:10\n"
"[errcode=11] [elementRdbHeader::State=0] Error Report number:11\n[errcode=12] [elementRdbHeader::State=0] Error Report number:12\n"
"[errcode=13] [elementRdbHeader::State=0] Error Report number:13\n[errcode=14] [elementRdbHeader::State=0] Error Report number:14\n"
"[errcode=15] [elementRdbHeader::State=0] Error Report number:15\n[errcode=16] [elementRdbHeader::State=0] Error Report number:16\n"
"[errcode=17] [elementRdbHeader::State=0] Error Report number:17\n[errcode=18] [elementRdbHeader::State=0] Error Report number:18\n"
"[errcode=19] [elementRdbHeader::State=0] Error Report number:19\n[errcode=20] [elementRdbHeader::State=0] Error Report number:20\n"
"[errcode=21] [elementRdbHeader::State=0] Error Report number:21\n[errcode=22] [elementRdbHeader::State=0] Error Report number:22\n"
"[errcode=23] [elementRdbHeader::State=0] Error Report number:23\n[errcode=24] [elementRdbHeader::State=0] Error Report number:24\n"
"[errcode=25] [elementRdbHeader::State=0] Error Report number:25\n[errcode=26] [elementRdbHeader::State=0] Error Report number:26\n"
"[errcode=27] [el\n... last recorded error message: ...\n[errcode=999] [elementRdbHeader::State=0] Error Report number:999\n";

assert_string_equal(returned, expected);
assert_int_equal(RDB_getErrorCode(parser), 1001);
RDB_deleteParser(parser);
}

static void printResPicture(int result) {
if (result)
printf(" x_x\n"
Expand Down Expand Up @@ -179,6 +221,7 @@ int group_misc(void) {
cmocka_unit_test(test_empty_rdb),
cmocka_unit_test(test_mixed_levels_registration),
cmocka_unit_test(test_checksum),
cmocka_unit_test(test_report_long_error),
cmocka_unit_test(test_not_support_future_rdb_version),
};
return cmocka_run_group_tests(tests, NULL, NULL);
Expand Down

0 comments on commit c64a509

Please sign in to comment.