Skip to content

Commit

Permalink
Merge branch 'hfe-integ-aof-and-replica' into hfe
Browse files Browse the repository at this point in the history
  • Loading branch information
sundb committed May 28, 2024
2 parents 8ea9240 + a5c4bac commit 4a27d61
Show file tree
Hide file tree
Showing 17 changed files with 864 additions and 285 deletions.
65 changes: 44 additions & 21 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1939,7 +1939,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
*
* The function returns 0 on error, non-zero on success. */
static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) {
if (hi->encoding == OBJ_ENCODING_LISTPACK) {
if ((hi->encoding == OBJ_ENCODING_LISTPACK) || (hi->encoding == OBJ_ENCODING_LISTPACK_EX)) {
unsigned char *vstr = NULL;
unsigned int vlen = UINT_MAX;
long long vll = LLONG_MAX;
Expand All @@ -1963,37 +1963,60 @@ static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) {
/* Emit the commands needed to rebuild a hash object.
* The function returns 0 on error, 1 on success. */
int rewriteHashObject(rio *r, robj *key, robj *o) {
int res = 0; /*fail*/

hashTypeIterator *hi;
long long count = 0, items = hashTypeLength(o, 0);

int isHFE = hashTypeGetMinExpire(o) != EB_EXPIRE_TIME_INVALID;
hi = hashTypeInitIterator(o);
while (hashTypeNext(hi, 0) != C_ERR) {
if (count == 0) {
int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
AOF_REWRITE_ITEMS_PER_CMD : items;

if (!rioWriteBulkCount(r,'*',2+cmd_items*2) ||
!rioWriteBulkString(r,"HMSET",5) ||
!rioWriteBulkObject(r,key))
{
hashTypeReleaseIterator(hi);
return 0;
if (!isHFE) {
while (hashTypeNext(hi, 0) != C_ERR) {
if (count == 0) {
int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
AOF_REWRITE_ITEMS_PER_CMD : items;
if (!rioWriteBulkCount(r, '*', 2 + cmd_items * 2) ||
!rioWriteBulkString(r, "HMSET", 5) ||
!rioWriteBulkObject(r, key))
goto reHashEnd;
}
}

if (!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY) ||
!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_VALUE))
{
hashTypeReleaseIterator(hi);
return 0;
if (!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY) ||
!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_VALUE))
goto reHashEnd;

if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
items--;
}
} else {
while (hashTypeNext(hi, 0) != C_ERR) {

char hmsetCmd[] = "*4\r\n$5\r\nHMSET\r\n";
if ( (!rioWrite(r, hmsetCmd, sizeof(hmsetCmd) - 1)) ||
(!rioWriteBulkObject(r, key)) ||
(!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY)) ||
(!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_VALUE)) )
goto reHashEnd;

if (hi->expire_time != EB_EXPIRE_TIME_INVALID) {
char cmd[] = "*6\r\n$10\r\nHPEXPIREAT\r\n";
if ( (!rioWrite(r, cmd, sizeof(cmd) - 1)) ||
(!rioWriteBulkObject(r, key)) ||
(!rioWriteBulkLongLong(r, hi->expire_time)) ||
(!rioWriteBulkString(r, "FIELDS", 6)) ||
(!rioWriteBulkString(r, "1", 1)) ||
(!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY)) )
goto reHashEnd;
}
}
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
items--;
}

hashTypeReleaseIterator(hi);
res = 1; /* success */

return 1;
reHashEnd:
hashTypeReleaseIterator(hi);
return res;
}

/* Helper for rewriteStreamObject() that generates a bulk string into the
Expand Down
7 changes: 7 additions & 0 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,11 @@ static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite, dictEnt
old = dictGetVal(de);
}
kvstoreDictSetVal(db->keys, slot, de, val);

/* if hash with HFEs, take care to remove from global HFE DS */
if (old->type == OBJ_HASH)
hashTypeRemoveFromExpires(&db->hexpires, old);

if (server.lazyfree_lazy_server_del) {
freeObjAsync(key,old,db->id);
} else {
Expand Down Expand Up @@ -1764,11 +1769,13 @@ void swapMainDbWithTempDb(redisDb *tempDb) {
* remain in the same DB they were. */
activedb->keys = newdb->keys;
activedb->expires = newdb->expires;
activedb->hexpires = newdb->hexpires;
activedb->avg_ttl = newdb->avg_ttl;
activedb->expires_cursor = newdb->expires_cursor;

newdb->keys = aux.keys;
newdb->expires = aux.expires;
newdb->hexpires = aux.hexpires;
newdb->avg_ttl = aux.avg_ttl;
newdb->expires_cursor = aux.expires_cursor;

Expand Down
5 changes: 5 additions & 0 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,18 @@ void xorObjectDigest(redisDb *db, robj *keyobj, unsigned char *digest, robj *o)
unsigned char eledigest[20];
sds sdsele;

/* field */
memset(eledigest,0,20);
sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_KEY);
mixDigest(eledigest,sdsele,sdslen(sdsele));
sdsfree(sdsele);
/* val */
sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_VALUE);
mixDigest(eledigest,sdsele,sdslen(sdsele));
sdsfree(sdsele);
/* hash-field expiration (HFE) */
if (hi->expire_time != EB_EXPIRE_TIME_INVALID)
xorDigest(eledigest,"!!hexpire!!",11);
xorDigest(digest,eledigest,20);
}
hashTypeReleaseIterator(hi);
Expand Down
25 changes: 15 additions & 10 deletions src/ebuckets.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@
* #define EB_VALIDATE_DEBUG 1
*/

#if (REDIS_TEST || EB_VALIDATE_DEBUG) && !defined(EB_TEST_BENCHMARK)
#define EB_VALIDATE_STRUCTURE(eb, type) ebValidate(eb, type)
#else
#define EB_VALIDATE_STRUCTURE(eb, type) // Do nothing
#endif

/*** BENCHMARK
*
* To benchmark ebuckets creation and active-expire with 10 million items, apply
Expand Down Expand Up @@ -190,7 +196,7 @@ static inline uint64_t raxKey2BucketKey(unsigned char *raxKey) {
* Before: [segHdr] -> {item1,..,item16} -> [..]
* After: [segHdr] -> {newItem} -> [nextSegHdr] -> {item1,..,item16} -> [..]
*
* Take care to persist `segHdr` to be the same instance after the change.
* Taken care to persist `segHdr` to be the same instance after the change.
* This is important because the rax tree is pointing to it. */
static int ebSegAddExtended(EbucketsType *type, FirstSegHdr *firstSegHdr, eItem newItem) {
/* Allocate nextSegHdr and let it take the items of first segment header */
Expand Down Expand Up @@ -1390,9 +1396,8 @@ int ebRemove(ebuckets *eb, EbucketsType *type, eItem item) {
if (res)
type->getExpireMeta(item)->trash = 1;

#if (REDIS_TEST || EB_VALIDATE_DEBUG) && !defined(EB_TEST_BENCHMARK)
ebValidate(*eb, type);
#endif
EB_VALIDATE_STRUCTURE(*eb, type);

return res;
}

Expand Down Expand Up @@ -1435,9 +1440,9 @@ int ebAdd(ebuckets *eb, EbucketsType *type, eItem item, uint64_t expireTime) {
/* Add item to rax */
res = ebAddToRax(eb, type, item, EB_BUCKET_KEY(expireTime));
}
#if (REDIS_TEST || EB_VALIDATE_DEBUG) && !defined(EB_TEST_BENCHMARK)
ebValidate(*eb, type);
#endif

EB_VALIDATE_STRUCTURE(*eb, type);

return res;
}

Expand Down Expand Up @@ -1521,9 +1526,9 @@ void ebExpire(ebuckets *eb, EbucketsType *type, ExpireInfo *info) {
ebAdd(eb, type, updateList, ebGetMetaExpTime(mItem));
updateList = next;
}
#if (REDIS_TEST || EB_VALIDATE_DEBUG) && !defined(EB_TEST_BENCHMARK)
ebValidate(*eb, type);
#endif

EB_VALIDATE_STRUCTURE(*eb, type);

return;
}

Expand Down
2 changes: 1 addition & 1 deletion src/ebuckets.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@
* The idea of it is to trim the rax tree depth, avoid having too many branches,
* and reduce frequent modifications of the tree to the minimum.
*/
#define EB_BUCKET_KEY_PRECISION 0 /* 1024msec */
#define EB_BUCKET_KEY_PRECISION 0 /* TBD: modify to 10 */

/* From expiration time to bucket-key */
#define EB_BUCKET_KEY(exptime) ((exptime) >> EB_BUCKET_KEY_PRECISION)
Expand Down
23 changes: 17 additions & 6 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -5271,7 +5271,10 @@ int RM_HashSet(RedisModuleKey *key, int flags, ...) {

/* Handle XX and NX */
if (flags & (REDISMODULE_HASH_XX|REDISMODULE_HASH_NX)) {
int exists = hashTypeExists(key->value, field->ptr);
int isHashDeleted;
int exists = hashTypeExists(key->db, key->value, field->ptr, &isHashDeleted);
/* hash-field-expiration is not exposed to modules */
serverAssert(isHashDeleted == 0);
if (((flags & REDISMODULE_HASH_XX) && !exists) ||
((flags & REDISMODULE_HASH_NX) && exists))
{
Expand All @@ -5282,7 +5285,7 @@ int RM_HashSet(RedisModuleKey *key, int flags, ...) {

/* Handle deletion if value is REDISMODULE_HASH_DELETE. */
if (value == REDISMODULE_HASH_DELETE) {
count += hashTypeDelete(key->value, field->ptr);
count += hashTypeDelete(key->value, field->ptr, 1);
if (flags & REDISMODULE_HASH_CFIELDS) decrRefCount(field);
continue;
}
Expand Down Expand Up @@ -5374,14 +5377,22 @@ int RM_HashGet(RedisModuleKey *key, int flags, ...) {
/* Query the hash for existence or value object. */
if (flags & REDISMODULE_HASH_EXISTS) {
existsptr = va_arg(ap,int*);
if (key->value)
*existsptr = hashTypeExists(key->value,field->ptr);
else
if (key->value) {
int isHashDeleted;
*existsptr = hashTypeExists(key->db, key->value, field->ptr, &isHashDeleted);
/* hash-field-expiration is not exposed to modules */
serverAssert(isHashDeleted == 0);
} else {
*existsptr = 0;
}
} else {
int isHashDeleted;
valueptr = va_arg(ap,RedisModuleString**);
if (key->value) {
*valueptr = hashTypeGetValueObject(key->value,field->ptr);
*valueptr = hashTypeGetValueObject(key->db,key->value,field->ptr, &isHashDeleted);

/* Currently hash-field-expiration is not exposed to modules */
serverAssert(isHashDeleted == 0);
if (*valueptr) {
robj *decoded = getDecodedObject(*valueptr);
decrRefCount(*valueptr);
Expand Down
19 changes: 15 additions & 4 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -3757,7 +3757,9 @@ void replaceClientCommandVector(client *c, int argc, robj **argv) {
* 1. Make sure there are no "holes" and all the arguments are set.
* 2. If the original argument vector was longer than the one we
* want to end with, it's up to the caller to set c->argc and
* free the no longer used objects on c->argv. */
* free the no longer used objects on c->argv.
* 3. To remove argument at i'th index, pass NULL as new value
*/
void rewriteClientCommandArgument(client *c, int i, robj *newval) {
robj *oldval;
retainOriginalCommandVector(c);
Expand All @@ -3775,9 +3777,18 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
}
oldval = c->argv[i];
if (oldval) c->argv_len_sum -= getStringObjectLen(oldval);
if (newval) c->argv_len_sum += getStringObjectLen(newval);
c->argv[i] = newval;
incrRefCount(newval);

if (newval) {
c->argv[i] = newval;
incrRefCount(newval);
c->argv_len_sum += getStringObjectLen(newval);
} else {
/* move the remaining arguments one step left */
for (int j = i+1; j < c->argc; j++) {
c->argv[j-1] = c->argv[j];
}
c->argv[--c->argc] = NULL;
}
if (oldval) decrRefCount(oldval);

/* If this is the command name make sure to fix c->cmd. */
Expand Down
Loading

0 comments on commit 4a27d61

Please sign in to comment.