Skip to content

Commit

Permalink
Fix and refactor RDB load HFEs
Browse files Browse the repository at this point in the history
  • Loading branch information
moticless committed May 29, 2024
1 parent e16b27d commit 6074be9
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 223 deletions.
13 changes: 11 additions & 2 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ void restoreCommand(client *c) {

rioInitWithBuffer(&payload,c->argv[3]->ptr);
if (((type = rdbLoadObjectType(&payload)) == -1) ||
((obj = rdbLoadObject(type,&payload,key->ptr,c->db,0,NULL)) == NULL))
((obj = rdbLoadObject(type,&payload,key->ptr,c->db,NULL)) == NULL))
{
addReplyError(c,"Bad data format");
return;
Expand All @@ -265,7 +265,16 @@ void restoreCommand(client *c) {
}

/* Create the key and set the TTL if any */
dbAdd(c->db,key,obj);
dictEntry *de = dbAdd(c->db,key,obj);

/* register hash in global HFE DS if needed.
*
* For that purpose rdbLoadObject() took care to write minimum expiration
* time as leftover value in the ExpireMeta (Marked as trash) of the object.
* That is why we indicate 0 as the expiration time here. */
if (obj->type == OBJ_HASH)
hashTypeAddToExpires(c->db, dictGetKey(de), obj, 0);

if (ttl) {
setExpire(c,c->db,key,ttl);
if (!absttl) {
Expand Down
3 changes: 3 additions & 0 deletions src/mstr.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ static inline int mstrIsMetaAttached(mstr s) { return s[-1] & MSTR_META_MASK; }
/* return whether if a specific flag-index is set */
static inline int mstrGetFlag(mstr s, int flagIdx) { return *mstrFlagsRef(s) & (1 << flagIdx); }

/* DEBUG */
void mstrPrint(mstr s, struct mstrKind *kind, int verbose);

/* See comment above about MSTR-ALIGNMENT(2) */
static_assert(sizeof(struct mstrhdr5 ) % 2 == 1, "must be odd");
static_assert(sizeof(struct mstrhdr8 ) % 2 == 1, "must be odd");
Expand Down
174 changes: 81 additions & 93 deletions src/rdb.c

Large diffs are not rendered by default.

13 changes: 7 additions & 6 deletions src/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,12 @@
#define RDB_MODULE_OPCODE_STRING 5 /* String. */

/* rdbLoad...() functions flags. */
#define RDB_LOAD_NONE 0
#define RDB_LOAD_ENC (1<<0)
#define RDB_LOAD_PLAIN (1<<1)
#define RDB_LOAD_SDS (1<<2)
#define RDB_LOAD_HFLD (1<<3)
#define RDB_LOAD_NONE 0
#define RDB_LOAD_ENC (1<<0)
#define RDB_LOAD_PLAIN (1<<1)
#define RDB_LOAD_SDS (1<<2)
#define RDB_LOAD_HFLD (1<<3)
#define RDB_LOAD_HFLD_TTL (1<<4)

/* flags on the purpose of rdb save or load */
#define RDBFLAGS_NONE 0 /* No special RDB loading or saving. */
Expand Down Expand Up @@ -142,7 +143,7 @@ int rdbSaveToFile(const char *filename);
int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags);
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid);
size_t rdbSavedObjectLen(robj *o, robj *key, int dbid);
robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb *db, int rdbflags, int *error);
robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb *db, int *error);
void backgroundSaveDoneHandler(int exitcode, int bysignal);
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime,int dbid);
ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt);
Expand Down
2 changes: 1 addition & 1 deletion src/redis-check-rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
rdbstate.keys++;
/* Read value */
rdbstate.doing = RDB_CHECK_DOING_READ_OBJECT_VALUE;
if ((val = rdbLoadObject(type,&rdb,key->ptr,NULL,0,NULL)) == NULL) goto eoferr;
if ((val = rdbLoadObject(type,&rdb,key->ptr,NULL,NULL)) == NULL) goto eoferr;
/* Check if the key already expired. */
if (expiretime != -1 && expiretime < now)
rdbstate.already_expired++;
Expand Down
2 changes: 0 additions & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2713,7 +2713,6 @@ void initServer(void) {
server.rdb_save_time_start = -1;
server.rdb_last_load_keys_expired = 0;
server.rdb_last_load_keys_loaded = 0;
server.rdb_last_load_hash_fields_expired = 0;
server.dirty = 0;
resetServerStats();
/* A few stats we don't want to reset: server startup time, and peak mem. */
Expand Down Expand Up @@ -5778,7 +5777,6 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
"rdb_last_cow_size:%zu\r\n", server.stat_rdb_cow_bytes,
"rdb_last_load_keys_expired:%lld\r\n", server.rdb_last_load_keys_expired,
"rdb_last_load_keys_loaded:%lld\r\n", server.rdb_last_load_keys_loaded,
"rdb_last_load_hash_fields_expired:%lld\r\n", server.rdb_last_load_hash_fields_expired,
"aof_enabled:%d\r\n", server.aof_state != AOF_OFF,
"aof_rewrite_in_progress:%d\r\n", server.child_type == CHILD_TYPE_AOF,
"aof_rewrite_scheduled:%d\r\n", server.aof_rewrite_scheduled,
Expand Down
19 changes: 16 additions & 3 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1807,7 +1807,6 @@ struct redisServer {
long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */
long long rdb_last_load_keys_expired; /* number of expired keys when loading RDB */
long long rdb_last_load_keys_loaded; /* number of loaded keys when loading RDB */
long long rdb_last_load_hash_fields_expired; /* number of expired hash fields when loading RDB */
struct saveparam *saveparams; /* Save points array for RDB */
int saveparamslen; /* Number of saving points */
char *rdb_filename; /* Name of RDB file */
Expand Down Expand Up @@ -3172,6 +3171,20 @@ typedef struct listpackEx {
are ordered by ttl. */
} listpackEx;

/* Each dict of hash object that has fields with time-Expiration will have the
* following metadata attached to dict header */
typedef struct dictExpireMetadata {
ExpireMeta expireMeta; /* embedded ExpireMeta in dict.
To be used in order to register the hash in the
global ebuckets (i.e db->hexpires) with next,
minimum, hash-field to expire */
ebuckets hfe; /* DS of Hash Fields Expiration, associated to each hash */
sds key; /* reference to the key, same one that stored in
db->dict. Will be used from active-expiration flow
for notification and deletion of the object, if
needed. */
} dictExpireMetadata;

/* Hash data type */
#define HASH_SET_TAKE_FIELD (1<<0)
#define HASH_SET_TAKE_VALUE (1<<1)
Expand Down Expand Up @@ -3208,12 +3221,12 @@ unsigned char *hashTypeListpackGetLp(robj *o);
uint64_t hashTypeGetMinExpire(robj *o);
void hashTypeUpdateKeyRef(robj *o, sds newkey);
ebuckets *hashTypeGetDictMetaHFE(dict *d);
int hashTypeSetExRdb(redisDb *db, robj *o, sds field, sds value, uint64_t expire_at);
uint64_t hashTypeGetMinExpire(robj *keyObj);
uint64_t hashTypeGetNextTimeToExpire(robj *o);
void initDictExpireMetadata(sds key, robj *o);
struct listpackEx *listpackExCreate(void);
void listpackExAddNew(robj *o, sds field, sds value, uint64_t expireAt);
void listpackExAddNew(robj *o, char *field, size_t flen,
char *value, size_t vlen, uint64_t expireAt);

/* Hash-Field data type (of t_hash.c) */
hfield hfieldNew(const void *field, size_t fieldlen, int withExpireMeta);
Expand Down
83 changes: 24 additions & 59 deletions src/t_hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,20 +108,6 @@ EbucketsType hashFieldExpireBucketsType = {
.itemsAddrAreOdd = 1, /* Addresses of hfield (mstr) are odd!! */
};

/* Each dict of hash object that has fields with time-Expiration will have the
* following metadata attached to dict header */
typedef struct dictExpireMetadata {
ExpireMeta expireMeta; /* embedded ExpireMeta in dict.
To be used in order to register the hash in the
global ebuckets (i.e db->hexpires) with next,
minimum, hash-field to expire */
ebuckets hfe; /* DS of Hash Fields Expiration, associated to each hash */
sds key; /* reference to the key, same one that stored in
db->dict. Will be used from active-expiration flow
for notification and deletion of the object, if
needed. */
} dictExpireMetadata;

/* ActiveExpireCtx passed to hashTypeActiveExpire() */
typedef struct ActiveExpireCtx {
uint32_t fieldsToExpireQuota;
Expand Down Expand Up @@ -489,10 +475,11 @@ static void listpackExAddInternal(robj *o, listpackEntry ent[3]) {
}

/* Add new field ordered by expire time. */
void listpackExAddNew(robj *o, sds field, sds value, uint64_t expireAt) {
void listpackExAddNew(robj *o, char *field, size_t flen,
char *value, size_t vlen, uint64_t expireAt) {
listpackEntry ent[3] = {
{.sval = (unsigned char*) field, .slen = sdslen(field)},
{.sval = (unsigned char*) value, .slen = sdslen(value)},
{.sval = (unsigned char*) field, .slen = flen},
{.sval = (unsigned char*) value, .slen = vlen},
{.lval = expireAt}
};

Expand Down Expand Up @@ -1064,33 +1051,6 @@ SetExRes hashTypeSetEx(redisDb *db, robj *o, sds field, HashTypeSet *setKeyVal,
return res;
}

/*
* hashTypeSetExRdb provide a simplified API for setting fields & expiry by RDB load
*
* It is the duty of RDB reading process to track minimal expiration time of the
* fields and eventually call hashTypeAddToExpires() to update global HFE DS with
* next expiration time.
*
* To just add a field with no expiry, use hashTypeSet instead.
*/
int hashTypeSetExRdb(redisDb *db, robj *o, sds field, sds value, uint64_t expire_at) {
/* Dummy struct to be used in hashTypeSetEx() */
HashTypeSetEx setEx = {
.fieldSetCond = FIELD_DONT_OVRWRT, /* Shouldn't be any duplication */
.expireSetCond = HFE_NX, /* Should set expiry once each field */
.minExpire = EB_EXPIRE_TIME_INVALID, /* Won't be used. Accounting made by RDB already */
.key = NULL, /* Not going to call hashTypeSetExDone() */
.hashObj = o,
.minExpireFields = EB_EXPIRE_TIME_INVALID, /* Not needed by RDB */
.c = NULL, /* No notification required */
.cmd = NULL, /* No notification required */
};

HashTypeSet setKeyVal = {.value = value, .flags = 0};
SetExRes res = hashTypeSetEx(db, o, field, &setKeyVal, expire_at, (expire_at) ? &setEx : NULL);
return (res == HSETEX_OK || res == HSET_UPDATE) ? C_OK : C_ERR;
}

void initDictExpireMetadata(sds key, robj *o) {
dict *ht = o->ptr;

Expand Down Expand Up @@ -1278,7 +1238,8 @@ static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeS

if (!fptr) {
if (setParams) {
listpackExAddNew(o, field, setParams->value,
listpackExAddNew(o, field, sdslen(field),
setParams->value, sdslen(setParams->value),
exParams ? expireAt : HASH_LP_NO_TTL);
} else {
res = HSETEX_NO_FIELD;
Expand Down Expand Up @@ -2011,29 +1972,33 @@ uint64_t hashTypeRemoveFromExpires(ebuckets *hexpires, robj *o) {

/* Add hash to global HFE DS and update key for notifications.
*
* key - must be the same instance that is stored in db->dict
* key - must be the same key instance that is persisted in db->dict
* expireTime - expiration in msec.
* If eq. 0 then the hash will be added to the global HFE DS with
* the minimum expiration time that is already written in advance
* to attached metadata (which considered as trash as long as it is
* not attached to global HFE DS).
*
* Precondition: It is a hash of type listpackex or HT with HFE metadata.
*/
void hashTypeAddToExpires(redisDb *db, sds key, robj *hashObj, uint64_t expireTime) {
if (expireTime == EB_EXPIRE_TIME_INVALID)
if (expireTime > EB_EXPIRE_TIME_MAX)
return;

if (hashObj->encoding == OBJ_ENCODING_LISTPACK_EX) {
listpackEx *lpt = hashObj->ptr;
lpt->key = key;
expireTime = (expireTime) ? expireTime : ebGetMetaExpTime(&lpt->meta);
ebAdd(&db->hexpires, &hashExpireBucketsType, hashObj, expireTime);
return;
} else if (hashObj->encoding == OBJ_ENCODING_HT) {
dict *d = hashObj->ptr;
if (isDictWithMetaHFE(d)) {
dictExpireMetadata *meta = (dictExpireMetadata *) dictMetadata(d);
expireTime = (expireTime) ? expireTime : ebGetMetaExpTime(&meta->expireMeta);
meta->key = key;
ebAdd(&db->hexpires, &hashExpireBucketsType, hashObj, expireTime);
}
}
serverAssert(hashObj->encoding == OBJ_ENCODING_HT);

serverAssert(isDictWithMetaHFE(hashObj->ptr));

/* Update hash with key for notifications */
dict *d = hashObj->ptr;
dictExpireMetadata *dictExpireMeta = (dictExpireMetadata *) dictMetadata(d);
dictExpireMeta->key = key;

/* Add hash to global HFE DS */
ebAdd(&db->hexpires, &hashExpireBucketsType, hashObj, expireTime);
}

/* DB active expire and update hashes with time-expiration on fields.
Expand Down
1 change: 0 additions & 1 deletion tests/integration/psync2-master-restart.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ start_server {} {
assert {[status $master sync_full] == 1}
assert {[status $master rdb_last_load_keys_expired] == 2048}

assert {[status $master rdb_last_load_hash_fields_expired] == 2}
assert {[status $replica sync_full] == 1}

set digest [$master debug digest]
Expand Down
Loading

0 comments on commit 6074be9

Please sign in to comment.