Skip to content

Commit

Permalink
Refine logic of del-key-before-write
Browse files Browse the repository at this point in the history
  • Loading branch information
moticless authored and moticless committed Aug 17, 2023
1 parent 27ba6f8 commit 4bb3338
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 37 deletions.
10 changes: 9 additions & 1 deletion api/librdb-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -381,13 +381,21 @@ _LIBRDB_API void RDB_dontPropagate(RdbParser *p);
* Parser setters & getters
****************************************************************/

_LIBRDB_API void RDB_setMaxRawLenHandling(RdbParser *p, size_t size);
_LIBRDB_API void RDB_setDeepIntegCheck(RdbParser *p, int deep);
_LIBRDB_API size_t RDB_getBytesProcessed(RdbParser *p);
_LIBRDB_API RdbState RDB_getState(RdbParser *p);
_LIBRDB_API int RDB_getNumHandlers(RdbParser *p, RdbHandlersLevel lvl);
_LIBRDB_API void RDB_IgnoreChecksum(RdbParser *p);

/* There could be relatively large strings stored within Redis, which are
* subsequently also present in the RDB. This is especially true for collections
* of strings. In situations like this, if the parser is configured to read
* raw data (using RDB_createHandlersRaw), it could potentially lead to memory
* problems in data path. By establishing a MaxRawSize threshold, the size of
* raw data can be restricted, and if this threshold is exceeded, the parser will
* terminate its operation. The default threshold is unlimited. */
_LIBRDB_API void RDB_setMaxRawSize(RdbParser *p, size_t maxSize);

/* logger */
_LIBRDB_API void RDB_setLogLevel(RdbParser *p, RdbLogLevel l);
_LIBRDB_API void RDB_setLogger(RdbParser *p, RdbLoggerCB f);
Expand Down
86 changes: 61 additions & 25 deletions src/ext/handlersToResp.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
#include "../../deps/redis/endianconv.h"
#include <sys/uio.h>

#define RETURN_ON_WRITE_ERR(cmd) do {\
if (unlikely(0 == (cmd))) return (RdbRes) RDBX_ERR_RESP_WRITE; \
} while(0);
#define _RDB_TYPE_STRING 0

#define WRITE_CONST_STR(wr, str, endCmd) (wr)->write((wr)->ctx, str, sizeof(str) - 1, endCmd)
typedef enum DelKeyBeforeWrite {
DEL_KEY_BEFORE_NONE,
DEL_KEY_BEFORE_BY_DEL_CMD,
DEL_KEY_BEFORE_BY_RESTORE_REPLACE, /* RESTORE supported */
} DelKeyBeforeWrite;

void setIov(struct iovec *iov, const char *s, size_t l) {
iov->iov_base = (void *) s;
Expand Down Expand Up @@ -45,6 +47,7 @@ struct RdbxToResp {

uint64_t crc;
int srcRdbVer;
DelKeyBeforeWrite delKeyBeforeWrite;
};

static void deleteRdbToRespCtx(RdbParser *p, void *context) {
Expand Down Expand Up @@ -107,22 +110,32 @@ static int rdbVerFromRedisVer(const char *ver) {
}

static void resolveSupportRestore(RdbParser *p, RdbxToResp *ctx, int srcRdbVer) {
int isRestore = ctx->conf.supportRestore;
int dstRdbVer = ctx->conf.restore.dstRdbVersion;

ctx->srcRdbVer = srcRdbVer;

if (isRestore) {
ctx->delKeyBeforeWrite = (ctx->conf.delKeyBeforeWrite) ? DEL_KEY_BEFORE_BY_DEL_CMD : DEL_KEY_BEFORE_NONE;

if (ctx->conf.supportRestore) {
/* if not configured destination RDB version, then resolve it from
* configured destination Redis version */
if (!dstRdbVer)
dstRdbVer = rdbVerFromRedisVer(ctx->conf.restore.dstRedisVersion);

if (dstRdbVer < srcRdbVer)
isRestore = 0;
if (dstRdbVer < srcRdbVer) {
RDB_log(p, RDB_LOG_WRN,
"Cannot support RESTORE. SRC version (=%d) is higher than DST version (%d)",
srcRdbVer, dstRdbVer);
ctx->conf.supportRestore = 0;
} else {
if (ctx->conf.delKeyBeforeWrite) {
RDB_log(p, RDB_LOG_INF, "As RESTORE is supported, configuration del-key-before-write will be ignored.");
ctx->delKeyBeforeWrite = DEL_KEY_BEFORE_BY_RESTORE_REPLACE;
}
}
}

RdbHandlersLevel lvl = (isRestore) ? RDB_LEVEL_RAW : RDB_LEVEL_DATA;
RdbHandlersLevel lvl = (ctx->conf.supportRestore) ? RDB_LEVEL_RAW : RDB_LEVEL_DATA;
for (int i = 0; i < RDB_DATA_TYPE_MAX; ++i) {
RDB_handleByLevel(p, (RdbDataType) i, lvl, 0);
}
Expand Down Expand Up @@ -173,7 +186,6 @@ static RdbRes toRespStartRdb(RdbParser *p, void *userData, int rdbVersion) {
return RDB_OK;
}

/* TODO: support option rdb2resp del key before write */
/* TODO: support expiry */
static RdbRes toRespNewKey(RdbParser *p, void *userData, RdbBulk key, RdbKeyInfo *info) {
UNUSED(info);
Expand All @@ -188,6 +200,18 @@ static RdbRes toRespNewKey(RdbParser *p, void *userData, RdbBulk key, RdbKeyInfo
ctx->keyCtx.keyLen = RDB_bulkLen(p, key);
if ((ctx->keyCtx.key = RDB_bulkClone(p, key)) == NULL)
return RDB_ERR_FAIL_ALLOC;

/* apply del-key-before-write if configured, unless it is 'SET' command where
* the key is overridden if it already exists, without encountering any problems. */
if ((ctx->delKeyBeforeWrite == DEL_KEY_BEFORE_BY_DEL_CMD) && (info->opcode != _RDB_TYPE_STRING)) {
struct iovec iov[4];
char keyLenStr[32];
IOV_CONST_STR(&iov[0], "*2\r\n$3\r\nDEL\r\n$");
iov_stringLen(&iov[1], ctx->keyCtx.keyLen, keyLenStr);
IOV_STRING(&iov[2], ctx->keyCtx.key, ctx->keyCtx.keyLen);
IOV_CONST_STR(&iov[3], "\r\n");
return writevWrap(ctx, iov, 4, 1, 1);
}
return RDB_OK;
}

Expand All @@ -207,7 +231,7 @@ static RdbRes toRespEndKey(RdbParser *p, void *userData) {
static RdbRes toRespString(RdbParser *p, void *userData, RdbBulk string) {
RdbxToResp *ctx = userData;

char keyLenStr[64], valLenStr[64];
char keyLenStr[32], valLenStr[32];
int valLen = RDB_bulkLen(p, string);

/*** fillup iovec ***/
Expand All @@ -231,7 +255,7 @@ static RdbRes toRespList(RdbParser *p, void *userData, RdbBulk item) {

/*** fillup iovec ***/

char keyLenStr[64], valLenStr[64];
char keyLenStr[32], valLenStr[32];
int valLen = RDB_bulkLen(p, item);

struct iovec iov[7];
Expand All @@ -253,7 +277,7 @@ static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk va

/*** fillup iovec ***/

char keyLenStr[64], fieldLenStr[64], valueLenStr[64];
char keyLenStr[32], fieldLenStr[32], valueLenStr[32];
int fieldLen = RDB_bulkLen(p, field);
int valueLen = RDB_bulkLen(p, value);

Expand All @@ -277,10 +301,8 @@ static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk va

static RdbRes toRespSet(RdbParser *p, void *userData, RdbBulk member) {
RdbxToResp *ctx = userData;
char keyLenStr[32], valLenStr[32];

/*** fillup iovec ***/

char keyLenStr[64], valLenStr[64];
int valLen = RDB_bulkLen(p, member);

struct iovec iov[7];
Expand All @@ -301,8 +323,16 @@ static RdbRes toRespEndRdb(RdbParser *p, void *userData) {
UNUSED(p);
RdbxToResp *ctx = userData;
RdbxRespWriter *writer = &ctx->respWriter;
writer->flush(writer->ctx);
return RDB_OK;
if (likely(writer->flush(writer->ctx) == 0))
return RDB_OK;

if (RDB_getErrorCode(p) != RDB_OK)
return RDB_getErrorCode(p);

/* writer didn't take care to report an error */
RDB_log(p, RDB_LOG_WRN, "Writer returned error indication but didn't RDB_reportError()");
RDB_reportError(p, (RdbRes) RDBX_ERR_RESP_WRITE, "RESP writer returned error on flush()");
return (RdbRes) RDBX_ERR_RESP_WRITE;
}

/*** Handling raw ***/
Expand All @@ -317,6 +347,7 @@ static RdbRes toRespRawBegin(RdbParser *p, void *userData, size_t size) {
}

static RdbRes toRespRawFrag(RdbParser *p, void *userData, RdbBulk frag) {
char keyLenStr[32], totalLenStr[32];
UNUSED(p);
RdbxToResp *ctx = userData;
struct iovec iov[10];
Expand All @@ -326,10 +357,11 @@ static RdbRes toRespRawFrag(RdbParser *p, void *userData, RdbBulk frag) {
ctx->crc = crc64(ctx->crc, (unsigned char *) frag , fragLen);

if (likely(!(ctx->rawCtx.sentFirstFrag))) {
char keyLenStr[64], totalLenStr[64];

ctx->rawCtx.sentFirstFrag = 1;
IOV_CONST_STR(&iov[iovs++], "*4\r\n$7\r\nRESTORE\r\n$"); /* RESTORE */
if (ctx->delKeyBeforeWrite == DEL_KEY_BEFORE_BY_RESTORE_REPLACE)
IOV_CONST_STR(&iov[iovs++], "*5\r\n$7\r\nRESTORE\r\n$"); /* RESTORE-REPLACE */
else
IOV_CONST_STR(&iov[iovs++], "*4\r\n$7\r\nRESTORE\r\n$"); /* RESTORE */
iov_stringLen(&iov[iovs++], ctx->keyCtx.keyLen, keyLenStr); /* write key len */
IOV_STRING(&iov[iovs++], ctx->keyCtx.key, ctx->keyCtx.keyLen); /* write key */
IOV_CONST_STR(&iov[iovs++], "\r\n$1\r\n0\r\n$"); /* newline + write TTL */
Expand All @@ -353,10 +385,14 @@ static RdbRes toRespRawFragEnd(RdbParser *p, void *userData) {
memrev64ifbe(crc);
memcpy(footer + 2, crc, 8);

struct iovec iov[] = {
{footer, 10},
{"\r\n", 2}
};
struct iovec iov[2];
int iovs = 0;

IOV_STRING(&iov[iovs++], footer, 10);
if (ctx->delKeyBeforeWrite == DEL_KEY_BEFORE_BY_RESTORE_REPLACE)
IOV_CONST_STR(&iov[iovs++], "\r\n$7\r\nREPLACE\r\n");
else
IOV_CONST_STR(&iov[iovs++], "\r\n");
return writevWrap(ctx, iov, 2, 0, 1);
}

Expand Down
18 changes: 9 additions & 9 deletions src/lib/parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ _LIBRDB_API RdbParser *RDB_createParserRdb(RdbMemAlloc *memAlloc) {
p->appCbCtx.numBulks = 0;
p->loggerCb = loggerCbDefault;
p->logLevel = RDB_LOG_DBG;
p->maxRawLen = SIZE_MAX;
p->maxRawSize = SIZE_MAX;
p->errorCode = RDB_OK;
p->handlers[RDB_LEVEL_RAW] = NULL;
p->handlers[RDB_LEVEL_STRUCT] = NULL;
Expand Down Expand Up @@ -364,8 +364,8 @@ _LIBRDB_API void RDB_IgnoreChecksum(RdbParser *p) {
p->ignoreChecksum = 1;
}

_LIBRDB_API void RDB_setMaxRawLenHandling(RdbParser *p, size_t size) {
p->maxRawLen = size;
_LIBRDB_API void RDB_setMaxRawSize(RdbParser *p, size_t size) {
p->maxRawSize = size;
}

_LIBRDB_API void RDB_log(RdbParser *p, RdbLogLevel lvl, const char *format, ...) {
Expand Down Expand Up @@ -718,12 +718,12 @@ static RdbStatus finalizeConfig(RdbParser *p, int isParseFromBuff) {
}

static void printParserState(RdbParser *p) {
printf ("Parser error message:%s\n", RDB_getErrorMessage(p));
printf ("Parser error code:%d\n", RDB_getErrorCode(p));
printf ("Parser element func name: %s\n", peInfo[p->parsingElement].funcname);
printf ("Parser element func description: %s\n", peInfo[p->parsingElement].funcname);
printf ("Parser element state:%d\n", p->elmCtx.state);
bulkPoolPrintDbg(p);
RDB_log(p, RDB_LOG_ERR, "Parser error message: %s", RDB_getErrorMessage(p));
RDB_log(p, RDB_LOG_ERR, "Parser error code: %d", RDB_getErrorCode(p));
RDB_log(p, RDB_LOG_ERR, "Parser element func name: %s", peInfo[p->parsingElement].funcname);
RDB_log(p, RDB_LOG_ERR, "Parser element func description: %s", peInfo[p->parsingElement].funcname);
RDB_log(p, RDB_LOG_ERR, "Parser element state:%d", p->elmCtx.state);
//bulkPoolPrintDbg(p);
}

static void loggerCbDefault(RdbLogLevel l, const char *msg) {
Expand Down
2 changes: 1 addition & 1 deletion src/lib/parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ struct RdbParser {
int ignoreChecksum;
RdbLoggerCB loggerCb;
RdbLogLevel logLevel;
size_t maxRawLen;
size_t maxRawSize;

/*** context ***/
ElementCtx elmCtx; /* parsing-element context */
Expand Down
2 changes: 1 addition & 1 deletion src/lib/parserRaw.c
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ static RdbStatus aggMakeRoom(RdbParser *p, size_t numBytesRq) {
if (likely(freeRoomLeft >= numBytesRq))
return RDB_STATUS_OK;

if (unlikely(p->maxRawLen < ctx->totalSize + numBytesRq)) {
if (unlikely(p->maxRawSize < ctx->totalSize + numBytesRq)) {
RDB_reportError(p, RDB_ERR_MAX_RAW_LEN_EXCEEDED_FOR_KEY, "Maximum raw length exceeded for key (len=%lu)",
ctx->totalSize + numBytesRq);
return RDB_STATUS_ERROR;
Expand Down
Binary file added test/dumps/100_lists.rdb
Binary file not shown.
45 changes: 45 additions & 0 deletions test/test_rdb_to_redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,50 @@ static void test_rdb_to_redis_set_lp(void **state) {
test_rdb_to_redis_common(DUMP_FOLDER("set_lp_v11.rdb"), 1, 1);
}

/* iff 'delKeyBeforeWrite' is not set, then the parser will return an error on
* loading 100_lists.rdb ("mylist1 mylist2 ... mylist100") on key 'mylist62'
* Because key `mylist62` created earlier with a string value. */
static void test_rdb_to_redis_del_before_write(void **state) {
UNUSED(state);
RdbParser *parser;
RdbStatus status;
for (int delKeyBeforeWrite = 0 ; delKeyBeforeWrite <= 1 ; ++delKeyBeforeWrite) {
RdbxToRespConf rdb2respConf = {
.delKeyBeforeWrite = delKeyBeforeWrite,
.supportRestore = 1,
.restore.dstRdbVersion = 100};

runSystemCmd("%s/redis-cli -p %d set mylist62 1 > /dev/null", redisInstallFolder, redisPort);
/* RDB to TCP */
RdbxToResp *rdbToResp;
parser = RDB_createParserRdb(NULL);
RDB_setLogLevel(parser, RDB_LOG_ERR);
assert_non_null(RDBX_createReaderFile(parser, DUMP_FOLDER("100_lists.rdb")));
assert_non_null(rdbToResp = RDBX_createHandlersToResp(parser, &rdb2respConf));

assert_non_null(RDBX_createRespToRedisTcp(parser,
rdbToResp,
"127.0.0.1",
redisPort));

RDB_setLogLevel(parser, RDB_LOG_ERR);

while ((status = RDB_parse(parser)) == RDB_STATUS_WAIT_MORE_DATA);

if (status == RDB_STATUS_OK)
assert_int_equal(delKeyBeforeWrite, 1);
else {
assert_int_equal(delKeyBeforeWrite, 0);
/* verify returned error code. Verify error message. */
RdbRes err = RDB_getErrorCode(parser);
assert_int_equal(err, RDBX_ERR_RESP_WRITE);
assert_non_null(strstr(RDB_getErrorMessage(parser), "mylist62"));
}

RDB_deleteParser(parser);
}
}

/*************************** group_rdb_to_redis *******************************/
int group_rdb_to_redis() {

Expand Down Expand Up @@ -191,6 +235,7 @@ int group_rdb_to_redis() {
/* misc */
cmocka_unit_test_setup(test_rdb_to_redis_multiple_lists_strings, setupTest),
cmocka_unit_test_setup(test_rdb_to_redis_multiple_lists_strings_pipeline_depth_1, setupTest),
cmocka_unit_test_setup(test_rdb_to_redis_del_before_write, setupTest),

};

Expand Down
1 change: 1 addition & 0 deletions test/test_rdb_to_resp.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ static void runWithAndWithoutRestore(const char *rdbfile) {

memset(&r2rConf, 0, sizeof(r2rConf));
r2rConf.supportRestore = 1;
r2rConf.delKeyBeforeWrite = 0;

/* expect not use RESTORE */
r2rConf.restore.dstRdbVersion = 1;
Expand Down

0 comments on commit 4bb3338

Please sign in to comment.