diff --git a/api/librdb-api.h b/api/librdb-api.h index 7b0ae11..abe90be 100644 --- a/api/librdb-api.h +++ b/api/librdb-api.h @@ -381,13 +381,21 @@ _LIBRDB_API void RDB_dontPropagate(RdbParser *p); * Parser setters & getters ****************************************************************/ -_LIBRDB_API void RDB_setMaxRawLenHandling(RdbParser *p, size_t size); _LIBRDB_API void RDB_setDeepIntegCheck(RdbParser *p, int deep); _LIBRDB_API size_t RDB_getBytesProcessed(RdbParser *p); _LIBRDB_API RdbState RDB_getState(RdbParser *p); _LIBRDB_API int RDB_getNumHandlers(RdbParser *p, RdbHandlersLevel lvl); _LIBRDB_API void RDB_IgnoreChecksum(RdbParser *p); +/* There could be relatively large strings stored within Redis, which are + * subsequently also present in the RDB. This is especially true for collections + * of strings. In situations like this, if the parser is configured to read + * raw data (using RDB_createHandlersRaw), it could potentially lead to memory + * problems in data path. By establishing a MaxRawSize threshold, the size of + * raw data can be restricted, and if this threshold is exceeded, the parser will + * terminate its operation. The default threshold is unlimited. */ +_LIBRDB_API void RDB_setMaxRawSize(RdbParser *p, size_t maxSize); + /* logger */ _LIBRDB_API void RDB_setLogLevel(RdbParser *p, RdbLogLevel l); _LIBRDB_API void RDB_setLogger(RdbParser *p, RdbLoggerCB f); diff --git a/src/ext/handlersToResp.c b/src/ext/handlersToResp.c index 691c1a5..b22dc0c 100644 --- a/src/ext/handlersToResp.c +++ b/src/ext/handlersToResp.c @@ -6,11 +6,13 @@ #include "../../deps/redis/endianconv.h" #include -#define RETURN_ON_WRITE_ERR(cmd) do {\ - if (unlikely(0 == (cmd))) return (RdbRes) RDBX_ERR_RESP_WRITE; \ - } while(0); +#define _RDB_TYPE_STRING 0 -#define WRITE_CONST_STR(wr, str, endCmd) (wr)->write((wr)->ctx, str, sizeof(str) - 1, endCmd) +typedef enum DelKeyBeforeWrite { + DEL_KEY_BEFORE_NONE, + DEL_KEY_BEFORE_BY_DEL_CMD, + DEL_KEY_BEFORE_BY_RESTORE_REPLACE, /* RESTORE supported */ +} DelKeyBeforeWrite; void setIov(struct iovec *iov, const char *s, size_t l) { iov->iov_base = (void *) s; @@ -45,6 +47,7 @@ struct RdbxToResp { uint64_t crc; int srcRdbVer; + DelKeyBeforeWrite delKeyBeforeWrite; }; static void deleteRdbToRespCtx(RdbParser *p, void *context) { @@ -107,22 +110,32 @@ static int rdbVerFromRedisVer(const char *ver) { } static void resolveSupportRestore(RdbParser *p, RdbxToResp *ctx, int srcRdbVer) { - int isRestore = ctx->conf.supportRestore; int dstRdbVer = ctx->conf.restore.dstRdbVersion; ctx->srcRdbVer = srcRdbVer; - if (isRestore) { + ctx->delKeyBeforeWrite = (ctx->conf.delKeyBeforeWrite) ? DEL_KEY_BEFORE_BY_DEL_CMD : DEL_KEY_BEFORE_NONE; + + if (ctx->conf.supportRestore) { /* if not configured destination RDB version, then resolve it from * configured destination Redis version */ if (!dstRdbVer) dstRdbVer = rdbVerFromRedisVer(ctx->conf.restore.dstRedisVersion); - if (dstRdbVer < srcRdbVer) - isRestore = 0; + if (dstRdbVer < srcRdbVer) { + RDB_log(p, RDB_LOG_WRN, + "Cannot support RESTORE. SRC version (=%d) is higher than DST version (%d)", + srcRdbVer, dstRdbVer); + ctx->conf.supportRestore = 0; + } else { + if (ctx->conf.delKeyBeforeWrite) { + RDB_log(p, RDB_LOG_INF, "As RESTORE is supported, configuration del-key-before-write will be ignored."); + ctx->delKeyBeforeWrite = DEL_KEY_BEFORE_BY_RESTORE_REPLACE; + } + } } - RdbHandlersLevel lvl = (isRestore) ? RDB_LEVEL_RAW : RDB_LEVEL_DATA; + RdbHandlersLevel lvl = (ctx->conf.supportRestore) ? RDB_LEVEL_RAW : RDB_LEVEL_DATA; for (int i = 0; i < RDB_DATA_TYPE_MAX; ++i) { RDB_handleByLevel(p, (RdbDataType) i, lvl, 0); } @@ -173,7 +186,6 @@ static RdbRes toRespStartRdb(RdbParser *p, void *userData, int rdbVersion) { return RDB_OK; } -/* TODO: support option rdb2resp del key before write */ /* TODO: support expiry */ static RdbRes toRespNewKey(RdbParser *p, void *userData, RdbBulk key, RdbKeyInfo *info) { UNUSED(info); @@ -188,6 +200,18 @@ static RdbRes toRespNewKey(RdbParser *p, void *userData, RdbBulk key, RdbKeyInfo ctx->keyCtx.keyLen = RDB_bulkLen(p, key); if ((ctx->keyCtx.key = RDB_bulkClone(p, key)) == NULL) return RDB_ERR_FAIL_ALLOC; + + /* apply del-key-before-write if configured, unless it is 'SET' command where + * the key is overridden if it already exists, without encountering any problems. */ + if ((ctx->delKeyBeforeWrite == DEL_KEY_BEFORE_BY_DEL_CMD) && (info->opcode != _RDB_TYPE_STRING)) { + struct iovec iov[4]; + char keyLenStr[32]; + IOV_CONST_STR(&iov[0], "*2\r\n$3\r\nDEL\r\n$"); + iov_stringLen(&iov[1], ctx->keyCtx.keyLen, keyLenStr); + IOV_STRING(&iov[2], ctx->keyCtx.key, ctx->keyCtx.keyLen); + IOV_CONST_STR(&iov[3], "\r\n"); + return writevWrap(ctx, iov, 4, 1, 1); + } return RDB_OK; } @@ -207,7 +231,7 @@ static RdbRes toRespEndKey(RdbParser *p, void *userData) { static RdbRes toRespString(RdbParser *p, void *userData, RdbBulk string) { RdbxToResp *ctx = userData; - char keyLenStr[64], valLenStr[64]; + char keyLenStr[32], valLenStr[32]; int valLen = RDB_bulkLen(p, string); /*** fillup iovec ***/ @@ -231,7 +255,7 @@ static RdbRes toRespList(RdbParser *p, void *userData, RdbBulk item) { /*** fillup iovec ***/ - char keyLenStr[64], valLenStr[64]; + char keyLenStr[32], valLenStr[32]; int valLen = RDB_bulkLen(p, item); struct iovec iov[7]; @@ -253,7 +277,7 @@ static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk va /*** fillup iovec ***/ - char keyLenStr[64], fieldLenStr[64], valueLenStr[64]; + char keyLenStr[32], fieldLenStr[32], valueLenStr[32]; int fieldLen = RDB_bulkLen(p, field); int valueLen = RDB_bulkLen(p, value); @@ -277,10 +301,8 @@ static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk va static RdbRes toRespSet(RdbParser *p, void *userData, RdbBulk member) { RdbxToResp *ctx = userData; + char keyLenStr[32], valLenStr[32]; - /*** fillup iovec ***/ - - char keyLenStr[64], valLenStr[64]; int valLen = RDB_bulkLen(p, member); struct iovec iov[7]; @@ -301,8 +323,16 @@ static RdbRes toRespEndRdb(RdbParser *p, void *userData) { UNUSED(p); RdbxToResp *ctx = userData; RdbxRespWriter *writer = &ctx->respWriter; - writer->flush(writer->ctx); - return RDB_OK; + if (likely(writer->flush(writer->ctx) == 0)) + return RDB_OK; + + if (RDB_getErrorCode(p) != RDB_OK) + return RDB_getErrorCode(p); + + /* writer didn't take care to report an error */ + RDB_log(p, RDB_LOG_WRN, "Writer returned error indication but didn't RDB_reportError()"); + RDB_reportError(p, (RdbRes) RDBX_ERR_RESP_WRITE, "RESP writer returned error on flush()"); + return (RdbRes) RDBX_ERR_RESP_WRITE; } /*** Handling raw ***/ @@ -317,6 +347,7 @@ static RdbRes toRespRawBegin(RdbParser *p, void *userData, size_t size) { } static RdbRes toRespRawFrag(RdbParser *p, void *userData, RdbBulk frag) { + char keyLenStr[32], totalLenStr[32]; UNUSED(p); RdbxToResp *ctx = userData; struct iovec iov[10]; @@ -326,10 +357,11 @@ static RdbRes toRespRawFrag(RdbParser *p, void *userData, RdbBulk frag) { ctx->crc = crc64(ctx->crc, (unsigned char *) frag , fragLen); if (likely(!(ctx->rawCtx.sentFirstFrag))) { - char keyLenStr[64], totalLenStr[64]; - ctx->rawCtx.sentFirstFrag = 1; - IOV_CONST_STR(&iov[iovs++], "*4\r\n$7\r\nRESTORE\r\n$"); /* RESTORE */ + if (ctx->delKeyBeforeWrite == DEL_KEY_BEFORE_BY_RESTORE_REPLACE) + IOV_CONST_STR(&iov[iovs++], "*5\r\n$7\r\nRESTORE\r\n$"); /* RESTORE-REPLACE */ + else + IOV_CONST_STR(&iov[iovs++], "*4\r\n$7\r\nRESTORE\r\n$"); /* RESTORE */ iov_stringLen(&iov[iovs++], ctx->keyCtx.keyLen, keyLenStr); /* write key len */ IOV_STRING(&iov[iovs++], ctx->keyCtx.key, ctx->keyCtx.keyLen); /* write key */ IOV_CONST_STR(&iov[iovs++], "\r\n$1\r\n0\r\n$"); /* newline + write TTL */ @@ -353,10 +385,14 @@ static RdbRes toRespRawFragEnd(RdbParser *p, void *userData) { memrev64ifbe(crc); memcpy(footer + 2, crc, 8); - struct iovec iov[] = { - {footer, 10}, - {"\r\n", 2} - }; + struct iovec iov[2]; + int iovs = 0; + + IOV_STRING(&iov[iovs++], footer, 10); + if (ctx->delKeyBeforeWrite == DEL_KEY_BEFORE_BY_RESTORE_REPLACE) + IOV_CONST_STR(&iov[iovs++], "\r\n$7\r\nREPLACE\r\n"); + else + IOV_CONST_STR(&iov[iovs++], "\r\n"); return writevWrap(ctx, iov, 2, 0, 1); } diff --git a/src/lib/parser.c b/src/lib/parser.c index 4cd4001..e6ebcbc 100644 --- a/src/lib/parser.c +++ b/src/lib/parser.c @@ -179,7 +179,7 @@ _LIBRDB_API RdbParser *RDB_createParserRdb(RdbMemAlloc *memAlloc) { p->appCbCtx.numBulks = 0; p->loggerCb = loggerCbDefault; p->logLevel = RDB_LOG_DBG; - p->maxRawLen = SIZE_MAX; + p->maxRawSize = SIZE_MAX; p->errorCode = RDB_OK; p->handlers[RDB_LEVEL_RAW] = NULL; p->handlers[RDB_LEVEL_STRUCT] = NULL; @@ -364,8 +364,8 @@ _LIBRDB_API void RDB_IgnoreChecksum(RdbParser *p) { p->ignoreChecksum = 1; } -_LIBRDB_API void RDB_setMaxRawLenHandling(RdbParser *p, size_t size) { - p->maxRawLen = size; +_LIBRDB_API void RDB_setMaxRawSize(RdbParser *p, size_t size) { + p->maxRawSize = size; } _LIBRDB_API void RDB_log(RdbParser *p, RdbLogLevel lvl, const char *format, ...) { @@ -718,12 +718,12 @@ static RdbStatus finalizeConfig(RdbParser *p, int isParseFromBuff) { } static void printParserState(RdbParser *p) { - printf ("Parser error message:%s\n", RDB_getErrorMessage(p)); - printf ("Parser error code:%d\n", RDB_getErrorCode(p)); - printf ("Parser element func name: %s\n", peInfo[p->parsingElement].funcname); - printf ("Parser element func description: %s\n", peInfo[p->parsingElement].funcname); - printf ("Parser element state:%d\n", p->elmCtx.state); - bulkPoolPrintDbg(p); + RDB_log(p, RDB_LOG_ERR, "Parser error message: %s", RDB_getErrorMessage(p)); + RDB_log(p, RDB_LOG_ERR, "Parser error code: %d", RDB_getErrorCode(p)); + RDB_log(p, RDB_LOG_ERR, "Parser element func name: %s", peInfo[p->parsingElement].funcname); + RDB_log(p, RDB_LOG_ERR, "Parser element func description: %s", peInfo[p->parsingElement].funcname); + RDB_log(p, RDB_LOG_ERR, "Parser element state:%d", p->elmCtx.state); + //bulkPoolPrintDbg(p); } static void loggerCbDefault(RdbLogLevel l, const char *msg) { diff --git a/src/lib/parser.h b/src/lib/parser.h index d2867a5..0a4a645 100644 --- a/src/lib/parser.h +++ b/src/lib/parser.h @@ -283,7 +283,7 @@ struct RdbParser { int ignoreChecksum; RdbLoggerCB loggerCb; RdbLogLevel logLevel; - size_t maxRawLen; + size_t maxRawSize; /*** context ***/ ElementCtx elmCtx; /* parsing-element context */ diff --git a/src/lib/parserRaw.c b/src/lib/parserRaw.c index ba517b5..091b25c 100644 --- a/src/lib/parserRaw.c +++ b/src/lib/parserRaw.c @@ -718,7 +718,7 @@ static RdbStatus aggMakeRoom(RdbParser *p, size_t numBytesRq) { if (likely(freeRoomLeft >= numBytesRq)) return RDB_STATUS_OK; - if (unlikely(p->maxRawLen < ctx->totalSize + numBytesRq)) { + if (unlikely(p->maxRawSize < ctx->totalSize + numBytesRq)) { RDB_reportError(p, RDB_ERR_MAX_RAW_LEN_EXCEEDED_FOR_KEY, "Maximum raw length exceeded for key (len=%lu)", ctx->totalSize + numBytesRq); return RDB_STATUS_ERROR; diff --git a/test/dumps/100_lists.rdb b/test/dumps/100_lists.rdb new file mode 100644 index 0000000..3bfe528 Binary files /dev/null and b/test/dumps/100_lists.rdb differ diff --git a/test/test_rdb_to_redis.c b/test/test_rdb_to_redis.c index 359061b..7bcfc8b 100644 --- a/test/test_rdb_to_redis.c +++ b/test/test_rdb_to_redis.c @@ -162,6 +162,50 @@ static void test_rdb_to_redis_set_lp(void **state) { test_rdb_to_redis_common(DUMP_FOLDER("set_lp_v11.rdb"), 1, 1); } +/* iff 'delKeyBeforeWrite' is not set, then the parser will return an error on + * loading 100_lists.rdb ("mylist1 mylist2 ... mylist100") on key 'mylist62' + * Because key `mylist62` created earlier with a string value. */ +static void test_rdb_to_redis_del_before_write(void **state) { + UNUSED(state); + RdbParser *parser; + RdbStatus status; + for (int delKeyBeforeWrite = 0 ; delKeyBeforeWrite <= 1 ; ++delKeyBeforeWrite) { + RdbxToRespConf rdb2respConf = { + .delKeyBeforeWrite = delKeyBeforeWrite, + .supportRestore = 1, + .restore.dstRdbVersion = 100}; + + runSystemCmd("%s/redis-cli -p %d set mylist62 1 > /dev/null", redisInstallFolder, redisPort); + /* RDB to TCP */ + RdbxToResp *rdbToResp; + parser = RDB_createParserRdb(NULL); + RDB_setLogLevel(parser, RDB_LOG_ERR); + assert_non_null(RDBX_createReaderFile(parser, DUMP_FOLDER("100_lists.rdb"))); + assert_non_null(rdbToResp = RDBX_createHandlersToResp(parser, &rdb2respConf)); + + assert_non_null(RDBX_createRespToRedisTcp(parser, + rdbToResp, + "127.0.0.1", + redisPort)); + + RDB_setLogLevel(parser, RDB_LOG_ERR); + + while ((status = RDB_parse(parser)) == RDB_STATUS_WAIT_MORE_DATA); + + if (status == RDB_STATUS_OK) + assert_int_equal(delKeyBeforeWrite, 1); + else { + assert_int_equal(delKeyBeforeWrite, 0); + /* verify returned error code. Verify error message. */ + RdbRes err = RDB_getErrorCode(parser); + assert_int_equal(err, RDBX_ERR_RESP_WRITE); + assert_non_null(strstr(RDB_getErrorMessage(parser), "mylist62")); + } + + RDB_deleteParser(parser); + } +} + /*************************** group_rdb_to_redis *******************************/ int group_rdb_to_redis() { @@ -191,6 +235,7 @@ int group_rdb_to_redis() { /* misc */ cmocka_unit_test_setup(test_rdb_to_redis_multiple_lists_strings, setupTest), cmocka_unit_test_setup(test_rdb_to_redis_multiple_lists_strings_pipeline_depth_1, setupTest), + cmocka_unit_test_setup(test_rdb_to_redis_del_before_write, setupTest), }; diff --git a/test/test_rdb_to_resp.c b/test/test_rdb_to_resp.c index 41995de..733e768 100644 --- a/test/test_rdb_to_resp.c +++ b/test/test_rdb_to_resp.c @@ -78,6 +78,7 @@ static void runWithAndWithoutRestore(const char *rdbfile) { memset(&r2rConf, 0, sizeof(r2rConf)); r2rConf.supportRestore = 1; + r2rConf.delKeyBeforeWrite = 0; /* expect not use RESTORE */ r2rConf.restore.dstRdbVersion = 1;