diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 156c769..d927559 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,11 +3,12 @@ name: CI on: push: pull_request: + schedule: + - cron: '0 0 * * 0' # Run every Sunday at midnight UTC jobs: build: runs-on: ubuntu-latest - steps: - name: Checkout uses: actions/checkout@v3 diff --git a/runtests b/runtests index 9a7bf75..1cb620d 100755 --- a/runtests +++ b/runtests @@ -1,4 +1,4 @@ -#!/bin/bash -ex +#!/bin/bash -e VALGRIND=0 REDIS_FOLDER="" diff --git a/src/cli/rdb-cli.c b/src/cli/rdb-cli.c index 6ff59ef..00a8173 100644 --- a/src/cli/rdb-cli.c +++ b/src/cli/rdb-cli.c @@ -365,6 +365,11 @@ int readCommonOptions(RdbParser *p, int argc, char* argv[], Options *options, in return at; } +void closeLogFileOnExit() { + if (logfile != NULL) + fclose(logfile); +} + int main(int argc, char **argv) { Options options; @@ -396,6 +401,8 @@ int main(int argc, char **argv) return RDB_ERR_GENERAL; } + atexit(closeLogFileOnExit); + /* create the parser and attach it a file reader */ RdbParser *parser = RDB_createParserRdb(NULL); RDB_setLogLevel(parser, RDB_LOG_INF); @@ -425,7 +432,5 @@ int main(int argc, char **argv) RDB_deleteParser(parser); - fclose(logfile); - return 0; } diff --git a/src/ext/respToRedisLoader.c b/src/ext/respToRedisLoader.c index 410091d..e30b3a9 100644 --- a/src/ext/respToRedisLoader.c +++ b/src/ext/respToRedisLoader.c @@ -54,15 +54,25 @@ static void onReadRepliesError(RdbxRespToRedisLoader *ctx) { ctx->respReader.countReplies); } -/* Read 'numToRead' replies from the socket. * Return 0 for success, 1 otherwise. */ -static int readReplies(RdbxRespToRedisLoader *ctx, int numToRead) { +/* Read 'numToRead' replies from the socket. + * + * numToRead - minimum number of replies to read from the socket before + * returning. + * sendError - if set, an error occurred while writing to the server. In + * this case the function will try to read replies from the + * server. Maybe one of the replies will contain an error message + * that explains why write got failed. Whether error message is + * received or not, the function will return to the original issue. + * + * Return 0 for success, 1 otherwise. */ +static int readReplies(RdbxRespToRedisLoader *ctx, int numToRead, int sendError) { char buff[REPLY_BUFF_SIZE]; RespReaderCtx *respReader = &ctx->respReader; size_t countRepliesBefore = respReader->countReplies; size_t repliesExpected = respReader->countReplies + numToRead; - while (respReader->countReplies < repliesExpected) { + while ((respReader->countReplies < repliesExpected) || (sendError)) { int bytesReceived = recv(ctx->fd, buff, sizeof(buff), 0); if (bytesReceived > 0) { @@ -71,12 +81,23 @@ static int readReplies(RdbxRespToRedisLoader *ctx, int numToRead) { onReadRepliesError(ctx); return 1; } + continue; + } + + /* handle error */ + + if (sendError) + return 0; /* Failed read error message from dst. Back to original issue. */ - } else if (bytesReceived == 0) { - RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP2REDIS_CONN_CLOSE, "Connection closed by the remote side"); + if (bytesReceived == 0) { + RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP2REDIS_CONN_CLOSE, + "Connection closed by the remote side"); return 1; } else { - RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP2REDIS_FAILED_READ, "Failed to recv() from Redis server. (errno=%d)", errno); + RDB_reportError(ctx->p, + (RdbRes) RDBX_ERR_RESP2REDIS_FAILED_READ, + "Failed to recv() from Redis server. errno=%d: %s", + errno, strerror(errno)); return 1; } } @@ -106,7 +127,7 @@ static int redisLoaderWritev(void *context, struct iovec *iov, int iovCnt, RdbxRespToRedisLoader *ctx = context; if (unlikely(ctx->pendingCmds.num == ctx->pendingCmds.pipelineDepth)) { - if (readReplies(ctx, 1 /* at least one */)) + if (readReplies(ctx, 1 /* at least one */, 0)) return 1; } @@ -114,7 +135,8 @@ static int redisLoaderWritev(void *context, struct iovec *iov, int iovCnt, while (1) { - writeResult = writev(ctx->fd, iov, iovCnt); + struct msghdr msg = { .msg_iov = iov, .msg_iovlen = iovCnt }; + writeResult = sendmsg(ctx->fd, &msg, MSG_NOSIGNAL /*Ignore SIGPIPE signal*/); /* check for error */ if (unlikely(writeResult == -1)) { @@ -122,13 +144,13 @@ static int redisLoaderWritev(void *context, struct iovec *iov, int iovCnt, if ((retries++) >= MAX_EINTR_RETRY) { RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP2REDIS_FAILED_WRITE, "Failed to write socket. Exceeded EINTR retry limit"); - return 1; + break; } continue; } else { RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP2REDIS_FAILED_WRITE, "Failed to write socket (errno=%d)", errno); - return 1; + break; } } @@ -140,16 +162,20 @@ static int redisLoaderWritev(void *context, struct iovec *iov, int iovCnt, } /* if managed to send all iov entries */ - if (likely(iovCnt == 0)) - break; + if (likely(iovCnt == 0)) { + ctx->pendingCmds.num += endCmd; + return 0; + } /* Update pointed iov entry. Only partial of its data sent */ iov->iov_len -= writeResult; iov->iov_base = (char *) iov->iov_base + writeResult; } - ctx->pendingCmds.num += endCmd; - return 0; + /* Error occurred. Try to receive error msg from dst, which might explain + why write got failed */ + readReplies(ctx, 0, 1/*sendError*/); + return 1; } /* Flush the pending commands by reading the remaining replies. @@ -157,7 +183,7 @@ static int redisLoaderWritev(void *context, struct iovec *iov, int iovCnt, static int redisLoaderFlush(void *context) { RdbxRespToRedisLoader *ctx = context; if (ctx->pendingCmds.num) - return readReplies(ctx, ctx->pendingCmds.num); + return readReplies(ctx, ctx->pendingCmds.num, 0); return 0; } diff --git a/src/lib/parser.c b/src/lib/parser.c index 76d6210..3e9c606 100644 --- a/src/lib/parser.c +++ b/src/lib/parser.c @@ -207,6 +207,7 @@ _LIBRDB_API RdbParser *RDB_createParserRdb(RdbMemAlloc *memAlloc) { p->reader = NULL; p->cache = NULL; p->errorMsg[0] = '\0'; + p->errorMsgAt = 0; p->appCbCtx.numBulks = 0; p->loggerCb = loggerCbDefault; p->logLevel = RDB_LOG_DBG; @@ -276,8 +277,10 @@ _LIBRDB_API void RDB_deleteParser(RdbParser *p) { } _LIBRDB_API RdbStatus RDB_parse(RdbParser *p) { - if (p->state == RDB_STATE_CONFIGURING) + if (p->state == RDB_STATE_CONFIGURING) { IF_NOT_OK_RETURN(finalizeConfig(p, 0)); + p->state = RDB_STATE_RUNNING; + } /* nothing special to do after pause */ if (p->state == RDB_STATE_PAUSED) @@ -288,8 +291,10 @@ _LIBRDB_API RdbStatus RDB_parse(RdbParser *p) { _LIBRDB_API RdbStatus RDB_parseBuff(RdbParser *p, unsigned char *buff, size_t size, int isEOF) { - if (p->state == RDB_STATE_CONFIGURING) + if (p->state == RDB_STATE_CONFIGURING) { IF_NOT_OK_RETURN(finalizeConfig(p, 1)); + p->state = RDB_STATE_RUNNING; + } if (p->state != RDB_STATE_PAUSED) { @@ -447,28 +452,41 @@ _LIBRDB_API RdbRes RDB_getErrorCode(RdbParser *p) { } _LIBRDB_API void RDB_reportError(RdbParser *p, RdbRes e, const char *msg, ...) { - int nchars = 0; - p->errorCode = e; + /* Record errorCode only of the first error */ + if (p->errorCode == RDB_OK) + p->errorCode = e; if (msg == NULL) { - p->errorMsg[0] = '\0'; return; } /* RDB_OK & RDB_OK_DONT_PROPAGATE - not a real errors to report */ assert (e != RDB_OK && e != RDB_OK_DONT_PROPAGATE); + /* If error message is too long, then trim it in order to record this last message */ + if (p->errorMsgAt > LAST_ERR_MSG_OFFSET) { + p->errorMsgAt = LAST_ERR_MSG_OFFSET; + p->errorMsgAt += snprintf(p->errorMsg + p->errorMsgAt, + MAX_ERROR_MSG - p->errorMsgAt, + "\n... last recorded error message: ...\n"); + } + p->errorMsgAt += snprintf(p->errorMsg + p->errorMsgAt, MAX_ERROR_MSG - p->errorMsgAt, "[errcode=%d] ", e); + if (p->state == RDB_STATE_RUNNING) { - nchars = snprintf(p->errorMsg, MAX_ERROR_MSG, "[%s::State=%d] ", + p->errorMsgAt += snprintf(p->errorMsg + p->errorMsgAt, + MAX_ERROR_MSG - p->errorMsgAt, "[%s::State=%d] ", peInfo[p->parsingElement].funcname, p->elmCtx.state); } va_list args; va_start(args, msg); - vsnprintf(p->errorMsg + nchars, MAX_ERROR_MSG - nchars, msg, args); + p->errorMsgAt += vsnprintf(p->errorMsg + p->errorMsgAt, MAX_ERROR_MSG - p->errorMsgAt, msg, args); va_end(args); + if (p->errorMsgAt >= MAX_ERROR_MSG) return; + p->errorMsgAt += snprintf(p->errorMsg + p->errorMsgAt, MAX_ERROR_MSG - p->errorMsgAt, "\n"); + RDB_log(p, RDB_LOG_ERR, "%s", p->errorMsg); } @@ -796,7 +814,6 @@ static RdbStatus finalizeConfig(RdbParser *p, int isParseFromBuff) { resolveMultipleLevelsRegistration(p); - p->state = RDB_STATE_RUNNING; RDB_log(p, RDB_LOG_INF, "Start processing RDB source"); return RDB_STATUS_OK; } diff --git a/src/lib/parser.h b/src/lib/parser.h index a2a2bbf..1059dde 100644 --- a/src/lib/parser.h +++ b/src/lib/parser.h @@ -6,7 +6,12 @@ #include "defines.h" #include "../../api/librdb-api.h" -#define MAX_ERROR_MSG 1024 + +/* Max error message length. Chain one or more recorded error messages. */ +#define MAX_ERROR_MSG 2048 +/* When reaching configured offset, keep overwrite the last error message */ +#define LAST_ERR_MSG_OFFSET 1600 + #define MAX_APP_BULKS 2 #define NOP /*no-op*/ #define IF_NOT_OK_RETURN(cmd) do {RdbStatus s; s = cmd; if (unlikely(s!=RDB_STATUS_OK)) return s;} while (0) @@ -289,8 +294,7 @@ typedef struct ElementCtx { /* The parser can handle one level of nested parsing-elements (PE), whereby a PE * may be called by another PE and control is returned to the caller once the * parsing of sub-element is complete. Currently, this functionality is only - * utilized by the raw list PE, which calls upon the raw string PE to parse - * individual string elements. */ + * utilized to call raw string PE to parse individual string elements. */ typedef struct ParsingSubElement { /* let callee knows which element and state to callback once done */ @@ -373,7 +377,9 @@ struct RdbParser { /*** error reporting ***/ RdbRes errorCode; + char errorMsg[MAX_ERROR_MSG]; + int errorMsgAt; /*** read RDB from reader VS read RDB from buffer ***/ diff --git a/test/test_common.c b/test/test_common.c index c371e1c..7476daa 100644 --- a/test/test_common.c +++ b/test/test_common.c @@ -105,7 +105,10 @@ void cleanTmpFolder(void) { const char *folder_path = "./test/tmp"; DIR *dir = opendir(folder_path); - assert_true(dir != NULL); + if (dir == NULL) { + printf("Failed to open directory: %s\n", folder_path); + assert_true(0); + } struct dirent *entry; while ((entry = readdir(dir)) != NULL) { diff --git a/test/test_common.h b/test/test_common.h index 50b13c4..a5d486a 100644 --- a/test/test_common.h +++ b/test/test_common.h @@ -76,4 +76,6 @@ void setEnvVar (const char *name, const char *val); char *substring(char *str, size_t len, char *substr); void assert_file_payload(const char *filename, char *expData, int expLen, MatchType matchType, int expMatch); +void dummyLogger(RdbLogLevel l, const char *msg); + int printHexDump(const char *addr, size_t len, char *obuf, int obuflen); diff --git a/test/test_main.c b/test/test_main.c index c497dc3..5c2b280 100644 --- a/test/test_main.c +++ b/test/test_main.c @@ -18,8 +18,7 @@ static void test_createReader_missingFile(void **state) { assert_int_equal(err, RDB_ERR_FAILED_OPEN_RDB_FILE); /* verify returned error string */ - assert_string_equal(RDB_getErrorMessage(parser), - "Failed to open RDB file `./test/dumps/non_exist_file.rdb`: No such file or directory\n"); + assert_true(strstr(RDB_getErrorMessage(parser), "Failed to open RDB file")); RDB_deleteParser(parser); } @@ -136,6 +135,49 @@ static void test_examples(void **state) { runSystemCmd("make example > /dev/null "); } +RdbRes handle_start_rdb_report_long_errors(RdbParser *p, void *userData, int rdbVersion) { + UNUSED(userData, rdbVersion); + for (int i = 2 ; i < 1000; i++) + RDB_reportError(p, (RdbRes) i, "Error Report number:%d", i); + return 1001; /* This value will be eventually returned as the error code */ +} + +static void test_report_long_error(void **state) { + RdbStatus status; + UNUSED(state); + void *user_data = NULL; + + RdbHandlersRawCallbacks cb = { .handleStartRdb = handle_start_rdb_report_long_errors }; + RdbParser *parser = RDB_createParserRdb(NULL); + RDB_setLogger(parser, dummyLogger); + assert_non_null(RDBX_createReaderFile(parser, "./test/dumps/quicklist2_v11.rdb")); + assert_non_null(RDB_createHandlersRaw(parser, &cb, user_data, NULL)); + + + while ((status = RDB_parse(parser)) == RDB_STATUS_WAIT_MORE_DATA); + assert_int_equal(status, RDB_STATUS_ERROR); + const char *returned = RDB_getErrorMessage(parser); + const char *expected = + "[errcode=2] [elementRdbHeader::State=0] Error Report number:2\n" + "[errcode=3] [elementRdbHeader::State=0] Error Report number:3\n[errcode=4] [elementRdbHeader::State=0] Error Report number:4\n" + "[errcode=5] [elementRdbHeader::State=0] Error Report number:5\n[errcode=6] [elementRdbHeader::State=0] Error Report number:6\n" + "[errcode=7] [elementRdbHeader::State=0] Error Report number:7\n[errcode=8] [elementRdbHeader::State=0] Error Report number:8\n" + "[errcode=9] [elementRdbHeader::State=0] Error Report number:9\n[errcode=10] [elementRdbHeader::State=0] Error Report number:10\n" + "[errcode=11] [elementRdbHeader::State=0] Error Report number:11\n[errcode=12] [elementRdbHeader::State=0] Error Report number:12\n" + "[errcode=13] [elementRdbHeader::State=0] Error Report number:13\n[errcode=14] [elementRdbHeader::State=0] Error Report number:14\n" + "[errcode=15] [elementRdbHeader::State=0] Error Report number:15\n[errcode=16] [elementRdbHeader::State=0] Error Report number:16\n" + "[errcode=17] [elementRdbHeader::State=0] Error Report number:17\n[errcode=18] [elementRdbHeader::State=0] Error Report number:18\n" + "[errcode=19] [elementRdbHeader::State=0] Error Report number:19\n[errcode=20] [elementRdbHeader::State=0] Error Report number:20\n" + "[errcode=21] [elementRdbHeader::State=0] Error Report number:21\n[errcode=22] [elementRdbHeader::State=0] Error Report number:22\n" + "[errcode=23] [elementRdbHeader::State=0] Error Report number:23\n[errcode=24] [elementRdbHeader::State=0] Error Report number:24\n" + "[errcode=25] [elementRdbHeader::State=0] Error Report number:25\n[errcode=26] [elementRdbHeader::State=0] Error Report number:26\n" + "[errcode=27] [el\n... last recorded error message: ...\n[errcode=999] [elementRdbHeader::State=0] Error Report number:999\n"; + + assert_string_equal(returned, expected); + assert_int_equal(RDB_getErrorCode(parser), 1001); + RDB_deleteParser(parser); +} + static void printResPicture(int result) { if (result) printf(" x_x\n" @@ -179,6 +221,7 @@ int group_misc(void) { cmocka_unit_test(test_empty_rdb), cmocka_unit_test(test_mixed_levels_registration), cmocka_unit_test(test_checksum), + cmocka_unit_test(test_report_long_error), cmocka_unit_test(test_not_support_future_rdb_version), }; return cmocka_run_group_tests(tests, NULL, NULL);