Skip to content

Commit

Permalink
hexpire - HT
Browse files Browse the repository at this point in the history
  • Loading branch information
moticless committed Jun 13, 2024
1 parent 13f57a7 commit 9c30f2e
Show file tree
Hide file tree
Showing 14 changed files with 243 additions and 80 deletions.
6 changes: 3 additions & 3 deletions api/librdb-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ typedef struct RdbHandlersStructCallbacks {
/* Callback to handle a listpack-based list value */
RdbRes (*handleListLP)(RdbParser *p, void *userData, RdbBulk listpack);

/* Callback to handle a field-value pair within a plain-hash */
RdbRes (*handleHashPlain)(RdbParser *p, void *userData, RdbBulk field, RdbBulk value);
/* Callback to handle a field-value pair within a plain-hash. expireAt -1 if not set. */
RdbRes (*handleHashPlain)(RdbParser *p, void *userData, RdbBulk field, RdbBulk value, int64_t expireAt);
/* Callback to handle a ziplist-based hash value */
RdbRes (*handleHashZL)(RdbParser *p, void *userData, RdbBulk ziplist);
/* Callback to handle a listpack-based hash value */
Expand Down Expand Up @@ -305,7 +305,7 @@ typedef struct RdbHandlersDataCallbacks {
/* Callback to handle an item from a list */
RdbRes (*handleListItem)(RdbParser *p, void *userData, RdbBulk item);
/* Callback to handle a field-value pair within a hash */
RdbRes (*handleHashField)(RdbParser *p, void *userData, RdbBulk field, RdbBulk value);
RdbRes (*handleHashField)(RdbParser *p, void *userData, RdbBulk field, RdbBulk value, int64_t expireAt);
/* Callback to handle a member within a set */
RdbRes (*handleSetMember)(RdbParser *p, void *userData, RdbBulk member);
/* Callback to handle a member within a sorted set along with its score */
Expand Down
9 changes: 5 additions & 4 deletions src/ext/handlersFilter.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ static void initOpcodeToType(RdbxFilter *ctx) {
ctx->opToType[RDB_TYPE_ZSET_LISTPACK] = RDB_DATA_TYPE_ZSET;
/*hash*/
ctx->opToType[RDB_TYPE_HASH] = RDB_DATA_TYPE_HASH;
ctx->opToType[RDB_TYPE_HASH_METADATA] = RDB_DATA_TYPE_HASH;
ctx->opToType[RDB_TYPE_HASH_ZIPMAP] = RDB_DATA_TYPE_HASH;
ctx->opToType[RDB_TYPE_HASH_ZIPLIST] = RDB_DATA_TYPE_HASH;
ctx->opToType[RDB_TYPE_HASH_LISTPACK] = RDB_DATA_TYPE_HASH;
Expand Down Expand Up @@ -120,8 +121,8 @@ static RdbRes filterList(RdbParser *p, void *userData, RdbBulk item) {
return ((RdbxFilter *) userData)->cbReturnValue;
}

static RdbRes filterHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk value) {
UNUSED(p, field, value);
static RdbRes filterHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk value, int64_t expireAt) {
UNUSED(p, field, value, expireAt);
return ((RdbxFilter *) userData)->cbReturnValue;
}

Expand Down Expand Up @@ -187,8 +188,8 @@ static RdbRes filterHashZL(RdbParser *p, void *userData, RdbBulk ziplist) {
return ((RdbxFilter *) userData)->cbReturnValue;
}

static RdbRes filterHashPlain(RdbParser *p, void *userData, RdbBulk field, RdbBulk value) {
UNUSED(p, field, value);
static RdbRes filterHashPlain(RdbParser *p, void *userData, RdbBulk field, RdbBulk value, int64_t expireAt) {
UNUSED(p, field, value, expireAt);
return ((RdbxFilter *) userData)->cbReturnValue;
}

Expand Down
13 changes: 8 additions & 5 deletions src/ext/handlersToJson.c
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,10 @@ static RdbRes toJsonZset(RdbParser *p, void *userData, RdbBulk member, double sc
return RDB_OK;
}

static RdbRes toJsonHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk value) {
static RdbRes toJsonHash(RdbParser *p, void *userData, RdbBulk field,
RdbBulk value, int64_t expireAt)
{
UNUSED(expireAt);
RdbxToJson *ctx = userData;

if (ctx->state == R2J_IN_KEY) {
Expand Down Expand Up @@ -775,10 +778,10 @@ RdbxToJson *RDBX_createHandlersToJson(RdbParser *p, const char *filename, RdbxTo
toJsonStruct, /* handleListZL*/
toJsonStruct, /* handleListLP*/
/*hash*/
toJsonHash,
toJsonStruct, /* handleHashZL*/
toJsonStruct, /* handleHashLP*/
toJsonStruct, /* handleHashZM*/
toJsonHash, /*handleHashPlain*/
toJsonStruct, /*handleHashZL*/
toJsonStruct, /*handleHashLP*/
toJsonStruct, /*handleHashZM*/
/*set*/
toJsonSet,
toJsonStruct, /* handleSetIS*/
Expand Down
30 changes: 25 additions & 5 deletions src/ext/handlersToResp.c
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,8 @@ static RdbRes toRespList(RdbParser *p, void *userData, RdbBulk item) {
return writevWrap(ctx, iov, 6, &startCmd, 1);
}

static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk value) {
static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk value, int64_t expireAt) {
char expireTimeStr[32], expireTimeLenStr[32];
struct iovec iov[10];
RdbxToResp *ctx = userData;

Expand All @@ -499,9 +500,9 @@ static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk va
int fieldLen = RDB_bulkLen(p, field);
int valueLen = RDB_bulkLen(p, value);

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "HSET";
startCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd hsetCmd;
hsetCmd.cmd = "HSET";
hsetCmd.key = ctx->keyCtx.key;

/* write RPUSH */
IOV_CONST(&iov[0], "*4\r\n$4\r\nHSET");
Expand All @@ -515,7 +516,26 @@ static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk va
IOV_LENGTH(&iov[5], valueLen, valueLenStr);
IOV_STRING(&iov[6], value, valueLen);
IOV_CONST(&iov[7], "\r\n");
return writevWrap(ctx, iov, 8, &startCmd, 1);
IF_NOT_OK_RETURN(writevWrap(ctx, iov, 8, &hsetCmd, 1));

if (expireAt == -1) return RDB_OK;

RdbxRespWriterStartCmd hpexpireatCmd;
hpexpireatCmd.cmd = "HPEXPIREAT";
hpexpireatCmd.key = ctx->keyCtx.key;
/* write HPEXPIREAT */
IOV_CONST(&iov[0], "*6\r\n$10\r\nHPEXPIREAT");
/* write key */
IOV_LENGTH(&iov[1], ctx->keyCtx.keyLen, keyLenStr);
IOV_STRING(&iov[2], ctx->keyCtx.key, ctx->keyCtx.keyLen);
/* write expiration-time in msec */
IOV_LEN_AND_VAL(&iov[3], expireAt, expireTimeLenStr, expireTimeStr);
IOV_CONST(&iov[5], "$6\r\nFIELDS\r\n$1\r\n1");
/* write field */
IOV_LENGTH(&iov[6], fieldLen, fieldLenStr);
IOV_STRING(&iov[7], field, fieldLen);
IOV_CONST(&iov[8], "\r\n");
return writevWrap(ctx, iov, 9, &hpexpireatCmd, 1);
}

static RdbRes toRespSet(RdbParser *p, void *userData, RdbBulk member) {
Expand Down
52 changes: 40 additions & 12 deletions src/ext/readerResp.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ static RespRes readRespReplyLine(RespReaderCtx *ctx, RespReplyBuff *buffInfo) {
++(buffInfo->at);
/* fall-thru */
case PROC_LINE_END:
ctx->typeState = 0;
return RESP_REPLY_OK;
}
}
Expand Down Expand Up @@ -141,14 +142,15 @@ RespRes readRespReplyBulk(RespReaderCtx *ctx, RespReplyBuff *buffInfo) {
char ch;
UNUSED(buffInfo);

/* Parsing : $<length>\r\n<data>\r\n */
enum ProcessBulkReadStates {
PROC_BULK_READ_INIT = 0,
PROC_BULK_READ_LEN,
PROC_BULK_READ_LEN_CR,
PROC_BULK_READ_LEN_NL,
PROC_BULK_READ,
PROC_BULK_READ_CR,
PROC_BULK_READ_NL,
PROC_BULK_READ_LEN, /* Read bulk length */
PROC_BULK_READ_LEN_CR, /* Read CR */
PROC_BULK_READ_LEN_NL, /* Read NL */
PROC_BULK_READ, /* Read data */
PROC_BULK_READ_CR, /* Read CR */
PROC_BULK_READ_NL, /* Read NL */
PROC_BULK_READ_END,
};

Expand Down Expand Up @@ -255,6 +257,7 @@ static RespRes readRespReplyBulkArray(RespReaderCtx *ctx, RespReplyBuff *buffInf
READ_NUM_BULKS_NL,
READ_NEXT_BULK_HDR,
READ_NEXT_BULK,
READ_NEXT_LINE, /* int, double, null, bool, bignum */
READ_END,
};

Expand Down Expand Up @@ -311,12 +314,26 @@ static RespRes readRespReplyBulkArray(RespReaderCtx *ctx, RespReplyBuff *buffInf
break;

case READ_NEXT_BULK_HDR:
if (buffInfo->buff[buffInfo->at++] != '$') {
snprintf(ctx->errorMsg, sizeof(ctx->errorMsg),
"Invalid Multi-Bulk response. Failed to read Bulk header.");
return RESP_REPLY_ERR;
if (buffInfo->buff[buffInfo->at] == '$') {
buffInfo->at++;
ctx->typeArrayState = READ_NEXT_BULK;
break;
}

if ((buffInfo->buff[buffInfo->at] == ':') || /*int*/
(buffInfo->buff[buffInfo->at] == ',') || /*double*/
(buffInfo->buff[buffInfo->at] == '_') || /*null*/
(buffInfo->buff[buffInfo->at] == '#') || /*bool*/
(buffInfo->buff[buffInfo->at] == '(')) /*bignum*/
{
buffInfo->at++;
ctx->typeArrayState = READ_NEXT_LINE;
break;
}
ctx->typeArrayState = READ_NEXT_BULK; /* fall-thru */

snprintf(ctx->errorMsg, sizeof(ctx->errorMsg),
"Invalid Multi-Bulk response. Failed to read Bulk header.");
return RESP_REPLY_ERR;

case READ_NEXT_BULK:
if ( (res = readRespReplyBulk(ctx, buffInfo)) != RESP_REPLY_OK)
Expand All @@ -326,7 +343,18 @@ static RespRes readRespReplyBulkArray(RespReaderCtx *ctx, RespReplyBuff *buffInf
ctx->typeArrayState = READ_NEXT_BULK_HDR;
break;
}
ctx->typeArrayState = READ_END; /* fall-through */
ctx->typeArrayState = READ_END;
break;

case READ_NEXT_LINE:
if ( (res = readRespReplyLine(ctx, buffInfo)) != RESP_REPLY_OK)
return res;

if (--ctx->numBulksArray) {
ctx->typeArrayState = READ_NEXT_BULK_HDR;
break;
}
ctx->typeArrayState = READ_END;
break;
}

Expand Down
40 changes: 25 additions & 15 deletions src/lib/parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ struct ParsingElementInfo peInfo[PE_MAX] = {
[PE_LIST_ZL] = {elementListZL, "elementListZL", "Parsing Ziplist"},
/* hash */
[PE_HASH] = {elementHash, "elementHash", "Parsing Hash"},
[PE_HASH_META] = {elementHash, "elementHashMeta", "Parsing Hash with expiry on fields"},
[PE_HASH_ZL] = {elementHashZL, "elementHashZL", "Parsing hash Ziplist"},
[PE_HASH_LP] = {elementHashLP, "elementHashLP", "Parsing hash Listpack"},
[PE_HASH_ZM] = {elementHashZM, "elementHashZM", "Parsing hash Zipmap"},
Expand Down Expand Up @@ -95,6 +96,7 @@ struct ParsingElementInfo peInfo[PE_MAX] = {
[PE_RAW_LIST_ZL] = {elementRawListZL, "elementRawListZL", "Parsing raw list ZL (zip list)"},
/* hash */
[PE_RAW_HASH] = {elementRawHash, "elementRawHash", "Parsing raw Hash"},
[PE_RAW_HASH_META] = {elementRawHash, "elementRawHashMeta", "Parsing raw Hash with expiry"},
[PE_RAW_HASH_ZL] = {elementRawHashZL, "elementRawHashZL", "Parsing raw hash Ziplist"},
[PE_RAW_HASH_LP] = {elementRawHashLP, "elementRawHashLP", "Parsing raw hash Listpack"},
[PE_RAW_HASH_ZM] = {elementRawHashZM, "elementRawHashZM", "Parsing raw hash Zipmap"},
Expand Down Expand Up @@ -557,6 +559,7 @@ _LIBRDB_API int RDB_handleByLevel(RdbParser *p, RdbDataType type, RdbHandlersLev
break;
case RDB_DATA_TYPE_HASH:
p->handleTypeObjByLevel[RDB_TYPE_HASH] = lvl;
p->handleTypeObjByLevel[RDB_TYPE_HASH_METADATA] = lvl;
p->handleTypeObjByLevel[RDB_TYPE_HASH_ZIPMAP] = lvl;
p->handleTypeObjByLevel[RDB_TYPE_HASH_ZIPLIST] = lvl;
p->handleTypeObjByLevel[RDB_TYPE_HASH_LISTPACK] = lvl;
Expand Down Expand Up @@ -701,12 +704,12 @@ static inline RdbStatus nextParsingElementKeyValue(RdbParser *p,
ParsingElementType peRawValue,
ParsingElementType peValue) {
p->elmCtx.key.handleByLevel = p->handleTypeObjByLevel[p->currOpcode];

/* Which level should parse this element (raw or struct/data) */
if (p->handleTypeObjByLevel[p->currOpcode] == RDB_LEVEL_RAW) {
p->elmCtx.key.valueType = peRawValue;
p->elmCtx.key.parsingElemType = peRawValue;
return nextParsingElement(p, PE_RAW_NEW_KEY);
} else {
p->elmCtx.key.valueType = peValue;
p->elmCtx.key.parsingElemType = peValue;
return nextParsingElement(p, PE_NEW_KEY);
}
}
Expand Down Expand Up @@ -1041,7 +1044,8 @@ static RdbStatus hashZiplist(RdbParser *p, BulkInfo *ziplistBulk) {
RDB_LEVEL_DATA,
rdbData.handleHashField,
embBulk1.binfo.ref,
embBulk2.binfo.ref);
embBulk2.binfo.ref,
-1 /*no expiry*/);
}
return RDB_STATUS_OK;
}
Expand Down Expand Up @@ -1096,7 +1100,8 @@ static RdbStatus hashListPack(RdbParser *p, BulkInfo *lpBulk) {
RDB_LEVEL_DATA,
rdbData.handleHashField,
embBulk1.binfo.ref,
embBulk2.binfo.ref);
embBulk2.binfo.ref,
-1 /*no expiry*/);
}
return RDB_STATUS_OK;
}
Expand Down Expand Up @@ -1139,7 +1144,8 @@ static RdbStatus hashZipMap(RdbParser *p, BulkInfo *zpBulk) {
RDB_LEVEL_DATA,
rdbData.handleHashField,
embBulk1.binfo.ref,
embBulk2.binfo.ref);
embBulk2.binfo.ref,
-1 /*no expiry*/);
}
return RDB_STATUS_OK;
}
Expand Down Expand Up @@ -1373,7 +1379,7 @@ RdbStatus elementNewKey(RdbParser *p) {
p->elmCtx.key.numItemsHint = -1;

/* Read value */
return nextParsingElement(p, p->elmCtx.key.valueType);
return nextParsingElement(p, p->elmCtx.key.parsingElemType);
}

/* load an expire-time, associated with the next key to load. */
Expand Down Expand Up @@ -1442,6 +1448,7 @@ RdbStatus elementNextRdbType(RdbParser *p) {
case RDB_TYPE_LIST_ZIPLIST: return nextParsingElementKeyValue(p, PE_RAW_LIST_ZL, PE_LIST_ZL);
/* hash */
case RDB_TYPE_HASH: return nextParsingElementKeyValue(p, PE_RAW_HASH, PE_HASH);
case RDB_TYPE_HASH_METADATA: return nextParsingElementKeyValue(p, PE_RAW_HASH_META, PE_HASH_META);
case RDB_TYPE_HASH_ZIPLIST: return nextParsingElementKeyValue(p, PE_RAW_HASH_ZL, PE_HASH_ZL);
case RDB_TYPE_HASH_LISTPACK: return nextParsingElementKeyValue(p, PE_RAW_HASH_LP, PE_HASH_LP);
case RDB_TYPE_HASH_ZIPMAP: return nextParsingElementKeyValue(p, PE_RAW_HASH_ZM, PE_HASH_ZM);
Expand All @@ -1454,9 +1461,12 @@ RdbStatus elementNextRdbType(RdbParser *p) {
case RDB_TYPE_ZSET_2: return nextParsingElementKeyValue(p, PE_RAW_ZSET, PE_ZSET);
case RDB_TYPE_ZSET_ZIPLIST: return nextParsingElementKeyValue(p, PE_RAW_ZSET_ZL, PE_ZSET_ZL);
case RDB_TYPE_ZSET_LISTPACK: return nextParsingElementKeyValue(p, PE_RAW_ZSET_LP, PE_ZSET_LP);

/* module */
case RDB_TYPE_MODULE_2: return nextParsingElementKeyValue(p, PE_RAW_MODULE, PE_MODULE);
/* stream */
case RDB_TYPE_STREAM_LISTPACKS:
case RDB_TYPE_STREAM_LISTPACKS_2:
case RDB_TYPE_STREAM_LISTPACKS_3: return nextParsingElementKeyValue(p, PE_RAW_STREAM_LP, PE_STREAM_LP);

case RDB_OPCODE_MODULE_AUX: if (p->handleTypeObjByLevel[RDB_OPCODE_MODULE_AUX] == RDB_LEVEL_RAW)
return nextParsingElement(p, PE_RAW_MODULE_AUX);
Expand All @@ -1467,11 +1477,6 @@ RdbStatus elementNextRdbType(RdbParser *p) {

case RDB_OPCODE_EOF: return nextParsingElement(p, PE_END_OF_FILE);

/* stream */
case RDB_TYPE_STREAM_LISTPACKS:
case RDB_TYPE_STREAM_LISTPACKS_2:
case RDB_TYPE_STREAM_LISTPACKS_3: return nextParsingElementKeyValue(p, PE_RAW_STREAM_LP, PE_STREAM_LP);

case RDB_OPCODE_FUNCTION:
RDB_reportError(p, RDB_ERR_PRERELEASE_FUNC_FORMAT_NOT_SUPPORTED,
"Pre-release function format not supported.");
Expand Down Expand Up @@ -1680,6 +1685,9 @@ RdbStatus elementHash(RdbParser *p) {
BulkInfo *binfoField, *binfoValue;

while(ctx->hash.visitingField < ctx->hash.numFields) {
uint64_t expireAt = 0;
if (p->parsingElement == PE_HASH_META)
IF_NOT_OK_RETURN(rdbLoadLen(p, NULL, &expireAt, NULL, NULL));
IF_NOT_OK_RETURN(rdbLoadString(p, RQ_ALLOC_APP_BULK, NULL, &binfoField));
IF_NOT_OK_RETURN(rdbLoadString(p, RQ_ALLOC_APP_BULK, NULL, &binfoValue));

Expand All @@ -1690,12 +1698,14 @@ RdbStatus elementHash(RdbParser *p) {
if (p->elmCtx.key.handleByLevel == RDB_LEVEL_STRUCT) {
CALL_HANDLERS_CB(p, NOP, RDB_LEVEL_STRUCT, rdbStruct.handleHashPlain,
binfoField->ref,
binfoValue->ref);
binfoValue->ref,
(expireAt) ? (int64_t) expireAt : -1);
}
else {
CALL_HANDLERS_CB(p, NOP, RDB_LEVEL_DATA, rdbData.handleHashField,
binfoField->ref,
binfoValue->ref);
binfoValue->ref,
(expireAt) ? (int64_t) expireAt : -1);
}
++ctx->hash.visitingField;

Expand Down
4 changes: 3 additions & 1 deletion src/lib/parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ typedef enum ParsingElementType {
PE_QUICKLIST,
PE_LIST_ZL,
PE_HASH,
PE_HASH_META,
PE_HASH_ZL,
PE_HASH_LP,
PE_HASH_ZM,
Expand All @@ -163,6 +164,7 @@ typedef enum ParsingElementType {
PE_RAW_QUICKLIST,
PE_RAW_LIST_ZL,
PE_RAW_HASH,
PE_RAW_HASH_META,
PE_RAW_HASH_ZL,
PE_RAW_HASH_LP,
PE_RAW_HASH_ZM,
Expand Down Expand Up @@ -228,7 +230,7 @@ typedef struct {

typedef struct {
RdbKeyInfo info;
ParsingElementType valueType;
ParsingElementType parsingElemType;
RdbHandlersLevel handleByLevel;
int64_t numItemsHint; /* hint for the total number of items in the current parsed key. -1 if unknown */
} ElementKeyCtx;
Expand Down
Loading

0 comments on commit 9c30f2e

Please sign in to comment.