Skip to content

Commit

Permalink
Add support to hash-field-expiration (7.4) (#47)
Browse files Browse the repository at this point in the history
* RDB_TYPE_HASH_METADATA - While RDB_TYPE_HASH keep key-value pairs to rdb file,
  this one keep key-value-expiry tuples such that the value of expiry cannot be 
  bigger than 2^48-1.
* RDB_TYPE_HASH_LISTPACK_EX - While RDB_TYPE_HASH_LISTPACK_EX keep in listpack 
  pairs of key-value, this one keep key-value-expiry tuples such that the value 
  of expiry cannot be bigger than 2^48-1.
  • Loading branch information
moticless authored Jun 17, 2024
1 parent 46049a1 commit 9d584ea
Show file tree
Hide file tree
Showing 21 changed files with 410 additions and 125 deletions.
8 changes: 5 additions & 3 deletions api/librdb-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,14 @@ typedef struct RdbHandlersStructCallbacks {
/* Callback to handle a listpack-based list value */
RdbRes (*handleListLP)(RdbParser *p, void *userData, RdbBulk listpack);

/* Callback to handle a field-value pair within a plain-hash */
RdbRes (*handleHashPlain)(RdbParser *p, void *userData, RdbBulk field, RdbBulk value);
/* Callback to handle a field-value pair within a plain-hash. expireAt -1 if not set. */
RdbRes (*handleHashPlain)(RdbParser *p, void *userData, RdbBulk field, RdbBulk value, int64_t expireAt);
/* Callback to handle a ziplist-based hash value */
RdbRes (*handleHashZL)(RdbParser *p, void *userData, RdbBulk ziplist);
/* Callback to handle a listpack-based hash value */
RdbRes (*handleHashLP)(RdbParser *p, void *userData, RdbBulk listpack);
/* Callback to handle a listpackex-based hash (with expiry on fields) */
RdbRes (*handleHashLPEx)(RdbParser *p, void *userData, RdbBulk listpackEx);
/* Callback to handle a zipmap-based hash value */
RdbRes (*handleHashZM)(RdbParser *p, void *userData, RdbBulk zipmap);

Expand Down Expand Up @@ -305,7 +307,7 @@ typedef struct RdbHandlersDataCallbacks {
/* Callback to handle an item from a list */
RdbRes (*handleListItem)(RdbParser *p, void *userData, RdbBulk item);
/* Callback to handle a field-value pair within a hash */
RdbRes (*handleHashField)(RdbParser *p, void *userData, RdbBulk field, RdbBulk value);
RdbRes (*handleHashField)(RdbParser *p, void *userData, RdbBulk field, RdbBulk value, int64_t expireAt);
/* Callback to handle a member within a set */
RdbRes (*handleSetMember)(RdbParser *p, void *userData, RdbBulk member);
/* Callback to handle a member within a sorted set along with its score */
Expand Down
16 changes: 12 additions & 4 deletions src/ext/handlersFilter.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ static void initOpcodeToType(RdbxFilter *ctx) {
ctx->opToType[RDB_TYPE_ZSET_LISTPACK] = RDB_DATA_TYPE_ZSET;
/*hash*/
ctx->opToType[RDB_TYPE_HASH] = RDB_DATA_TYPE_HASH;
ctx->opToType[RDB_TYPE_HASH_METADATA] = RDB_DATA_TYPE_HASH;
ctx->opToType[RDB_TYPE_HASH_ZIPMAP] = RDB_DATA_TYPE_HASH;
ctx->opToType[RDB_TYPE_HASH_ZIPLIST] = RDB_DATA_TYPE_HASH;
ctx->opToType[RDB_TYPE_HASH_LISTPACK] = RDB_DATA_TYPE_HASH;
ctx->opToType[RDB_TYPE_HASH_LISTPACK_EX] = RDB_DATA_TYPE_HASH;
/*module*/
ctx->opToType[RDB_TYPE_MODULE_2] = RDB_DATA_TYPE_MODULE;
ctx->opToType[RDB_OPCODE_MODULE_AUX] = RDB_DATA_TYPE_MODULE;
Expand Down Expand Up @@ -120,8 +122,8 @@ static RdbRes filterList(RdbParser *p, void *userData, RdbBulk item) {
return ((RdbxFilter *) userData)->cbReturnValue;
}

static RdbRes filterHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk value) {
UNUSED(p, field, value);
static RdbRes filterHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk value, int64_t expireAt) {
UNUSED(p, field, value, expireAt);
return ((RdbxFilter *) userData)->cbReturnValue;
}

Expand Down Expand Up @@ -177,6 +179,11 @@ static RdbRes filterHashLP(RdbParser *p, void *userData, RdbBulk listpack) {
return ((RdbxFilter *) userData)->cbReturnValue;
}

static RdbRes filterHashLPEx(RdbParser *p, void *userData, RdbBulk listpackEx) {
UNUSED(p, listpackEx);
return ((RdbxFilter *) userData)->cbReturnValue;
}

static RdbRes filterHashZM(RdbParser *p, void *userData, RdbBulk zipmap) {
UNUSED(p, zipmap);
return ((RdbxFilter *) userData)->cbReturnValue;
Expand All @@ -187,8 +194,8 @@ static RdbRes filterHashZL(RdbParser *p, void *userData, RdbBulk ziplist) {
return ((RdbxFilter *) userData)->cbReturnValue;
}

static RdbRes filterHashPlain(RdbParser *p, void *userData, RdbBulk field, RdbBulk value) {
UNUSED(p, field, value);
static RdbRes filterHashPlain(RdbParser *p, void *userData, RdbBulk field, RdbBulk value, int64_t expireAt) {
UNUSED(p, field, value, expireAt);
return ((RdbxFilter *) userData)->cbReturnValue;
}

Expand Down Expand Up @@ -310,6 +317,7 @@ static void defaultFilterStructCb(RdbHandlersStructCallbacks *structCb) {
filterHashPlain, /*handleHashPlain*/
filterHashZL, /*handleHashZL*/
filterHashLP, /*handleHashLP*/
filterHashLPEx, /*handleHashLPEx*/
filterHashZM, /*handleHashZM*/
filterSetPlain, /*handleSetPlain*/
filterSetIS, /*handleSetIS*/
Expand Down
14 changes: 9 additions & 5 deletions src/ext/handlersToJson.c
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,10 @@ static RdbRes toJsonZset(RdbParser *p, void *userData, RdbBulk member, double sc
return RDB_OK;
}

static RdbRes toJsonHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk value) {
static RdbRes toJsonHash(RdbParser *p, void *userData, RdbBulk field,
RdbBulk value, int64_t expireAt)
{
UNUSED(expireAt);
RdbxToJson *ctx = userData;

if (ctx->state == R2J_IN_KEY) {
Expand Down Expand Up @@ -775,10 +778,11 @@ RdbxToJson *RDBX_createHandlersToJson(RdbParser *p, const char *filename, RdbxTo
toJsonStruct, /* handleListZL*/
toJsonStruct, /* handleListLP*/
/*hash*/
toJsonHash,
toJsonStruct, /* handleHashZL*/
toJsonStruct, /* handleHashLP*/
toJsonStruct, /* handleHashZM*/
toJsonHash, /*handleHashPlain*/
toJsonStruct, /*handleHashZL*/
toJsonStruct, /*handleHashLP*/
toJsonStruct, /*handleHashLPEx*/
toJsonStruct, /*handleHashZM*/
/*set*/
toJsonSet,
toJsonStruct, /* handleSetIS*/
Expand Down
30 changes: 25 additions & 5 deletions src/ext/handlersToResp.c
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,8 @@ static RdbRes toRespList(RdbParser *p, void *userData, RdbBulk item) {
return writevWrap(ctx, iov, 6, &startCmd, 1);
}

static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk value) {
static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk value, int64_t expireAt) {
char expireTimeStr[32], expireTimeLenStr[32];
struct iovec iov[10];
RdbxToResp *ctx = userData;

Expand All @@ -499,9 +500,9 @@ static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk va
int fieldLen = RDB_bulkLen(p, field);
int valueLen = RDB_bulkLen(p, value);

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "HSET";
startCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd hsetCmd;
hsetCmd.cmd = "HSET";
hsetCmd.key = ctx->keyCtx.key;

/* write RPUSH */
IOV_CONST(&iov[0], "*4\r\n$4\r\nHSET");
Expand All @@ -515,7 +516,26 @@ static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk va
IOV_LENGTH(&iov[5], valueLen, valueLenStr);
IOV_STRING(&iov[6], value, valueLen);
IOV_CONST(&iov[7], "\r\n");
return writevWrap(ctx, iov, 8, &startCmd, 1);
IF_NOT_OK_RETURN(writevWrap(ctx, iov, 8, &hsetCmd, 1));

if (expireAt == -1) return RDB_OK;

RdbxRespWriterStartCmd hpexpireatCmd;
hpexpireatCmd.cmd = "HPEXPIREAT";
hpexpireatCmd.key = ctx->keyCtx.key;
/* write HPEXPIREAT */
IOV_CONST(&iov[0], "*6\r\n$10\r\nHPEXPIREAT");
/* write key */
IOV_LENGTH(&iov[1], ctx->keyCtx.keyLen, keyLenStr);
IOV_STRING(&iov[2], ctx->keyCtx.key, ctx->keyCtx.keyLen);
/* write expiration-time in msec */
IOV_LEN_AND_VAL(&iov[3], expireAt, expireTimeLenStr, expireTimeStr);
IOV_CONST(&iov[5], "$6\r\nFIELDS\r\n$1\r\n1");
/* write field */
IOV_LENGTH(&iov[6], fieldLen, fieldLenStr);
IOV_STRING(&iov[7], field, fieldLen);
IOV_CONST(&iov[8], "\r\n");
return writevWrap(ctx, iov, 9, &hpexpireatCmd, 1);
}

static RdbRes toRespSet(RdbParser *p, void *userData, RdbBulk member) {
Expand Down
52 changes: 40 additions & 12 deletions src/ext/readerResp.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ static RespRes readRespReplyLine(RespReaderCtx *ctx, RespReplyBuff *buffInfo) {
++(buffInfo->at);
/* fall-thru */
case PROC_LINE_END:
ctx->typeState = 0;
return RESP_REPLY_OK;
}
}
Expand Down Expand Up @@ -141,14 +142,15 @@ RespRes readRespReplyBulk(RespReaderCtx *ctx, RespReplyBuff *buffInfo) {
char ch;
UNUSED(buffInfo);

/* Parsing : $<length>\r\n<data>\r\n */
enum ProcessBulkReadStates {
PROC_BULK_READ_INIT = 0,
PROC_BULK_READ_LEN,
PROC_BULK_READ_LEN_CR,
PROC_BULK_READ_LEN_NL,
PROC_BULK_READ,
PROC_BULK_READ_CR,
PROC_BULK_READ_NL,
PROC_BULK_READ_LEN, /* Read bulk length */
PROC_BULK_READ_LEN_CR, /* Read CR */
PROC_BULK_READ_LEN_NL, /* Read NL */
PROC_BULK_READ, /* Read data */
PROC_BULK_READ_CR, /* Read CR */
PROC_BULK_READ_NL, /* Read NL */
PROC_BULK_READ_END,
};

Expand Down Expand Up @@ -255,6 +257,7 @@ static RespRes readRespReplyBulkArray(RespReaderCtx *ctx, RespReplyBuff *buffInf
READ_NUM_BULKS_NL,
READ_NEXT_BULK_HDR,
READ_NEXT_BULK,
READ_NEXT_LINE, /* int, double, null, bool, bignum */
READ_END,
};

Expand Down Expand Up @@ -311,12 +314,26 @@ static RespRes readRespReplyBulkArray(RespReaderCtx *ctx, RespReplyBuff *buffInf
break;

case READ_NEXT_BULK_HDR:
if (buffInfo->buff[buffInfo->at++] != '$') {
snprintf(ctx->errorMsg, sizeof(ctx->errorMsg),
"Invalid Multi-Bulk response. Failed to read Bulk header.");
return RESP_REPLY_ERR;
if (buffInfo->buff[buffInfo->at] == '$') {
buffInfo->at++;
ctx->typeArrayState = READ_NEXT_BULK;
break;
}

if ((buffInfo->buff[buffInfo->at] == ':') || /*int*/
(buffInfo->buff[buffInfo->at] == ',') || /*double*/
(buffInfo->buff[buffInfo->at] == '_') || /*null*/
(buffInfo->buff[buffInfo->at] == '#') || /*bool*/
(buffInfo->buff[buffInfo->at] == '(')) /*bignum*/
{
buffInfo->at++;
ctx->typeArrayState = READ_NEXT_LINE;
break;
}
ctx->typeArrayState = READ_NEXT_BULK; /* fall-thru */

snprintf(ctx->errorMsg, sizeof(ctx->errorMsg),
"Invalid Multi-Bulk response. Failed to read Bulk header.");
return RESP_REPLY_ERR;

case READ_NEXT_BULK:
if ( (res = readRespReplyBulk(ctx, buffInfo)) != RESP_REPLY_OK)
Expand All @@ -326,7 +343,18 @@ static RespRes readRespReplyBulkArray(RespReaderCtx *ctx, RespReplyBuff *buffInf
ctx->typeArrayState = READ_NEXT_BULK_HDR;
break;
}
ctx->typeArrayState = READ_END; /* fall-through */
ctx->typeArrayState = READ_END;
break;

case READ_NEXT_LINE:
if ( (res = readRespReplyLine(ctx, buffInfo)) != RESP_REPLY_OK)
return res;

if (--ctx->numBulksArray) {
ctx->typeArrayState = READ_NEXT_BULK_HDR;
break;
}
ctx->typeArrayState = READ_END;
break;
}

Expand Down
Loading

0 comments on commit 9d584ea

Please sign in to comment.