diff --git a/src/cluster.c b/src/cluster.c index ebab7a3d8c1..5c50aa6ea30 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -239,7 +239,7 @@ void restoreCommand(client *c) { rioInitWithBuffer(&payload,c->argv[3]->ptr); if (((type = rdbLoadObjectType(&payload)) == -1) || - ((obj = rdbLoadObject(type,&payload,key->ptr,c->db,0,NULL)) == NULL)) + ((obj = rdbLoadObject(type,&payload,key->ptr,c->db,NULL)) == NULL)) { addReplyError(c,"Bad data format"); return; @@ -265,7 +265,16 @@ void restoreCommand(client *c) { } /* Create the key and set the TTL if any */ - dbAdd(c->db,key,obj); + dictEntry *de = dbAdd(c->db,key,obj); + + /* register hash in global HFE DS if needed. + * + * For that purpose rdbLoadObject() took care to write minimum expiration + * time as leftover value in the ExpireMeta (Marked as trash) of the object. + * That is why we indicate 0 as the expiration time here. */ + if (obj->type == OBJ_HASH) + hashTypeAddToExpires(c->db, dictGetKey(de), obj, 0); + if (ttl) { setExpire(c,c->db,key,ttl); if (!absttl) { diff --git a/src/mstr.h b/src/mstr.h index fa7d4b21410..1613a637ec6 100644 --- a/src/mstr.h +++ b/src/mstr.h @@ -209,6 +209,9 @@ static inline int mstrIsMetaAttached(mstr s) { return s[-1] & MSTR_META_MASK; } /* return whether if a specific flag-index is set */ static inline int mstrGetFlag(mstr s, int flagIdx) { return *mstrFlagsRef(s) & (1 << flagIdx); } +/* DEBUG */ +void mstrPrint(mstr s, struct mstrKind *kind, int verbose); + /* See comment above about MSTR-ALIGNMENT(2) */ static_assert(sizeof(struct mstrhdr5 ) % 2 == 1, "must be odd"); static_assert(sizeof(struct mstrhdr8 ) % 2 == 1, "must be odd"); diff --git a/src/rdb.c b/src/rdb.c index 7ec20dfd35b..5b03082871d 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -270,7 +270,7 @@ int rdbEncodeInteger(long long value, unsigned char *enc) { void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr) { int plainFlag = flags & RDB_LOAD_PLAIN; int sdsFlag = flags & RDB_LOAD_SDS; - int hfldFlag = flags & RDB_LOAD_HFLD; + int hfldFlag = flags & (RDB_LOAD_HFLD|RDB_LOAD_HFLD_TTL); int encode = flags & RDB_LOAD_ENC; unsigned char enc[4]; long long val; @@ -305,7 +305,7 @@ void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr) { } else if (sdsFlag) { p = sdsnewlen(SDS_NOINIT,len); } else { /* hfldFlag */ - p = hfieldNew(NULL, len, 0); + p = hfieldNew(NULL, len, (flags&RDB_LOAD_HFLD) ? 0 : 1); } memcpy(p,buf,len); return p; @@ -377,7 +377,7 @@ ssize_t rdbSaveLzfStringObject(rio *rdb, unsigned char *s, size_t len) { void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) { int plainFlag = flags & RDB_LOAD_PLAIN; int sdsFlag = flags & RDB_LOAD_SDS; - int hfldFlag = flags & RDB_LOAD_HFLD; + int hfldFlag = flags & (RDB_LOAD_HFLD | RDB_LOAD_HFLD_TTL); int robjFlag = (!(plainFlag || sdsFlag || hfldFlag)); /* not plain/sds/hfld */ uint64_t len, clen; @@ -397,7 +397,7 @@ void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) { } else if (sdsFlag || robjFlag) { val = sdstrynewlen(SDS_NOINIT,len); } else { /* hfldFlag */ - val = hfieldTryNew(NULL, len, 0); + val = hfieldTryNew(NULL, len, (flags&RDB_LOAD_HFLD) ? 0 : 1); } if (!val) { @@ -505,6 +505,7 @@ ssize_t rdbSaveStringObject(rio *rdb, robj *obj) { * instead of a Redis object with an sds in it. * RDB_LOAD_SDS: Return an SDS string instead of a Redis object. * RDB_LOAD_HFLD: Return a hash field object (mstr) + * RDB_LOAD_HFLD_TTL: Return a hash field with TTL metadata reserved * * On I/O error NULL is returned. */ @@ -512,7 +513,7 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) { void *buf; int plainFlag = flags & RDB_LOAD_PLAIN; int sdsFlag = flags & RDB_LOAD_SDS; - int hfldFlag = flags & RDB_LOAD_HFLD; + int hfldFlag = flags & (RDB_LOAD_HFLD|RDB_LOAD_HFLD_TTL); int robjFlag = (!(plainFlag || sdsFlag || hfldFlag)); /* not plain/sds/hfld */ int isencoded; @@ -555,7 +556,7 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) { } else if (sdsFlag) { buf = sdstrynewlen(SDS_NOINIT,len); } else { /* hfldFlag */ - buf = hfieldTryNew(NULL, len, 0); + buf = hfieldTryNew(NULL, len, (flags&RDB_LOAD_HFLD) ? 0 : 1); } if (!buf) { serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len); @@ -1890,8 +1891,8 @@ int lpValidateIntegrityAndDups(unsigned char *lp, size_t size, int deep, int tup * On success a newly allocated object is returned, otherwise NULL. * When the function returns NULL and if 'error' is not NULL, the * integer pointed by 'error' is set to the type of error that occurred */ -robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, - int *error) { +robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int *error) { + uint64_t minExpiredField = EB_EXPIRE_TIME_INVALID; robj *o = NULL, *ele, *dec; uint64_t len; unsigned int i; @@ -2110,7 +2111,6 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, sds value; hfield field; dict *dupSearchDict = NULL; - ebuckets *hexpires = (db != NULL ? &db->hexpires : NULL); len = rdbLoadLen(rdb, NULL); if (len == RDB_LENERR) return NULL; @@ -2120,7 +2120,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, /* Too many entries? Use a hash table right from the start. */ if (len > server.hash_max_listpack_entries) - hashTypeConvert(o, OBJ_ENCODING_HT, hexpires); + hashTypeConvert(o, OBJ_ENCODING_HT, NULL); else if (deep_integrity_validation) { /* In this mode, we need to guarantee that the server won't crash * later when the ziplist is converted to a dict. @@ -2164,7 +2164,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, sdslen(value) > server.hash_max_listpack_value || !lpSafeToAdd(o->ptr, hfieldlen(field) + sdslen(value))) { - hashTypeConvert(o, OBJ_ENCODING_HT, hexpires); + hashTypeConvert(o, OBJ_ENCODING_HT, NULL); dictUseStoredKeyApi((dict *)o->ptr, 1); ret = dictAdd((dict*)o->ptr, field, value); dictUseStoredKeyApi((dict *)o->ptr, 0); @@ -2233,13 +2233,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, /* All pairs should be read by now */ serverAssert(len == 0); } else if (rdbtype == RDB_TYPE_HASH_METADATA) { - size_t fieldLen; sds value, field; - uint64_t expireAt, minExpire = EB_EXPIRE_TIME_INVALID; - mstime_t now = mstime(); + uint64_t expireAt; dict *dupSearchDict = NULL; - ebuckets *hexpires = (db != NULL ? &db->hexpires : NULL); len = rdbLoadLen(rdb, NULL); if (len == RDB_LENERR) return NULL; @@ -2248,11 +2245,11 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, o = createHashObject(); /* Too many entries? Use a hash table right from the start. */ if (len > server.hash_max_listpack_entries) { - hashTypeConvert(o, OBJ_ENCODING_HT, hexpires); + hashTypeConvert(o, OBJ_ENCODING_HT, NULL); dictTypeAddMeta((dict**)&o->ptr, &mstrHashDictTypeWithHFE); initDictExpireMetadata(key, o); } else { - hashTypeConvert(o, OBJ_ENCODING_LISTPACK_EX, hexpires); + hashTypeConvert(o, OBJ_ENCODING_LISTPACK_EX, NULL); if (deep_integrity_validation) { /* In this mode, we need to guarantee that the server won't crash * later when the listpack is converted to a dict. @@ -2278,8 +2275,13 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, return NULL; } - /* read the field name */ - if ((field = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, &fieldLen)) == NULL) { + /* if needed create field with TTL metadata */ + if (expireAt !=0) + field = rdbGenericLoadStringObject(rdb, RDB_LOAD_HFLD_TTL, &fieldLen); + else + field = rdbGenericLoadStringObject(rdb, RDB_LOAD_HFLD, &fieldLen); + + if (field == NULL) { serverLog(LL_WARNING, "failed reading hash field"); decrRefCount(o); if (dupSearchDict != NULL) dictRelease(dupSearchDict); @@ -2291,55 +2293,18 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, serverLog(LL_WARNING, "failed reading hash value"); decrRefCount(o); if (dupSearchDict != NULL) dictRelease(dupSearchDict); - sdsfree(field); + hfieldFree(field); return NULL; } - /* Check if the hash field already expired. This function is used when - * loading an RDB file from disk, either at startup, or when an RDB was - * received from the master. In the latter case, the master is - * responsible for hash field expiry. If we would expire hash fields here, - * the snapshot taken by the master may not be reflected on the slave. - * Similarly, if the base AOF is RDB format, we want to load all - * the hash fields there are, since the log of operations in the incr AOF - * is assumed to work in the exact keyspace state. - * Expired hash fields on the master are silently discarded. - * Note that if all fields in a hash has expired, the hash would not be - * created in memory (because it is created on the first valid field), and - * thus the key would be discarded as an "expired hash" */ - if (expireAt != 0 && iAmMaster() && ((mstime_t)expireAt < now) && - !(rdbflags & RDBFLAGS_AOF_PREAMBLE)) - { - /* TODO: aggregate HDELs and send them to replicas. */ - if (rdbflags & RDBFLAGS_FEED_REPL && db) { - /* Caller should have created replication backlog, - * and now this path only works when rebooting, - * so we don't have replicas yet. */ - serverAssert(server.repl_backlog != NULL && listLength(server.slaves) == 0); - robj keyobj, fieldObj; - initStaticStringObject(keyobj,key); - initStaticStringObject(fieldObj, field); - robj *argv[3]; - argv[0] = shared.hdel; - argv[1] = &keyobj; - argv[2] = &fieldObj; - replicationFeedSlaves(server.slaves,db->id,argv,3); - } - - server.rdb_last_load_hash_fields_expired++; - sdsfree(field); - sdsfree(value); - continue; - } - /* keep the nearest expiration to connect listpack object to db expiry */ - if ((expireAt != 0) && (expireAt < minExpire)) minExpire = expireAt; + if ((expireAt != 0) && (expireAt < minExpiredField)) minExpiredField = expireAt; /* store the values read - either to listpack or dict */ if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { /* integrity - check for key duplication (if required) */ if (dupSearchDict) { - sds field_dup = sdsnewlen(field, sdslen(field)); + sds field_dup = sdsnewlen(field, hfieldlen(field)); if (dictAdd(dupSearchDict, field_dup, NULL) != DICT_OK) { rdbReportCorruptRDB("Hash with dup elements"); @@ -2347,18 +2312,18 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, decrRefCount(o); sdsfree(field_dup); sdsfree(value); - sdsfree(field); + hfieldFree(field); return NULL; } } /* check if the values can be saved to listpack (or should convert to dict encoding) */ - if (sdslen(field) > server.hash_max_listpack_value || + if (hfieldlen(field) > server.hash_max_listpack_value || sdslen(value) > server.hash_max_listpack_value || - !lpSafeToAdd(((listpackEx*)o->ptr)->lp, sdslen(field) + sdslen(value) + lpEntrySizeInteger(expireAt))) + !lpSafeToAdd(((listpackEx*)o->ptr)->lp, hfieldlen(field) + sdslen(value) + lpEntrySizeInteger(expireAt))) { /* convert to hash */ - hashTypeConvert(o, OBJ_ENCODING_HT, hexpires); + hashTypeConvert(o, OBJ_ENCODING_HT, NULL); if (len > DICT_HT_INITIAL_SIZE) { /* TODO: this is NOT the original len, but this is also the case for simple hash, is this a bug? */ if (dictTryExpand(o->ptr, len) != DICT_OK) { @@ -2366,36 +2331,40 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, decrRefCount(o); if (dupSearchDict != NULL) dictRelease(dupSearchDict); sdsfree(value); - sdsfree(field); + hfieldFree(field); return NULL; } } /* don't add the values to the new hash: the next if will catch and the values will be added there */ } else { - listpackExAddNew(o, field, value, expireAt); - sdsfree(field); + listpackExAddNew(o, field, hfieldlen(field), + value, sdslen(value), expireAt); + hfieldFree(field); sdsfree(value); } } if (o->encoding == OBJ_ENCODING_HT) { - /* WA for check-rdb mode, when there's no DB so can't attach expired items to ebuckets, - * or when no expiry was not set for this field */ - if ((db == NULL) || (expireAt == 0)) { - hashTypeSet(db, o, field, value, 0); - } else { - if (hashTypeSetExRdb(db, o, field, value, expireAt) != C_OK) { - serverLog(LL_WARNING, "failed adding hash field %s to key %s", field, key); - decrRefCount(o); - if (dupSearchDict != NULL) dictRelease(dupSearchDict); - sdsfree(value); - sdsfree(field); - return NULL; - } + /* Add pair to hash table */ + dict *d = o->ptr; + dictUseStoredKeyApi(d, 1); + int ret = dictAdd(d, field, value); + dictUseStoredKeyApi(d, 0); + + /* Attach expiry to the hash field and register in hash private HFE DS */ + if ((ret != DICT_ERR) && expireAt) { + dictExpireMetadata *m = (dictExpireMetadata *) dictMetadata(d); + ret = ebAdd(&m->hfe, &hashFieldExpireBucketsType, field, expireAt); + } + + if (ret == DICT_ERR) { + rdbReportCorruptRDB("Duplicate hash fields detected"); + sdsfree(value); + hfieldFree(field); + decrRefCount(o); + return NULL; } - sdsfree(field); - sdsfree(value); } } @@ -2406,8 +2375,6 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, decrRefCount(o); goto expiredHash; } - if ((db != NULL) && (minExpire != EB_EXPIRE_TIME_INVALID)) - hashTypeAddToExpires(db, key, o, minExpire); } else if (rdbtype == RDB_TYPE_LIST_QUICKLIST || rdbtype == RDB_TYPE_LIST_QUICKLIST_2) { if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL; if (len == 0) goto emptykey; @@ -2732,17 +2699,13 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, } /* for TTL listpack, find the minimum expiry */ - uint64_t minExpire = hashTypeGetNextTimeToExpire(o); + minExpiredField = hashTypeGetNextTimeToExpire(o); /* Convert listpack to hash table without registering in global HFE DS, * if has HFEs, since the listpack is not connected yet to the DB */ if (hashTypeLength(o, 0) > server.hash_max_listpack_entries) hashTypeConvert(o, OBJ_ENCODING_HT, NULL /*db->hexpires*/); - /* Only now register in global HFE DS, if db provided */ - if (db != NULL) - hashTypeAddToExpires(db, key, o, minExpire); - break; default: /* totally unreachable */ @@ -3127,6 +3090,23 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, rdbReportReadError("Unknown RDB encoding type %d",rdbtype); return NULL; } + + /* If loaded hash fields with expiration, + * write it to the hash ExpireMeta but don't register it in global + * HFE DS! If needed the caller of rdbLoadObject() is the one to read + * out of the object ExpireMeta the minimum expiration time and register + * it in global HFE DS */ + if (minExpiredField != EB_EXPIRE_TIME_INVALID) { + if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { + listpackEx *lpt = o->ptr; + ebSetMetaExpTime(&lpt->meta, minExpiredField); + } else if (o->encoding == OBJ_ENCODING_HT) { + dict *d = o->ptr; + dictExpireMetadata *m = (dictExpireMetadata *) dictMetadata(d); + ebSetMetaExpTime(&m->expireMeta, minExpiredField); + } + } + if (error) *error = 0; return o; @@ -3546,7 +3526,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin if ((key = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) goto eoferr; /* Read value */ - val = rdbLoadObject(type,rdb,key,db,rdbflags,&error); + val = rdbLoadObject(type,rdb,key,db,&error); /* Check if the key already expired. This function is used when loading * an RDB file from disk, either at startup, or when an RDB was @@ -3612,6 +3592,14 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin } } + /* register hash in global HFE DS if needed. + * + * For that purpose rdbLoadObject() took care to write minimum expiration + * time as leftover value in the ExpireMeta (Marked as trash) of the object. + * That is why we indicate 0 as the expiration time here. */ + if (val->type == OBJ_HASH) + hashTypeAddToExpires(db, key, val, 0); + /* Set the expire time if needed */ if (expiretime != -1) { setExpire(NULL,db,&keyobj,expiretime); @@ -3657,12 +3645,12 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin if (empty_keys_skipped) { serverLog(LL_NOTICE, - "Done loading RDB, keys loaded: %lld, keys expired: %lld, hash fields expired: %lld, empty keys skipped: %lld.", - server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired, server.rdb_last_load_hash_fields_expired, empty_keys_skipped); + "Done loading RDB, keys loaded: %lld, keys expired: %lld, empty keys skipped: %lld.", + server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired, empty_keys_skipped); } else { serverLog(LL_NOTICE, - "Done loading RDB, keys loaded: %lld, keys expired: %lld, hash fields expired: %lld.", - server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired, server.rdb_last_load_hash_fields_expired); + "Done loading RDB, keys loaded: %lld, keys expired: %lld.", + server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired); } return C_OK; diff --git a/src/rdb.h b/src/rdb.h index 73a6a5b67c1..5b45d3ea2a3 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -103,11 +103,12 @@ #define RDB_MODULE_OPCODE_STRING 5 /* String. */ /* rdbLoad...() functions flags. */ -#define RDB_LOAD_NONE 0 -#define RDB_LOAD_ENC (1<<0) -#define RDB_LOAD_PLAIN (1<<1) -#define RDB_LOAD_SDS (1<<2) -#define RDB_LOAD_HFLD (1<<3) +#define RDB_LOAD_NONE 0 +#define RDB_LOAD_ENC (1<<0) +#define RDB_LOAD_PLAIN (1<<1) +#define RDB_LOAD_SDS (1<<2) +#define RDB_LOAD_HFLD (1<<3) +#define RDB_LOAD_HFLD_TTL (1<<4) /* flags on the purpose of rdb save or load */ #define RDBFLAGS_NONE 0 /* No special RDB loading or saving. */ @@ -142,7 +143,7 @@ int rdbSaveToFile(const char *filename); int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags); ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid); size_t rdbSavedObjectLen(robj *o, robj *key, int dbid); -robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb *db, int rdbflags, int *error); +robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb *db, int *error); void backgroundSaveDoneHandler(int exitcode, int bysignal); int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime,int dbid); ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt); diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c index edadb7dd7c0..953f830bcad 100644 --- a/src/redis-check-rdb.c +++ b/src/redis-check-rdb.c @@ -331,7 +331,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { rdbstate.keys++; /* Read value */ rdbstate.doing = RDB_CHECK_DOING_READ_OBJECT_VALUE; - if ((val = rdbLoadObject(type,&rdb,key->ptr,NULL,0,NULL)) == NULL) goto eoferr; + if ((val = rdbLoadObject(type,&rdb,key->ptr,NULL,NULL)) == NULL) goto eoferr; /* Check if the key already expired. */ if (expiretime != -1 && expiretime < now) rdbstate.already_expired++; diff --git a/src/server.c b/src/server.c index fa02678d11f..236d9d7672e 100644 --- a/src/server.c +++ b/src/server.c @@ -2713,7 +2713,6 @@ void initServer(void) { server.rdb_save_time_start = -1; server.rdb_last_load_keys_expired = 0; server.rdb_last_load_keys_loaded = 0; - server.rdb_last_load_hash_fields_expired = 0; server.dirty = 0; resetServerStats(); /* A few stats we don't want to reset: server startup time, and peak mem. */ @@ -5778,7 +5777,6 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { "rdb_last_cow_size:%zu\r\n", server.stat_rdb_cow_bytes, "rdb_last_load_keys_expired:%lld\r\n", server.rdb_last_load_keys_expired, "rdb_last_load_keys_loaded:%lld\r\n", server.rdb_last_load_keys_loaded, - "rdb_last_load_hash_fields_expired:%lld\r\n", server.rdb_last_load_hash_fields_expired, "aof_enabled:%d\r\n", server.aof_state != AOF_OFF, "aof_rewrite_in_progress:%d\r\n", server.child_type == CHILD_TYPE_AOF, "aof_rewrite_scheduled:%d\r\n", server.aof_rewrite_scheduled, diff --git a/src/server.h b/src/server.h index 3d082957a35..e5116d4c9a1 100644 --- a/src/server.h +++ b/src/server.h @@ -1807,7 +1807,6 @@ struct redisServer { long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */ long long rdb_last_load_keys_expired; /* number of expired keys when loading RDB */ long long rdb_last_load_keys_loaded; /* number of loaded keys when loading RDB */ - long long rdb_last_load_hash_fields_expired; /* number of expired hash fields when loading RDB */ struct saveparam *saveparams; /* Save points array for RDB */ int saveparamslen; /* Number of saving points */ char *rdb_filename; /* Name of RDB file */ @@ -3172,6 +3171,20 @@ typedef struct listpackEx { are ordered by ttl. */ } listpackEx; +/* Each dict of hash object that has fields with time-Expiration will have the + * following metadata attached to dict header */ +typedef struct dictExpireMetadata { + ExpireMeta expireMeta; /* embedded ExpireMeta in dict. + To be used in order to register the hash in the + global ebuckets (i.e db->hexpires) with next, + minimum, hash-field to expire */ + ebuckets hfe; /* DS of Hash Fields Expiration, associated to each hash */ + sds key; /* reference to the key, same one that stored in + db->dict. Will be used from active-expiration flow + for notification and deletion of the object, if + needed. */ +} dictExpireMetadata; + /* Hash data type */ #define HASH_SET_TAKE_FIELD (1<<0) #define HASH_SET_TAKE_VALUE (1<<1) @@ -3208,12 +3221,12 @@ unsigned char *hashTypeListpackGetLp(robj *o); uint64_t hashTypeGetMinExpire(robj *o); void hashTypeUpdateKeyRef(robj *o, sds newkey); ebuckets *hashTypeGetDictMetaHFE(dict *d); -int hashTypeSetExRdb(redisDb *db, robj *o, sds field, sds value, uint64_t expire_at); uint64_t hashTypeGetMinExpire(robj *keyObj); uint64_t hashTypeGetNextTimeToExpire(robj *o); void initDictExpireMetadata(sds key, robj *o); struct listpackEx *listpackExCreate(void); -void listpackExAddNew(robj *o, sds field, sds value, uint64_t expireAt); +void listpackExAddNew(robj *o, char *field, size_t flen, + char *value, size_t vlen, uint64_t expireAt); /* Hash-Field data type (of t_hash.c) */ hfield hfieldNew(const void *field, size_t fieldlen, int withExpireMeta); diff --git a/src/t_hash.c b/src/t_hash.c index 68635f33108..b4fa034fd16 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -108,20 +108,6 @@ EbucketsType hashFieldExpireBucketsType = { .itemsAddrAreOdd = 1, /* Addresses of hfield (mstr) are odd!! */ }; -/* Each dict of hash object that has fields with time-Expiration will have the - * following metadata attached to dict header */ -typedef struct dictExpireMetadata { - ExpireMeta expireMeta; /* embedded ExpireMeta in dict. - To be used in order to register the hash in the - global ebuckets (i.e db->hexpires) with next, - minimum, hash-field to expire */ - ebuckets hfe; /* DS of Hash Fields Expiration, associated to each hash */ - sds key; /* reference to the key, same one that stored in - db->dict. Will be used from active-expiration flow - for notification and deletion of the object, if - needed. */ -} dictExpireMetadata; - /* ActiveExpireCtx passed to hashTypeActiveExpire() */ typedef struct ActiveExpireCtx { uint32_t fieldsToExpireQuota; @@ -489,10 +475,11 @@ static void listpackExAddInternal(robj *o, listpackEntry ent[3]) { } /* Add new field ordered by expire time. */ -void listpackExAddNew(robj *o, sds field, sds value, uint64_t expireAt) { +void listpackExAddNew(robj *o, char *field, size_t flen, + char *value, size_t vlen, uint64_t expireAt) { listpackEntry ent[3] = { - {.sval = (unsigned char*) field, .slen = sdslen(field)}, - {.sval = (unsigned char*) value, .slen = sdslen(value)}, + {.sval = (unsigned char*) field, .slen = flen}, + {.sval = (unsigned char*) value, .slen = vlen}, {.lval = expireAt} }; @@ -1064,33 +1051,6 @@ SetExRes hashTypeSetEx(redisDb *db, robj *o, sds field, HashTypeSet *setKeyVal, return res; } -/* - * hashTypeSetExRdb provide a simplified API for setting fields & expiry by RDB load - * - * It is the duty of RDB reading process to track minimal expiration time of the - * fields and eventually call hashTypeAddToExpires() to update global HFE DS with - * next expiration time. - * - * To just add a field with no expiry, use hashTypeSet instead. - */ -int hashTypeSetExRdb(redisDb *db, robj *o, sds field, sds value, uint64_t expire_at) { - /* Dummy struct to be used in hashTypeSetEx() */ - HashTypeSetEx setEx = { - .fieldSetCond = FIELD_DONT_OVRWRT, /* Shouldn't be any duplication */ - .expireSetCond = HFE_NX, /* Should set expiry once each field */ - .minExpire = EB_EXPIRE_TIME_INVALID, /* Won't be used. Accounting made by RDB already */ - .key = NULL, /* Not going to call hashTypeSetExDone() */ - .hashObj = o, - .minExpireFields = EB_EXPIRE_TIME_INVALID, /* Not needed by RDB */ - .c = NULL, /* No notification required */ - .cmd = NULL, /* No notification required */ - }; - - HashTypeSet setKeyVal = {.value = value, .flags = 0}; - SetExRes res = hashTypeSetEx(db, o, field, &setKeyVal, expire_at, (expire_at) ? &setEx : NULL); - return (res == HSETEX_OK || res == HSET_UPDATE) ? C_OK : C_ERR; -} - void initDictExpireMetadata(sds key, robj *o) { dict *ht = o->ptr; @@ -1278,7 +1238,8 @@ static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeS if (!fptr) { if (setParams) { - listpackExAddNew(o, field, setParams->value, + listpackExAddNew(o, field, sdslen(field), + setParams->value, sdslen(setParams->value), exParams ? expireAt : HASH_LP_NO_TTL); } else { res = HSETEX_NO_FIELD; @@ -2011,29 +1972,33 @@ uint64_t hashTypeRemoveFromExpires(ebuckets *hexpires, robj *o) { /* Add hash to global HFE DS and update key for notifications. * - * key - must be the same instance that is stored in db->dict + * key - must be the same key instance that is persisted in db->dict + * expireTime - expiration in msec. + * If eq. 0 then the hash will be added to the global HFE DS with + * the minimum expiration time that is already written in advance + * to attached metadata (which considered as trash as long as it is + * not attached to global HFE DS). + * + * Precondition: It is a hash of type listpackex or HT with HFE metadata. */ void hashTypeAddToExpires(redisDb *db, sds key, robj *hashObj, uint64_t expireTime) { - if (expireTime == EB_EXPIRE_TIME_INVALID) + if (expireTime > EB_EXPIRE_TIME_MAX) return; if (hashObj->encoding == OBJ_ENCODING_LISTPACK_EX) { listpackEx *lpt = hashObj->ptr; lpt->key = key; + expireTime = (expireTime) ? expireTime : ebGetMetaExpTime(&lpt->meta); ebAdd(&db->hexpires, &hashExpireBucketsType, hashObj, expireTime); - return; + } else if (hashObj->encoding == OBJ_ENCODING_HT) { + dict *d = hashObj->ptr; + if (isDictWithMetaHFE(d)) { + dictExpireMetadata *meta = (dictExpireMetadata *) dictMetadata(d); + expireTime = (expireTime) ? expireTime : ebGetMetaExpTime(&meta->expireMeta); + meta->key = key; + ebAdd(&db->hexpires, &hashExpireBucketsType, hashObj, expireTime); + } } - serverAssert(hashObj->encoding == OBJ_ENCODING_HT); - - serverAssert(isDictWithMetaHFE(hashObj->ptr)); - - /* Update hash with key for notifications */ - dict *d = hashObj->ptr; - dictExpireMetadata *dictExpireMeta = (dictExpireMetadata *) dictMetadata(d); - dictExpireMeta->key = key; - - /* Add hash to global HFE DS */ - ebAdd(&db->hexpires, &hashExpireBucketsType, hashObj, expireTime); } /* DB active expire and update hashes with time-expiration on fields. diff --git a/tests/integration/psync2-master-restart.tcl b/tests/integration/psync2-master-restart.tcl index d2908ede8fa..fa798b2ac92 100644 --- a/tests/integration/psync2-master-restart.tcl +++ b/tests/integration/psync2-master-restart.tcl @@ -221,7 +221,6 @@ start_server {} { assert {[status $master sync_full] == 1} assert {[status $master rdb_last_load_keys_expired] == 2048} - assert {[status $master rdb_last_load_hash_fields_expired] == 2} assert {[status $replica sync_full] == 1} set digest [$master debug digest] diff --git a/tests/integration/rdb.tcl b/tests/integration/rdb.tcl index 827ecee3377..9db78168996 100644 --- a/tests/integration/rdb.tcl +++ b/tests/integration/rdb.tcl @@ -421,14 +421,20 @@ set server_path [tmpdir "server.partial-hfield-exp-test"] # verifies writing and reading hash key with expiring and persistent fields start_server [list overrides [list "dir" $server_path]] { foreach {type lp_entries} {listpack 512 dict 0} { - test "hash field expiration save and load rdb one expired field, ($type)" { + test "HFE - save and load expired fields, expired soon after, or long after ($type)" { r config set hash-max-listpack-entries $lp_entries r FLUSHALL - r HMSET key a 1 b 2 c 3 d 4 - r HEXPIREAT key 2524600800 FIELDS 2 a b - r HPEXPIRE key 100 FIELDS 1 d + r HMSET key a 1 b 2 c 3 d 4 e 5 + # expected to be expired long after restart + r HEXPIREAT key 2524600800 FIELDS 1 a + # expected long TTL value (6 bytes) is saved and loaded correctly + r HPEXPIREAT key 188900976391764 FIELDS 1 b + # expected to be already expired after restart + r HPEXPIRE key 80 FIELDS 1 d + # expected to be expired soon after restart + r HPEXPIRE key 200 FIELDS 1 e r save # sleep 101 ms to make sure d will expire after restart @@ -437,14 +443,14 @@ start_server [list overrides [list "dir" $server_path]] { wait_done_loading r assert_equal [lsort [r hgetall key]] "1 2 3 a b c" - assert_equal [r hexpiretime key FIELDS 3 a b c] {2524600800 2524600800 -1} + assert_equal [r hpexpiretime key FIELDS 3 a b c] {2524600800000 188900976391764 -1} assert_equal [s rdb_last_load_keys_loaded] 1 - # hash keys saved in listpack encoding are loaded as a blob, - # so individual field expiry is not verified on load - if {$type eq "dict"} { - assert_equal [s rdb_last_load_hash_fields_expired] 1 + + # wait until expired_hash_fields equals 2 + wait_for_condition 10 100 { + [s expired_hash_fields] == 2 } else { - assert_equal [s rdb_last_load_hash_fields_expired] 0 + fail "Value of expired_hash_fields is not as expected" } } } @@ -455,7 +461,7 @@ set server_path [tmpdir "server.all-hfield-exp-test"] # verifies writing hash with several expired keys, and active-expiring it on load start_server [list overrides [list "dir" $server_path]] { foreach {type lp_entries} {listpack 512 dict 0} { - test "hash field expiration save and load rdb all fields expired, ($type)" { + test "HFE - save and load rdb all fields expired, ($type)" { r config set hash-max-listpack-entries $lp_entries r FLUSHALL @@ -470,53 +476,15 @@ start_server [list overrides [list "dir" $server_path]] { restart_server 0 true false wait_done_loading r - # hash keys saved as listpack-encoded are saved and loaded as a blob - # so individual field validation is not checked during load. - # Therefore, if the key was saved as dict it is expected that - # all 4 fields were expired during load, and thus the key was - # "declared" an empty key. - # On the other hand, if the key was saved as listpack, it is - # expected that no field was expired on load and the key was loaded, - # even though all its fields are actually expired. - if {$type eq "dict"} { - assert_equal [s rdb_last_load_keys_loaded] 0 - assert_equal [s rdb_last_load_hash_fields_expired] 4 - } else { - assert_equal [s rdb_last_load_keys_loaded] 1 - assert_equal [s rdb_last_load_hash_fields_expired] 0 - } + # it is expected that no field was expired on load and the key was + # loaded, even though all its fields are actually expired. + assert_equal [s rdb_last_load_keys_loaded] 1 - # in listpack encoding, the fields (and key) will be expired by - # lazy expiry assert_equal [r hgetall key] {} } } } -set server_path [tmpdir "server.long-ttl-test"] - -# verifies a long TTL value (6 bytes) is saved and loaded correctly -start_server [list overrides [list "dir" $server_path]] { - foreach {type lp_entries} {listpack 512 dict 0} { - test "hash field expiration save and load rdb long TTL, ($type)" { - r config set hash-max-listpack-entries $lp_entries - - r FLUSHALL - - r HSET key a 1 - # set expiry to 0xabcdef987654 (6 bytes) - r HPEXPIREAT key 188900976391764 FIELDS 1 a - - r save - restart_server 0 true false - wait_done_loading r - - assert_equal [r hget key a ] 1 - assert_equal [r hpexpiretime key FIELDS 1 a] {188900976391764} - } - } -} - set server_path [tmpdir "server.listpack-to-dict-test"] test "save listpack, load dict" { @@ -540,7 +508,6 @@ test "save listpack, load dict" { # first verify d was not expired during load (no expiry when loading # a hash that was saved listpack-encoded) assert_equal [s rdb_last_load_keys_loaded] 1 - assert_equal [s rdb_last_load_hash_fields_expired] 0 # d should be lazy expired in hgetall assert_equal [lsort [r hgetall key]] "1 2 3 a b c" @@ -570,7 +537,6 @@ test "save dict, load listpack" { # verify d was expired during load assert_equal [s rdb_last_load_keys_loaded] 1 - assert_equal [s rdb_last_load_hash_fields_expired] 1 assert_equal [lsort [r hgetall key]] "1 2 3 a b c" assert_match "*encoding:listpack*" [r debug object key] @@ -602,7 +568,6 @@ foreach {type lp_entries} {listpack 512 dict 0} { } assert_equal [s rdb_last_load_keys_loaded] 1 - assert_equal [s rdb_last_load_hash_fields_expired] 0 # hgetall might lazy expire fields, so it's only called after the stat asserts assert_equal [lsort [r hgetall key]] "1 2 5 6 a b e f" @@ -632,7 +597,6 @@ foreach {type lp_entries} {listpack 512 dict 0} { after 500 assert_equal [s rdb_last_load_keys_loaded] 1 - assert_equal [s rdb_last_load_hash_fields_expired] 0 assert_equal [s expired_hash_fields] 0 # hgetall will lazy expire fields, so it's only called after the stat asserts