Skip to content

Commit

Permalink
RDB load expired fields sync to replicas
Browse files Browse the repository at this point in the history
  • Loading branch information
moticless committed May 26, 2024
1 parent 425190e commit 088430d
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 49 deletions.
2 changes: 1 addition & 1 deletion src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ void xorObjectDigest(redisDb *db, robj *keyobj, unsigned char *digest, robj *o)
sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_VALUE);
mixDigest(eledigest,sdsele,sdslen(sdsele));
sdsfree(sdsele);
/* field expire */
/* hash-field expiration (HFE) */
if (hi->expire_time != EB_EXPIRE_TIME_INVALID)
xorDigest(eledigest,"!!hexpire!!",11);
xorDigest(digest,eledigest,20);
Expand Down
2 changes: 1 addition & 1 deletion src/ebuckets.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ static inline uint64_t raxKey2BucketKey(unsigned char *raxKey) {
* Before: [segHdr] -> {item1,..,item16} -> [..]
* After: [segHdr] -> {newItem} -> [nextSegHdr] -> {item1,..,item16} -> [..]
*
* Take care to persist `segHdr` to be the same instance after the change.
* Taken care to persist `segHdr` to be the same instance after the change.
* This is important because the rax tree is pointing to it. */
static int ebSegAddExtended(EbucketsType *type, FirstSegHdr *firstSegHdr, eItem newItem) {
/* Allocate nextSegHdr and let it take the items of first segment header */
Expand Down
2 changes: 1 addition & 1 deletion src/ebuckets.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@
* The idea of it is to trim the rax tree depth, avoid having too many branches,
* and reduce frequent modifications of the tree to the minimum.
*/
#define EB_BUCKET_KEY_PRECISION 0 /* 1024msec */
#define EB_BUCKET_KEY_PRECISION 0 /* TBD: modify to 10 */

/* From expiration time to bucket-key */
#define EB_BUCKET_KEY(exptime) ((exptime) >> EB_BUCKET_KEY_PRECISION)
Expand Down
4 changes: 2 additions & 2 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -5375,13 +5375,13 @@ int RM_HashGet(RedisModuleKey *key, int flags, ...) {
if (flags & REDISMODULE_HASH_EXISTS) {
existsptr = va_arg(ap,int*);
if (key->value)
*existsptr = hashTypeExists(key->db, key->value,field->ptr);
*existsptr = hashTypeExists(key->db,key->value,field->ptr);
else
*existsptr = 0;
} else {
valueptr = va_arg(ap,RedisModuleString**);
if (key->value) {
*valueptr = hashTypeGetValueObject(key->db, key->value,field->ptr);
*valueptr = hashTypeGetValueObject(key->db,key->value,field->ptr);
if (*valueptr) {
robj *decoded = getDecodedObject(*valueptr);
decrRefCount(*valueptr);
Expand Down
78 changes: 41 additions & 37 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -2236,7 +2236,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags,

size_t fieldLen;
sds value, field;
uint64_t expire, minExpire = EB_EXPIRE_TIME_INVALID;
uint64_t expireAt, minExpire = EB_EXPIRE_TIME_INVALID;
mstime_t now = mstime();
dict *dupSearchDict = NULL;
ebuckets *hexpires = (db != NULL ? &db->hexpires : NULL);
Expand Down Expand Up @@ -2266,14 +2266,14 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags,
len--;

/* read the TTL */
if (rdbLoadLenByRef(rdb, NULL, &expire) == -1) {
if (rdbLoadLenByRef(rdb, NULL, &expireAt) == -1) {
serverLog(LL_WARNING, "failed reading hash TTL");
decrRefCount(o);
if (dupSearchDict != NULL) dictRelease(dupSearchDict);
return NULL;
}
if (expire > EB_EXPIRE_TIME_MAX) {
rdbReportCorruptRDB("invalid expire time: %llu", (unsigned long long)expire);
if (expireAt > EB_EXPIRE_TIME_MAX) {
rdbReportCorruptRDB("invalid expireAt time: %llu", (unsigned long long)expireAt);
decrRefCount(o);
return NULL;
}
Expand Down Expand Up @@ -2306,19 +2306,33 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags,
* Expired hash fields on the master are silently discarded.
* Note that if all fields in a hash has expired, the hash would not be
* created in memory (because it is created on the first valid field), and
* thus the key would be discarded as an "empty key" */
if (expire != 0 && iAmMaster() && ((mstime_t)expire < now) && /* note: expire was saved to RDB as unix-time in milliseconds */
* thus the key would be discarded as an "expired hash" */
if (expireAt != 0 && iAmMaster() && ((mstime_t)expireAt < now) &&
!(rdbflags & RDBFLAGS_AOF_PREAMBLE))
{
/* TODO: consider replication (like in rdbLoadAddKeyToDb) */
if (rdbflags & RDBFLAGS_FEED_REPL && db) {
/* Caller should have created replication backlog,
* and now this path only works when rebooting,
* so we don't have replicas yet. */
serverAssert(server.repl_backlog != NULL && listLength(server.slaves) == 0);
robj keyobj, fieldObj;
initStaticStringObject(keyobj,key);
initStaticStringObject(fieldObj, field);
robj *argv[3];
argv[0] = shared.hdel;
argv[1] = &keyobj;
argv[2] = &fieldObj;
replicationFeedSlaves(server.slaves,db->id,argv,3);
}

server.rdb_last_load_hash_fields_expired++;
sdsfree(field);
sdsfree(value);
continue;
}

/* keep the nearest expiration to connect listpack object to db expiry */
if ((expire != 0) && (expire < minExpire)) minExpire = expire;
if ((expireAt != 0) && (expireAt < minExpire)) minExpire = expireAt;

/* store the values read - either to listpack or dict */
if (o->encoding == OBJ_ENCODING_LISTPACK_EX) {
Expand All @@ -2340,7 +2354,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags,
/* check if the values can be saved to listpack (or should convert to dict encoding) */
if (sdslen(field) > server.hash_max_listpack_value ||
sdslen(value) > server.hash_max_listpack_value ||
!lpSafeToAdd(((listpackEx*)o->ptr)->lp, sdslen(field) + sdslen(value) + lpEntrySizeInteger(expire)))
!lpSafeToAdd(((listpackEx*)o->ptr)->lp, sdslen(field) + sdslen(value) + lpEntrySizeInteger(expireAt)))
{
/* convert to hash */
hashTypeConvert(o, OBJ_ENCODING_HT, hexpires);
Expand All @@ -2358,7 +2372,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags,

/* don't add the values to the new hash: the next if will catch and the values will be added there */
} else {
listpackExAddNew(o, field, value, expire);
listpackExAddNew(o, field, value, expireAt);
sdsfree(field);
sdsfree(value);
}
Expand All @@ -2367,10 +2381,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags,
if (o->encoding == OBJ_ENCODING_HT) {
/* WA for check-rdb mode, when there's no DB so can't attach expired items to ebuckets,
* or when no expiry was not set for this field */
if ((db == NULL) || (expire == 0)) {
if ((db == NULL) || (expireAt == 0)) {
hashTypeSet(db, o, field, value, 0);
} else {
if (hashTypeSetExRdb(db, o, field, value, expire) != C_OK) {
if (hashTypeSetExRdb(db, o, field, value, expireAt) != C_OK) {
serverLog(LL_WARNING, "failed adding hash field %s to key %s", field, key);
decrRefCount(o);
if (dupSearchDict != NULL) dictRelease(dupSearchDict);
Expand All @@ -2389,7 +2403,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags,
/* check for empty key (if all fields were expired) */
if (hashTypeLength(o, 0) == 0) {
decrRefCount(o);
goto emptykey;
goto expiredHash;
}
if ((db != NULL) && (minExpire != EB_EXPIRE_TIME_INVALID))
hashTypeAddToExpires(db, key, o, minExpire);
Expand Down Expand Up @@ -2719,31 +2733,15 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags,
/* for TTL listpack, find the minimum expiry */
uint64_t minExpire = hashTypeGetNextTimeToExpire(o);

/* check if need to convert to dict encoding */
if ((db != NULL) &&
(hashTypeLength(o, 0) > server.hash_max_listpack_entries)) /* TODO: each field length is not verified against server.hash_max_listpack_value */
{
hashTypeConvert(o, OBJ_ENCODING_HT, &db->hexpires);
/*
* hashTypeAddToExpires is presumably called from within
* the convert function (from listpackEx to dict), BUT,
* this call depends on the lpt->meta field to be updated,
* which is not the case here as hashTypeAddToExpires was
* not yet called for the listpack (which is what updating
* its meta).
* Instead, this "manual" call is added here.
* Another approach would be to have the conversion function
* find the minExpire by itself when iterating on the listpack
* instead of relying on the meta and use this value for the
* final ebAdd call.
*/
/* Convert listpac to hash table without register in global HFE DS
* since the listpack is not connected yet to the DB */
if (hashTypeLength(o, 0) > server.hash_max_listpack_entries)
hashTypeConvert(o, OBJ_ENCODING_HT, NULL /*db->hexpires*/);

/* Only now register in global HFE DS, if db provided */
if (db != NULL)
hashTypeAddToExpires(db, key, o, minExpire);
} else if (rdbtype == RDB_TYPE_HASH_LISTPACK_EX) {
/* connect the listpack to the DB-global expiry data structure */
if ((minExpire != EB_EXPIRE_TIME_INVALID) && (db != NULL)) { /* DB can be NULL when checking rdb */
hashTypeAddToExpires(db, key, o, minExpire);
}
}

break;
default:
/* totally unreachable */
Expand Down Expand Up @@ -3134,6 +3132,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags,
emptykey:
if (error) *error = RDB_LOAD_ERR_EMPTY_KEY;
return NULL;
expiredHash:
if (error) *error = RDB_LOAD_ERR_EXPIRED_HASH;
return NULL;
}

/* Mark that we are loading in the global state and setup the fields
Expand Down Expand Up @@ -3563,6 +3564,9 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
if(empty_keys_skipped++ < 10)
serverLog(LL_NOTICE, "rdbLoadObject skipping empty key: %s", key);
sdsfree(key);
} else if (error == RDB_LOAD_ERR_EXPIRED_HASH) {
/* Valid flow. Continue. */
sdsfree(key);
} else {
sdsfree(key);
goto eoferr;
Expand Down
5 changes: 3 additions & 2 deletions src/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@

/* When rdbLoadObject() returns NULL, the err flag is
* set to hold the type of error that occurred */
#define RDB_LOAD_ERR_EMPTY_KEY 1 /* Error of empty key */
#define RDB_LOAD_ERR_OTHER 2 /* Any other errors */
#define RDB_LOAD_ERR_EMPTY_KEY 1 /* Error of empty key */
#define RDB_LOAD_ERR_EXPIRED_HASH 2 /* Expired hash since all its fields are expired */
#define RDB_LOAD_ERR_OTHER 3 /* Any other errors */

ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len);
int rdbSaveType(rio *rdb, unsigned char type);
Expand Down
8 changes: 5 additions & 3 deletions src/t_hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ robj *hashTypeGetValueObject(redisDb *db, robj *o, sds field) {
unsigned int vlen;
long long vll;

if (hashTypeGetValue(db, o,field,&vstr,&vlen,&vll) == C_ERR) return NULL;
if (hashTypeGetValue(db,o,field,&vstr,&vlen,&vll) == C_ERR) return NULL;
if (vstr) return createStringObject((char*)vstr,vlen);
else return createStringObjectFromLongLong(vll);
}
Expand Down Expand Up @@ -1676,7 +1676,7 @@ void hashTypeConvertListpackEx(robj *o, int enc, ebuckets *hexpires) {
}
}

/* NOTE: hexpires can be NULL (Won't attempt to register in global HFE DS) */
/* NOTE: hexpires can be NULL (Won't register in global HFE DS) */
void hashTypeConvert(robj *o, int enc, ebuckets *hexpires) {
if (o->encoding == OBJ_ENCODING_LISTPACK) {
hashTypeConvertListpack(o, enc);
Expand Down Expand Up @@ -4025,7 +4025,9 @@ void hsetfCommand(client *c) {
return;

hashObj = lookupKeyWrite(c->db, c->argv[1]);
if (!hashObj) {
if (hashObj) {
if (checkType(c, hashObj, OBJ_HASH)) return;
} else {
/* Don't create the object if command has DC or DCF arguments */
if (flags & HFE_CMD_DC || flags & HFE_CMD_DCF) {
addReplyOrErrorObject(c, shared.null[c->resp]);
Expand Down
11 changes: 11 additions & 0 deletions tests/integration/psync2-master-restart.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,15 @@ start_server {} {
$master set $j somevalue px 10
}

##### hash-field-expiration
# Hashes of type OBJ_ENCODING_LISTPACK_EX won't be discarded during
# RDB load, even if they are expired.
$master hsetf myhash1 PX 10 FVS 3 f1 v1 f2 v2 f3 v3
# Hashes of type RDB_TYPE_HASH_METADATA will be discarded during RDB load.
$master config set hash-max-listpack-entries 0
$master hsetf myhash2 PX 10 FVS 2 f1 v1 f2 v2
$master config set hash-max-listpack-entries 1

after 20

wait_for_condition 500 100 {
Expand Down Expand Up @@ -209,6 +218,8 @@ start_server {} {
assert {[status $master sync_partial_ok] == 0}
assert {[status $master sync_full] == 1}
assert {[status $master rdb_last_load_keys_expired] == 2048}

assert {[status $master rdb_last_load_hash_fields_expired] == 2}
assert {[status $replica sync_full] == 1}

set digest [$master debug digest]
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/type/hash-field-expire.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -1286,12 +1286,12 @@ start_server {tags {"external:skip needs:debug"}} {
r config resetstat
r del myhash
r hset myhash f1 v1 f2 v2 f3 v3 f4 v4 f5 v5
r hpexpire myhash 100 1 f1 f2 f3
r hpexpire myhash 100 FIELDS 3 f1 f2 f3

assert_match [get_hashes_with_expiry_fields r] 1
r hset myhash2 f1 v1 f2 v2 f3 v3 f4 v4 f5 v5
assert_match [get_hashes_with_expiry_fields r] 1
r hpexpire myhash2 100 1 f1 f2 f3
r hpexpire myhash2 100 FIELDS 3 f1 f2 f3
assert_match [get_hashes_with_expiry_fields r] 2

wait_for_condition 50 50 {
Expand Down

0 comments on commit 088430d

Please sign in to comment.