Skip to content

Commit

Permalink
PR fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
moticless committed Oct 12, 2023
1 parent 828c2fd commit f17a7ec
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 113 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
make test
working-directory: ${{github.workspace}}

- name: Build and test librdb vs. redis-unstable
- name: Test librdb vs. redis-unstable
run: |
pushd ~/redis
git checkout unstable
Expand Down Expand Up @@ -88,7 +88,7 @@ jobs:
make test
working-directory: ${{github.workspace}}

- name: Build and test librdb vs. redis-6.2
- name: Test librdb vs. redis-6.2
run: |
# clang is more strict to shared-obj versioning. Satisfy its needs.
pushd ~/redis
Expand All @@ -97,7 +97,6 @@ jobs:
make -C ~/redis/tests/modules
popd
export LIBRDB_REDIS_FOLDER=~/redis/src
make test
make clean debug test
working-directory: ${{github.workspace}}

104 changes: 52 additions & 52 deletions src/ext/handlersToResp.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ struct RdbxToResp {

struct {
int sentFirstFrag;
size_t rawSize;
size_t restoreSize;
int isModuleAux;
uint64_t crc;
struct {
char cmdPrefix[128];
int cmdlen;
} moduleAux;
} rawCtx;
} restoreCtx;

int srcRdbVer;
int dstRdbVer;
Expand Down Expand Up @@ -286,13 +286,13 @@ static inline RdbRes writevWrap(RdbxToResp *ctx, struct iovec *iov, int cnt, int
return RDB_OK;
}

static inline RdbRes sendFirstRawFrag(RdbxToResp *ctx, RdbBulk frag, size_t fragLen) {
static inline RdbRes sendFirstRestoreFrag(RdbxToResp *ctx, RdbBulk frag, size_t fragLen) {
long long expireTime = 0;
char expireTimeStr[32], expireTimeLenStr[32], keyLenStr[32], lenStr[32];
struct iovec iov[10];
int extra_args = 0, iovs = 0;

/* this logic must be exactly the same as in toRespRawFragEnd() */
/* this logic must be exactly the same as in toRespRestoreFragEnd() */
if (ctx->targetRedisVerVal >= VER_VAL(5, 0))
{
if (ctx->keyCtx.info.expiretime != -1) {
Expand Down Expand Up @@ -324,17 +324,17 @@ static inline RdbRes sendFirstRawFrag(RdbxToResp *ctx, RdbBulk frag, size_t frag
IOV_CONST(&iov[iovs++], "\r\n$1\r\n0\r\n$");
}

IOV_VALUE(&iov[iovs++], ctx->rawCtx.rawSize + 10, lenStr); /* write raw len + trailer */
IOV_STRING(&iov[iovs++], frag, fragLen); /* write first frag */
IOV_VALUE(&iov[iovs++], ctx->restoreCtx.restoreSize + 10, lenStr); /* write restore len + trailer */
IOV_STRING(&iov[iovs++], frag, fragLen); /* write first frag */
return writevWrap(ctx, iov, iovs, 1, 0);
}

static inline RdbRes sendFirstRawFragModuleAux(RdbxToResp *ctx, RdbBulk frag, size_t fragLen) {
static inline RdbRes sendFirstRestoreFragModuleAux(RdbxToResp *ctx, RdbBulk frag, size_t fragLen) {
struct iovec iov[3];
char lenStr[32];
iov[0].iov_base = ctx->rawCtx.moduleAux.cmdPrefix;
iov[0].iov_len = ctx->rawCtx.moduleAux.cmdlen;
IOV_LENGTH(&iov[1], ctx->rawCtx.rawSize + 10, lenStr); /* write raw len + trailer */
iov[0].iov_base = ctx->restoreCtx.moduleAux.cmdPrefix;
iov[0].iov_len = ctx->restoreCtx.moduleAux.cmdlen;
IOV_LENGTH(&iov[1], ctx->restoreCtx.restoreSize + 10, lenStr); /* write restore len + trailer */
IOV_STRING(&iov[2], frag, fragLen); /* write first frag */
return writevWrap(ctx, iov, 3, 1, 0);
}
Expand Down Expand Up @@ -783,21 +783,21 @@ static RdbRes toRespStreamConsumerPendingEntry(RdbParser *p, void *userData, Rdb
return writevWrap(ctx, iov, iovs, 1, 1);
}

/*** Handling raw ***/
/* Callback on start of serializing module aux data (alternative to toRespRawBegin).
* Following this call, one or more calls will be made to toRespRawFrag() to
* stream fragments of the serialized data. And at the end toRespRawFragEnd()
/*** Handling raw (RESTORE) ***/
/* Callback on start of serializing module aux data (alternative to toRespRestoreBegin).
* Following this call, one or more calls will be made to toRespRestoreFrag() to
* stream fragments of the serialized data. And at the end toRespRestoreFragEnd()
* will be called */
static RdbRes toRespRawBeginModuleAux(RdbParser *p, void *userData, RdbBulk name, int encver, int when, size_t rawSize) {
static RdbRes toRespRestoreBeginModuleAux(RdbParser *p, void *userData, RdbBulk name, int encver, int when, size_t rawSize) {
char encstr[10];
UNUSED(p);

/* reset rawCtx */
/* reset restoreCtx */
RdbxToResp *ctx = userData;
ctx->rawCtx.rawSize = rawSize;
ctx->rawCtx.sentFirstFrag = 0;
ctx->rawCtx.isModuleAux = 1;
ctx->rawCtx.crc = 0;
ctx->restoreCtx.restoreSize = rawSize;
ctx->restoreCtx.sentFirstFrag = 0;
ctx->restoreCtx.isModuleAux = 1;
ctx->restoreCtx.crc = 0;

/* if target doesn't support module-aux, then skip it */
if (!ctx->conf.supportRestoreModuleAux)
Expand All @@ -806,70 +806,70 @@ static RdbRes toRespRawBeginModuleAux(RdbParser *p, void *userData, RdbBulk name
/* Build the cmd instead of keeping the values and build it later */
size_t enclen = snprintf(encstr, sizeof(encstr), "%d", encver);
const char* whenstr = (when==_REDISMODULE_AUX_BEFORE_RDB) ? "before" :"after";
ctx->rawCtx.moduleAux.cmdlen = snprintf(ctx->rawCtx.moduleAux.cmdPrefix,
sizeof(ctx->rawCtx.moduleAux.cmdPrefix),
"*5\r\n$13\r\nRESTOREMODAUX\r\n$%zu\r\n%s\r\n$%zu\r\n%s\r\n$%zu\r\n%s",
strlen(name), name, enclen, encstr, strlen(whenstr), whenstr);
ctx->restoreCtx.moduleAux.cmdlen = snprintf(ctx->restoreCtx.moduleAux.cmdPrefix,
sizeof(ctx->restoreCtx.moduleAux.cmdPrefix),
"*5\r\n$13\r\nRESTOREMODAUX\r\n$%zu\r\n%s\r\n$%zu\r\n%s\r\n$%zu\r\n%s",
strlen(name), name, enclen, encstr, strlen(whenstr), whenstr);
return RDB_OK;
}

/* Callback on start of serializing value of a key. Following this call, one
* or more calls will be made to toRespRawFrag() to stream fragments of the
* serialized data. And at the end toRespRawFragEnd() will be called */
static RdbRes toRespRawBegin(RdbParser *p, void *userData, size_t size) {
* or more calls will be made to toRespRestoreFrag() to stream fragments of the
* serialized data. And at the end toRespRestoreFragEnd() will be called */
static RdbRes toRespRestoreBegin(RdbParser *p, void *userData, size_t size) {
UNUSED(p);
RdbxToResp *ctx = userData;

/* reset rawCtx */
ctx->rawCtx.rawSize = size;
ctx->rawCtx.sentFirstFrag = 0;
ctx->rawCtx.isModuleAux = 0;
ctx->rawCtx.crc = 0;
/* reset restoreCtx */
ctx->restoreCtx.restoreSize = size;
ctx->restoreCtx.sentFirstFrag = 0;
ctx->restoreCtx.isModuleAux = 0;
ctx->restoreCtx.crc = 0;
return RDB_OK;
}

/* Callback for fragments of a serialized value associated with a new key or module
* auxiliary data. This callback is invoked after toRespRawBegin() or
* toRespRawBeginModuleAux(), and it may be called multiple times until the
* serialization is complete. Finally, toRespRawFragEnd() will be called to signal
* auxiliary data. This callback is invoked after toRespRestoreBegin() or
* toRespRestoreBeginModuleAux(), and it may be called multiple times until the
* serialization is complete. Finally, toRespRestoreFragEnd() will be called to signal
* the registered handlers for the completion of the operation. */
static RdbRes toRespRawFrag(RdbParser *p, void *userData, RdbBulk frag) {
static RdbRes toRespRestoreFrag(RdbParser *p, void *userData, RdbBulk frag) {
UNUSED(p);
RdbxToResp *ctx = userData;
struct iovec iov[10];
int iovs = 0;

/* if processing module-aux but target doesn't support, then skip it */
if ( (ctx->rawCtx.isModuleAux) && (!ctx->conf.supportRestoreModuleAux))
if ((ctx->restoreCtx.isModuleAux) && (!ctx->conf.supportRestoreModuleAux))
return RDB_OK;

size_t fragLen = RDB_bulkLen(p, frag);
ctx->rawCtx.crc = crc64(ctx->rawCtx.crc, (unsigned char *) frag , fragLen);
ctx->restoreCtx.crc = crc64(ctx->restoreCtx.crc, (unsigned char *) frag , fragLen);

/* if first frag, handled differently */
if (likely(!(ctx->rawCtx.sentFirstFrag))) {
ctx->rawCtx.sentFirstFrag = 1;
if (ctx->rawCtx.isModuleAux)
return sendFirstRawFragModuleAux(ctx, frag, fragLen);
if (likely(!(ctx->restoreCtx.sentFirstFrag))) {
ctx->restoreCtx.sentFirstFrag = 1;
if (ctx->restoreCtx.isModuleAux)
return sendFirstRestoreFragModuleAux(ctx, frag, fragLen);
else
return sendFirstRawFrag(ctx, frag, fragLen);
return sendFirstRestoreFrag(ctx, frag, fragLen);
}

IOV_STRING(&iov[iovs++], frag, fragLen);
return writevWrap(ctx, iov, iovs, 1, 0);
}

/* This call will be followed one or more calls to toRespRawFrag() which indicates
/* This call will be followed one or more calls to toRespRestoreFrag() which indicates
* for completion of streaming of fragments of serialized value of a new key or
* module-aux data. */
static RdbRes toRespRawFragEnd(RdbParser *p, void *userData) {
static RdbRes toRespRestoreFragEnd(RdbParser *p, void *userData) {
UNUSED(p);
char cmd[1024]; /* degenerate usage of iov. All copied strings are small */
RdbxToResp *ctx = userData;
uint64_t *crc = &(ctx->rawCtx.crc);
uint64_t *crc = &(ctx->restoreCtx.crc);

/* if processing module-aux but target doesn't support, then skip it */
if ( (ctx->rawCtx.isModuleAux) && (!ctx->conf.supportRestoreModuleAux))
if ((ctx->restoreCtx.isModuleAux) && (!ctx->conf.supportRestoreModuleAux))
return RDB_OK;

/* Add RDB version 2 bytes. If it is module */
Expand Down Expand Up @@ -900,7 +900,7 @@ static RdbRes toRespRawFragEnd(RdbParser *p, void *userData) {
else
len += snprintf(cmd+len, sizeof(cmd)-len, "\r\n");

/* This logic must be exactly the same as in toRespRawFrag() */
/* This logic must be exactly the same as in toRespRestoreFrag() */
if (likely(ctx->targetRedisVerVal >= VER_VAL(5, 0))) {

/* Add ABSTTL */
Expand Down Expand Up @@ -980,10 +980,10 @@ _LIBRDB_API RdbxToResp *RDBX_createHandlersToResp(RdbParser *p, RdbxToRespConf *

toRespNewKey,
toRespEndKey,
toRespRawBeginModuleAux,
toRespRawBegin,
toRespRawFrag,
toRespRawFragEnd,
toRespRestoreBeginModuleAux,
toRespRestoreBegin,
toRespRestoreFrag,
toRespRestoreFragEnd,
};
RDB_createHandlersRaw(p, &rawCb, ctx, deleteRdbToRespCtx);

Expand Down
1 change: 1 addition & 0 deletions src/ext/readerResp.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ static inline void unused(void *dummy, ...) { (void)(dummy);}

#define MAX_RSP_BULK_SIZE 1024*1024

/* This limit doesn't really exist in redis but at client side (hiredis) */
#define MAX_ARRAY_ELEMENTS ((1LL<<32) - 1)

typedef enum RespReplyType {
Expand Down
2 changes: 1 addition & 1 deletion src/lib/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ DEBUG = -g3
LIBS =

ifeq ($(LIBRDB_DEBUG), 1)
CFLAGS += -DLIBRDB_DEBUG=1
CFLAGS += -DLIBRDB_DEBUG=1
else
CFLAGS += -DNDEBUG=1
endif
Expand Down
Loading

0 comments on commit f17a7ec

Please sign in to comment.