Skip to content

Commit

Permalink
RESP2REDIS: Should not fail trying to load empty module
Browse files Browse the repository at this point in the history
  • Loading branch information
moticless committed Aug 8, 2024
1 parent f953088 commit 73e9680
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 18 deletions.
6 changes: 6 additions & 0 deletions api/librdb-ext-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ typedef enum {
RDBX_ERR_RESP2REDIS_FAILED_WRITE,
RDBX_ERR_RESP2REDIS_CONN_CLOSE,
RDBX_ERR_RESP2REDIS_MAX_RETRIES,
RDBX_ERR_RESP2REDIS_SET_TIMEOUT,
} RdbxRes;

/****************************************************************
Expand Down Expand Up @@ -203,9 +204,14 @@ typedef struct RdbxRespWriterStartCmd {
/* Redis Command name (Ex: "SET", "RESTORE"). Owned by the caller. It is
* constant static string and Valid for ref behind the duration of the call. */
const char *cmd;

/* If key available as part of command. Else empty string.
* Owned by the caller. */
const char *key;

/* On restore command, size of serialized data. Otherwise, set to 0. */
size_t restoreSize;

} RdbxRespWriterStartCmd;

typedef struct RdbxRespWriter {
Expand Down
19 changes: 19 additions & 0 deletions src/ext/handlersToResp.c
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ static inline RdbRes onWriteNewCmdDbg(RdbxToResp *ctx) {
RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "SET";
startCmd.key = KEY_CMD_ID_DBG;
startCmd.restoreSize = 0;

struct iovec iov[7];
/* write SET */
Expand Down Expand Up @@ -299,6 +300,7 @@ static inline RdbRes sendFirstRestoreFrag(RdbxToResp *ctx, RdbBulk frag, size_t
RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "RESTORE";
startCmd.key = ctx->keyCtx.key;
startCmd.restoreSize = ctx->restoreCtx.restoreSize;

/* writev RESTORE */
char cmd[64];
Expand Down Expand Up @@ -329,6 +331,7 @@ static inline RdbRes sendFirstRestoreFragModuleAux(RdbxToResp *ctx, RdbBulk frag
RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "RESTOREMODAUX";
startCmd.key = "";
startCmd.restoreSize = ctx->restoreCtx.restoreSize;

/* writev RESTOREMODAUX */
iov[0].iov_base = ctx->restoreCtx.moduleAux.cmdPrefix;
Expand Down Expand Up @@ -357,6 +360,7 @@ static RdbRes toRespNewDb(RdbParser *p, void *userData, int dbid) {
RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "SELECT";
startCmd.key = "";
startCmd.restoreSize = 0;

IOV_CONST(&iov[0], "*2\r\n$6\r\nSELECT");
IOV_LENGTH(&iov[1], cnt, cntStr);
Expand Down Expand Up @@ -397,6 +401,7 @@ static RdbRes toRespNewKey(RdbParser *p, void *userData, RdbBulk key, RdbKeyInfo
RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "DEL";
startCmd.key = ctx->keyCtx.key;
startCmd.restoreSize = 0;

IOV_CONST(&iov[0], "*2\r\n$3\r\nDEL");
IOV_LENGTH(&iov[1], ctx->keyCtx.keyLen, keyLenStr);
Expand All @@ -418,6 +423,7 @@ static RdbRes toRespEndKey(RdbParser *p, void *userData) {
RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "PEXPIREAT";
startCmd.key = ctx->keyCtx.key;
startCmd.restoreSize = 0;

char keyLenStr[32], expireLenStr[32], expireStr[32];
/* PEXPIREAT */
Expand Down Expand Up @@ -451,6 +457,7 @@ static RdbRes toRespString(RdbParser *p, void *userData, RdbBulk string) {
RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "SET";
startCmd.key = ctx->keyCtx.key;
startCmd.restoreSize = 0;

/* write SET */
IOV_CONST(&iov[0], "*3\r\n$3\r\nSET");
Expand All @@ -476,6 +483,7 @@ static RdbRes toRespList(RdbParser *p, void *userData, RdbBulk item) {
RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "RPUSH";
startCmd.key = ctx->keyCtx.key;
startCmd.restoreSize = 0;

/* write RPUSH */
IOV_CONST(&iov[0], "*3\r\n$5\r\nRPUSH");
Expand Down Expand Up @@ -503,6 +511,7 @@ static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk va
RdbxRespWriterStartCmd hsetCmd;
hsetCmd.cmd = "HSET";
hsetCmd.key = ctx->keyCtx.key;
hsetCmd.restoreSize = 0;

/* write RPUSH */
IOV_CONST(&iov[0], "*4\r\n$4\r\nHSET");
Expand All @@ -523,6 +532,8 @@ static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk va
RdbxRespWriterStartCmd hpexpireatCmd;
hpexpireatCmd.cmd = "HPEXPIREAT";
hpexpireatCmd.key = ctx->keyCtx.key;
hpexpireatCmd.restoreSize = 0;

/* write HPEXPIREAT */
IOV_CONST(&iov[0], "*6\r\n$10\r\nHPEXPIREAT");
/* write key */
Expand All @@ -548,6 +559,7 @@ static RdbRes toRespSet(RdbParser *p, void *userData, RdbBulk member) {
RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "SADD";
startCmd.key = ctx->keyCtx.key;
startCmd.restoreSize = 0;

/* write RPUSH */
IOV_CONST(&iov[0], "*3\r\n$4\r\nSADD");
Expand All @@ -571,6 +583,7 @@ static RdbRes toRespZset(RdbParser *p, void *userData, RdbBulk member, double sc
RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "ZADD";
startCmd.key = ctx->keyCtx.key;
startCmd.restoreSize = 0;

/* write ZADD */
IOV_CONST(&iov[0], "*4\r\n$4\r\nZADD");
Expand Down Expand Up @@ -618,6 +631,7 @@ static RdbRes toRespFunction(RdbParser *p, void *userData, RdbBulk func) {
RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "FUNCTION";
startCmd.key = "";
startCmd.restoreSize = 0;

if (ctx->conf.funcLibReplaceIfExist)
IOV_CONST(&iov[0], "*4\r\n$8\r\nFUNCTION\r\n$4\r\nLOAD\r\n$7\r\nREPLACE");
Expand Down Expand Up @@ -647,6 +661,7 @@ static RdbRes toRespStreamMetaData(RdbParser *p, void *userData, RdbStreamMeta *
RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "XGROUP CREATE";
startCmd.key = ctx->keyCtx.key;
startCmd.restoreSize = 0;

IOV_CONST(&iov[0], "*6\r\n$6\r\nXGROUP\r\n$6\r\nCREATE");
IOV_LENGTH(&iov[1], ctx->keyCtx.keyLen, keyLenStr);
Expand Down Expand Up @@ -674,6 +689,7 @@ static RdbRes toRespStreamMetaData(RdbParser *p, void *userData, RdbStreamMeta *
RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "XSETID";
startCmd.key = ctx->keyCtx.key;
startCmd.restoreSize = 0;

if ((ctx->keyCtx.info.opcode >= _RDB_TYPE_STREAM_LISTPACKS_2) && (ctx->targetRedisVerVal >= VER_VAL(7, 0))) {
IOV_CONST(&iov[0], "*7\r\n$6\r\nXSETID");
Expand Down Expand Up @@ -713,6 +729,7 @@ static RdbRes toRespStreamItem(RdbParser *p, void *userData, RdbStreamID *id, Rd
if ((ctx->streamCtx.xaddStartEndCounter % 2) == 0) {
startCmd.cmd = "XADD";
startCmd.key = ctx->keyCtx.key;
startCmd.restoreSize = 0;
startCmdRef = &startCmd;

/* writev XADD */
Expand Down Expand Up @@ -766,6 +783,7 @@ static RdbRes toRespStreamNewCGroup(RdbParser *p, void *userData, RdbBulk grpNam
RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "XGROUP";
startCmd.key = ctx->keyCtx.key;
startCmd.restoreSize = 0;

/* writev XGROUP */
if ( (meta->entriesRead>=0) && (ctx->targetRedisVerVal >= VER_VAL(7, 0))) {
Expand Down Expand Up @@ -848,6 +866,7 @@ static RdbRes toRespStreamConsumerPendingEntry(RdbParser *p, void *userData, Rdb
RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "XCLAIM";
startCmd.key = ctx->keyCtx.key;
startCmd.restoreSize = 0;

/* writev XCLAIM */
IOV_CONST(&iov[iovs++], "*12\r\n$6\r\nXCLAIM");
Expand Down
15 changes: 11 additions & 4 deletions src/ext/readerResp.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,11 @@ static RespRes readRespReplyError(RespReaderCtx *ctx, RespReplyBuff *buffInfo) {
else
ctx->errorMsg[ctx->errorMsgLen - 1] = '\0';

res = RESP_REPLY_ERR;
/* Report the error. cb return 1 to propagate. 0 to mask */
if ((ctx->errCb) && (ctx->errCb(ctx->errCbCtx, ctx->errorMsg) == 0))
return RESP_REPLY_OK;

return RESP_REPLY_ERR;
}

return res;
Expand Down Expand Up @@ -450,9 +454,12 @@ static RespRes readRespReply(RespReaderCtx *ctx, RespReplyBuff *buffInfo) {
/*** non-static functions (public) ***/

void readRespInit(RespReaderCtx *ctx) {
ctx->type = 0;
ctx->errorMsgLen = 0;
ctx->countReplies = 0;
memset(ctx, 0, sizeof(RespReaderCtx));
}

void setErrorCb(RespReaderCtx *respReaderCtx, void *errorCbCtx, OnRespErrorCb cb) {
respReaderCtx->errCbCtx = errorCbCtx;
respReaderCtx->errCb = cb;
}

RespRes readRespReplies(RespReaderCtx *ctx, const char *buff, int buffLen) {
Expand Down
10 changes: 10 additions & 0 deletions src/ext/readerResp.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ typedef struct RespReplyBuff {
int at;
} RespReplyBuff;

/* cb to report on RESP error. Returns 1 to propagate. 0 to mask. */
typedef int (*OnRespErrorCb) (void *callerCtx, char *msg);

typedef struct {

/* PUBLIC: read-only */
Expand All @@ -33,8 +36,15 @@ typedef struct {
/* private bulk-array response state */
long long numBulksArray;

/* On RESP error callback */
void *errCbCtx;
OnRespErrorCb errCb;

} RespReaderCtx;

void readRespInit(RespReaderCtx *ctx);

/* Can register cb to decide whether to ignore given error or propagate it */
void setErrorCb(RespReaderCtx *respReaderCtx, void *errorCbCtx, OnRespErrorCb cb);

RespRes readRespReplies(RespReaderCtx *ctx, const char *buff, int buffLen);
54 changes: 50 additions & 4 deletions src/ext/respToRedisLoader.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ struct RdbxRespToRedisLoader {
const char *cmd[NUM_RECORDED_CMDS];
/* strncpy() of the key sent */
char key[NUM_RECORDED_CMDS][RECORDED_KEY_MAX_LEN];
/* if restore cmd, then serialized size. Otherwise, set to 0 */
size_t restoreSize[NUM_RECORDED_CMDS];
} pendingCmds;

RespReaderCtx respReader;
Expand All @@ -42,16 +44,41 @@ struct RdbxRespToRedisLoader {
int fdOwner; /* Set to 1 if this entity created the socket, and it is the one to release. */
};

static void onReadRepliesError(RdbxRespToRedisLoader *ctx) {
RespReaderCtx *respReader = &ctx->respReader;
/* cb to report RESP error. Returns 1 to propagate. 0 to mask. */
static int onReadRepliesErrorCb(void *context, char *msg) {
RdbxRespToRedisLoader *ctx = context;

int currIdx = ctx->respReader.countReplies % NUM_RECORDED_CMDS;

/*
* librdb should not fail trying to load empty module
*
* If RDB2RESP was configured to "supportRestoreModuleAux" and generates
* RESTOREMODAUX commands (currently Redis enterprise only), then if RDB was
* generated by a server with some module, but user didn't make any use of that
* module, attempting to play it to another server that wasn't loaded with that
* module, the RDB parser will fail. This is because the module always store
* something in the AUX field, and the RDB parser will try to load it.
*
* In order to overcome this issue, A module that its AUX payload is less than
* 15 Bytes (including RDB version and checksum) counted as AUX field of an empty
* Module (not in use), then the parser, when restoring the empty module, it
* should ignore returned error: "-ERR Module X not found..."
*/
if ((strcmp(ctx->pendingCmds.cmd[currIdx], "RESTOREMODAUX")==0) &&
(ctx->pendingCmds.restoreSize[currIdx] < 15) &&
(strncmp(msg, "ERR Module", 10) == 0) && /* error starts with "-ERR Module" */
(strstr(msg, "not found"))) /* error includes "not found" */
return 0; /* mask error */

RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP_WRITE,
"\nerror from dst '-%s' on key '%s' on command '%s' (RESP Command #%zu)\n",
respReader->errorMsg,
msg,
ctx->pendingCmds.key[currIdx],
ctx->pendingCmds.cmd[currIdx],
ctx->respReader.countReplies);

return 1; /* propagate error */
}

/* Read 'numToRead' replies from the socket.
Expand All @@ -66,6 +93,7 @@ static void onReadRepliesError(RdbxRespToRedisLoader *ctx) {
*
* Return 0 for success, 1 otherwise. */
static int readReplies(RdbxRespToRedisLoader *ctx, int numToRead, int sendError) {
int noDataEv = 0;
char buff[REPLY_BUFF_SIZE];

RespReaderCtx *respReader = &ctx->respReader;
Expand All @@ -78,7 +106,6 @@ static int readReplies(RdbxRespToRedisLoader *ctx, int numToRead, int sendError)
if (bytesReceived > 0) {
/* Data was received, process it */
if (unlikely(RESP_REPLY_ERR == readRespReplies(respReader, buff, bytesReceived))) {
onReadRepliesError(ctx);
return 1;
}
continue;
Expand All @@ -94,6 +121,12 @@ static int readReplies(RdbxRespToRedisLoader *ctx, int numToRead, int sendError)
"Connection closed by the remote side");
return 1;
} else {
if ((!noDataEv) && (errno == EAGAIN || errno == EWOULDBLOCK)) {
noDataEv = 1;
RDB_log(ctx->p, RDB_LOG_WRN, "No data available from redis-server");
continue; /* Try one more time (Timeout is 60sec) */
}

RDB_reportError(ctx->p,
(RdbRes) RDBX_ERR_RESP2REDIS_FAILED_READ,
"Failed to recv() from Redis server. errno=%d: %s",
Expand All @@ -114,6 +147,7 @@ static inline void recordCommandSent(RdbxRespToRedisLoader *ctx,RdbxRespWriterSt
ctx->pendingCmds.cmd[recordCmdEntry] = cmd->cmd;
strncpy(ctx->pendingCmds.key[recordCmdEntry], cmd->key, RECORDED_KEY_MAX_LEN-1);
ctx->pendingCmds.key[recordCmdEntry][RECORDED_KEY_MAX_LEN-1] = '\0';
ctx->pendingCmds.restoreSize[recordCmdEntry] = cmd->restoreSize;
}

/* Write the vector of data to the socket with writev() sys-call.
Expand Down Expand Up @@ -212,6 +246,7 @@ static RdbRes redisAuthCustomized(RdbxRespToRedisLoader *ctx, RdbxRedisAuth *aut
RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "<AUTH_CUSTOMIZED_CMD>";
startCmd.key = "";
startCmd.restoreSize = 0;

/* allocate iovec (2 for header and trailer. 3 for each argument) */
struct iovec *iov = (struct iovec *)malloc((auth->cmd.argc * 3 + 2) * sizeof(struct iovec));
Expand Down Expand Up @@ -261,6 +296,7 @@ static RdbRes redisAuth(RdbxRespToRedisLoader *ctx, RdbxRedisAuth *auth) {
RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "AUTH";
startCmd.key = "";
startCmd.restoreSize = 0;

struct iovec iov[10];
if (auth->user) {
Expand Down Expand Up @@ -312,6 +348,7 @@ _LIBRDB_API RdbxRespToRedisLoader *RDBX_createRespToRedisFd(RdbParser *p,
ctx->pendingCmds.num = 0;
ctx->pendingCmds.pipelineDepth = PIPELINE_DEPTH_DEF;
readRespInit(&ctx->respReader);
setErrorCb(&ctx->respReader, ctx, onReadRepliesErrorCb);

if (auth && (redisAuth(ctx, auth) != RDB_OK))
return NULL;
Expand Down Expand Up @@ -351,6 +388,15 @@ _LIBRDB_API RdbxRespToRedisLoader *RDBX_createRespToRedisTcp(RdbParser *p,
goto createErr;
}

/* Set the recv() timeout. Avoid blocking forever. */
struct timeval timeout = { .tv_sec = 60, .tv_usec = 0 };
if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) < 0) {
RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_SET_TIMEOUT,
"Failed to set socket receive timeout. errno=%d: %s",
errno, strerror(errno));
goto createErr;
}

RdbxRespToRedisLoader *res = RDBX_createRespToRedisFd(p, rdbToResp, auth, sockfd);

if (!res) goto createErr;
Expand Down
2 changes: 1 addition & 1 deletion src/lib/parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ struct ParsingElementInfo peInfo[PE_MAX] = {
[PE_FUNCTION] = {elementFunction, "elementFunction", "Parsing Function"},
/* module */
[PE_MODULE] = {elementModule, "elementModule", "Parsing silently Module element"},
[PE_MODULE_AUX] = {elementModule, "elementModule", "Parsing silently Module Auxiliary data"},
[PE_MODULE_AUX] = {elementModule, "elementModule(aux)", "Parsing silently Module Auxiliary data"},
/* stream */
[PE_STREAM_LP] = {elementStreamLP, "elementStreamLP", "Parsing stream Listpack"},

Expand Down
2 changes: 1 addition & 1 deletion src/lib/parserRaw.c
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,7 @@ static inline RdbStatus cbHandleEnd(RdbParser *p) {

/* report entire/leftover to cb handlers */
for(int j = 0 ; j <= ctx->curBulkIndex ; ++j)
cbHandleFrag(p, ctx->bulkArray + j);
IF_NOT_OK_RETURN(cbHandleFrag(p, ctx->bulkArray + j));

aggFlushBulks(p);

Expand Down
Binary file added test/dumps/module_aux_empty.rdb
Binary file not shown.
Loading

0 comments on commit 73e9680

Please sign in to comment.