Skip to content

Commit

Permalink
RESP2REDIS: Should not fail trying to load empty module (#53)
Browse files Browse the repository at this point in the history
If RDB2RESP was configured to "supportRestoreModuleAux" and generates
RESTOREMODAUX commands, currently relevant only to Redis enterprise,
then if RDB was generated by a server with some module, but user didn't
make any use of that module, attempting to play it to another server that
wasn't loaded with that module, the RDB parser will get fail. This is because
the module always store something in the AUX field, and the RDB parser will try
to load it.

In order to overcome this issue, A module that its AUX payload is less than
15 Bytes (including RDB version and checksum) counted as AUX field of an empty
Module (not in use), then the parser, when restoring the empty module, it
should ignore returned error: "-ERR Module X not found..."

Few changes were made to support it:
- Propagate restoreSize to RESP2REDIS handlers.
- ReaderResp gets now error-callback which can indicate to mask given error.
- RESP2REDIS indicates to ignore RESTOREMODAUX error message of type "-ERR Module .. not found ..."
- Added tests accordingly.

Additionally added timeout to recv() from redis to avoid blocking forever. Irrelevant to this fix.
  • Loading branch information
moticless authored Aug 11, 2024
1 parent f953088 commit efff6b5
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 77 deletions.
10 changes: 9 additions & 1 deletion api/librdb-ext-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ typedef enum {
RDBX_ERR_RESP2REDIS_FAILED_WRITE,
RDBX_ERR_RESP2REDIS_CONN_CLOSE,
RDBX_ERR_RESP2REDIS_MAX_RETRIES,
RDBX_ERR_RESP2REDIS_SET_TIMEOUT,
} RdbxRes;

/****************************************************************
Expand Down Expand Up @@ -198,14 +199,21 @@ _LIBRDB_API RdbxToResp *RDBX_createHandlersToResp(RdbParser *, RdbxToRespConf *)
* <user-defined-writer>
****************************************************************/

/* On start command pass command info. NULL otherwise. */
/* As streaming RESP protocol, when starting a new command, provide details
* about the command. Otherwise, pass NULL. This information will be used to log
* and report the command in case of a failure from Redis server. */
typedef struct RdbxRespWriterStartCmd {
/* Redis Command name (Ex: "SET", "RESTORE"). Owned by the caller. It is
* constant static string and Valid for ref behind the duration of the call. */
const char *cmd;

/* If key available as part of command. Else empty string.
* Owned by the caller. */
const char *key;

/* On restore command, size of serialized data. Otherwise, set to 0. */
size_t restoreSize;

} RdbxRespWriterStartCmd;

typedef struct RdbxRespWriter {
Expand Down
72 changes: 19 additions & 53 deletions src/ext/handlersToResp.c
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,7 @@ static inline RdbRes onWriteNewCmdDbg(RdbxToResp *ctx) {
if (ctx->debug.flags & RFLAG_ENUM_CMD_ID) {
char keyLenStr[32], cmdIdLenStr[32], cmdIdStr[32];

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "SET";
startCmd.key = KEY_CMD_ID_DBG;
RdbxRespWriterStartCmd startCmd = {"SET", KEY_CMD_ID_DBG, 0};

struct iovec iov[7];
/* write SET */
Expand Down Expand Up @@ -296,9 +294,7 @@ static inline RdbRes sendFirstRestoreFrag(RdbxToResp *ctx, RdbBulk frag, size_t
if (ctx->keyCtx.delBeforeWrite == DEL_KEY_BEFORE_BY_RESTORE_REPLACE)
extra_args++;

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "RESTORE";
startCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd startCmd = {"RESTORE", ctx->keyCtx.key, ctx->restoreCtx.restoreSize};

/* writev RESTORE */
char cmd[64];
Expand Down Expand Up @@ -326,9 +322,7 @@ static inline RdbRes sendFirstRestoreFragModuleAux(RdbxToResp *ctx, RdbBulk frag
struct iovec iov[3];
char lenStr[32];

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "RESTOREMODAUX";
startCmd.key = "";
RdbxRespWriterStartCmd startCmd = {"RESTOREMODAUX", "", ctx->restoreCtx.restoreSize};

/* writev RESTOREMODAUX */
iov[0].iov_base = ctx->restoreCtx.moduleAux.cmdPrefix;
Expand All @@ -354,9 +348,7 @@ static RdbRes toRespNewDb(RdbParser *p, void *userData, int dbid) {

int cnt = ll2string(dbidStr, sizeof(dbidStr), dbid);

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "SELECT";
startCmd.key = "";
RdbxRespWriterStartCmd startCmd = {"SELECT", "", 0};

IOV_CONST(&iov[0], "*2\r\n$6\r\nSELECT");
IOV_LENGTH(&iov[1], cnt, cntStr);
Expand Down Expand Up @@ -394,9 +386,7 @@ static RdbRes toRespNewKey(RdbParser *p, void *userData, RdbBulk key, RdbKeyInfo
struct iovec iov[4];
char keyLenStr[32];

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "DEL";
startCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd startCmd = {"DEL", ctx->keyCtx.key, 0};

IOV_CONST(&iov[0], "*2\r\n$3\r\nDEL");
IOV_LENGTH(&iov[1], ctx->keyCtx.keyLen, keyLenStr);
Expand All @@ -415,9 +405,7 @@ static RdbRes toRespEndKey(RdbParser *p, void *userData) {
/* key is in db. Set its expiration time */
if (ctx->keyCtx.info.expiretime != -1) {
struct iovec iov[6];
RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "PEXPIREAT";
startCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd startCmd = {"PEXPIREAT", ctx->keyCtx.key, 0};

char keyLenStr[32], expireLenStr[32], expireStr[32];
/* PEXPIREAT */
Expand Down Expand Up @@ -448,9 +436,7 @@ static RdbRes toRespString(RdbParser *p, void *userData, RdbBulk string) {

struct iovec iov[7];

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "SET";
startCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd startCmd = {"SET", ctx->keyCtx.key, 0};

/* write SET */
IOV_CONST(&iov[0], "*3\r\n$3\r\nSET");
Expand All @@ -473,9 +459,7 @@ static RdbRes toRespList(RdbParser *p, void *userData, RdbBulk item) {
char keyLenStr[32], valLenStr[32];
int valLen = RDB_bulkLen(p, item);

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "RPUSH";
startCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd startCmd = {"RPUSH", ctx->keyCtx.key, 0};

/* write RPUSH */
IOV_CONST(&iov[0], "*3\r\n$5\r\nRPUSH");
Expand All @@ -500,9 +484,7 @@ static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk va
int fieldLen = RDB_bulkLen(p, field);
int valueLen = RDB_bulkLen(p, value);

RdbxRespWriterStartCmd hsetCmd;
hsetCmd.cmd = "HSET";
hsetCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd hsetCmd = {"HSET", ctx->keyCtx.key, 0};

/* write RPUSH */
IOV_CONST(&iov[0], "*4\r\n$4\r\nHSET");
Expand All @@ -520,9 +502,8 @@ static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk va

if (expireAt == -1) return RDB_OK;

RdbxRespWriterStartCmd hpexpireatCmd;
hpexpireatCmd.cmd = "HPEXPIREAT";
hpexpireatCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd hpexpireatCmd = {"HPEXPIREAT", ctx->keyCtx.key, 0};

/* write HPEXPIREAT */
IOV_CONST(&iov[0], "*6\r\n$10\r\nHPEXPIREAT");
/* write key */
Expand All @@ -545,9 +526,7 @@ static RdbRes toRespSet(RdbParser *p, void *userData, RdbBulk member) {

int valLen = RDB_bulkLen(p, member);

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "SADD";
startCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd startCmd = {"SADD", ctx->keyCtx.key, 0};

/* write RPUSH */
IOV_CONST(&iov[0], "*3\r\n$4\r\nSADD");
Expand All @@ -568,9 +547,7 @@ static RdbRes toRespZset(RdbParser *p, void *userData, RdbBulk member, double sc

int valLen = RDB_bulkLen(p, member);

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "ZADD";
startCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd startCmd = {"ZADD", ctx->keyCtx.key, 0};

/* write ZADD */
IOV_CONST(&iov[0], "*4\r\n$4\r\nZADD");
Expand Down Expand Up @@ -615,9 +592,7 @@ static RdbRes toRespFunction(RdbParser *p, void *userData, RdbBulk func) {

int funcLen = RDB_bulkLen(p, func);

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "FUNCTION";
startCmd.key = "";
RdbxRespWriterStartCmd startCmd = {"FUNCTION", "", 0};

if (ctx->conf.funcLibReplaceIfExist)
IOV_CONST(&iov[0], "*4\r\n$8\r\nFUNCTION\r\n$4\r\nLOAD\r\n$7\r\nREPLACE");
Expand All @@ -644,9 +619,7 @@ static RdbRes toRespStreamMetaData(RdbParser *p, void *userData, RdbStreamMeta *
* for the Stream type. (We don't use the MAXLEN 0 trick from aof.c
* because of Redis Enterprise CRDT compatibility issues - Can't XSETID "back") */

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "XGROUP CREATE";
startCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd startCmd = {"XGROUP CREATE", ctx->keyCtx.key, 0};

IOV_CONST(&iov[0], "*6\r\n$6\r\nXGROUP\r\n$6\r\nCREATE");
IOV_LENGTH(&iov[1], ctx->keyCtx.keyLen, keyLenStr);
Expand All @@ -671,9 +644,7 @@ static RdbRes toRespStreamMetaData(RdbParser *p, void *userData, RdbStreamMeta *
int idLen = snprintf(idStr, sizeof(idStr), "%lu-%lu",meta->lastID.ms,meta->lastID.seq);
int maxDelEntryIdLen = snprintf(maxDelEntryId, sizeof(maxDelEntryId), "%lu-%lu", meta->maxDelEntryID.ms, meta->maxDelEntryID.seq);

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "XSETID";
startCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd startCmd = {"XSETID", ctx->keyCtx.key, 0};

if ((ctx->keyCtx.info.opcode >= _RDB_TYPE_STREAM_LISTPACKS_2) && (ctx->targetRedisVerVal >= VER_VAL(7, 0))) {
IOV_CONST(&iov[0], "*7\r\n$6\r\nXSETID");
Expand Down Expand Up @@ -711,8 +682,7 @@ static RdbRes toRespStreamItem(RdbParser *p, void *userData, RdbStreamID *id, Rd

/* Start of (another) stream item? */
if ((ctx->streamCtx.xaddStartEndCounter % 2) == 0) {
startCmd.cmd = "XADD";
startCmd.key = ctx->keyCtx.key;
startCmd = (RdbxRespWriterStartCmd) {"XADD", ctx->keyCtx.key, 0};
startCmdRef = &startCmd;

/* writev XADD */
Expand Down Expand Up @@ -763,9 +733,7 @@ static RdbRes toRespStreamNewCGroup(RdbParser *p, void *userData, RdbBulk grpNam

int idLen = snprintf(idStr, sizeof(idStr), "%lu-%lu",meta->lastId.ms,meta->lastId.seq);

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "XGROUP";
startCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd startCmd = { "XGROUP", ctx->keyCtx.key, 0};

/* writev XGROUP */
if ( (meta->entriesRead>=0) && (ctx->targetRedisVerVal >= VER_VAL(7, 0))) {
Expand Down Expand Up @@ -845,9 +813,7 @@ static RdbRes toRespStreamConsumerPendingEntry(RdbParser *p, void *userData, Rdb
return (RdbRes) RDBX_ERR_STREAM_INTEG_CHECK;
}

RdbxRespWriterStartCmd startCmd;
startCmd.cmd = "XCLAIM";
startCmd.key = ctx->keyCtx.key;
RdbxRespWriterStartCmd startCmd = {"XCLAIM", ctx->keyCtx.key, 0};

/* writev XCLAIM */
IOV_CONST(&iov[iovs++], "*12\r\n$6\r\nXCLAIM");
Expand Down
15 changes: 11 additions & 4 deletions src/ext/readerResp.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,11 @@ static RespRes readRespReplyError(RespReaderCtx *ctx, RespReplyBuff *buffInfo) {
else
ctx->errorMsg[ctx->errorMsgLen - 1] = '\0';

res = RESP_REPLY_ERR;
/* Report the error. cb return 1 to propagate. 0 to mask */
if ((ctx->errCb) && (ctx->errCb(ctx->errCbCtx, ctx->errorMsg) == 0))
return RESP_REPLY_OK;

return RESP_REPLY_ERR;
}

return res;
Expand Down Expand Up @@ -450,9 +454,12 @@ static RespRes readRespReply(RespReaderCtx *ctx, RespReplyBuff *buffInfo) {
/*** non-static functions (public) ***/

void readRespInit(RespReaderCtx *ctx) {
ctx->type = 0;
ctx->errorMsgLen = 0;
ctx->countReplies = 0;
memset(ctx, 0, sizeof(RespReaderCtx));
}

void setErrorCb(RespReaderCtx *respReaderCtx, void *errorCbCtx, OnRespErrorCb cb) {
respReaderCtx->errCbCtx = errorCbCtx;
respReaderCtx->errCb = cb;
}

RespRes readRespReplies(RespReaderCtx *ctx, const char *buff, int buffLen) {
Expand Down
10 changes: 10 additions & 0 deletions src/ext/readerResp.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ typedef struct RespReplyBuff {
int at;
} RespReplyBuff;

/* cb to report on RESP error. Returns 1 to propagate. 0 to mask. */
typedef int (*OnRespErrorCb) (void *callerCtx, char *msg);

typedef struct {

/* PUBLIC: read-only */
Expand All @@ -33,8 +36,15 @@ typedef struct {
/* private bulk-array response state */
long long numBulksArray;

/* On RESP error callback */
void *errCbCtx;
OnRespErrorCb errCb;

} RespReaderCtx;

void readRespInit(RespReaderCtx *ctx);

/* Can register cb to decide whether to ignore given error or propagate it */
void setErrorCb(RespReaderCtx *respReaderCtx, void *errorCbCtx, OnRespErrorCb cb);

RespRes readRespReplies(RespReaderCtx *ctx, const char *buff, int buffLen);
Loading

0 comments on commit efff6b5

Please sign in to comment.