Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to hash-field-expiration RDB_TYPE v2 #48

Merged
merged 3 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/ext/handlersFilter.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ static void initOpcodeToType(RdbxFilter *ctx) {
ctx->opToType[RDB_TYPE_ZSET_LISTPACK] = RDB_DATA_TYPE_ZSET;
/*hash*/
ctx->opToType[RDB_TYPE_HASH] = RDB_DATA_TYPE_HASH;
ctx->opToType[RDB_TYPE_HASH_METADATA_PRE_GA] = RDB_DATA_TYPE_HASH;
ctx->opToType[RDB_TYPE_HASH_METADATA] = RDB_DATA_TYPE_HASH;
ctx->opToType[RDB_TYPE_HASH_ZIPMAP] = RDB_DATA_TYPE_HASH;
ctx->opToType[RDB_TYPE_HASH_ZIPLIST] = RDB_DATA_TYPE_HASH;
ctx->opToType[RDB_TYPE_HASH_LISTPACK] = RDB_DATA_TYPE_HASH;
ctx->opToType[RDB_TYPE_HASH_LISTPACK_EX_PRE_GA] = RDB_DATA_TYPE_HASH;
ctx->opToType[RDB_TYPE_HASH_LISTPACK_EX] = RDB_DATA_TYPE_HASH;
/*module*/
ctx->opToType[RDB_TYPE_MODULE_2] = RDB_DATA_TYPE_MODULE;
Expand Down
11 changes: 6 additions & 5 deletions src/lib/defines.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#define RDB_TYPE_MODULE_PRE_GA 6 /* Used in 4.0 release candidates */
#define RDB_TYPE_MODULE_2 7 /* Module value with annotations for parsing without
the generating module being loaded. */
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */

/* Object types for encoded objects. */
#define RDB_TYPE_HASH_ZIPMAP 9
Expand All @@ -32,10 +31,12 @@
#define RDB_TYPE_STREAM_LISTPACKS_2 19
#define RDB_TYPE_SET_LISTPACK 20
#define RDB_TYPE_STREAM_LISTPACKS_3 21
#define RDB_TYPE_HASH_METADATA 22
#define RDB_TYPE_HASH_LISTPACK_EX 23
#define RDB_TYPE_MAX 24
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */
#define RDB_TYPE_HASH_METADATA_PRE_GA 22 /* Hash with HFEs. Doesn't attach min TTL at start (7.4 RC) */
#define RDB_TYPE_HASH_LISTPACK_EX_PRE_GA 23 /* Hash LP with HFEs. Doesn't attach min TTL at start (7.4 RC) */
#define RDB_TYPE_HASH_METADATA 24 /* Hash with HFEs. Attach min TTL at start */
#define RDB_TYPE_HASH_LISTPACK_EX 25 /* Hash LP with HFEs. Attach min TTL at start */
#define RDB_TYPE_MAX 26


/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
#define RDB_OPCODE_SLOT_INFO 244 /* Individual slot info, such as slot id and size (cluster mode only). */
Expand Down
21 changes: 20 additions & 1 deletion src/lib/parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -561,10 +561,12 @@ _LIBRDB_API int RDB_handleByLevel(RdbParser *p, RdbDataType type, RdbHandlersLev
break;
case RDB_DATA_TYPE_HASH:
p->handleTypeObjByLevel[RDB_TYPE_HASH] = lvl;
p->handleTypeObjByLevel[RDB_TYPE_HASH_METADATA_PRE_GA] = lvl;
p->handleTypeObjByLevel[RDB_TYPE_HASH_METADATA] = lvl;
p->handleTypeObjByLevel[RDB_TYPE_HASH_ZIPMAP] = lvl;
p->handleTypeObjByLevel[RDB_TYPE_HASH_ZIPLIST] = lvl;
p->handleTypeObjByLevel[RDB_TYPE_HASH_LISTPACK] = lvl;
p->handleTypeObjByLevel[RDB_TYPE_HASH_LISTPACK_EX_PRE_GA] = lvl;
p->handleTypeObjByLevel[RDB_TYPE_HASH_LISTPACK_EX] = lvl;
break;
case RDB_DATA_TYPE_MODULE:
Expand Down Expand Up @@ -1467,9 +1469,11 @@ RdbStatus elementNextRdbType(RdbParser *p) {
case RDB_TYPE_LIST_ZIPLIST: return nextParsingElementKeyValue(p, PE_RAW_LIST_ZL, PE_LIST_ZL);
/* hash */
case RDB_TYPE_HASH: return nextParsingElementKeyValue(p, PE_RAW_HASH, PE_HASH);
case RDB_TYPE_HASH_METADATA_PRE_GA: return nextParsingElementKeyValue(p, PE_RAW_HASH_META, PE_HASH_META);
case RDB_TYPE_HASH_METADATA: return nextParsingElementKeyValue(p, PE_RAW_HASH_META, PE_HASH_META);
case RDB_TYPE_HASH_ZIPLIST: return nextParsingElementKeyValue(p, PE_RAW_HASH_ZL, PE_HASH_ZL);
case RDB_TYPE_HASH_LISTPACK: return nextParsingElementKeyValue(p, PE_RAW_HASH_LP, PE_HASH_LP);
case RDB_TYPE_HASH_LISTPACK_EX_PRE_GA: return nextParsingElementKeyValue(p, PE_RAW_HASH_LP_EX, PE_HASH_LP_EX);
case RDB_TYPE_HASH_LISTPACK_EX: return nextParsingElementKeyValue(p, PE_RAW_HASH_LP_EX, PE_HASH_LP_EX);
case RDB_TYPE_HASH_ZIPMAP: return nextParsingElementKeyValue(p, PE_RAW_HASH_ZM, PE_HASH_ZM);
/* set */
Expand Down Expand Up @@ -1692,6 +1696,13 @@ RdbStatus elementHash(RdbParser *p) {

switch (ctx->state) {
case ST_HASH_HEADER:
if (p->currOpcode == RDB_TYPE_HASH_METADATA) {
/* digest min HFE expiration time. No need to pass it to handlers
each field goanna report its expiration time anyway */
moticless marked this conversation as resolved.
Show resolved Hide resolved
BulkInfo *binfoExpire;
IF_NOT_OK_RETURN(rdbLoad(p, 8, RQ_ALLOC, NULL, &binfoExpire));
}

IF_NOT_OK_RETURN(rdbLoadLen(p, NULL, &(ctx->hash.numFields), NULL, NULL));

ctx->key.numItemsHint = ctx->hash.numFields;
Expand Down Expand Up @@ -1757,6 +1768,13 @@ RdbStatus elementHashZL(RdbParser *p) {
RdbStatus elementHashLPEx(RdbParser *p) {
BulkInfo *listpackBulk;

if (p->currOpcode != RDB_TYPE_HASH_LISTPACK_EX_PRE_GA) {
/* digest min HFE expiration time. No need to pass it to handlers
each field goanna report its expiration time anyway */
moticless marked this conversation as resolved.
Show resolved Hide resolved
BulkInfo *binfoExpire;
IF_NOT_OK_RETURN(rdbLoad(p, 8, RQ_ALLOC, NULL, &binfoExpire));
}

IF_NOT_OK_RETURN(rdbLoadString(p, RQ_ALLOC_APP_BULK, NULL, &listpackBulk));

/*** ENTER SAFE STATE ***/
Expand Down Expand Up @@ -2648,7 +2666,8 @@ RdbStatus rdbLoadString(RdbParser *p, AllocTypeRq type, char *refBuf, BulkInfo *
return rdbLoadLzfString(p, type, refBuf, binfo);
default:
RDB_reportError(p, RDB_ERR_STRING_UNKNOWN_ENCODING_TYPE,
"rdbLoadString(): Unknown RDB string encoding type: %lu",len);
"rdbLoadString(): Unknown RDB string encoding type: %lu (0x%lx)",
len, len);
moticless marked this conversation as resolved.
Show resolved Hide resolved
return RDB_STATUS_ERROR;
}
}
Expand Down
67 changes: 45 additions & 22 deletions src/lib/parserRaw.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ static int listpackValidateIntegrityCb(unsigned char* str, size_t size, RdbParse
static int zipmapValidateIntegrityCb(unsigned char* str, size_t size, RdbParser *p);
static int intsetValidateIntegrityCb(unsigned char* str, size_t size, RdbParser *p);
typedef int (*singleStringTypeValidateCb)(unsigned char* str, size_t size, RdbParser *p); // return 0 for error
static RdbStatus singleStringTypeHandling(RdbParser *p, singleStringTypeValidateCb validateCb, char *callerName);
static RdbStatus singleStringTypeHandling(RdbParser *p,
singleStringTypeValidateCb validateCb,
int digestHdrSize,
char *callerName);
void moduleTypeNameByID(char *name, uint64_t moduleid);

/*** init & release ***/
Expand Down Expand Up @@ -315,7 +318,8 @@ RdbStatus elementRawString(RdbParser *p) {
break;
default:
RDB_reportError(p, RDB_ERR_STRING_UNKNOWN_ENCODING_TYPE,
"elementRawString(): Unknown RDB string encoding type: %lu", strCtx->len);
"elementRawString(): Unknown RDB string encoding type: %lu (0x%lx)",
strCtx->len, strCtx->len);
return RDB_STATUS_ERROR;
}
}
Expand Down Expand Up @@ -419,7 +423,8 @@ RdbStatus elementRawString(RdbParser *p) {
}

RDB_reportError(p, RDB_ERR_STRING_UNKNOWN_ENCODING_TYPE,
"elementRawString(): Unknown RDB string encoding type: %lu", strCtx->encoding);
"elementRawString(): Unknown RDB string encoding type: %lu (0x%lx)",
strCtx->encoding, strCtx->encoding);
return RDB_STATUS_ERROR;
}

Expand All @@ -432,12 +437,13 @@ RdbStatus elementRawString(RdbParser *p) {
}

RdbStatus elementRawListZL(RdbParser *p) {
return singleStringTypeHandling(p, ziplistValidateIntegrityCb, "elementRawListZL");
return singleStringTypeHandling(p, ziplistValidateIntegrityCb, 0, "elementRawListZL");
}

RdbStatus elementRawHash(RdbParser *p) {
BulkInfo *binfo;
uint64_t expireAt;
int numDigits;
int offset, processedBytes;
size_t len;
unsigned char *unusedData;

Expand All @@ -456,18 +462,25 @@ RdbStatus elementRawHash(RdbParser *p) {
switch (p->elmCtx.state) {

case ST_RAW_HASH_HEADER:
numDigits = 0;
aggMakeRoom(p, 10); /* worse case 9 bytes for len */
offset = processedBytes = 0;

aggMakeRoom(p, 20); /* > optional 8 bytes + worse case 9 bytes for len */
if (p->currOpcode == RDB_TYPE_HASH_METADATA) {
/* load min expiration time. Do nothing with it since each field
* goanna report anyway its expiration time */
moticless marked this conversation as resolved.
Show resolved Hide resolved
IF_NOT_OK_RETURN(rdbLoad(p, 8, RQ_ALLOC_REF, rawCtx->at, &binfo));
offset = processedBytes = 8;
}

IF_NOT_OK_RETURN(rdbLoadLen(p, NULL, &hashCtx->numFields,
(unsigned char *) rawCtx->at, &numDigits));
(unsigned char *) rawCtx->at + offset, &processedBytes));

/*** ENTER SAFE STATE ***/

hashCtx->visitField = 0;

IF_NOT_OK_RETURN(cbHandleBegin(p, DATA_SIZE_UNKNOWN_AHEAD));
IF_NOT_OK_RETURN(aggUpdateWritten(p, numDigits));
IF_NOT_OK_RETURN(aggUpdateWritten(p, processedBytes));

if (hashCtx->numFields == 0)
return nextParsingElement(p, PE_RAW_END_KEY); /* empty-key */
Expand All @@ -476,13 +489,13 @@ RdbStatus elementRawHash(RdbParser *p) {

case ST_RAW_HASH_READ_NEXT_EXPIRE:
if (p->parsingElement == PE_RAW_HASH_META) {
numDigits = 0;
processedBytes = 0;
aggMakeRoom(p, 32);
IF_NOT_OK_RETURN(rdbLoadLen(p, NULL, &expireAt,
(unsigned char *) rawCtx->at,
&numDigits));
&processedBytes));
/*** ENTER SAFE STATE ***/
IF_NOT_OK_RETURN(aggUpdateWritten(p, numDigits));
IF_NOT_OK_RETURN(aggUpdateWritten(p, processedBytes));
}
updateElementState(p, ST_RAW_HASH_READ_NEXT_FIELD_STR, 0); /* fall-thru */

Expand Down Expand Up @@ -518,27 +531,28 @@ RdbStatus elementRawHash(RdbParser *p) {
}

RdbStatus elementRawHashZL(RdbParser *p) {
return singleStringTypeHandling(p, ziplistValidateIntegrityCb, "elementRawHashZL");
return singleStringTypeHandling(p, ziplistValidateIntegrityCb, 0, "elementRawHashZL");
}

RdbStatus elementRawHashLP(RdbParser *p) {
return singleStringTypeHandling(p, listpackValidateIntegrityCb, "elementRawHashLP");
return singleStringTypeHandling(p, listpackValidateIntegrityCb, 0, "elementRawHashLP");
}

RdbStatus elementRawHashLPEx(RdbParser *p) {
return singleStringTypeHandling(p, listpackValidateIntegrityCb, "elementRawHashLPEx");
int digestMinExpireSize = (p->currOpcode != RDB_TYPE_HASH_LISTPACK_EX_PRE_GA) ? 8 : 0;
return singleStringTypeHandling(p, listpackValidateIntegrityCb, digestMinExpireSize, "elementRawHashLPEx");
}

RdbStatus elementRawHashZM(RdbParser *p) {
return singleStringTypeHandling(p, zipmapValidateIntegrityCb, "elementRawHashZM");
return singleStringTypeHandling(p, zipmapValidateIntegrityCb, 0, "elementRawHashZM");
}

RdbStatus elementRawSetIS(RdbParser *p) {
return singleStringTypeHandling(p, intsetValidateIntegrityCb, "elementRawSetIS");
return singleStringTypeHandling(p, intsetValidateIntegrityCb, 0, "elementRawSetIS");
}

RdbStatus elementRawSetLP(RdbParser *p) {
return singleStringTypeHandling(p, listpackValidateIntegrityCb, "elementRawSetLP");
return singleStringTypeHandling(p, listpackValidateIntegrityCb, 0, "elementRawSetLP");
}

RdbStatus elementRawSet(RdbParser *p) {
Expand Down Expand Up @@ -595,11 +609,11 @@ RdbStatus elementRawSet(RdbParser *p) {
}

RdbStatus elementRawZsetLP(RdbParser *p) {
return singleStringTypeHandling(p, listpackValidateIntegrityCb, "elementRawZsetLP");
return singleStringTypeHandling(p, listpackValidateIntegrityCb, 0, "elementRawZsetLP");
}

RdbStatus elementRawZsetZL(RdbParser *p) {
return singleStringTypeHandling(p, ziplistValidateIntegrityCb, "elementRawZsetZL");
return singleStringTypeHandling(p, ziplistValidateIntegrityCb, 0, "elementRawZsetZL");
}

RdbStatus elementRawZset(RdbParser *p) {
Expand Down Expand Up @@ -1122,8 +1136,11 @@ static int intsetValidateIntegrityCb(unsigned char* str, size_t size, RdbParser
return intsetValidateIntegrity(str, size, 1);
}

static RdbStatus singleStringTypeHandling(RdbParser *p, singleStringTypeValidateCb validateCb, char *callerName) {

static RdbStatus singleStringTypeHandling(RdbParser *p,
singleStringTypeValidateCb validateCb,
int digestHdrSize,
char *callerName) {
BulkInfo *binfo;
enum RAW_SINGLE_STRING_TYPE_STATES {
ST_RAW_SSTYPE_START=0,
ST_RAW_SSTYPE_CALL_STR, /* Call PE_RAW_STRING as sub-element */
Expand All @@ -1132,6 +1149,12 @@ static RdbStatus singleStringTypeHandling(RdbParser *p, singleStringTypeValidate

switch (p->elmCtx.state) {
case ST_RAW_SSTYPE_START:
if (digestHdrSize) {
IF_NOT_OK_RETURN(aggMakeRoom(p, digestHdrSize));
IF_NOT_OK_RETURN(rdbLoad(p, digestHdrSize, RQ_ALLOC_REF, p->rawCtx.at, &binfo));
/*** ENTER SAFE STATE ***/
IF_NOT_OK_RETURN(aggUpdateWritten(p, digestHdrSize));
}
/* take care string won't propagate for having integrity check */
IF_NOT_OK_RETURN(cbHandleBegin(p, DATA_SIZE_UNKNOWN_AHEAD));

Expand Down
2 changes: 1 addition & 1 deletion test/test_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ size_t serializeRedisReply(const redisReply *reply, char *buffer, size_t bsize)
*
* Return the response serialized
*/
char *sendRedisCmd(char *cmd, int expRetType, char *expRsp) {
char *sendRedisCmd(const char *cmd, int expRetType, char *expRsp) {
static char rspbuf[1024];

assert_int_not_equal(currRedisInst, -1);
Expand Down
2 changes: 1 addition & 1 deletion test/test_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const char *getTargetRedisVersion(int *major, int *minor); /* call only after se
void teardownRedisServer(void);
void cleanup_json_sign_service(void);
int isSetRedisServer(void);
char *sendRedisCmd(char *cmd, int expRetType, char *expRsp);
char *sendRedisCmd(const char *cmd, int expRetType, char *expRsp);
int isSupportRestoreModuleAux(void);

/* test groups */
Expand Down
36 changes: 15 additions & 21 deletions test/test_rdb_to_redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -189,33 +189,27 @@ static void test_rdb_to_redis_hash(void **state) {

static void test_rdb_to_redis_hash_with_expire(void **state) {
UNUSED(state);
const char* configs[] = {
"CONFIG SET HASH-MAX-LISTPACK-ENTRIES 0", /*HT*/
"CONFIG SET HASH-MAX-LISTPACK-ENTRIES 512", /*listpack*/
};

/* hash-field-expiration available since 7.4 */
if ((serverMajorVer<7) || ((serverMajorVer==7) && (serverMinorVer<4)))
skip();

setupRedisServer("--enable-debug-command yes --dbfilename expire.rdb");

/* listpack */
sendRedisCmd("FLUSHALL", REDIS_REPLY_STATUS, NULL);
sendRedisCmd("CONFIG SET HASH-MAX-LISTPACK-ENTRIES 512", REDIS_REPLY_STATUS, NULL);
sendRedisCmd("HSET myhash f4 v1 f5 v2 f6 v3", REDIS_REPLY_INTEGER, "3");
sendRedisCmd("HPEXPIREAT myhash 70368744177663 FIELDS 2 f4 f5", REDIS_REPLY_ARRAY, "1 1");
rdb_save_librdb_reload_eq(0 /*restore*/, TMP_FOLDER("expire.rdb"));
rdb_save_librdb_reload_eq(1 /*restore*/, TMP_FOLDER("expire.rdb"));
sendRedisCmd("HPEXPIRETIME myhash FIELDS 3 f4 f5 f6", REDIS_REPLY_ARRAY,
"70368744177663 70368744177663 -1"); /* verify expected output */

/* dict (max-lp-entries=0) */
sendRedisCmd("FLUSHALL", REDIS_REPLY_STATUS, NULL);
sendRedisCmd("CONFIG SET HASH-MAX-LISTPACK-ENTRIES 0", REDIS_REPLY_STATUS, NULL);
sendRedisCmd("HSET myhash f4 v1 f5 v2 f6 v3", REDIS_REPLY_INTEGER, "3");
sendRedisCmd("HPEXPIREAT myhash 70368744177663 FIELDS 2 f4 f5", REDIS_REPLY_ARRAY, "1 1");
rdb_save_librdb_reload_eq(0 /*restore*/, TMP_FOLDER("expire.rdb"));
rdb_save_librdb_reload_eq(1 /*restore*/, TMP_FOLDER("expire.rdb"));
sendRedisCmd("HPEXPIRETIME myhash FIELDS 3 f4 f5 f6", REDIS_REPLY_ARRAY,
"70368744177663 70368744177663 -1"); /* verify expected output */

for (int i = 0; i < 2; i++) {
sendRedisCmd("FLUSHALL", REDIS_REPLY_STATUS, NULL);
sendRedisCmd(configs[i], REDIS_REPLY_STATUS, NULL);
sendRedisCmd("HSET myhash f4 v1 f5 v2 f6 v3", REDIS_REPLY_INTEGER, "3");
sendRedisCmd("HPEXPIREAT myhash 70368744177663 FIELDS 2 f4 f5",
REDIS_REPLY_ARRAY, "1 1"); /*time=0x3fffffffffff*/
rdb_save_librdb_reload_eq(0 /*restore*/, TMP_FOLDER("expire.rdb")); /// <<<< ----- problem here
//rdb_save_librdb_reload_eq(1 /*restore*/, TMP_FOLDER("expire.rdb"));
moticless marked this conversation as resolved.
Show resolved Hide resolved
sendRedisCmd("HPEXPIRETIME myhash FIELDS 3 f4 f5 f6", REDIS_REPLY_ARRAY,
"70368744177663 70368744177663 -1"); /* verify expected output */
}
teardownRedisServer();
}

Expand Down
Loading