diff --git a/api/librdb-api.h b/api/librdb-api.h index 9a890a1..abe90be 100644 --- a/api/librdb-api.h +++ b/api/librdb-api.h @@ -45,6 +45,7 @@ typedef enum RdbRes { RDB_ERR_GENERAL, RDB_ERR_FAIL_ALLOC, + RDB_ERR_INVALID_CONFIGURATION, RDB_ERR_FAILED_CREATE_PARSER, RDB_ERR_FAILED_OPEN_LOG_FILE, RDB_ERR_FAILED_READ_RDB_FILE, @@ -380,13 +381,21 @@ _LIBRDB_API void RDB_dontPropagate(RdbParser *p); * Parser setters & getters ****************************************************************/ -_LIBRDB_API void RDB_setMaxRawLenHandling(RdbParser *p, size_t size); _LIBRDB_API void RDB_setDeepIntegCheck(RdbParser *p, int deep); _LIBRDB_API size_t RDB_getBytesProcessed(RdbParser *p); _LIBRDB_API RdbState RDB_getState(RdbParser *p); _LIBRDB_API int RDB_getNumHandlers(RdbParser *p, RdbHandlersLevel lvl); _LIBRDB_API void RDB_IgnoreChecksum(RdbParser *p); +/* There could be relatively large strings stored within Redis, which are + * subsequently also present in the RDB. This is especially true for collections + * of strings. In situations like this, if the parser is configured to read + * raw data (using RDB_createHandlersRaw), it could potentially lead to memory + * problems in data path. By establishing a MaxRawSize threshold, the size of + * raw data can be restricted, and if this threshold is exceeded, the parser will + * terminate its operation. The default threshold is unlimited. */ +_LIBRDB_API void RDB_setMaxRawSize(RdbParser *p, size_t maxSize); + /* logger */ _LIBRDB_API void RDB_setLogLevel(RdbParser *p, RdbLogLevel l); _LIBRDB_API void RDB_setLogger(RdbParser *p, RdbLoggerCB f); diff --git a/api/librdb-ext-api.h b/api/librdb-ext-api.h index 0a7ee5b..5180fb3 100644 --- a/api/librdb-ext-api.h +++ b/api/librdb-ext-api.h @@ -15,8 +15,7 @@ typedef struct RdbxReaderFileDesc RdbxReaderFileDesc; typedef struct RdbxFilterKey RdbxFilterKey; typedef struct RdbxToJson RdbxToJson; typedef struct RdbxToResp RdbxToResp; -typedef struct RdbxRespWriter RdbxRespWriter; -typedef struct RdbxRespToTcpLoader RdbxRespToTcpLoader; +typedef struct RdbxRespToRedisLoader RdbxRespToRedisLoader; /**************************************************************** * Error codes @@ -24,6 +23,7 @@ typedef struct RdbxRespToTcpLoader RdbxRespToTcpLoader; typedef enum { RDBX_ERR_READER_FILE_GENERAL_ERROR = _RDB_ERR_EXTENSION_FIRST, + RDBX_ERR_RESP_FAILED_ALLOC, /* rdb2json errors */ RDBX_ERR_FAILED_OPEN_FILE, @@ -33,20 +33,17 @@ typedef enum { RDBX_ERR_FILTER_FAILED_COMPILE_REGEX, /* rdb2resp errors */ - RDBX_ERR_RESP_INVALID_CONN_TYPE, - RDBX_ERR_RESP_FAILED_ALLOC, - RDBX_ERR_RESP_INIT_CONN_ERROR, /* resp writer/loader */ RDBX_ERR_RESP_WRITE, RDBX_ERR_RESP_READ, - RDBX_ERR_RESP2TCP_CREATE_SOCKET, - RDBX_ERR_RESP2TCP_INVALID_ADDRESS, - RDBX_ERR_RESP2TCP_FAILED_CONNECT, - RDBX_ERR_RESP2TCP_FAILED_READ, - RDBX_ERR_RESP2TCP_FAILED_WRITE, - RDBX_ERR_RESP2TCP_CONN_CLOSE, - RDBX_ERR_RESP2TCP_MAX_RETRIES, + RDBX_ERR_RESP2REDIS_CREATE_SOCKET, + RDBX_ERR_RESP2REDIS_INVALID_ADDRESS, + RDBX_ERR_RESP2REDIS_FAILED_CONNECT, + RDBX_ERR_RESP2REDIS_FAILED_READ, + RDBX_ERR_RESP2REDIS_FAILED_WRITE, + RDBX_ERR_RESP2REDIS_CONN_CLOSE, + RDBX_ERR_RESP2REDIS_MAX_RETRIES, } RdbxRes; /**************************************************************** @@ -129,19 +126,20 @@ _LIBRDB_API RdbxToResp *RDBX_createHandlersToResp(RdbParser *, RdbxToRespConf *) * * Create instance for writing RDB to RESP stream. * - * Used by: RDBX_createRespToTcpLoader + * Used by: RDBX_createRespToRedisTcp + * RDBX_createRespToRedisFd * RDBX_createRespFileWriter * ****************************************************************/ -struct RdbxRespWriter { +typedef struct RdbxRespWriter { void *ctx; void (*delete)(void *ctx); /* return 0 on success. Otherwise 1 */ - int (*writev) (void *ctx, const struct iovec *ioVec, int count, int startCmd, int endCmd); + int (*writev) (void *ctx, struct iovec *ioVec, int count, int startCmd, int endCmd); int (*flush) (void *ctx); -}; +} RdbxRespWriter; _LIBRDB_API void RDBX_attachRespWriter(RdbxToResp *rdbToResp, RdbxRespWriter *writer); @@ -158,15 +156,19 @@ _LIBRDB_API RdbxRespFileWriter *RDBX_createRespFileWriter(RdbParser *p, /**************************************************************** * Create RESP to Redis TCP connection * - * If provided path is NULL then write stdout * Can configure pipeline depth of transmitted RESP commands. Set * to 0 if to use default. ****************************************************************/ -_LIBRDB_API RdbxRespToTcpLoader *RDBX_createRespToTcpLoader(RdbParser *p, +_LIBRDB_API RdbxRespToRedisLoader *RDBX_createRespToRedisTcp(RdbParser *p, RdbxToResp *rdbToResp, - const char* hostname, - int port, - int pipelineDepth); + const char *hostname, + int port); + +_LIBRDB_API RdbxRespToRedisLoader *RDBX_createRespToRedisFd(RdbParser *p, + RdbxToResp *rdbToResp, + int fd); + +_LIBRDB_API void RDBX_setPipelineDepth(RdbxRespToRedisLoader *r2r, int depth); #ifdef __cplusplus } diff --git a/src/cli/rdb-cli.c b/src/cli/rdb-cli.c index 076b076..a7662aa 100644 --- a/src/cli/rdb-cli.c +++ b/src/cli/rdb-cli.c @@ -159,8 +159,7 @@ static RdbRes formatRedis(RdbParser *parser, char *input, int argc, char **argv) if ((rdbToResp = RDBX_createHandlersToResp(parser, &conf)) == NULL) return RDB_ERR_GENERAL; - - if (RDBX_createRespToTcpLoader(parser, rdbToResp, hostname, port, pipeDepthVal) == NULL) + if (RDBX_createRespToRedisTcp(parser, rdbToResp, hostname, port) == NULL) return RDB_ERR_GENERAL; return RDB_OK; diff --git a/src/ext/handlersToJson.c b/src/ext/handlersToJson.c index 4ad25fd..fe8a3ad 100644 --- a/src/ext/handlersToJson.c +++ b/src/ext/handlersToJson.c @@ -96,6 +96,7 @@ static RdbxToJson *initRdbToJsonCtx(RdbParser *p, const char *filename, RdbxToJs /* init RdbToJson context */ RdbxToJson *ctx = RDB_alloc(p, sizeof(RdbxToJson)); + memset(ctx, 0, sizeof(RdbxToJson)); ctx->filename = RDB_alloc(p, strlen(filename)+1); strcpy(ctx->filename, filename); ctx->outfile = f; @@ -192,11 +193,11 @@ static RdbRes toJsonNewDb(RdbParser *p, void *userData, int db) { RdbxToJson *ctx = userData; if (ctx->state == R2J_IDLE) { - if (!ctx->conf.flatten) ouput_fprintf(ctx, "[{\n"); + if (!ctx->conf.flatten) ouput_fprintf(ctx, "{\n"); } else if (ctx->state == R2J_IN_DB) { /* output json part */ if (!ctx->conf.flatten) { - ouput_fprintf(ctx, "},{\n"); + ouput_fprintf(ctx, "\n},{\n"); } else { ouput_fprintf(ctx, ",\n"); } @@ -213,20 +214,35 @@ static RdbRes toJsonNewDb(RdbParser *p, void *userData, int db) { return RDB_OK; } +static RdbRes toJsonNewRdb(RdbParser *p, void *userData, int rdbVersion) { + UNUSED(rdbVersion); + RdbxToJson *ctx = userData; + + if (ctx->state != R2J_IDLE) { + RDB_reportError(p, (RdbRes) RDBX_ERR_R2J_INVALID_STATE, + "toJsonNewRdb(): Invalid state value: %d", ctx->state); + return (RdbRes) RDBX_ERR_R2J_INVALID_STATE; + } + + if (!ctx->conf.flatten) ouput_fprintf(ctx, "["); + + return RDB_OK; +} + static RdbRes toJsonEndRdb(RdbParser *p, void *userData) { RdbxToJson *ctx = userData; - if (ctx->state != R2J_IN_DB) { + if (ctx->state == R2J_IDLE) { + RDB_log(p, RDB_LOG_WRN, "RDB is empty."); + } else if (ctx->state == R2J_IN_DB) { + if (!ctx->conf.flatten) ouput_fprintf(ctx, "\n}"); + } else { RDB_reportError(p, (RdbRes) RDBX_ERR_R2J_INVALID_STATE, "toJsonEndRdb(): Invalid state value: %d", ctx->state); return (RdbRes) RDBX_ERR_R2J_INVALID_STATE; } - /* output json part */ - if (!ctx->conf.flatten) - ouput_fprintf(ctx, "\n}]\n"); - else - ouput_fprintf(ctx, "\n"); + if (!ctx->conf.flatten) ouput_fprintf(ctx, "]\n"); /* update new state */ ctx->state = R2J_IDLE; @@ -384,6 +400,7 @@ RdbxToJson *RDBX_createHandlersToJson(RdbParser *p, const char *filename, RdbxTo callbacks.common.handleNewKey = toJsonNewKey; callbacks.common.handleEndKey = toJsonEndKey; callbacks.common.handleNewDb = toJsonNewDb; + callbacks.common.handleStartRdb = toJsonNewRdb; callbacks.common.handleEndRdb = toJsonEndRdb; if (ctx->conf.level == RDB_LEVEL_DATA) { diff --git a/src/ext/handlersToResp.c b/src/ext/handlersToResp.c index ab6c319..b22dc0c 100644 --- a/src/ext/handlersToResp.c +++ b/src/ext/handlersToResp.c @@ -6,11 +6,13 @@ #include "../../deps/redis/endianconv.h" #include -#define RETURN_ON_WRITE_ERR(cmd) do {\ - if (unlikely(0 == (cmd))) return (RdbRes) RDBX_ERR_RESP_WRITE; \ - } while(0); +#define _RDB_TYPE_STRING 0 -#define WRITE_CONST_STR(wr, str, endCmd) (wr)->write((wr)->ctx, str, sizeof(str) - 1, endCmd) +typedef enum DelKeyBeforeWrite { + DEL_KEY_BEFORE_NONE, + DEL_KEY_BEFORE_BY_DEL_CMD, + DEL_KEY_BEFORE_BY_RESTORE_REPLACE, /* RESTORE supported */ +} DelKeyBeforeWrite; void setIov(struct iovec *iov, const char *s, size_t l) { iov->iov_base = (void *) s; @@ -45,6 +47,7 @@ struct RdbxToResp { uint64_t crc; int srcRdbVer; + DelKeyBeforeWrite delKeyBeforeWrite; }; static void deleteRdbToRespCtx(RdbParser *p, void *context) { @@ -107,22 +110,32 @@ static int rdbVerFromRedisVer(const char *ver) { } static void resolveSupportRestore(RdbParser *p, RdbxToResp *ctx, int srcRdbVer) { - int isRestore = ctx->conf.supportRestore; int dstRdbVer = ctx->conf.restore.dstRdbVersion; ctx->srcRdbVer = srcRdbVer; - if (isRestore) { + ctx->delKeyBeforeWrite = (ctx->conf.delKeyBeforeWrite) ? DEL_KEY_BEFORE_BY_DEL_CMD : DEL_KEY_BEFORE_NONE; + + if (ctx->conf.supportRestore) { /* if not configured destination RDB version, then resolve it from * configured destination Redis version */ if (!dstRdbVer) dstRdbVer = rdbVerFromRedisVer(ctx->conf.restore.dstRedisVersion); - if (dstRdbVer < srcRdbVer) - isRestore = 0; + if (dstRdbVer < srcRdbVer) { + RDB_log(p, RDB_LOG_WRN, + "Cannot support RESTORE. SRC version (=%d) is higher than DST version (%d)", + srcRdbVer, dstRdbVer); + ctx->conf.supportRestore = 0; + } else { + if (ctx->conf.delKeyBeforeWrite) { + RDB_log(p, RDB_LOG_INF, "As RESTORE is supported, configuration del-key-before-write will be ignored."); + ctx->delKeyBeforeWrite = DEL_KEY_BEFORE_BY_RESTORE_REPLACE; + } + } } - RdbHandlersLevel lvl = (isRestore) ? RDB_LEVEL_RAW : RDB_LEVEL_DATA; + RdbHandlersLevel lvl = (ctx->conf.supportRestore) ? RDB_LEVEL_RAW : RDB_LEVEL_DATA; for (int i = 0; i < RDB_DATA_TYPE_MAX; ++i) { RDB_handleByLevel(p, (RdbDataType) i, lvl, 0); } @@ -131,7 +144,7 @@ static void resolveSupportRestore(RdbParser *p, RdbxToResp *ctx, int srcRdbVer) RDB_handleByLevel(p, RDB_DATA_TYPE_MODULE, RDB_LEVEL_RAW, 0); } -static inline RdbRes writevWrap(RdbxToResp *ctx, const struct iovec *iov, int cnt, int startCmd, int endCmd) { +static inline RdbRes writevWrap(RdbxToResp *ctx, struct iovec *iov, int cnt, int startCmd, int endCmd) { RdbxRespWriter *writer = &ctx->respWriter; if (unlikely(writer->writev(writer->ctx, iov, cnt, startCmd, endCmd))) { RdbRes errCode = RDB_getErrorCode(ctx->parser); @@ -173,7 +186,6 @@ static RdbRes toRespStartRdb(RdbParser *p, void *userData, int rdbVersion) { return RDB_OK; } -/* TODO: support option rdb2resp del key before write */ /* TODO: support expiry */ static RdbRes toRespNewKey(RdbParser *p, void *userData, RdbBulk key, RdbKeyInfo *info) { UNUSED(info); @@ -188,6 +200,18 @@ static RdbRes toRespNewKey(RdbParser *p, void *userData, RdbBulk key, RdbKeyInfo ctx->keyCtx.keyLen = RDB_bulkLen(p, key); if ((ctx->keyCtx.key = RDB_bulkClone(p, key)) == NULL) return RDB_ERR_FAIL_ALLOC; + + /* apply del-key-before-write if configured, unless it is 'SET' command where + * the key is overridden if it already exists, without encountering any problems. */ + if ((ctx->delKeyBeforeWrite == DEL_KEY_BEFORE_BY_DEL_CMD) && (info->opcode != _RDB_TYPE_STRING)) { + struct iovec iov[4]; + char keyLenStr[32]; + IOV_CONST_STR(&iov[0], "*2\r\n$3\r\nDEL\r\n$"); + iov_stringLen(&iov[1], ctx->keyCtx.keyLen, keyLenStr); + IOV_STRING(&iov[2], ctx->keyCtx.key, ctx->keyCtx.keyLen); + IOV_CONST_STR(&iov[3], "\r\n"); + return writevWrap(ctx, iov, 4, 1, 1); + } return RDB_OK; } @@ -207,7 +231,7 @@ static RdbRes toRespEndKey(RdbParser *p, void *userData) { static RdbRes toRespString(RdbParser *p, void *userData, RdbBulk string) { RdbxToResp *ctx = userData; - char keyLenStr[64], valLenStr[64]; + char keyLenStr[32], valLenStr[32]; int valLen = RDB_bulkLen(p, string); /*** fillup iovec ***/ @@ -231,7 +255,7 @@ static RdbRes toRespList(RdbParser *p, void *userData, RdbBulk item) { /*** fillup iovec ***/ - char keyLenStr[64], valLenStr[64]; + char keyLenStr[32], valLenStr[32]; int valLen = RDB_bulkLen(p, item); struct iovec iov[7]; @@ -253,7 +277,7 @@ static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk va /*** fillup iovec ***/ - char keyLenStr[64], fieldLenStr[64], valueLenStr[64]; + char keyLenStr[32], fieldLenStr[32], valueLenStr[32]; int fieldLen = RDB_bulkLen(p, field); int valueLen = RDB_bulkLen(p, value); @@ -277,10 +301,8 @@ static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk va static RdbRes toRespSet(RdbParser *p, void *userData, RdbBulk member) { RdbxToResp *ctx = userData; + char keyLenStr[32], valLenStr[32]; - /*** fillup iovec ***/ - - char keyLenStr[64], valLenStr[64]; int valLen = RDB_bulkLen(p, member); struct iovec iov[7]; @@ -301,8 +323,16 @@ static RdbRes toRespEndRdb(RdbParser *p, void *userData) { UNUSED(p); RdbxToResp *ctx = userData; RdbxRespWriter *writer = &ctx->respWriter; - writer->flush(writer->ctx); - return RDB_OK; + if (likely(writer->flush(writer->ctx) == 0)) + return RDB_OK; + + if (RDB_getErrorCode(p) != RDB_OK) + return RDB_getErrorCode(p); + + /* writer didn't take care to report an error */ + RDB_log(p, RDB_LOG_WRN, "Writer returned error indication but didn't RDB_reportError()"); + RDB_reportError(p, (RdbRes) RDBX_ERR_RESP_WRITE, "RESP writer returned error on flush()"); + return (RdbRes) RDBX_ERR_RESP_WRITE; } /*** Handling raw ***/ @@ -317,6 +347,7 @@ static RdbRes toRespRawBegin(RdbParser *p, void *userData, size_t size) { } static RdbRes toRespRawFrag(RdbParser *p, void *userData, RdbBulk frag) { + char keyLenStr[32], totalLenStr[32]; UNUSED(p); RdbxToResp *ctx = userData; struct iovec iov[10]; @@ -326,10 +357,11 @@ static RdbRes toRespRawFrag(RdbParser *p, void *userData, RdbBulk frag) { ctx->crc = crc64(ctx->crc, (unsigned char *) frag , fragLen); if (likely(!(ctx->rawCtx.sentFirstFrag))) { - char keyLenStr[64], totalLenStr[64]; - ctx->rawCtx.sentFirstFrag = 1; - IOV_CONST_STR(&iov[iovs++], "*4\r\n$7\r\nRESTORE\r\n$"); /* RESTORE */ + if (ctx->delKeyBeforeWrite == DEL_KEY_BEFORE_BY_RESTORE_REPLACE) + IOV_CONST_STR(&iov[iovs++], "*5\r\n$7\r\nRESTORE\r\n$"); /* RESTORE-REPLACE */ + else + IOV_CONST_STR(&iov[iovs++], "*4\r\n$7\r\nRESTORE\r\n$"); /* RESTORE */ iov_stringLen(&iov[iovs++], ctx->keyCtx.keyLen, keyLenStr); /* write key len */ IOV_STRING(&iov[iovs++], ctx->keyCtx.key, ctx->keyCtx.keyLen); /* write key */ IOV_CONST_STR(&iov[iovs++], "\r\n$1\r\n0\r\n$"); /* newline + write TTL */ @@ -353,10 +385,14 @@ static RdbRes toRespRawFragEnd(RdbParser *p, void *userData) { memrev64ifbe(crc); memcpy(footer + 2, crc, 8); - struct iovec iov[] = { - {footer, 10}, - {"\r\n", 2} - }; + struct iovec iov[2]; + int iovs = 0; + + IOV_STRING(&iov[iovs++], footer, 10); + if (ctx->delKeyBeforeWrite == DEL_KEY_BEFORE_BY_RESTORE_REPLACE) + IOV_CONST_STR(&iov[iovs++], "\r\n$7\r\nREPLACE\r\n"); + else + IOV_CONST_STR(&iov[iovs++], "\r\n"); return writevWrap(ctx, iov, 2, 0, 1); } diff --git a/src/ext/readerFile.c b/src/ext/readerFile.c index a6c13f3..ba322f4 100644 --- a/src/ext/readerFile.c +++ b/src/ext/readerFile.c @@ -1,5 +1,6 @@ #include #include +#include #include "common.h" struct RdbxReaderFile { @@ -25,18 +26,22 @@ static void deleteReaderFile(RdbParser *p, void *rdata) { static RdbStatus readFile(void *data, void *buf, size_t len) { RdbxReaderFile *readerFile = data; - size_t newLen = fread(buf, sizeof(char), len, readerFile->file); - if (ferror( readerFile->file) != 0) { - RDB_reportError(readerFile->parser, RDB_ERR_FAILED_READ_RDB_FILE, NULL); - return RDB_STATUS_ERROR; - } + size_t readLen = fread(buf, sizeof(char), len, readerFile->file); - if (newLen != len) { - RDB_reportError(readerFile->parser, RDB_ERR_FAILED_PARTIAL_READ_RDB_FILE, NULL); - return RDB_STATUS_ERROR; - } + if (likely(readLen == len)) + return RDB_STATUS_OK; - return RDB_STATUS_OK; + if (feof(readerFile->file)) { + RDB_reportError(readerFile->parser, RDB_ERR_FAILED_PARTIAL_READ_RDB_FILE, + "Encountered an unexpected RDB end-of-file. Parsing halted."); + } else if (ferror(readerFile->file)) { + RDB_reportError(readerFile->parser, RDB_ERR_FAILED_READ_RDB_FILE, + "An error occurred while attempting to read the RDB file (errno=%d).", errno); + } else { /* readLen < len */ + RDB_reportError(readerFile->parser, RDB_ERR_FAILED_READ_RDB_FILE, + "The amount of data read from the RDB file was less than expected. Reached EOF."); + } + return RDB_STATUS_ERROR; } RdbxReaderFile *RDBX_createReaderFile(RdbParser *p, const char *filename) { diff --git a/src/ext/respToRedisLoader.c b/src/ext/respToRedisLoader.c new file mode 100644 index 0000000..a67f16b --- /dev/null +++ b/src/ext/respToRedisLoader.c @@ -0,0 +1,240 @@ +#include "common.h" +#include +#include +#include +#include +#include +#include +#include "readerResp.h" + +#ifdef USE_OPENSSL +#include +#include +#endif + +#define PIPELINE_DEPTH_DEF 200 /* Default Number of pending cmds before waiting for response(s) */ +#define PIPELINE_DEPTH_MAX 1000 /* limit the max value allowed to configure for pipeline depth */ + +#define NUM_RECORDED_CMDS 400 /* Number of commands to backlog, in a cyclic array */ +#define RECORDED_DATA_MAX_LEN 40 /* Maximum payload size from any command to record into cyclic array */ + +#define REPLY_BUFF_SIZE 4096 /* reply buffer size */ + +#define MAX_EAGAIN_RETRY 3 + + +struct RdbxRespToRedisLoader { + + struct { + int num; + int pipelineDepth; + char cmdPrefix[NUM_RECORDED_CMDS][RECORDED_DATA_MAX_LEN]; + int cmdAt; + } pendingCmds; + + RespReaderCtx respReader; + RdbParser *p; + int fd; +}; + +/* Read 'numToRead' replies from the socket. + * Return 0 for success, 1 otherwise. */ +static int readReplies(RdbxRespToRedisLoader *ctx, int numToRead) { + char buff[REPLY_BUFF_SIZE]; + + RespReaderCtx *respReader = &ctx->respReader; + size_t countRepliesBefore = respReader->countReplies; + size_t repliesExpected = respReader->countReplies + numToRead; + + while (respReader->countReplies < repliesExpected) { + int bytesReceived = recv(ctx->fd, buff, sizeof(buff), 0); + + if (bytesReceived > 0) { + /* Data was received, process it */ + if (RESP_REPLY_ERR == readRespReplies(respReader, buff, bytesReceived)) { + char *failedRecord = ctx->pendingCmds.cmdPrefix[ctx->respReader.countReplies % NUM_RECORDED_CMDS]; + RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP_WRITE, + "\nReceived Server error: \"%s\"\nFailed on command [#%d]:\n%s\n", + respReader->errorMsg, + ctx->respReader.countReplies, + failedRecord); + return 1; + } + + } else if (bytesReceived == 0) { + RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP2REDIS_CONN_CLOSE, "Connection closed by the remote side"); + return 1; + } else { + RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP2REDIS_FAILED_READ, "Failed to recv() from Redis server. Exit."); + return 1; + } + } + + ctx->pendingCmds.num -= (respReader->countReplies - countRepliesBefore); + return 0; +} + +/* For debugging, record the command into the cyclic array before sending it */ +static inline void recordNewCmd(RdbxRespToRedisLoader *ctx, const struct iovec *cmd_iov, int iovcnt) { + int recordCmdEntry = (ctx->respReader.countReplies + ctx->pendingCmds.num) % NUM_RECORDED_CMDS; + char *recordCmdPrefixAt = ctx->pendingCmds.cmdPrefix[recordCmdEntry]; + + int copiedBytes = 0, bytesToCopy = RECORDED_DATA_MAX_LEN - 1; + + const struct iovec* currentIov = cmd_iov; + for (int i = 0; i < iovcnt && bytesToCopy; ++i) { + int slice = (currentIov->iov_len >= ((size_t)bytesToCopy)) ? bytesToCopy : (int) currentIov->iov_len; + + for (int j = 0 ; j < slice ; ) + recordCmdPrefixAt[copiedBytes++] = ((char *)currentIov->iov_base)[j++]; + + bytesToCopy -= slice; + ++currentIov; + } + recordCmdPrefixAt[copiedBytes] = '\0'; +} + +/* Write the vector of data to the socket with writev() sys-call. + * Return 0 for success, 1 otherwise. */ +static int redisLoaderWritev(void *context, struct iovec *iov, int count, int startCmd, int endCmd) { + int origCount = count; + ssize_t writeResult; + int retries = 0; + + RdbxRespToRedisLoader *ctx = context; + + if (unlikely(ctx->pendingCmds.num == ctx->pendingCmds.pipelineDepth)) { + if (readReplies(ctx, 1 /* at least one */)) + return 1; + } + + if (startCmd) + recordNewCmd(ctx, iov, count); + + while (1) + { + writeResult = writev(ctx->fd, iov, count); + + /* check for error */ + if (unlikely(writeResult == -1)) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + if ((retries++) >= MAX_EAGAIN_RETRY) { + RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP2REDIS_FAILED_WRITE, + "Failed to write socket. Exceeded EAGAIN retry limit"); + return 1; + } + usleep(1000 * retries); /* Backoff and Retries */ + continue; + } else { + RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP2REDIS_FAILED_WRITE, + "Failed to write socket (errno=%d)", errno); + printf("count=%d origCount=%d\n",count, origCount);for (int i = 0 ; i < count ; ++i) { + printf ("iov[%d]: base=%p len=%lu\n", i, iov[i].iov_base, iov[i].iov_len ); + } + return 1; + } + } + + /* crunch iov entries that were transmitted entirely */ + while ((count) && (iov->iov_len <= (size_t) writeResult)) { + writeResult -= iov->iov_len; + ++iov; + --count; + } + + /* if managed to send all iov entries */ + if (likely(count == 0)) + break; + + /* Update pointed iov entry. Only partial of its data sent */ + iov->iov_len -= writeResult; + iov->iov_base = (char *) iov->iov_base + writeResult; + } + + ctx->pendingCmds.num += endCmd; + return 0; +} + + +/* Flush the pending commands by reading the remaining replies. + * Return 0 for success, 1 otherwise. */ +static int redisLoaderFlush(void *context) { + RdbxRespToRedisLoader *ctx = context; + if (ctx->pendingCmds.num) + return readReplies(ctx, ctx->pendingCmds.num); + return 0; +} + +/* Delete the context and perform cleanup. */ +static void redisLoaderDelete(void *context) { + struct RdbxRespToRedisLoader *ctx = context; + + /* not required to flush on termination */ + + shutdown(ctx->fd, SHUT_WR); /* graceful shutdown */ + close(ctx->fd); + RDB_free(ctx->p, ctx); +} + +_LIBRDB_API void RDBX_setPipelineDepth(RdbxRespToRedisLoader *r2r, int depth) { + r2r->pendingCmds.pipelineDepth = (depth <= 0 || depth>PIPELINE_DEPTH_MAX) ? PIPELINE_DEPTH_DEF : depth; +} + +_LIBRDB_API RdbxRespToRedisLoader *RDBX_createRespToRedisFd(RdbParser *p, + RdbxToResp *rdbToResp, + int fd) +{ + RdbxRespToRedisLoader *ctx; + if ((ctx = RDB_alloc(p, sizeof(RdbxRespToRedisLoader))) == NULL) { + close(fd); + RDB_reportError(p, (RdbRes) RDBX_ERR_RESP_FAILED_ALLOC, + "Failed to allocate struct RdbxRespToRedisLoader"); + return NULL; + } + + /* init RdbxRespToRedisLoader context */ + memset(ctx, 0, sizeof(RdbxRespToRedisLoader)); + ctx->p = p; + ctx->fd = fd; + ctx->pendingCmds.num = 0; + ctx->pendingCmds.pipelineDepth = PIPELINE_DEPTH_DEF; + readRespInit(&ctx->respReader); + + /* Set 'this' writer to rdbToResp */ + RdbxRespWriter inst = {ctx, redisLoaderDelete, redisLoaderWritev, redisLoaderFlush}; + RDBX_attachRespWriter(rdbToResp, &inst); + return ctx; +} + +_LIBRDB_API RdbxRespToRedisLoader *RDBX_createRespToRedisTcp(RdbParser *p, + RdbxToResp *rdbToResp, + const char *hostname, + int port) { + int sockfd; + + sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd == -1) { + RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_CREATE_SOCKET, "Failed to create tcp socket"); + return NULL; + } + + struct sockaddr_in server_addr; + memset(&server_addr, 0, sizeof(server_addr)); + server_addr.sin_family = AF_INET; + server_addr.sin_port = htons(port); + if (inet_pton(AF_INET, hostname, &(server_addr.sin_addr)) <= 0) { + RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_INVALID_ADDRESS, + "Invalid tcp address (hostname=%s, port=%d)", hostname, port); + close(sockfd); + return NULL; + } + + if (connect(sockfd, (struct sockaddr *) &server_addr, sizeof(server_addr)) == -1) { + RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_INVALID_ADDRESS, + "Invalid tcp address (hostname=%s, port=%d)", hostname, port); + close(sockfd); + return NULL; + } + + return RDBX_createRespToRedisFd(p, rdbToResp, sockfd); +} \ No newline at end of file diff --git a/src/ext/respToTcpLoader.c b/src/ext/respToTcpLoader.c deleted file mode 100644 index 5d08cbc..0000000 --- a/src/ext/respToTcpLoader.c +++ /dev/null @@ -1,187 +0,0 @@ -#include "common.h" -#include -#include -#include -#include -#include -#include -#include "readerResp.h" - -#ifdef USE_OPENSSL -#include -#include -#endif - -#define PIPELINE_DEPTH_DEFAULT 200 /* Default Number of pending cmds before waiting for response(s) */ -#define PIPELINE_DEPTH_MAX 1000 /* limit the max value allowed to configure for pipeline depth */ - -#define NUM_RECORDED_CMDS 400 /* Number of commands to backlog, in a cyclic array */ -#define RECORDED_DATA_MAX_LEN 40 /* Maximum payload size from any command to record into cyclic array */ - -#define REPLY_BUFF_SIZE 4096 /* reply buffer size */ - - -struct RdbxRespToTcpLoader { - - struct { - int num; - int pipelineDepth; - char cmdPrefix[NUM_RECORDED_CMDS][RECORDED_DATA_MAX_LEN]; - int cmdAt; - } pendingCmds; - - RespReaderCtx respReader; - RdbParser *p; - int fd; -}; - -/* Read 'numToRead' replies from the TCP socket. - * Return 0 for success, 1 otherwise. */ -static int readReplies(RdbxRespToTcpLoader *ctx, int numToRead) { - char buff[REPLY_BUFF_SIZE]; - - RespReaderCtx *respReader = &ctx->respReader; - size_t countRepliesBefore = respReader->countReplies; - size_t repliesExpected = respReader->countReplies + numToRead; - - while (respReader->countReplies < repliesExpected) { - int rd = recv(ctx->fd, buff, sizeof(buff), 0); - - if (rd > 0) { - /* Data was received, process it */ - if (RESP_REPLY_ERR == readRespReplies(respReader, buff, rd)) { - RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP_READ, respReader->errorMsg); - return 1; - } - - } else if (rd == 0) { - RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP2TCP_CONN_CLOSE, "Connection closed by the remote side"); - return 1; - } else { - RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP2TCP_FAILED_READ, "Failed to recv() from Redis server. Exit."); - return 1; - } - } - - ctx->pendingCmds.num -= (respReader->countReplies - countRepliesBefore); - return 0; -} - -/* For debugging, record the command into the cyclic array before sending it */ -/* TODO: on error response, report which command caused the failure */ -static inline void recordNewCmd(RdbxRespToTcpLoader *ctx, const struct iovec *cmd_iov, int iovcnt) { - int recordCmdEntry = (ctx->respReader.countReplies + ctx->pendingCmds.num) % NUM_RECORDED_CMDS; - char *recordCmdPrefixAt = ctx->pendingCmds.cmdPrefix[recordCmdEntry]; - - int copiedBytes = 0, bytesToCopy = RECORDED_DATA_MAX_LEN - 1; - - const struct iovec* currentIov = cmd_iov; - for (int i = 0; i < iovcnt && bytesToCopy; ++i) { - int slice = (currentIov->iov_len >= ((size_t)bytesToCopy)) ? bytesToCopy : (int) currentIov->iov_len; - - for (int j = 0 ; j < slice ; ) - recordCmdPrefixAt[copiedBytes++] = ((char *)currentIov->iov_base)[j++]; - - bytesToCopy -= slice; - ++currentIov; - } - recordCmdPrefixAt[copiedBytes] = '\0'; -} - -/* Write the vector of data to the TCP socket with writev() sys-call. - * Return 0 for success, 1 otherwise. */ -static int tcpLoaderWritev(void *context, const struct iovec *iov, int count, int startCmd, int endCmd) { - UNUSED(startCmd, endCmd); - - RdbxRespToTcpLoader *ctx = context; - - if (unlikely(ctx->pendingCmds.num == ctx->pendingCmds.pipelineDepth)) { - if (readReplies(ctx, 1 /* at least one */)) - return 1; - } - - if (startCmd) - recordNewCmd(ctx, iov, count); - - if (unlikely(writev(ctx->fd, iov, count) == -1)) { - RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP2TCP_FAILED_WRITE, "Failed to write tcp socket"); - return 1; - } - - ctx->pendingCmds.num += endCmd; - return 0; -} - -/* Flush the pending commands by reading the remaining replies. - * Return 0 for success, 1 otherwise. */ -static int tcpLoaderFlush(void *context) { - RdbxRespToTcpLoader *ctx = context; - if (ctx->pendingCmds.num) - return readReplies(ctx, ctx->pendingCmds.num); - return 0; -} - -/* Delete the TCP loader context and perform cleanup. */ -static void tcpLoaderDelete(void *context) { - struct RdbxRespToTcpLoader *ctx = context; - tcpLoaderFlush(ctx); - shutdown(ctx->fd, SHUT_WR); /* graceful shutdown */ - close(ctx->fd); - RDB_free(ctx->p, ctx); -} - -/* Create and initialize the RdbxRespToTcpLoader context. - * Return a pointer to the created context on success, or NULL on failure. */ -_LIBRDB_API RdbxRespToTcpLoader *RDBX_createRespToTcpLoader(RdbParser *p, - RdbxToResp *rdbToResp, - const char *hostname, - int port, - int pipelineDepth) { - RdbxRespToTcpLoader *ctx; - - if (pipelineDepth <= 0 || pipelineDepth>PIPELINE_DEPTH_MAX) - pipelineDepth = PIPELINE_DEPTH_DEFAULT; - - int sockfd = socket(AF_INET, SOCK_STREAM, 0); - if (sockfd == -1) { - RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2TCP_CREATE_SOCKET, "Failed to create tcp socket"); - return NULL; - } - - struct sockaddr_in server_addr; - memset(&server_addr, 0, sizeof(server_addr)); - server_addr.sin_family = AF_INET; - server_addr.sin_port = htons(port); - if (inet_pton(AF_INET, hostname, &(server_addr.sin_addr)) <= 0) { - RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2TCP_INVALID_ADDRESS, - "Invalid tcp address (hostname=%s, port=%d)", hostname, port); - close(sockfd); - return NULL; - } - - if (connect(sockfd, (struct sockaddr *) &server_addr, sizeof(server_addr)) == -1) { - RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2TCP_INVALID_ADDRESS, - "Invalid tcp address (hostname=%s, port=%d)", hostname, port); - close(sockfd); - return NULL; - } - - if ((ctx = RDB_alloc(p, sizeof(RdbxRespToTcpLoader))) == NULL) { - close(sockfd); - RDB_reportError(p, (RdbRes) RDBX_ERR_RESP_FAILED_ALLOC, "Failed to allocate struct RdbxRespToTcpLoader"); - return NULL; - } - - memset(ctx, 0, sizeof(RdbxRespToTcpLoader)); - ctx->p = p; - ctx->fd = sockfd; - ctx->pendingCmds.num = 0; - ctx->pendingCmds.pipelineDepth = pipelineDepth; - readRespInit(&ctx->respReader); - - /* Attach this writer to rdbToResp */ - RdbxRespWriter inst = {ctx, tcpLoaderDelete, tcpLoaderWritev, tcpLoaderFlush}; - RDBX_attachRespWriter(rdbToResp, &inst); - - return ctx; -} diff --git a/src/ext/respWriter.c b/src/ext/respWriter.c index 71b2df6..282e158 100644 --- a/src/ext/respWriter.c +++ b/src/ext/respWriter.c @@ -10,7 +10,7 @@ struct RdbxRespFileWriter { }; /* return 0 for success. 1 Otherwise. */ -static int respFileWritev(void *context, const struct iovec *iov, int count, int startCmd, int endCmd) { +static int respFileWritev(void *context, struct iovec *iov, int count, int startCmd, int endCmd) { UNUSED(startCmd); struct RdbxRespFileWriter *ctx = context; ctx->cmdCount += endCmd; diff --git a/src/lib/parser.c b/src/lib/parser.c index 13aa14b..e6ebcbc 100644 --- a/src/lib/parser.c +++ b/src/lib/parser.c @@ -179,7 +179,7 @@ _LIBRDB_API RdbParser *RDB_createParserRdb(RdbMemAlloc *memAlloc) { p->appCbCtx.numBulks = 0; p->loggerCb = loggerCbDefault; p->logLevel = RDB_LOG_DBG; - p->maxRawLen = SIZE_MAX; + p->maxRawSize = SIZE_MAX; p->errorCode = RDB_OK; p->handlers[RDB_LEVEL_RAW] = NULL; p->handlers[RDB_LEVEL_STRUCT] = NULL; @@ -364,8 +364,8 @@ _LIBRDB_API void RDB_IgnoreChecksum(RdbParser *p) { p->ignoreChecksum = 1; } -_LIBRDB_API void RDB_setMaxRawLenHandling(RdbParser *p, size_t size) { - p->maxRawLen = size; +_LIBRDB_API void RDB_setMaxRawSize(RdbParser *p, size_t size) { + p->maxRawSize = size; } _LIBRDB_API void RDB_log(RdbParser *p, RdbLogLevel lvl, const char *format, ...) { @@ -718,12 +718,12 @@ static RdbStatus finalizeConfig(RdbParser *p, int isParseFromBuff) { } static void printParserState(RdbParser *p) { - printf ("Parser error message:%s\n", RDB_getErrorMessage(p)); - printf ("Parser error code:%d\n", RDB_getErrorCode(p)); - printf ("Parser element func name: %s\n", peInfo[p->parsingElement].funcname); - printf ("Parser element func description: %s\n", peInfo[p->parsingElement].funcname); - printf ("Parser element state:%d\n", p->elmCtx.state); - bulkPoolPrintDbg(p); + RDB_log(p, RDB_LOG_ERR, "Parser error message: %s", RDB_getErrorMessage(p)); + RDB_log(p, RDB_LOG_ERR, "Parser error code: %d", RDB_getErrorCode(p)); + RDB_log(p, RDB_LOG_ERR, "Parser element func name: %s", peInfo[p->parsingElement].funcname); + RDB_log(p, RDB_LOG_ERR, "Parser element func description: %s", peInfo[p->parsingElement].funcname); + RDB_log(p, RDB_LOG_ERR, "Parser element state:%d", p->elmCtx.state); + //bulkPoolPrintDbg(p); } static void loggerCbDefault(RdbLogLevel l, const char *msg) { @@ -1792,10 +1792,13 @@ static RdbStatus readRdbFromReader(RdbParser *p, size_t len, AllocTypeRq type, c bytesToFill -= overflow; p->bytesRead -= overflow; - res = p->reader->readFunc(p->reader->readerData, - ((char *) (*binfo)->ref) + (*binfo)->written, bytesToFill); - (*binfo)->written += bytesToFill; - if (res != RDB_STATUS_OK) goto not_ok; + /* Verify bigger than 0 (non-standard readers might fail on it) */ + if (likely(bytesToFill)) { + res = p->reader->readFunc(p->reader->readerData, + ((char *) (*binfo)->ref) + (*binfo)->written, bytesToFill); + (*binfo)->written += bytesToFill; + if (res != RDB_STATUS_OK) goto not_ok; + } return RDB_STATUS_PAUSED; } else { /* fill up entire item */ diff --git a/src/lib/parser.h b/src/lib/parser.h index d2867a5..0a4a645 100644 --- a/src/lib/parser.h +++ b/src/lib/parser.h @@ -283,7 +283,7 @@ struct RdbParser { int ignoreChecksum; RdbLoggerCB loggerCb; RdbLogLevel logLevel; - size_t maxRawLen; + size_t maxRawSize; /*** context ***/ ElementCtx elmCtx; /* parsing-element context */ diff --git a/src/lib/parserRaw.c b/src/lib/parserRaw.c index ba517b5..091b25c 100644 --- a/src/lib/parserRaw.c +++ b/src/lib/parserRaw.c @@ -718,7 +718,7 @@ static RdbStatus aggMakeRoom(RdbParser *p, size_t numBytesRq) { if (likely(freeRoomLeft >= numBytesRq)) return RDB_STATUS_OK; - if (unlikely(p->maxRawLen < ctx->totalSize + numBytesRq)) { + if (unlikely(p->maxRawSize < ctx->totalSize + numBytesRq)) { RDB_reportError(p, RDB_ERR_MAX_RAW_LEN_EXCEEDED_FOR_KEY, "Maximum raw length exceeded for key (len=%lu)", ctx->totalSize + numBytesRq); return RDB_STATUS_ERROR; diff --git a/test/dumps/100_lists.rdb b/test/dumps/100_lists.rdb new file mode 100644 index 0000000..3bfe528 Binary files /dev/null and b/test/dumps/100_lists.rdb differ diff --git a/test/dumps/empty.rdb b/test/dumps/empty.rdb new file mode 100644 index 0000000..5576bc6 Binary files /dev/null and b/test/dumps/empty.rdb differ diff --git a/test/test_common.h b/test/test_common.h index fc949fc..a41aaf4 100644 --- a/test/test_common.h +++ b/test/test_common.h @@ -32,7 +32,7 @@ void setupRedisServer(); void teardownRedisServer(); /* test groups */ -int group_rdb_to_loader(); +int group_rdb_to_redis(); int group_test_rdb_cli(); int group_rdb_to_resp(); int group_main(); diff --git a/test/test_main.c b/test/test_main.c index 8d9b885..ee5a527 100644 --- a/test/test_main.c +++ b/test/test_main.c @@ -23,6 +23,27 @@ static void test_createReader_missingFile(void **state) { RDB_deleteParser(parser); } +static void test_empty_rdb(void **state) { + UNUSED(state); + + const char *rdbfile = DUMP_FOLDER("empty.rdb"); + const char *jsonfile = TMP_FOLDER("empty.json"); + + RdbStatus status; + RdbParser *parser = RDB_createParserRdb(NULL); + RDB_setLogLevel(parser, RDB_LOG_ERR); + assert_non_null(RDBX_createReaderFile(parser, rdbfile)); + RdbxToJsonConf r2jConf = {RDB_LEVEL_DATA, RDBX_CONV_JSON_ENC_PLAIN, 0, 1}; + assert_non_null(RDBX_createHandlersToJson(parser, + jsonfile, + &r2jConf)); + + while ((status = RDB_parse(parser)) == RDB_STATUS_WAIT_MORE_DATA); + assert_int_equal( status, RDB_STATUS_OK); + + RDB_deleteParser(parser); +} + static void test_createHandlersRdbToJson_and_2_FilterKey(void **state) { UNUSED(state); @@ -112,6 +133,7 @@ int group_main(void) { /* Insert here your test functions */ const struct CMUnitTest tests[] = { cmocka_unit_test(test_createReader_missingFile), + cmocka_unit_test(test_empty_rdb), cmocka_unit_test(test_createHandlersRdbToJson_and_2_FilterKey), cmocka_unit_test(test_mixed_levels_registration), }; @@ -155,7 +177,7 @@ int main(int argc, char *argv[]) { RUN_TEST_GROUP(group_mem_management); RUN_TEST_GROUP(group_bulk_ops); RUN_TEST_GROUP(group_pause); - RUN_TEST_GROUP(group_rdb_to_loader); /*external*/ + RUN_TEST_GROUP(group_rdb_to_redis); /*external*/ RUN_TEST_GROUP(group_test_rdb_cli); /*external*/ @@ -166,7 +188,7 @@ int main(int argc, char *argv[]) { RUN_TEST_GROUP(group_rdb_to_json); RUN_TEST_GROUP(group_mem_management); RUN_TEST_GROUP(group_bulk_ops); - RUN_TEST_GROUP(group_rdb_to_loader); /*external*/ + RUN_TEST_GROUP(group_rdb_to_redis); /*external*/ RUN_TEST_GROUP(group_test_rdb_cli); /*external*/ printf("\n*************** END TESTING *******************\n"); diff --git a/test/test_rdb_cli.c b/test/test_rdb_cli.c index 2f714a3..acab1f8 100644 --- a/test/test_rdb_cli.c +++ b/test/test_rdb_cli.c @@ -59,7 +59,7 @@ static void test_rdb_cli_json(void **state) { assert_json_equal(DUMP_FOLDER("multiple_lists_strings_data.json"), "./test/tmp/out.json", 0); } -static void test_rdb_cli_resp_tcp(void **state) { +static void test_rdb_cli_resp_to_redis(void **state) { UNUSED(state); test_rdb_cli_resp_common(DUMP_FOLDER("multiple_lists_strings.rdb")); } @@ -74,7 +74,7 @@ int group_test_rdb_cli(void) { const struct CMUnitTest tests[] = { cmocka_unit_test(test_rdb_cli_json), - cmocka_unit_test_setup(test_rdb_cli_resp_tcp, setupTest), + cmocka_unit_test_setup(test_rdb_cli_resp_to_redis, setupTest), }; setupRedisServer(); diff --git a/test/test_rdb_to_loader.c b/test/test_rdb_to_loader.c deleted file mode 100644 index 2755dbb..0000000 --- a/test/test_rdb_to_loader.c +++ /dev/null @@ -1,196 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include "test_common.h" - -static int setupTest(void **state) { - UNUSED(state); - runSystemCmd("%s/redis-cli -p %d flushall > /dev/null", redisInstallFolder, redisPort); - runSystemCmd("%s/redis-cli -p %d save > /dev/null", redisInstallFolder, redisPort); - return 0; -} - -/* - * Testing RESP TCP against live server: - * 1. Run RDB to Json (out1.json) - * 2. Run RDB against Redis and save DUMP-RDB - * 3. From DUMP-RDB generate Json (out2.json) - * 4. assert_json_equal(out1.json , out2.json) - * - * The test will run twice against: - * A. old Redis target (no RESTORE) - * B. new Redis target (RESTORE) - * Note: This test cannot tell if actually run RESTORE command in the background. - * test_rdb_to_resp.c verifies that RESTORE command is used only when it should. - */ -static void test_rdb_to_loader_common(const char *rdbfile, int pipelineDepth, int ignoreListOrder) { - RdbParser *parser; - RdbStatus status; - - /* test one time without RESTORE, Playing against old version. - * and one time with RESTORE, Playing against new version. */ - for (int isRestore = 0 ; isRestore <= 1 ; ++isRestore) { - - int targetRdbVer = (isRestore == 0) ? 1 : 100; /* old-target (not RESTORE) VS. new-target (RESTORE) */ - - runSystemCmd("%s/redis-cli -p %d flushall > /dev/null", redisInstallFolder, redisPort); - - RdbxToRespConf rdb2respConf = { - .supportRestore = 1, - .restore.dstRdbVersion = targetRdbVer, - }; - RdbxToJsonConf rdb2jsonConf = {RDB_LEVEL_DATA, RDBX_CONV_JSON_ENC_PLAIN, 1, 1}; - - /* RDB to JSON */ - parser = RDB_createParserRdb(NULL); - RDB_setLogLevel(parser, RDB_LOG_ERR); - assert_non_null(RDBX_createReaderFile(parser, rdbfile)); - assert_non_null(RDBX_createHandlersToJson(parser, - TMP_FOLDER("out1.json"), - &rdb2jsonConf)); - while ((status = RDB_parse(parser)) == RDB_STATUS_WAIT_MORE_DATA); - assert_int_equal(status, RDB_STATUS_OK); - RDB_deleteParser(parser); - - /* RDB to TCP */ - RdbxToResp *rdbToResp; - parser = RDB_createParserRdb(NULL); - RDB_setLogLevel(parser, RDB_LOG_ERR); - assert_non_null(RDBX_createReaderFile(parser, rdbfile)); - assert_non_null(rdbToResp = RDBX_createHandlersToResp(parser, &rdb2respConf)); - assert_non_null(RDBX_createRespToTcpLoader(parser, rdbToResp, "127.0.0.1", redisPort, pipelineDepth)); - RDB_setLogLevel(parser, RDB_LOG_ERR); - while ((status = RDB_parse(parser)) == RDB_STATUS_WAIT_MORE_DATA); - assert_int_equal(status, RDB_STATUS_OK); - RDB_deleteParser(parser); - - /* DUMP-RDB from Redis */ - runSystemCmd("%s/redis-cli -p %d save > /dev/null", redisInstallFolder, redisPort); - - /* DUMP-RDB to JSON */ - parser = RDB_createParserRdb(NULL); - RDB_setLogLevel(parser, RDB_LOG_ERR); - assert_non_null(RDBX_createReaderFile(parser, TMP_FOLDER("dump.rdb"))); - assert_non_null(RDBX_createHandlersToJson(parser, - TMP_FOLDER("out2.json"), - &rdb2jsonConf)); - while ((status = RDB_parse(parser)) == RDB_STATUS_WAIT_MORE_DATA); - assert_int_equal(status, RDB_STATUS_OK); - RDB_deleteParser(parser); - - /* Json (from DUMP-RDB) vs. expected Json */ - assert_json_equal(TMP_FOLDER("out1.json"), TMP_FOLDER("out2.json"), ignoreListOrder); - } -} - -static void test_rdb_to_loader_single_string(void **state) { - UNUSED(state); - test_rdb_to_loader_common(DUMP_FOLDER("single_key.rdb"), 0, 0); -} - -static void test_rdb_to_loader_single_list(void **state) { - UNUSED(state); - test_rdb_to_loader_common(DUMP_FOLDER("quicklist2_v11.rdb"), 0, 0); -} - -static void test_rdb_to_loader_multiple_lists_strings(void **state) { - UNUSED(state); - test_rdb_to_loader_common(DUMP_FOLDER("multiple_lists_strings.rdb"), 0, 0); -} - -static void test_rdb_to_loader_multiple_lists_strings_pipeline_depth_1(void **state) { - UNUSED(state); - test_rdb_to_loader_common(DUMP_FOLDER("multiple_lists_strings.rdb"), 1, 0); -} - -static void test_rdb_to_loader_plain_list(void **state) { - UNUSED(state); - test_rdb_to_loader_common(DUMP_FOLDER("plain_list_v6.rdb"), 1, 0); -} - -static void test_rdb_to_loader_quicklist(void **state) { - UNUSED(state); - test_rdb_to_loader_common(DUMP_FOLDER("quicklist.rdb"), 1, 0); -} - -static void test_rdb_to_loader_single_ziplist(void **state) { - UNUSED(state); - test_rdb_to_loader_common(DUMP_FOLDER("ziplist_v3.rdb"), 1, 0); -} - -static void test_rdb_to_loader_plain_hash(void **state) { - UNUSED(state); - test_rdb_to_loader_common(DUMP_FOLDER("plain_hash_v3.rdb"), 1, 0); -} - -static void test_rdb_to_loader_hash_zl(void **state) { - UNUSED(state); - test_rdb_to_loader_common(DUMP_FOLDER("hash_zl_v6.rdb"), 1, 0); -} - -static void test_rdb_to_loader_hash_lp(void **state) { - UNUSED(state); - test_rdb_to_loader_common(DUMP_FOLDER("hash_lp_v11.rdb"), 1, 0); -} - -static void test_rdb_to_loader_hash_zm(void **state) { - UNUSED(state); - test_rdb_to_loader_common(DUMP_FOLDER("hash_zm_v2.rdb"), 1, 0); -} - -static void test_rdb_to_loader_plain_set(void **state) { - UNUSED(state); - test_rdb_to_loader_common(DUMP_FOLDER("plain_set_v6.rdb"), 1, 1); -} - -static void test_rdb_to_loader_set_is(void **state) { - UNUSED(state); - test_rdb_to_loader_common(DUMP_FOLDER("set_is_v11.rdb"), 1, 1); -} - -static void test_rdb_to_loader_set_lp(void **state) { - UNUSED(state); - test_rdb_to_loader_common(DUMP_FOLDER("set_lp_v11.rdb"), 1, 1); -} - -/*************************** group_rdb_to_loader *******************************/ -int group_rdb_to_loader() { - - if (!redisInstallFolder) { - printf("[ SKIPPED ] (Redis installation folder is not configured)\n"); - return 0; - } - - const struct CMUnitTest tests[] = { - /* string */ - cmocka_unit_test_setup(test_rdb_to_loader_single_string, setupTest), - /* list */ - cmocka_unit_test_setup(test_rdb_to_loader_single_list, setupTest), - cmocka_unit_test_setup(test_rdb_to_loader_plain_list, setupTest), - cmocka_unit_test_setup(test_rdb_to_loader_quicklist, setupTest), - cmocka_unit_test_setup(test_rdb_to_loader_single_ziplist, setupTest), - /* hash */ - cmocka_unit_test_setup(test_rdb_to_loader_plain_hash, setupTest), - cmocka_unit_test_setup(test_rdb_to_loader_hash_zl, setupTest), - cmocka_unit_test_setup(test_rdb_to_loader_hash_lp, setupTest), - cmocka_unit_test_setup(test_rdb_to_loader_hash_zm, setupTest), - /* set */ - cmocka_unit_test_setup(test_rdb_to_loader_plain_set, setupTest), - cmocka_unit_test_setup(test_rdb_to_loader_set_is, setupTest), - cmocka_unit_test_setup(test_rdb_to_loader_set_lp, setupTest), - - /* misc */ - cmocka_unit_test_setup(test_rdb_to_loader_multiple_lists_strings, setupTest), - cmocka_unit_test_setup(test_rdb_to_loader_multiple_lists_strings_pipeline_depth_1, setupTest), - - }; - - setupRedisServer(); - int res = cmocka_run_group_tests(tests, NULL, NULL); - teardownRedisServer(); - - return res; -} \ No newline at end of file diff --git a/test/test_rdb_to_redis.c b/test/test_rdb_to_redis.c new file mode 100644 index 0000000..7bcfc8b --- /dev/null +++ b/test/test_rdb_to_redis.c @@ -0,0 +1,247 @@ +#include +#include +#include +#include +#include +#include +#include "test_common.h" + +static int setupTest(void **state) { + UNUSED(state); + runSystemCmd("%s/redis-cli -p %d flushall > /dev/null", redisInstallFolder, redisPort); + runSystemCmd("%s/redis-cli -p %d save > /dev/null", redisInstallFolder, redisPort); + return 0; +} + +/* + * Testing RESP against live server: + * 1. Run RDB to Json (out1.json) + * 2. Run RDB against Redis and save DUMP-RDB + * 3. From DUMP-RDB generate Json (out2.json) + * 4. assert_json_equal(out1.json , out2.json) + * + * The test will run twice against: + * A. old Redis target (no RESTORE) + * B. new Redis target (RESTORE) + * Note: This test cannot tell if actually run RESTORE command in the background. + * test_rdb_to_resp.c verifies that RESTORE command is used only when it should. + */ +static void test_rdb_to_redis_common(const char *rdbfile, int pipelineDepth, int ignoreListOrder) { + RdbParser *parser; + RdbStatus status; + + /* test one time without RESTORE, Playing against old version. + * and one time with RESTORE, Playing against new version. */ + for (int isRestore = 0 ; isRestore <= 1 ; ++isRestore) { + + int targetRdbVer = (isRestore == 0) ? 1 : 100; /* old-target (not RESTORE) VS. new-target (RESTORE) */ + + runSystemCmd("%s/redis-cli -p %d flushall > /dev/null", redisInstallFolder, redisPort); + + RdbxToRespConf rdb2respConf = { + .supportRestore = 1, + .restore.dstRdbVersion = targetRdbVer, + }; + RdbxToJsonConf rdb2jsonConf = {RDB_LEVEL_DATA, RDBX_CONV_JSON_ENC_PLAIN, 1, 1}; + + /* RDB to JSON */ + parser = RDB_createParserRdb(NULL); + RDB_setLogLevel(parser, RDB_LOG_ERR); + assert_non_null(RDBX_createReaderFile(parser, rdbfile)); + assert_non_null(RDBX_createHandlersToJson(parser, + TMP_FOLDER("out1.json"), + &rdb2jsonConf)); + while ((status = RDB_parse(parser)) == RDB_STATUS_WAIT_MORE_DATA); + assert_int_equal(status, RDB_STATUS_OK); + RDB_deleteParser(parser); + + /* RDB to TCP */ + RdbxToResp *rdbToResp; + parser = RDB_createParserRdb(NULL); + RDB_setLogLevel(parser, RDB_LOG_ERR); + assert_non_null(RDBX_createReaderFile(parser, rdbfile)); + assert_non_null(rdbToResp = RDBX_createHandlersToResp(parser, &rdb2respConf)); + + RdbxRespToRedisLoader *r2r = RDBX_createRespToRedisTcp(parser, + rdbToResp, + "127.0.0.1", + redisPort); + assert_non_null(r2r); + RDBX_setPipelineDepth(r2r, pipelineDepth); + RDB_setLogLevel(parser, RDB_LOG_ERR); + while ((status = RDB_parse(parser)) == RDB_STATUS_WAIT_MORE_DATA); + assert_int_equal(status, RDB_STATUS_OK); + RDB_deleteParser(parser); + + /* DUMP-RDB from Redis */ + runSystemCmd("%s/redis-cli -p %d save > /dev/null", redisInstallFolder, redisPort); + + /* DUMP-RDB to JSON */ + parser = RDB_createParserRdb(NULL); + RDB_setLogLevel(parser, RDB_LOG_ERR); + assert_non_null(RDBX_createReaderFile(parser, TMP_FOLDER("dump.rdb"))); + assert_non_null(RDBX_createHandlersToJson(parser, + TMP_FOLDER("out2.json"), + &rdb2jsonConf)); + while ((status = RDB_parse(parser)) == RDB_STATUS_WAIT_MORE_DATA); + assert_int_equal(status, RDB_STATUS_OK); + RDB_deleteParser(parser); + + /* Json (from DUMP-RDB) vs. expected Json */ + assert_json_equal(TMP_FOLDER("out1.json"), TMP_FOLDER("out2.json"), ignoreListOrder); + } +} + +static void test_rdb_to_redis_single_string(void **state) { + UNUSED(state); + test_rdb_to_redis_common(DUMP_FOLDER("single_key.rdb"), 0, 0); +} + +static void test_rdb_to_redis_single_list(void **state) { + UNUSED(state); + test_rdb_to_redis_common(DUMP_FOLDER("quicklist2_v11.rdb"), 0, 0); +} + +static void test_rdb_to_redis_multiple_lists_strings(void **state) { + UNUSED(state); + test_rdb_to_redis_common(DUMP_FOLDER("multiple_lists_strings.rdb"), 0, 0); +} + +static void test_rdb_to_redis_multiple_lists_strings_pipeline_depth_1(void **state) { + UNUSED(state); + test_rdb_to_redis_common(DUMP_FOLDER("multiple_lists_strings.rdb"), 1, 0); +} + +static void test_rdb_to_redis_plain_list(void **state) { + UNUSED(state); + test_rdb_to_redis_common(DUMP_FOLDER("plain_list_v6.rdb"), 1, 0); +} + +static void test_rdb_to_redis_quicklist(void **state) { + UNUSED(state); + test_rdb_to_redis_common(DUMP_FOLDER("quicklist.rdb"), 1, 0); +} + +static void test_rdb_to_redis_single_ziplist(void **state) { + UNUSED(state); + test_rdb_to_redis_common(DUMP_FOLDER("ziplist_v3.rdb"), 1, 0); +} + +static void test_rdb_to_redis_plain_hash(void **state) { + UNUSED(state); + test_rdb_to_redis_common(DUMP_FOLDER("plain_hash_v3.rdb"), 1, 0); +} + +static void test_rdb_to_redis_hash_zl(void **state) { + UNUSED(state); + test_rdb_to_redis_common(DUMP_FOLDER("hash_zl_v6.rdb"), 1, 0); +} + +static void test_rdb_to_redis_hash_lp(void **state) { + UNUSED(state); + test_rdb_to_redis_common(DUMP_FOLDER("hash_lp_v11.rdb"), 1, 0); +} + +static void test_rdb_to_redis_hash_zm(void **state) { + UNUSED(state); + test_rdb_to_redis_common(DUMP_FOLDER("hash_zm_v2.rdb"), 1, 0); +} + +static void test_rdb_to_redis_plain_set(void **state) { + UNUSED(state); + test_rdb_to_redis_common(DUMP_FOLDER("plain_set_v6.rdb"), 1, 1); +} + +static void test_rdb_to_redis_set_is(void **state) { + UNUSED(state); + test_rdb_to_redis_common(DUMP_FOLDER("set_is_v11.rdb"), 1, 1); +} + +static void test_rdb_to_redis_set_lp(void **state) { + UNUSED(state); + test_rdb_to_redis_common(DUMP_FOLDER("set_lp_v11.rdb"), 1, 1); +} + +/* iff 'delKeyBeforeWrite' is not set, then the parser will return an error on + * loading 100_lists.rdb ("mylist1 mylist2 ... mylist100") on key 'mylist62' + * Because key `mylist62` created earlier with a string value. */ +static void test_rdb_to_redis_del_before_write(void **state) { + UNUSED(state); + RdbParser *parser; + RdbStatus status; + for (int delKeyBeforeWrite = 0 ; delKeyBeforeWrite <= 1 ; ++delKeyBeforeWrite) { + RdbxToRespConf rdb2respConf = { + .delKeyBeforeWrite = delKeyBeforeWrite, + .supportRestore = 1, + .restore.dstRdbVersion = 100}; + + runSystemCmd("%s/redis-cli -p %d set mylist62 1 > /dev/null", redisInstallFolder, redisPort); + /* RDB to TCP */ + RdbxToResp *rdbToResp; + parser = RDB_createParserRdb(NULL); + RDB_setLogLevel(parser, RDB_LOG_ERR); + assert_non_null(RDBX_createReaderFile(parser, DUMP_FOLDER("100_lists.rdb"))); + assert_non_null(rdbToResp = RDBX_createHandlersToResp(parser, &rdb2respConf)); + + assert_non_null(RDBX_createRespToRedisTcp(parser, + rdbToResp, + "127.0.0.1", + redisPort)); + + RDB_setLogLevel(parser, RDB_LOG_ERR); + + while ((status = RDB_parse(parser)) == RDB_STATUS_WAIT_MORE_DATA); + + if (status == RDB_STATUS_OK) + assert_int_equal(delKeyBeforeWrite, 1); + else { + assert_int_equal(delKeyBeforeWrite, 0); + /* verify returned error code. Verify error message. */ + RdbRes err = RDB_getErrorCode(parser); + assert_int_equal(err, RDBX_ERR_RESP_WRITE); + assert_non_null(strstr(RDB_getErrorMessage(parser), "mylist62")); + } + + RDB_deleteParser(parser); + } +} + +/*************************** group_rdb_to_redis *******************************/ +int group_rdb_to_redis() { + + if (!redisInstallFolder) { + printf("[ SKIPPED ] (Redis installation folder is not configured)\n"); + return 0; + } + + const struct CMUnitTest tests[] = { + /* string */ + cmocka_unit_test_setup(test_rdb_to_redis_single_string, setupTest), + /* list */ + cmocka_unit_test_setup(test_rdb_to_redis_single_list, setupTest), + cmocka_unit_test_setup(test_rdb_to_redis_plain_list, setupTest), + cmocka_unit_test_setup(test_rdb_to_redis_quicklist, setupTest), + cmocka_unit_test_setup(test_rdb_to_redis_single_ziplist, setupTest), + /* hash */ + cmocka_unit_test_setup(test_rdb_to_redis_plain_hash, setupTest), + cmocka_unit_test_setup(test_rdb_to_redis_hash_zl, setupTest), + cmocka_unit_test_setup(test_rdb_to_redis_hash_lp, setupTest), + cmocka_unit_test_setup(test_rdb_to_redis_hash_zm, setupTest), + /* set */ + cmocka_unit_test_setup(test_rdb_to_redis_plain_set, setupTest), + cmocka_unit_test_setup(test_rdb_to_redis_set_is, setupTest), + cmocka_unit_test_setup(test_rdb_to_redis_set_lp, setupTest), + + /* misc */ + cmocka_unit_test_setup(test_rdb_to_redis_multiple_lists_strings, setupTest), + cmocka_unit_test_setup(test_rdb_to_redis_multiple_lists_strings_pipeline_depth_1, setupTest), + cmocka_unit_test_setup(test_rdb_to_redis_del_before_write, setupTest), + + }; + + setupRedisServer(); + int res = cmocka_run_group_tests(tests, NULL, NULL); + teardownRedisServer(); + + return res; +} \ No newline at end of file diff --git a/test/test_rdb_to_resp.c b/test/test_rdb_to_resp.c index 41995de..733e768 100644 --- a/test/test_rdb_to_resp.c +++ b/test/test_rdb_to_resp.c @@ -78,6 +78,7 @@ static void runWithAndWithoutRestore(const char *rdbfile) { memset(&r2rConf, 0, sizeof(r2rConf)); r2rConf.supportRestore = 1; + r2rConf.delKeyBeforeWrite = 0; /* expect not use RESTORE */ r2rConf.restore.dstRdbVersion = 1;