diff --git a/src/ext/respToRedisLoader.c b/src/ext/respToRedisLoader.c index 410091d..e30b3a9 100644 --- a/src/ext/respToRedisLoader.c +++ b/src/ext/respToRedisLoader.c @@ -54,15 +54,25 @@ static void onReadRepliesError(RdbxRespToRedisLoader *ctx) { ctx->respReader.countReplies); } -/* Read 'numToRead' replies from the socket. * Return 0 for success, 1 otherwise. */ -static int readReplies(RdbxRespToRedisLoader *ctx, int numToRead) { +/* Read 'numToRead' replies from the socket. + * + * numToRead - minimum number of replies to read from the socket before + * returning. + * sendError - if set, an error occurred while writing to the server. In + * this case the function will try to read replies from the + * server. Maybe one of the replies will contain an error message + * that explains why write got failed. Whether error message is + * received or not, the function will return to the original issue. + * + * Return 0 for success, 1 otherwise. */ +static int readReplies(RdbxRespToRedisLoader *ctx, int numToRead, int sendError) { char buff[REPLY_BUFF_SIZE]; RespReaderCtx *respReader = &ctx->respReader; size_t countRepliesBefore = respReader->countReplies; size_t repliesExpected = respReader->countReplies + numToRead; - while (respReader->countReplies < repliesExpected) { + while ((respReader->countReplies < repliesExpected) || (sendError)) { int bytesReceived = recv(ctx->fd, buff, sizeof(buff), 0); if (bytesReceived > 0) { @@ -71,12 +81,23 @@ static int readReplies(RdbxRespToRedisLoader *ctx, int numToRead) { onReadRepliesError(ctx); return 1; } + continue; + } + + /* handle error */ + + if (sendError) + return 0; /* Failed read error message from dst. Back to original issue. */ - } else if (bytesReceived == 0) { - RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP2REDIS_CONN_CLOSE, "Connection closed by the remote side"); + if (bytesReceived == 0) { + RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP2REDIS_CONN_CLOSE, + "Connection closed by the remote side"); return 1; } else { - RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP2REDIS_FAILED_READ, "Failed to recv() from Redis server. (errno=%d)", errno); + RDB_reportError(ctx->p, + (RdbRes) RDBX_ERR_RESP2REDIS_FAILED_READ, + "Failed to recv() from Redis server. errno=%d: %s", + errno, strerror(errno)); return 1; } } @@ -106,7 +127,7 @@ static int redisLoaderWritev(void *context, struct iovec *iov, int iovCnt, RdbxRespToRedisLoader *ctx = context; if (unlikely(ctx->pendingCmds.num == ctx->pendingCmds.pipelineDepth)) { - if (readReplies(ctx, 1 /* at least one */)) + if (readReplies(ctx, 1 /* at least one */, 0)) return 1; } @@ -114,7 +135,8 @@ static int redisLoaderWritev(void *context, struct iovec *iov, int iovCnt, while (1) { - writeResult = writev(ctx->fd, iov, iovCnt); + struct msghdr msg = { .msg_iov = iov, .msg_iovlen = iovCnt }; + writeResult = sendmsg(ctx->fd, &msg, MSG_NOSIGNAL /*Ignore SIGPIPE signal*/); /* check for error */ if (unlikely(writeResult == -1)) { @@ -122,13 +144,13 @@ static int redisLoaderWritev(void *context, struct iovec *iov, int iovCnt, if ((retries++) >= MAX_EINTR_RETRY) { RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP2REDIS_FAILED_WRITE, "Failed to write socket. Exceeded EINTR retry limit"); - return 1; + break; } continue; } else { RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP2REDIS_FAILED_WRITE, "Failed to write socket (errno=%d)", errno); - return 1; + break; } } @@ -140,16 +162,20 @@ static int redisLoaderWritev(void *context, struct iovec *iov, int iovCnt, } /* if managed to send all iov entries */ - if (likely(iovCnt == 0)) - break; + if (likely(iovCnt == 0)) { + ctx->pendingCmds.num += endCmd; + return 0; + } /* Update pointed iov entry. Only partial of its data sent */ iov->iov_len -= writeResult; iov->iov_base = (char *) iov->iov_base + writeResult; } - ctx->pendingCmds.num += endCmd; - return 0; + /* Error occurred. Try to receive error msg from dst, which might explain + why write got failed */ + readReplies(ctx, 0, 1/*sendError*/); + return 1; } /* Flush the pending commands by reading the remaining replies. @@ -157,7 +183,7 @@ static int redisLoaderWritev(void *context, struct iovec *iov, int iovCnt, static int redisLoaderFlush(void *context) { RdbxRespToRedisLoader *ctx = context; if (ctx->pendingCmds.num) - return readReplies(ctx, ctx->pendingCmds.num); + return readReplies(ctx, ctx->pendingCmds.num, 0); return 0; }