diff --git a/src/aof.c b/src/aof.c index 9632d9c5b33..17e72febb0f 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1939,7 +1939,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) { * * The function returns 0 on error, non-zero on success. */ static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) { - if (hi->encoding == OBJ_ENCODING_LISTPACK) { + if ((hi->encoding == OBJ_ENCODING_LISTPACK) || (hi->encoding == OBJ_ENCODING_LISTPACK_EX)) { unsigned char *vstr = NULL; unsigned int vlen = UINT_MAX; long long vll = LLONG_MAX; @@ -1963,37 +1963,60 @@ static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) { /* Emit the commands needed to rebuild a hash object. * The function returns 0 on error, 1 on success. */ int rewriteHashObject(rio *r, robj *key, robj *o) { + int res = 0; /*fail*/ + hashTypeIterator *hi; long long count = 0, items = hashTypeLength(o, 0); + int isHFE = hashTypeGetMinExpire(o) != EB_EXPIRE_TIME_INVALID; hi = hashTypeInitIterator(o); - while (hashTypeNext(hi, 0) != C_ERR) { - if (count == 0) { - int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ? - AOF_REWRITE_ITEMS_PER_CMD : items; - if (!rioWriteBulkCount(r,'*',2+cmd_items*2) || - !rioWriteBulkString(r,"HMSET",5) || - !rioWriteBulkObject(r,key)) - { - hashTypeReleaseIterator(hi); - return 0; + if (!isHFE) { + while (hashTypeNext(hi, 0) != C_ERR) { + if (count == 0) { + int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ? + AOF_REWRITE_ITEMS_PER_CMD : items; + if (!rioWriteBulkCount(r, '*', 2 + cmd_items * 2) || + !rioWriteBulkString(r, "HMSET", 5) || + !rioWriteBulkObject(r, key)) + goto reHashEnd; } - } - if (!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY) || - !rioWriteHashIteratorCursor(r, hi, OBJ_HASH_VALUE)) - { - hashTypeReleaseIterator(hi); - return 0; + if (!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY) || + !rioWriteHashIteratorCursor(r, hi, OBJ_HASH_VALUE)) + goto reHashEnd; + + if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0; + items--; + } + } else { + while (hashTypeNext(hi, 0) != C_ERR) { + + char hmsetCmd[] = "*4\r\n$5\r\nHMSET\r\n"; + if ( (!rioWrite(r, hmsetCmd, sizeof(hmsetCmd) - 1)) || + (!rioWriteBulkObject(r, key)) || + (!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY)) || + (!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_VALUE)) ) + goto reHashEnd; + + if (hi->expire_time != EB_EXPIRE_TIME_INVALID) { + char cmd[] = "*6\r\n$10\r\nHPEXPIREAT\r\n"; + if ( (!rioWrite(r, cmd, sizeof(cmd) - 1)) || + (!rioWriteBulkObject(r, key)) || + (!rioWriteBulkLongLong(r, hi->expire_time)) || + (!rioWriteBulkString(r, "FIELDS", 6)) || + (!rioWriteBulkString(r, "1", 1)) || + (!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY)) ) + goto reHashEnd; + } } - if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0; - items--; } - hashTypeReleaseIterator(hi); + res = 1; /* success */ - return 1; +reHashEnd: + hashTypeReleaseIterator(hi); + return res; } /* Helper for rewriteStreamObject() that generates a bulk string into the diff --git a/src/db.c b/src/db.c index 1c8557af9a4..328e37a3158 100644 --- a/src/db.c +++ b/src/db.c @@ -276,6 +276,11 @@ static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite, dictEnt old = dictGetVal(de); } kvstoreDictSetVal(db->keys, slot, de, val); + + /* if hash with HFEs, take care to remove from global HFE DS */ + if (old->type == OBJ_HASH) + hashTypeRemoveFromExpires(&db->hexpires, old); + if (server.lazyfree_lazy_server_del) { freeObjAsync(key,old,db->id); } else { @@ -1764,11 +1769,13 @@ void swapMainDbWithTempDb(redisDb *tempDb) { * remain in the same DB they were. */ activedb->keys = newdb->keys; activedb->expires = newdb->expires; + activedb->hexpires = newdb->hexpires; activedb->avg_ttl = newdb->avg_ttl; activedb->expires_cursor = newdb->expires_cursor; newdb->keys = aux.keys; newdb->expires = aux.expires; + newdb->hexpires = aux.hexpires; newdb->avg_ttl = aux.avg_ttl; newdb->expires_cursor = aux.expires_cursor; diff --git a/src/debug.c b/src/debug.c index 84e96aa1420..4b5f730610f 100644 --- a/src/debug.c +++ b/src/debug.c @@ -204,13 +204,18 @@ void xorObjectDigest(redisDb *db, robj *keyobj, unsigned char *digest, robj *o) unsigned char eledigest[20]; sds sdsele; + /* field */ memset(eledigest,0,20); sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_KEY); mixDigest(eledigest,sdsele,sdslen(sdsele)); sdsfree(sdsele); + /* val */ sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_VALUE); mixDigest(eledigest,sdsele,sdslen(sdsele)); sdsfree(sdsele); + /* hash-field expiration (HFE) */ + if (hi->expire_time != EB_EXPIRE_TIME_INVALID) + xorDigest(eledigest,"!!hexpire!!",11); xorDigest(digest,eledigest,20); } hashTypeReleaseIterator(hi); diff --git a/src/ebuckets.c b/src/ebuckets.c index 7235862850c..387aef88c93 100644 --- a/src/ebuckets.c +++ b/src/ebuckets.c @@ -23,6 +23,12 @@ * #define EB_VALIDATE_DEBUG 1 */ +#if (REDIS_TEST || EB_VALIDATE_DEBUG) && !defined(EB_TEST_BENCHMARK) +#define EB_VALIDATE_STRUCTURE(eb, type) ebValidate(eb, type) +#else +#define EB_VALIDATE_STRUCTURE(eb, type) // Do nothing +#endif + /*** BENCHMARK * * To benchmark ebuckets creation and active-expire with 10 million items, apply @@ -190,7 +196,7 @@ static inline uint64_t raxKey2BucketKey(unsigned char *raxKey) { * Before: [segHdr] -> {item1,..,item16} -> [..] * After: [segHdr] -> {newItem} -> [nextSegHdr] -> {item1,..,item16} -> [..] * - * Take care to persist `segHdr` to be the same instance after the change. + * Taken care to persist `segHdr` to be the same instance after the change. * This is important because the rax tree is pointing to it. */ static int ebSegAddExtended(EbucketsType *type, FirstSegHdr *firstSegHdr, eItem newItem) { /* Allocate nextSegHdr and let it take the items of first segment header */ @@ -1390,9 +1396,8 @@ int ebRemove(ebuckets *eb, EbucketsType *type, eItem item) { if (res) type->getExpireMeta(item)->trash = 1; -#if (REDIS_TEST || EB_VALIDATE_DEBUG) && !defined(EB_TEST_BENCHMARK) - ebValidate(*eb, type); -#endif + EB_VALIDATE_STRUCTURE(*eb, type); + return res; } @@ -1435,9 +1440,9 @@ int ebAdd(ebuckets *eb, EbucketsType *type, eItem item, uint64_t expireTime) { /* Add item to rax */ res = ebAddToRax(eb, type, item, EB_BUCKET_KEY(expireTime)); } -#if (REDIS_TEST || EB_VALIDATE_DEBUG) && !defined(EB_TEST_BENCHMARK) - ebValidate(*eb, type); -#endif + + EB_VALIDATE_STRUCTURE(*eb, type); + return res; } @@ -1521,9 +1526,9 @@ void ebExpire(ebuckets *eb, EbucketsType *type, ExpireInfo *info) { ebAdd(eb, type, updateList, ebGetMetaExpTime(mItem)); updateList = next; } -#if (REDIS_TEST || EB_VALIDATE_DEBUG) && !defined(EB_TEST_BENCHMARK) - ebValidate(*eb, type); -#endif + + EB_VALIDATE_STRUCTURE(*eb, type); + return; } diff --git a/src/ebuckets.h b/src/ebuckets.h index f3033733061..66954131a02 100644 --- a/src/ebuckets.h +++ b/src/ebuckets.h @@ -139,7 +139,7 @@ * The idea of it is to trim the rax tree depth, avoid having too many branches, * and reduce frequent modifications of the tree to the minimum. */ -#define EB_BUCKET_KEY_PRECISION 0 /* 1024msec */ +#define EB_BUCKET_KEY_PRECISION 0 /* TBD: modify to 10 */ /* From expiration time to bucket-key */ #define EB_BUCKET_KEY(exptime) ((exptime) >> EB_BUCKET_KEY_PRECISION) diff --git a/src/module.c b/src/module.c index 37b0df4ebfc..1d0c628a315 100644 --- a/src/module.c +++ b/src/module.c @@ -5271,7 +5271,10 @@ int RM_HashSet(RedisModuleKey *key, int flags, ...) { /* Handle XX and NX */ if (flags & (REDISMODULE_HASH_XX|REDISMODULE_HASH_NX)) { - int exists = hashTypeExists(key->value, field->ptr); + int isHashDeleted; + int exists = hashTypeExists(key->db, key->value, field->ptr, &isHashDeleted); + /* hash-field-expiration is not exposed to modules */ + serverAssert(isHashDeleted == 0); if (((flags & REDISMODULE_HASH_XX) && !exists) || ((flags & REDISMODULE_HASH_NX) && exists)) { @@ -5282,7 +5285,7 @@ int RM_HashSet(RedisModuleKey *key, int flags, ...) { /* Handle deletion if value is REDISMODULE_HASH_DELETE. */ if (value == REDISMODULE_HASH_DELETE) { - count += hashTypeDelete(key->value, field->ptr); + count += hashTypeDelete(key->value, field->ptr, 1); if (flags & REDISMODULE_HASH_CFIELDS) decrRefCount(field); continue; } @@ -5374,14 +5377,22 @@ int RM_HashGet(RedisModuleKey *key, int flags, ...) { /* Query the hash for existence or value object. */ if (flags & REDISMODULE_HASH_EXISTS) { existsptr = va_arg(ap,int*); - if (key->value) - *existsptr = hashTypeExists(key->value,field->ptr); - else + if (key->value) { + int isHashDeleted; + *existsptr = hashTypeExists(key->db, key->value, field->ptr, &isHashDeleted); + /* hash-field-expiration is not exposed to modules */ + serverAssert(isHashDeleted == 0); + } else { *existsptr = 0; + } } else { + int isHashDeleted; valueptr = va_arg(ap,RedisModuleString**); if (key->value) { - *valueptr = hashTypeGetValueObject(key->value,field->ptr); + *valueptr = hashTypeGetValueObject(key->db,key->value,field->ptr, &isHashDeleted); + + /* Currently hash-field-expiration is not exposed to modules */ + serverAssert(isHashDeleted == 0); if (*valueptr) { robj *decoded = getDecodedObject(*valueptr); decrRefCount(*valueptr); diff --git a/src/networking.c b/src/networking.c index 245e2ebdd8c..885863a18e5 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3757,7 +3757,9 @@ void replaceClientCommandVector(client *c, int argc, robj **argv) { * 1. Make sure there are no "holes" and all the arguments are set. * 2. If the original argument vector was longer than the one we * want to end with, it's up to the caller to set c->argc and - * free the no longer used objects on c->argv. */ + * free the no longer used objects on c->argv. + * 3. To remove argument at i'th index, pass NULL as new value + */ void rewriteClientCommandArgument(client *c, int i, robj *newval) { robj *oldval; retainOriginalCommandVector(c); @@ -3775,9 +3777,18 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { } oldval = c->argv[i]; if (oldval) c->argv_len_sum -= getStringObjectLen(oldval); - if (newval) c->argv_len_sum += getStringObjectLen(newval); - c->argv[i] = newval; - incrRefCount(newval); + + if (newval) { + c->argv[i] = newval; + incrRefCount(newval); + c->argv_len_sum += getStringObjectLen(newval); + } else { + /* move the remaining arguments one step left */ + for (int j = i+1; j < c->argc; j++) { + c->argv[j-1] = c->argv[j]; + } + c->argv[--c->argc] = NULL; + } if (oldval) decrRefCount(oldval); /* If this is the command name make sure to fix c->cmd. */ diff --git a/src/rdb.c b/src/rdb.c index 6053cf40ac8..7ec20dfd35b 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2236,7 +2236,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, size_t fieldLen; sds value, field; - uint64_t expire, minExpire = EB_EXPIRE_TIME_INVALID; + uint64_t expireAt, minExpire = EB_EXPIRE_TIME_INVALID; mstime_t now = mstime(); dict *dupSearchDict = NULL; ebuckets *hexpires = (db != NULL ? &db->hexpires : NULL); @@ -2266,14 +2266,14 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, len--; /* read the TTL */ - if (rdbLoadLenByRef(rdb, NULL, &expire) == -1) { + if (rdbLoadLenByRef(rdb, NULL, &expireAt) == -1) { serverLog(LL_WARNING, "failed reading hash TTL"); decrRefCount(o); if (dupSearchDict != NULL) dictRelease(dupSearchDict); return NULL; } - if (expire > EB_EXPIRE_TIME_MAX) { - rdbReportCorruptRDB("invalid expire time: %llu", (unsigned long long)expire); + if (expireAt > EB_EXPIRE_TIME_MAX) { + rdbReportCorruptRDB("invalid expireAt time: %llu", (unsigned long long)expireAt); decrRefCount(o); return NULL; } @@ -2306,11 +2306,26 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, * Expired hash fields on the master are silently discarded. * Note that if all fields in a hash has expired, the hash would not be * created in memory (because it is created on the first valid field), and - * thus the key would be discarded as an "empty key" */ - if (expire != 0 && iAmMaster() && ((mstime_t)expire < now) && /* note: expire was saved to RDB as unix-time in milliseconds */ + * thus the key would be discarded as an "expired hash" */ + if (expireAt != 0 && iAmMaster() && ((mstime_t)expireAt < now) && !(rdbflags & RDBFLAGS_AOF_PREAMBLE)) { - /* TODO: consider replication (like in rdbLoadAddKeyToDb) */ + /* TODO: aggregate HDELs and send them to replicas. */ + if (rdbflags & RDBFLAGS_FEED_REPL && db) { + /* Caller should have created replication backlog, + * and now this path only works when rebooting, + * so we don't have replicas yet. */ + serverAssert(server.repl_backlog != NULL && listLength(server.slaves) == 0); + robj keyobj, fieldObj; + initStaticStringObject(keyobj,key); + initStaticStringObject(fieldObj, field); + robj *argv[3]; + argv[0] = shared.hdel; + argv[1] = &keyobj; + argv[2] = &fieldObj; + replicationFeedSlaves(server.slaves,db->id,argv,3); + } + server.rdb_last_load_hash_fields_expired++; sdsfree(field); sdsfree(value); @@ -2318,7 +2333,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, } /* keep the nearest expiration to connect listpack object to db expiry */ - if ((expire != 0) && (expire < minExpire)) minExpire = expire; + if ((expireAt != 0) && (expireAt < minExpire)) minExpire = expireAt; /* store the values read - either to listpack or dict */ if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { @@ -2340,7 +2355,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, /* check if the values can be saved to listpack (or should convert to dict encoding) */ if (sdslen(field) > server.hash_max_listpack_value || sdslen(value) > server.hash_max_listpack_value || - !lpSafeToAdd(((listpackEx*)o->ptr)->lp, sdslen(field) + sdslen(value) + lpEntrySizeInteger(expire))) + !lpSafeToAdd(((listpackEx*)o->ptr)->lp, sdslen(field) + sdslen(value) + lpEntrySizeInteger(expireAt))) { /* convert to hash */ hashTypeConvert(o, OBJ_ENCODING_HT, hexpires); @@ -2358,7 +2373,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, /* don't add the values to the new hash: the next if will catch and the values will be added there */ } else { - listpackExAddNew(o, field, value, expire); + listpackExAddNew(o, field, value, expireAt); sdsfree(field); sdsfree(value); } @@ -2367,10 +2382,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, if (o->encoding == OBJ_ENCODING_HT) { /* WA for check-rdb mode, when there's no DB so can't attach expired items to ebuckets, * or when no expiry was not set for this field */ - if ((db == NULL) || (expire == 0)) { + if ((db == NULL) || (expireAt == 0)) { hashTypeSet(db, o, field, value, 0); } else { - if (hashTypeSetExRdb(db, o, field, value, expire) != C_OK) { + if (hashTypeSetExRdb(db, o, field, value, expireAt) != C_OK) { serverLog(LL_WARNING, "failed adding hash field %s to key %s", field, key); decrRefCount(o); if (dupSearchDict != NULL) dictRelease(dupSearchDict); @@ -2389,7 +2404,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, /* check for empty key (if all fields were expired) */ if (hashTypeLength(o, 0) == 0) { decrRefCount(o); - goto emptykey; + goto expiredHash; } if ((db != NULL) && (minExpire != EB_EXPIRE_TIME_INVALID)) hashTypeAddToExpires(db, key, o, minExpire); @@ -2719,31 +2734,15 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, /* for TTL listpack, find the minimum expiry */ uint64_t minExpire = hashTypeGetNextTimeToExpire(o); - /* check if need to convert to dict encoding */ - if ((db != NULL) && - (hashTypeLength(o, 0) > server.hash_max_listpack_entries)) /* TODO: each field length is not verified against server.hash_max_listpack_value */ - { - hashTypeConvert(o, OBJ_ENCODING_HT, &db->hexpires); - /* - * hashTypeAddToExpires is presumably called from within - * the convert function (from listpackEx to dict), BUT, - * this call depends on the lpt->meta field to be updated, - * which is not the case here as hashTypeAddToExpires was - * not yet called for the listpack (which is what updating - * its meta). - * Instead, this "manual" call is added here. - * Another approach would be to have the conversion function - * find the minExpire by itself when iterating on the listpack - * instead of relying on the meta and use this value for the - * final ebAdd call. - */ + /* Convert listpack to hash table without registering in global HFE DS, + * if has HFEs, since the listpack is not connected yet to the DB */ + if (hashTypeLength(o, 0) > server.hash_max_listpack_entries) + hashTypeConvert(o, OBJ_ENCODING_HT, NULL /*db->hexpires*/); + + /* Only now register in global HFE DS, if db provided */ + if (db != NULL) hashTypeAddToExpires(db, key, o, minExpire); - } else if (rdbtype == RDB_TYPE_HASH_LISTPACK_EX) { - /* connect the listpack to the DB-global expiry data structure */ - if ((minExpire != EB_EXPIRE_TIME_INVALID) && (db != NULL)) { /* DB can be NULL when checking rdb */ - hashTypeAddToExpires(db, key, o, minExpire); - } - } + break; default: /* totally unreachable */ @@ -3134,6 +3133,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, emptykey: if (error) *error = RDB_LOAD_ERR_EMPTY_KEY; return NULL; +expiredHash: + if (error) *error = RDB_LOAD_ERR_EXPIRED_HASH; + return NULL; } /* Mark that we are loading in the global state and setup the fields @@ -3563,6 +3565,9 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin if(empty_keys_skipped++ < 10) serverLog(LL_NOTICE, "rdbLoadObject skipping empty key: %s", key); sdsfree(key); + } else if (error == RDB_LOAD_ERR_EXPIRED_HASH) { + /* Valid flow. Continue. */ + sdsfree(key); } else { sdsfree(key); goto eoferr; diff --git a/src/rdb.h b/src/rdb.h index 2f4c49954aa..73a6a5b67c1 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -119,8 +119,9 @@ /* When rdbLoadObject() returns NULL, the err flag is * set to hold the type of error that occurred */ -#define RDB_LOAD_ERR_EMPTY_KEY 1 /* Error of empty key */ -#define RDB_LOAD_ERR_OTHER 2 /* Any other errors */ +#define RDB_LOAD_ERR_EMPTY_KEY 1 /* Error of empty key */ +#define RDB_LOAD_ERR_EXPIRED_HASH 2 /* Expired hash since all its fields are expired */ +#define RDB_LOAD_ERR_OTHER 3 /* Any other errors */ ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len); int rdbSaveType(rio *rdb, unsigned char type); diff --git a/src/server.c b/src/server.c index 4a39fd07b04..fa02678d11f 100644 --- a/src/server.c +++ b/src/server.c @@ -1959,6 +1959,8 @@ void createSharedObjects(void) { shared.persist = createStringObject("PERSIST",7); shared.set = createStringObject("SET",3); shared.eval = createStringObject("EVAL",4); + shared.hpexpireat = createStringObject("HPEXPIREAT",10); + shared.hdel = createStringObject("HDEL",4); /* Shared command argument */ shared.left = createStringObject("left",4); diff --git a/src/server.h b/src/server.h index 8e02a042439..3d082957a35 100644 --- a/src/server.h +++ b/src/server.h @@ -1317,7 +1317,8 @@ struct sharedObjectsStruct { *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink, *rpop, *lpop, *lpush, *rpoplpush, *lmove, *blmove, *zpopmin, *zpopmax, *emptyscan, *multi, *exec, *left, *right, *hset, *srem, *xgroup, *xclaim, - *script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire, + *script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire, + *hdel, *hpexpireat, *time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread, *lastid, *ping, *setid, *keepttl, *load, *createconsumer, *getack, *special_asterick, *special_equals, *default_username, *redacted, @@ -3178,8 +3179,8 @@ typedef struct listpackEx { void hashTypeConvert(robj *o, int enc, ebuckets *hexpires); void hashTypeTryConversion(redisDb *db, robj *subject, robj **argv, int start, int end); -int hashTypeExists(robj *o, sds key); -int hashTypeDelete(robj *o, sds key); +int hashTypeExists(redisDb *db, robj *o, sds key, int *isHashDeleted); +int hashTypeDelete(robj *o, void *key, int isSdsField); unsigned long hashTypeLength(const robj *o, int subtractExpiredFields); hashTypeIterator *hashTypeInitIterator(robj *subject); void hashTypeReleaseIterator(hashTypeIterator *hi); @@ -3195,18 +3196,18 @@ void hashTypeCurrentObject(hashTypeIterator *hi, int what, unsigned char **vstr, unsigned int *vlen, long long *vll, uint64_t *expireTime); sds hashTypeCurrentObjectNewSds(hashTypeIterator *hi, int what); hfield hashTypeCurrentObjectNewHfield(hashTypeIterator *hi); -robj *hashTypeGetValueObject(robj *o, sds field); +robj *hashTypeGetValueObject(redisDb *db, robj *o, sds field, int *isHashDeleted); int hashTypeSet(redisDb *db, robj *o, sds field, sds value, int flags); robj *hashTypeDup(robj *o, sds newkey, uint64_t *minHashExpire); uint64_t hashTypeRemoveFromExpires(ebuckets *hexpires, robj *o); void hashTypeAddToExpires(redisDb *db, sds key, robj *hashObj, uint64_t expireTime); void hashTypeFree(robj *o); int hashTypeIsExpired(const robj *o, uint64_t expireAt); +uint64_t hashTypeGetMinExpire(robj *o); unsigned char *hashTypeListpackGetLp(robj *o); uint64_t hashTypeGetMinExpire(robj *o); void hashTypeUpdateKeyRef(robj *o, sds newkey); ebuckets *hashTypeGetDictMetaHFE(dict *d); -void listpackExExpire(robj *o, ExpireInfo *info); int hashTypeSetExRdb(redisDb *db, robj *o, sds field, sds value, uint64_t expire_at); uint64_t hashTypeGetMinExpire(robj *keyObj); uint64_t hashTypeGetNextTimeToExpire(robj *o); diff --git a/src/sort.c b/src/sort.c index 426ff0c1d05..d45c380ac39 100644 --- a/src/sort.c +++ b/src/sort.c @@ -94,7 +94,12 @@ robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { /* Retrieve value from hash by the field name. The returned object * is a new object with refcount already incremented. */ - o = hashTypeGetValueObject(o, fieldobj->ptr); + int isHashDeleted; + o = hashTypeGetValueObject(db, o, fieldobj->ptr, &isHashDeleted); + + if (isHashDeleted) + goto noobj; + } else { if (o->type != OBJ_STRING) goto noobj; diff --git a/src/t_hash.c b/src/t_hash.c index 8f5f0ee3f65..4642cb196e5 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -14,6 +14,18 @@ * update the expiration time of the hash object in global HFE DS. */ #define HASH_NEW_EXPIRE_DIFF_THRESHOLD max(4000, 1<encoding == OBJ_ENCODING_LISTPACK_EX); - uint64_t min = EB_EXPIRE_TIME_INVALID; + uint64_t expired = 0, min = EB_EXPIRE_TIME_INVALID; + unsigned char *ptr; listpackEx *lpt = o->ptr; - struct lpFingArgs r = { - .max_to_search = info->maxToExpire, - .expire_time = info->now - }; + ptr = lpFirst(lpt->lp); - lpFindCb(lpt->lp, NULL, &r, cbFindInListpack, 0); - info->itemsExpired += r.expired; + while (ptr != NULL && (info->itemsExpired < info->maxToExpire)) { + long long val; + int64_t flen; + unsigned char intbuf[LP_INTBUF_SIZE], *fref; + + fref = lpGet(ptr, &flen, intbuf); + + ptr = lpNext(lpt->lp, ptr); + serverAssert(ptr); + ptr = lpNext(lpt->lp, ptr); + serverAssert(ptr && lpGetIntegerValue(ptr, &val)); + + /* Fields are ordered by expiry time. If we reached to a non-expired + * or a non-volatile field, we know rest is not yet expired. */ + if (val == HASH_LP_NO_TTL || (uint64_t) val > info->now) + break; + + propagateHashFieldDeletion(db, ((listpackEx *) o->ptr)->key, (char *)((fref) ? fref : intbuf), flen); + + ptr = lpNext(lpt->lp, ptr); + + info->itemsExpired++; + expired++; + } - /* Delete all the expired fields in one go */ - if (r.expired > 0) - lpt->lp = lpDeleteRange(lpt->lp, 0, r.expired * 3); + if (expired) + lpt->lp = lpDeleteRange(lpt->lp, 0, expired * 3); min = hashTypeGetNextTimeToExpire(o); info->nextExpireTime = (min != EB_EXPIRE_TIME_INVALID) ? min : 0; @@ -524,7 +560,8 @@ SetExRes hashTypeSetExpiryListpack(HashTypeSetEx *ex, sds field, } if (prevExpire == EB_EXPIRE_TIME_INVALID) { - if (ex->expireSetCond & (HFE_XX | HFE_LT | HFE_GT)) + /* For fields without expiry, LT condition is considered valid */ + if (ex->expireSetCond & (HFE_XX | HFE_GT)) return HSETEX_NO_CONDITION_MET; } else { if (((ex->expireSetCond == HFE_GT) && (prevExpire >= expireAt)) || @@ -539,7 +576,7 @@ SetExRes hashTypeSetExpiryListpack(HashTypeSetEx *ex, sds field, /* if expiration time is in the past */ if (unlikely(checkAlreadyExpired(expireAt))) { - hashTypeDelete(ex->hashObj, field); + hashTypeDelete(ex->hashObj, field, 1); ex->fieldDeleted++; return HSETEX_DELETED; } @@ -615,13 +652,14 @@ void hashTypeTryConversion(redisDb *db, robj *o, robj **argv, int start, int end hashTypeConvert(o, OBJ_ENCODING_HT, &db->hexpires); } -/* Get the value from a listpack encoded hash, identified by field. - * Returns -1 when the field cannot be found. */ -int hashTypeGetFromListpack(robj *o, sds field, +/* Get the value from a listpack encoded hash, identified by field. */ +GetFieldRes hashTypeGetFromListpack(robj *o, sds field, unsigned char **vstr, unsigned int *vlen, - long long *vll) + long long *vll, + uint64_t *expiredAt) { + *expiredAt = EB_EXPIRE_TIME_INVALID; unsigned char *zl, *fptr = NULL, *vptr = NULL; if (o->encoding == OBJ_ENCODING_LISTPACK) { @@ -649,9 +687,8 @@ int hashTypeGetFromListpack(robj *o, sds field, h = lpNext(lpt->lp, vptr); serverAssert(h && lpGetIntegerValue(h, &expire)); - - if (hashTypeIsExpired(o, expire)) - return -1; + if (expire != HASH_LP_NO_TTL) + *expiredAt = expire; } } } else { @@ -660,96 +697,156 @@ int hashTypeGetFromListpack(robj *o, sds field, if (vptr != NULL) { *vstr = lpGetValue(vptr, vlen, vll); - return 0; + return GETF_OK; } - return -1; + return GETF_NOT_FOUND; } /* Get the value from a hash table encoded hash, identified by field. * Returns NULL when the field cannot be found, otherwise the SDS value * is returned. */ -sds hashTypeGetFromHashTable(robj *o, sds field) { +GetFieldRes hashTypeGetFromHashTable(robj *o, sds field, sds *value, uint64_t *expiredAt) { dictEntry *de; + *expiredAt = EB_EXPIRE_TIME_INVALID; + serverAssert(o->encoding == OBJ_ENCODING_HT); de = dictFind(o->ptr, field); - if (de == NULL) return NULL; + if (de == NULL) + return GETF_NOT_FOUND; - /* Check if the field is expired */ - if (hfieldIsExpired(dictGetKey(de))) return NULL; - - return dictGetVal(de); + *expiredAt = hfieldGetExpireTime(dictGetKey(de)); + *value = (sds) dictGetVal(de); + return GETF_OK; } /* Higher level function of hashTypeGet*() that returns the hash value - * associated with the specified field. If the field is found C_OK - * is returned, otherwise C_ERR. The returned object is returned by - * reference in either *vstr and *vlen if it's returned in string form, - * or stored in *vll if it's returned as a number. + * associated with the specified field. * - * If *vll is populated *vstr is set to NULL, so the caller - * can always check the function return by checking the return value - * for C_OK and checking if vll (or vstr) is NULL. */ -int hashTypeGetValue(robj *o, sds field, unsigned char **vstr, unsigned int *vlen, long long *vll) { + * Returned: + * - GetFieldRes: OK: Return Field's valid value + * NOT_FOUND: Field was not found. + * EXPIRED: Field is expired and Lazy deleted + * EXPIRED_HASH: Returned only if the field is the last one in the + * hash and the hash is deleted. + * - vstr, vlen : if string, ref in either *vstr and *vlen if it's + * returned in string form, + * - vll : or stored in *vll if it's returned as a number. + * If *vll is populated *vstr is set to NULL, so the caller can + * always check the function return by checking the return value + * for GETF_OK and checking if vll (or vstr) is NULL. + * + */ +GetFieldRes hashTypeGetValue(redisDb *db, robj *o, sds field, unsigned char **vstr, + unsigned int *vlen, long long *vll) { + uint64_t expiredAt; + sds key; + GetFieldRes res; if (o->encoding == OBJ_ENCODING_LISTPACK || - o->encoding == OBJ_ENCODING_LISTPACK_EX) - { + o->encoding == OBJ_ENCODING_LISTPACK_EX) { *vstr = NULL; - if (hashTypeGetFromListpack(o, field, vstr, vlen, vll) == 0) - return C_OK; + res = hashTypeGetFromListpack(o, field, vstr, vlen, vll, &expiredAt); + + if (res == GETF_NOT_FOUND) + return GETF_NOT_FOUND; + } else if (o->encoding == OBJ_ENCODING_HT) { - sds value; - if ((value = hashTypeGetFromHashTable(o, field)) != NULL) { - *vstr = (unsigned char*) value; - *vlen = sdslen(value); - return C_OK; - } + sds value = NULL; + res = hashTypeGetFromHashTable(o, field, &value, &expiredAt); + + if (res == GETF_NOT_FOUND) + return GETF_NOT_FOUND; + + *vstr = (unsigned char*) value; + *vlen = sdslen(value); } else { serverPanic("Unknown hash encoding"); } - return C_ERR; + + /* Don't expire anything while loading. It will be done later. */ + if (server.loading) + return GETF_OK; + + if (server.lazy_expire_disabled) + return GETF_OK; + + if ((server.masterhost) && (server.current_client && (server.current_client->flags & CLIENT_MASTER))) + return GETF_OK; + + if (expiredAt >= (uint64_t) commandTimeSnapshot()) + return GETF_OK; + + /* Got expired. Extract attached key from LISTPACK_EX/HT */ + if (o->encoding == OBJ_ENCODING_LISTPACK_EX) + key = ((listpackEx *) o->ptr)->key; + else + key = ((dictExpireMetadata *) dictMetadata((dict*)o->ptr))->key; + + /* delete the field and propagate the deletion */ + serverAssert(hashTypeDelete(o, field, 1) == 1); + propagateHashFieldDeletion(db, key, field, sdslen(field)); + + /* If the field is the last one in the hash, then the hash will be deleted */ + if (hashTypeLength(o, 0) == 0) { + robj *keyObj = createStringObject(key, sdslen(key)); + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", keyObj, db->id); + dbDelete(db,keyObj); + decrRefCount(keyObj); + return GETF_EXPIRED_HASH; + } + + return GETF_EXPIRED; } /* Like hashTypeGetValue() but returns a Redis object, which is useful for * interaction with the hash type outside t_hash.c. * The function returns NULL if the field is not found in the hash. Otherwise - * a newly allocated string object with the value is returned. */ -robj *hashTypeGetValueObject(robj *o, sds field) { + * a newly allocated string object with the value is returned. + * + * isHashDeleted - If attempted to access expired field and it's the last field + * in the hash, then the hash will as well be deleted. In this case, + * isHashDeleted will be set to 1. + */ +robj *hashTypeGetValueObject(redisDb *db, robj *o, sds field, int *isHashDeleted) { unsigned char *vstr; unsigned int vlen; long long vll; - if (hashTypeGetValue(o,field,&vstr,&vlen,&vll) == C_ERR) return NULL; - if (vstr) return createStringObject((char*)vstr,vlen); - else return createStringObjectFromLongLong(vll); -} + *isHashDeleted = 0; /*default*/ + GetFieldRes res = hashTypeGetValue(db,o,field,&vstr,&vlen,&vll); -/* Higher level function using hashTypeGet*() to return the length of the - * object associated with the requested field, or 0 if the field does not - * exist. */ -size_t hashTypeGetValueLength(robj *o, sds field) { - size_t len = 0; - unsigned char *vstr = NULL; - unsigned int vlen = UINT_MAX; - long long vll = LLONG_MAX; + if (res == GETF_OK) { + if (vstr) return createStringObject((char*)vstr,vlen); + else return createStringObjectFromLongLong(vll); + } - if (hashTypeGetValue(o, field, &vstr, &vlen, &vll) == C_OK) - len = vstr ? vlen : sdigits10(vll); + if (res == GETF_EXPIRED_HASH) + *isHashDeleted = 1; - return len; + /* GETF_EXPIRED_HASH, GETF_EXPIRED, GETF_NOT_FOUND */ + return NULL; } -/* Test if the specified field exists in the given hash. Returns 1 if the field - * exists, and 0 when it doesn't. */ -int hashTypeExists(robj *o, sds field) { +/* Test if the specified field exists in the given hash. If the field is + * expired (HFE), then it will be lazy deleted + * + * Returns 1 if the field exists, and 0 when it doesn't. + * + * isHashDeleted - If attempted to access expired field and it is the last field + * in the hash, then the hash will as well be deleted. In this case, + * isHashDeleted will be set to 1. + */ +int hashTypeExists(redisDb *db, robj *o, sds field, int *isHashDeleted) { unsigned char *vstr = NULL; unsigned int vlen = UINT_MAX; long long vll = LLONG_MAX; - return hashTypeGetValue(o, field, &vstr, &vlen, &vll) == C_OK; + GetFieldRes res = hashTypeGetValue(db, o, field, &vstr, &vlen, &vll); + *isHashDeleted = (res == GETF_EXPIRED_HASH) ? 1 : 0; + return (res == GETF_OK) ? 1 : 0; } /* Add a new field, overwrite the old with the new value if it already exists. @@ -759,8 +856,9 @@ int hashTypeExists(robj *o, sds field) { * caller retains ownership of the strings passed. However this behavior * can be effected by passing appropriate flags (possibly bitwise OR-ed): * - * HASH_SET_TAKE_FIELD -- The SDS field ownership passes to the function. - * HASH_SET_TAKE_VALUE -- The SDS value ownership passes to the function. + * HASH_SET_TAKE_FIELD -- The SDS field ownership passes to the function. + * HASH_SET_TAKE_VALUE -- The SDS value ownership passes to the function. + * HASH_SET_KEEP_FIELD -- keep original field along with TTL if already exists * * When the flags are used the caller does not need to release the passed * SDS string(s). It's up to the function to use the string to create a new @@ -770,8 +868,9 @@ int hashTypeExists(robj *o, sds field) { * semantics of copying the values if needed. * */ -#define HASH_SET_TAKE_FIELD (1<<0) -#define HASH_SET_TAKE_VALUE (1<<1) +#define HASH_SET_TAKE_FIELD (1<<0) +#define HASH_SET_TAKE_VALUE (1<<1) +#define HASH_SET_KEEP_FIELD (1<<2) #define HASH_SET_COPY 0 int hashTypeSet(redisDb *db, robj *o, sds field, sds value, int flags) { HashTypeSet set = {value, flags}; @@ -817,7 +916,8 @@ SetExRes hashTypeSetExpiry(HashTypeSetEx *ex, sds field, uint64_t expireAt, dict /* If field doesn't have expiry metadata attached */ if (!hfieldIsExpireAttached(hfOld)) { - if (ex->expireSetCond & (HFE_XX | HFE_LT | HFE_GT)) { + /* For fields without expiry, LT condition is considered valid */ + if (ex->expireSetCond & (HFE_XX | HFE_GT)) { hfieldFree(hfNew); return HSETEX_NO_CONDITION_MET; } @@ -851,7 +951,7 @@ SetExRes hashTypeSetExpiry(HashTypeSetEx *ex, sds field, uint64_t expireAt, dict /* field has invalid expiry. No need to ebRemove() */ /* Check XX|LT|GT */ - if (ex->expireSetCond & (HFE_XX | HFE_LT | HFE_GT)) + if (ex->expireSetCond & (HFE_XX | HFE_GT)) return HSETEX_NO_CONDITION_MET; } @@ -864,7 +964,7 @@ SetExRes hashTypeSetExpiry(HashTypeSetEx *ex, sds field, uint64_t expireAt, dict /* if expiration time is in the past */ if (unlikely(checkAlreadyExpired(expireAt))) { - hashTypeDelete(ex->hashObj, field); + hashTypeDelete(ex->hashObj, field, 1); ex->fieldDeleted++; return HSETEX_DELETED; } @@ -898,22 +998,22 @@ SetExRes hashTypeSetEx(redisDb *db, robj *o, sds field, HashTypeSet *setKeyVal, /* Check if the field is too long for listpack, and convert before adding the item. * This is needed for HINCRBY* case since in other commands this is handled early by * hashTypeTryConversion, so this check will be a NOP. */ - if (isSetKeyValue && (o->encoding == OBJ_ENCODING_LISTPACK || - o->encoding == OBJ_ENCODING_LISTPACK_EX)) + if (o->encoding == OBJ_ENCODING_LISTPACK || + o->encoding == OBJ_ENCODING_LISTPACK_EX) { - if (sdslen(field) > server.hash_max_listpack_value || - sdslen(setKeyVal->value) > server.hash_max_listpack_value) + if ( (isSetKeyValue) && + (sdslen(field) > server.hash_max_listpack_value || + sdslen(setKeyVal->value) > server.hash_max_listpack_value) ) + { hashTypeConvert(o, OBJ_ENCODING_HT, &db->hexpires); + } else { + res = hashTypeSetExListpack(db, o, field, setKeyVal, expireAt, exInfo); + goto SetExDone; /*done*/ + } } - if (o->encoding == OBJ_ENCODING_LISTPACK || - o->encoding == OBJ_ENCODING_LISTPACK_EX) - { - res = hashTypeSetExListpack(db, o, field, setKeyVal, expireAt, exInfo); - goto SetExDone; - } else if (o->encoding != OBJ_ENCODING_HT) { + if (o->encoding != OBJ_ENCODING_HT) serverPanic("Unknown hash encoding"); - } /*** now deal with HT ***/ hfield newField; @@ -936,13 +1036,17 @@ SetExRes hashTypeSetEx(redisDb *db, robj *o, sds field, HashTypeSet *setKeyVal, /* If field already exists, then update "field". "Value" will be set afterward */ if (de == NULL) { - /* If attached TTL to the old field, then remove it from hash's private ebuckets */ - hfield oldField = dictGetKey(existing); - hfieldPersist(o, oldField); - - hfieldFree(oldField); + if (flags & HASH_SET_KEEP_FIELD) { + /* Not keep old field along with TTL */ + hfieldFree(newField); + } else { + /* If attached TTL to the old field, then remove it from hash's private ebuckets */ + hfield oldField = dictGetKey(existing); + hfieldPersist(o, oldField); + hfieldFree(oldField); + dictSetKey(ht, existing, newField); + } sdsfree(dictGetVal(existing)); - dictSetKey(ht, existing, newField); res = HSET_UPDATE; de = existing; } @@ -1009,13 +1113,11 @@ void initDictExpireMetadata(sds key, robj *o) { * done by function hashTypeSetExDone(). */ int hashTypeSetExInit(robj *key, robj *o, client *c, redisDb *db, const char *cmd, FieldSetCond fieldSetCond, - FieldGet fieldGet, ExpireSetCond expireSetCond, - HashTypeSetEx *ex) + ExpireSetCond expireSetCond, HashTypeSetEx *ex) { dict *ht = o->ptr; ex->fieldSetCond = fieldSetCond; - ex->fieldGet = fieldGet; /* TODO */ ex->expireSetCond = expireSetCond; ex->minExpire = EB_EXPIRE_TIME_INVALID; ex->c = c; @@ -1027,6 +1129,7 @@ int hashTypeSetExInit(robj *key, robj *o, client *c, redisDb *db, const char *cm ex->fieldUpdated = 0; ex->minExpireFields = EB_EXPIRE_TIME_INVALID; + /* Take care that HASH support expiration */ if (ex->hashObj->encoding == OBJ_ENCODING_LISTPACK) { hashTypeConvert(ex->hashObj, OBJ_ENCODING_LISTPACK_EX, &c->db->hexpires); @@ -1107,13 +1210,16 @@ void hashTypeSetExDone(HashTypeSetEx *ex) { /* Check if the field is too long for listpack, and convert before adding the item. * This is needed for HINCRBY* case since in other commands this is handled early by * hashTypeTryConversion, so this check will be a NOP. */ -static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeSet *s, - uint64_t expireAt, HashTypeSetEx *ex) +static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeSet *setParams, + uint64_t expireAt, HashTypeSetEx *exParams) { int res = HSETEX_OK; unsigned char *fptr = NULL, *vptr = NULL, *tptr = NULL; if (o->encoding == OBJ_ENCODING_LISTPACK) { + /* If reached here, then no need to set expiration. Otherwise, as precond + * listpack is converted to listpackex by hashTypeSetExInit() */ + unsigned char *zl = o->ptr; fptr = lpFirst(zl); if (fptr != NULL) { @@ -1125,14 +1231,14 @@ static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeS res = HSET_UPDATE; /* Replace value */ - zl = lpReplace(zl, &vptr, (unsigned char *) s->value, sdslen(s->value)); + zl = lpReplace(zl, &vptr, (unsigned char *) setParams->value, sdslen(setParams->value)); } } if (res != HSET_UPDATE) { /* Push new field/value pair onto the tail of the listpack */ zl = lpAppend(zl, (unsigned char*)field, sdslen(field)); - zl = lpAppend(zl, (unsigned char*)s->value, sdslen(s->value)); + zl = lpAppend(zl, (unsigned char*)setParams->value, sdslen(setParams->value)); } o->ptr = zl; goto out; @@ -1148,11 +1254,11 @@ static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeS vptr = lpNext(lpt->lp, fptr); serverAssert(vptr != NULL); - if (s) { + if (setParams) { /* Replace value */ lpt->lp = lpReplace(lpt->lp, &vptr, - (unsigned char *) s->value, - sdslen(s->value)); + (unsigned char *) setParams->value, + sdslen(setParams->value)); fptr = lpPrev(lpt->lp, vptr); serverAssert(fptr != NULL); @@ -1161,8 +1267,11 @@ static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeS tptr = lpNext(lpt->lp, vptr); serverAssert(tptr && lpGetIntegerValue(tptr, &expireTime)); - if (ex) { - res = hashTypeSetExpiryListpack(ex, field, fptr, vptr, tptr, + /* Keep, update or clear TTL */ + if (setParams && setParams->flags & HASH_SET_KEEP_FIELD) { + /* keep old field along with TTL */ + } else if (exParams) { + res = hashTypeSetExpiryListpack(exParams, field, fptr, vptr, tptr, expireAt); if (res != HSETEX_OK) goto out; @@ -1174,9 +1283,9 @@ static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeS } if (!fptr) { - if (s) { - listpackExAddNew(o, field, s->value, - ex ? expireAt : HASH_LP_NO_TTL); + if (setParams) { + listpackExAddNew(o, field, setParams->value, + exParams ? expireAt : HASH_LP_NO_TTL); } else { res = HSETEX_NO_FIELD; } @@ -1191,9 +1300,12 @@ static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeS } /* Delete an element from a hash. - * Return 1 on deleted and 0 on not found. */ -int hashTypeDelete(robj *o, sds field) { + * + * Return 1 on deleted and 0 on not found. + * isSdsField - 1 if the field is sds, 0 if it is hfield */ +int hashTypeDelete(robj *o, void *field, int isSdsField) { int deleted = 0; + int fieldLen = (isSdsField) ? sdslen((sds)field) : hfieldlen((hfield)field); if (o->encoding == OBJ_ENCODING_LISTPACK) { unsigned char *zl, *fptr; @@ -1201,7 +1313,7 @@ int hashTypeDelete(robj *o, sds field) { zl = o->ptr; fptr = lpFirst(zl); if (fptr != NULL) { - fptr = lpFind(zl, fptr, (unsigned char*)field, sdslen(field), 1); + fptr = lpFind(zl, fptr, (unsigned char*)field, fieldLen, 1); if (fptr != NULL) { /* Delete both of the key and the value. */ zl = lpDeleteRangeWithEntry(zl,&fptr,2); @@ -1215,7 +1327,7 @@ int hashTypeDelete(robj *o, sds field) { fptr = lpFirst(lpt->lp); if (fptr != NULL) { - fptr = lpFind(lpt->lp, fptr, (unsigned char*)field, sdslen(field), 2); + fptr = lpFind(lpt->lp, fptr, (unsigned char*)field, fieldLen, 2); if (fptr != NULL) { /* Delete field, value and ttl */ lpt->lp = lpDeleteRangeWithEntry(lpt->lp, &fptr, 3); @@ -1224,9 +1336,11 @@ int hashTypeDelete(robj *o, sds field) { } } else if (o->encoding == OBJ_ENCODING_HT) { /* dictDelete() will call dictHfieldDestructor() */ + dictUseStoredKeyApi((dict*)o->ptr, isSdsField ? 0 : 1); if (dictDelete((dict*)o->ptr, field) == C_OK) { deleted = 1; } + dictUseStoredKeyApi((dict*)o->ptr, 0); } else { serverPanic("Unknown hash encoding"); @@ -1296,6 +1410,7 @@ void hashTypeReleaseIterator(hashTypeIterator *hi) { /* Move to the next entry in the hash. Return C_OK when the next entry * could be found and C_ERR when the iterator reaches the end. */ int hashTypeNext(hashTypeIterator *hi, int skipExpiredFields) { + hi->expire_time = EB_EXPIRE_TIME_INVALID; if (hi->encoding == OBJ_ENCODING_LISTPACK) { unsigned char *zl; unsigned char *fptr, *vptr; @@ -1363,8 +1478,11 @@ int hashTypeNext(hashTypeIterator *hi, int skipExpiredFields) { hi->tptr = tptr; hi->expire_time = (expire_time != HASH_LP_NO_TTL) ? (uint64_t) expire_time : EB_EXPIRE_TIME_INVALID; } else if (hi->encoding == OBJ_ENCODING_HT) { + while ((hi->de = dictNext(hi->di)) != NULL) { - if (skipExpiredFields && hfieldIsExpired(dictGetKey(hi->de))) + hi->expire_time = hfieldGetExpireTime(dictGetKey(hi->de)); + /* this condition still valid if expire_time equals EB_EXPIRE_TIME_INVALID */ + if (skipExpiredFields && ((mstime_t)hi->expire_time < commandTimeSnapshot())) continue; return C_OK; } @@ -1417,11 +1535,8 @@ void hashTypeCurrentFromHashTable(hashTypeIterator *hi, int what, char **str, si *len = sdslen(val); } - if (expireTime) { - if (!key) key = dictGetKey(hi->de); - *expireTime = hfieldGetExpireTime( key ); - } - + if (expireTime) + *expireTime = hi->expire_time; } /* Higher level function of hashTypeCurrent*() that returns the hash value @@ -1446,7 +1561,6 @@ void hashTypeCurrentObject(hashTypeIterator *hi, { *vstr = NULL; hashTypeCurrentFromListpack(hi, what, vstr, vlen, vll, expireTime); - /* TODO-HFE: Handle expireTime */ } else if (hi->encoding == OBJ_ENCODING_HT) { char *ele; size_t eleLen; @@ -1534,7 +1648,6 @@ void hashTypeConvertListpack(robj *o, int enc) { dict = dictCreate(&mstrHashDictType); /* Presize the dict to avoid rehashing */ - /* TODO: activeExpire list pack. Should be small */ dictExpand(dict,hashTypeLength(o, 0)); while (hashTypeNext(hi, 0) != C_ERR) { @@ -1618,7 +1731,7 @@ void hashTypeConvertListpackEx(robj *o, int enc, ebuckets *hexpires) { } } -/* NOTE: hexpires can be NULL (Won't attempt to register in global HFE DS) */ +/* NOTE: hexpires can be NULL (Won't register in global HFE DS) */ void hashTypeConvert(robj *o, int enc, ebuckets *hexpires) { if (o->encoding == OBJ_ENCODING_LISTPACK) { hashTypeConvertListpack(o, enc); @@ -1785,11 +1898,10 @@ static ExpireAction hashTypeActiveExpire(eItem _hashObj, void *ctx) { if (hashObj->encoding == OBJ_ENCODING_LISTPACK_EX) { info = (ExpireInfo){ .maxToExpire = activeExpireCtx->fieldsToExpireQuota, - .ctx = hashObj, .now = commandTimeSnapshot(), .itemsExpired = 0}; - listpackExExpire(hashObj, &info); + listpackExExpire(activeExpireCtx->db, hashObj, &info); server.stat_expired_hash_fields += info.itemsExpired; keystr = ((listpackEx*)hashObj->ptr)->key; } else { @@ -1798,11 +1910,16 @@ static ExpireAction hashTypeActiveExpire(eItem _hashObj, void *ctx) { dict *d = hashObj->ptr; dictExpireMetadata *dictExpireMeta = (dictExpireMetadata *) dictMetadata(d); + OnFieldExpireCtx onFieldExpireCtx = { + .hashObj = hashObj, + .db = activeExpireCtx->db + }; + info = (ExpireInfo){ - .maxToExpire = activeExpireCtx->fieldsToExpireQuota, - .onExpireItem = onFieldExpire, - .ctx = hashObj, - .now = commandTimeSnapshot() + .maxToExpire = activeExpireCtx->fieldsToExpireQuota, + .onExpireItem = onFieldExpire, + .ctx = &onFieldExpireCtx, + .now = commandTimeSnapshot() }; ebExpire(&dictExpireMeta->hfe, &hashFieldExpireBucketsType, &info); @@ -1817,7 +1934,6 @@ static ExpireAction hashTypeActiveExpire(eItem _hashObj, void *ctx) { if (hashTypeLength(hashObj, 0) == 0) { robj *key = createStringObject(keystr, sdslen(keystr)); dbDelete(activeExpireCtx->db, key); - //notifyKeyspaceEvent(NOTIFY_HASH,"xxxxxxxxx",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key, activeExpireCtx->db->id); server.dirty++; signalModifiedKey(NULL, &server.db[0], key); @@ -1957,12 +2073,19 @@ uint64_t hashTypeDbActiveExpire(redisDb *db, uint32_t maxFieldsToExpire) { void hashTypeFree(robj *o) { switch (o->encoding) { case OBJ_ENCODING_HT: + /* Verify hash is not registered in global HFE ds */ + if (isDictWithMetaHFE((dict*)o->ptr)) { + dictExpireMetadata *m = (dictExpireMetadata *)dictMetadata((dict*)o->ptr); + serverAssert(m->expireMeta.trash == 1); + } dictRelease((dict*) o->ptr); break; case OBJ_ENCODING_LISTPACK: lpFree(o->ptr); break; case OBJ_ENCODING_LISTPACK_EX: + /* Verify hash is not registered in global HFE ds */ + serverAssert(((listpackEx *) o->ptr)->meta.trash == 1); listpackExFree(o->ptr); break; default: @@ -1994,19 +2117,27 @@ ebuckets *hashTypeGetDictMetaHFE(dict *d) { *----------------------------------------------------------------------------*/ void hsetnxCommand(client *c) { + int isHashDeleted; robj *o; if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; - if (hashTypeExists(o, c->argv[2]->ptr)) { + if (hashTypeExists(c->db, o, c->argv[2]->ptr, &isHashDeleted)) { addReply(c, shared.czero); - } else { - hashTypeTryConversion(c->db, o,c->argv,2,3); - hashTypeSet(c->db, o,c->argv[2]->ptr,c->argv[3]->ptr,HASH_SET_COPY); - addReply(c, shared.cone); - signalModifiedKey(c,c->db,c->argv[1]); - notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id); - server.dirty++; + return; } + + /* Field expired and in turn hash deleted. Create new one! */ + if (isHashDeleted) { + o = createHashObject(); + dbAdd(c->db,c->argv[1],o); + } + + hashTypeTryConversion(c->db, o,c->argv,2,3); + hashTypeSet(c->db, o,c->argv[2]->ptr,c->argv[3]->ptr,HASH_SET_COPY); + addReply(c, shared.cone); + signalModifiedKey(c,c->db,c->argv[1]); + notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id); + server.dirty++; } void hsetCommand(client *c) { @@ -2047,14 +2178,21 @@ void hincrbyCommand(client *c) { if (getLongLongFromObjectOrReply(c,c->argv[3],&incr,NULL) != C_OK) return; if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; - if (hashTypeGetValue(o,c->argv[2]->ptr,&vstr,&vlen,&value) == C_OK) { + + GetFieldRes res = hashTypeGetValue(c->db, o,c->argv[2]->ptr,&vstr,&vlen,&value); + if (res == GETF_OK) { if (vstr) { if (string2ll((char*)vstr,vlen,&value) == 0) { addReplyError(c,"hash value is not an integer"); return; } } /* Else hashTypeGetValue() already stored it into &value */ + } else if ((res == GETF_NOT_FOUND) || (res == GETF_EXPIRED)) { + value = 0; } else { + /* Field expired and in turn hash deleted. Create new one! */ + o = createHashObject(); + dbAdd(c->db,c->argv[1],o); value = 0; } @@ -2066,7 +2204,7 @@ void hincrbyCommand(client *c) { } value += incr; new = sdsfromlonglong(value); - hashTypeSet(c->db, o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE); + hashTypeSet(c->db, o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE | HASH_SET_KEEP_FIELD); addReplyLongLong(c,value); signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_HASH,"hincrby",c->argv[1],c->db->id); @@ -2087,7 +2225,8 @@ void hincrbyfloatCommand(client *c) { return; } if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; - if (hashTypeGetValue(o,c->argv[2]->ptr,&vstr,&vlen,&ll) == C_OK) { + GetFieldRes res = hashTypeGetValue(c->db, o,c->argv[2]->ptr,&vstr,&vlen,&ll); + if (res == GETF_OK) { if (vstr) { if (string2ld((char*)vstr,vlen,&value) == 0) { addReplyError(c,"hash value is not a float"); @@ -2096,7 +2235,12 @@ void hincrbyfloatCommand(client *c) { } else { value = (long double)ll; } + } else if ((res == GETF_NOT_FOUND) || (res == GETF_EXPIRED)) { + value = 0; } else { + /* Field expired and in turn hash deleted. Create new one! */ + o = createHashObject(); + dbAdd(c->db,c->argv[1],o); value = 0; } @@ -2109,7 +2253,7 @@ void hincrbyfloatCommand(client *c) { char buf[MAX_LONG_DOUBLE_CHARS]; int len = ld2string(buf,sizeof(buf),value,LD_STR_HUMAN); new = sdsnewlen(buf,len); - hashTypeSet(c->db, o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE); + hashTypeSet(c->db, o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE | HASH_SET_KEEP_FIELD); addReplyBulkCBuffer(c,buf,len); signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_HASH,"hincrbyfloat",c->argv[1],c->db->id); @@ -2125,17 +2269,18 @@ void hincrbyfloatCommand(client *c) { decrRefCount(newobj); } -static void addHashFieldToReply(client *c, robj *o, sds field) { +static GetFieldRes addHashFieldToReply(client *c, robj *o, sds field) { if (o == NULL) { addReplyNull(c); - return; + return GETF_NOT_FOUND; } unsigned char *vstr = NULL; unsigned int vlen = UINT_MAX; long long vll = LLONG_MAX; - if (hashTypeGetValue(o, field, &vstr, &vlen, &vll) == C_OK) { + GetFieldRes res = hashTypeGetValue(c->db, o, field, &vstr, &vlen, &vll); + if (res == GETF_OK) { if (vstr) { addReplyBulkCBuffer(c, vstr, vlen); } else { @@ -2144,6 +2289,7 @@ static void addHashFieldToReply(client *c, robj *o, sds field) { } else { addReplyNull(c); } + return res; } void hgetCommand(client *c) { @@ -2156,6 +2302,7 @@ void hgetCommand(client *c) { } void hmgetCommand(client *c) { + GetFieldRes res = GETF_OK; robj *o; int i; @@ -2165,8 +2312,17 @@ void hmgetCommand(client *c) { if (checkType(c,o,OBJ_HASH)) return; addReplyArrayLen(c, c->argc-2); - for (i = 2; i < c->argc; i++) { - addHashFieldToReply(c, o, c->argv[i]->ptr); + for (i = 2; i < c->argc ; i++) { + + res = addHashFieldToReply(c, o, c->argv[i]->ptr); + + /* If hash got lazy expired since all fields are expired (o is invalid), + * then fill the rest with trivial nulls and return */ + if (res == GETF_EXPIRED_HASH) { + for ( ++i ; i < c->argc; i++) + addReply(c, shared.null[c->resp]); + return; + } } } @@ -2178,7 +2334,7 @@ void hdelCommand(client *c) { checkType(c,o,OBJ_HASH)) return; for (j = 2; j < c->argc; j++) { - if (hashTypeDelete(o,c->argv[j]->ptr)) { + if (hashTypeDelete(o,c->argv[j]->ptr, 1)) { deleted++; if (hashTypeLength(o, 0) == 0) { dbDelete(c->db,c->argv[1]); @@ -2209,10 +2365,22 @@ void hlenCommand(client *c) { void hstrlenCommand(client *c) { robj *o; + unsigned char *vstr = NULL; + unsigned int vlen = UINT_MAX; + long long vll = LLONG_MAX; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,OBJ_HASH)) return; - addReplyLongLong(c,hashTypeGetValueLength(o,c->argv[2]->ptr)); + + GetFieldRes res = hashTypeGetValue(c->db, o, c->argv[2]->ptr, &vstr, &vlen, &vll); + + if (res == GETF_NOT_FOUND || res == GETF_EXPIRED || res == GETF_EXPIRED_HASH) { + addReply(c, shared.czero); + return; + } + + size_t len = vstr ? vlen : sdigits10(vll); + addReplyLongLong(c,len); } static void addHashIteratorCursorToReply(client *c, hashTypeIterator *hi, int what) { @@ -2295,10 +2463,11 @@ void hgetallCommand(client *c) { void hexistsCommand(client *c) { robj *o; + int isHashDeleted; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,OBJ_HASH)) return; - addReply(c, hashTypeExists(o,c->argv[2]->ptr) ? shared.cone : shared.czero); + addReply(c, hashTypeExists(c->db, o,c->argv[2]->ptr, &isHashDeleted) ? shared.cone : shared.czero); } void hscanCommand(client *c) { @@ -2669,14 +2838,36 @@ int hfieldIsExpired(hfield field) { /*----------------------------------------------------------------------------- * Hash Field Expiration (HFE) *----------------------------------------------------------------------------*/ -/* Called during active expiration of hash-fields */ +static void propagateHashFieldDeletion(redisDb *db, sds key, char *field, size_t fieldLen) { + robj *argv[] = { + shared.hdel, + createStringObject((char*) key, sdslen(key)), + createStringObject(field, fieldLen) + }; + + enterExecutionUnit(1, 0); + int prev_replication_allowed = server.replication_allowed; + server.replication_allowed = 1; + alsoPropagate(db->id,argv, 3, PROPAGATE_AOF|PROPAGATE_REPL); + server.replication_allowed = prev_replication_allowed; + exitExecutionUnit(); + + /* Propagate the HDEL command */ + postExecutionUnitOperations(); + + decrRefCount(argv[1]); + decrRefCount(argv[2]); +} + +/* Called during active expiration of hash-fields. Propagate to replica & Delete. */ static ExpireAction onFieldExpire(eItem item, void *ctx) { + OnFieldExpireCtx *expCtx = ctx; hfield hf = item; - robj *hashobj = (robj *) ctx; - dictUseStoredKeyApi((dict *)hashobj->ptr, 1); - hashTypeDelete(hashobj, hf); + dict *d = expCtx->hashObj->ptr; + dictExpireMetadata *dictExpireMeta = (dictExpireMetadata *) dictMetadata(d); + propagateHashFieldDeletion(expCtx->db, dictExpireMeta->key, hf, hfieldlen(hf)); + serverAssert(hashTypeDelete(expCtx->hashObj, hf, 0) == 1); server.stat_expired_hash_fields++; - dictUseStoredKeyApi((dict *)hashobj->ptr, 0); return ACT_REMOVE_EXP_ITEM; } @@ -2888,17 +3079,31 @@ static void hexpireGenericCommand(client *c, const char *cmd, long long basetime HashTypeSetEx exCtx; hashTypeSetExInit(keyArg, hashObj, c, c->db, cmd, FIELD_DONT_CREATE2, - FIELD_GET_NONE, expireSetCond, &exCtx); addReplyArrayLen(c, numFields); + for (int i = 0 ; i < numFields ; i++) { sds field = c->argv[numFieldsAt+i+1]->ptr; SetExRes res = hashTypeSetEx(c->db, hashObj, field, NULL, expire, &exCtx); addReplyLongLong(c,res); } hashTypeSetExDone(&exCtx); + + /* rewrite command for the replica sake */ + + /* Propagate as HPEXPIREAT millisecond-timestamp. Rewrite only if not already */ + if (c->cmd->proc != hpexpireatCommand) { + rewriteClientCommandArgument(c,0,shared.hpexpireat); + } + + /* rewrite expiration time to unix time in msec */ + if (basetime != 0 || unit == UNIT_SECONDS) { + robj *expireObj = createStringObjectFromLongLong(expire); + rewriteClientCommandArgument(c, 2, expireObj); + decrRefCount(expireObj); + } } /* HPEXPIRE key milliseconds [ NX | XX | GT | LT] numfields */ diff --git a/tests/integration/psync2-master-restart.tcl b/tests/integration/psync2-master-restart.tcl index 6f7a31d2187..d2908ede8fa 100644 --- a/tests/integration/psync2-master-restart.tcl +++ b/tests/integration/psync2-master-restart.tcl @@ -179,6 +179,17 @@ start_server {} { $master set $j somevalue px 10 } + ##### hash-field-expiration + # Hashes of type OBJ_ENCODING_LISTPACK_EX won't be discarded during + # RDB load, even if they are expired. + $master hset myhash1 f1 v1 f2 v2 f3 v3 + $master hpexpire myhash1 10 FIELDS 3 f1 f2 f3 + # Hashes of type RDB_TYPE_HASH_METADATA will be discarded during RDB load. + $master config set hash-max-listpack-entries 0 + $master hset myhash2 f1 v1 f2 v2 + $master hpexpire myhash2 10 FIELDS 2 f1 f2 + $master config set hash-max-listpack-entries 1 + after 20 wait_for_condition 500 100 { @@ -209,6 +220,8 @@ start_server {} { assert {[status $master sync_partial_ok] == 0} assert {[status $master sync_full] == 1} assert {[status $master rdb_last_load_keys_expired] == 2048} + + assert {[status $master rdb_last_load_hash_fields_expired] == 2} assert {[status $replica sync_full] == 1} set digest [$master debug digest] diff --git a/tests/support/util.tcl b/tests/support/util.tcl index a40c630df43..fcdac8c947c 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -293,6 +293,9 @@ proc findKeyWithType {r type} { proc createComplexDataset {r ops {opt {}}} { set useexpire [expr {[lsearch -exact $opt useexpire] != -1}] + # TODO: Remove usehexpire on next commit, when RDB will support replication + set usehexpire [expr {[lsearch -exact $opt usehexpire] != -1}] + if {[lsearch -exact $opt usetag] != -1} { set tag "{t}" } else { @@ -386,6 +389,10 @@ proc createComplexDataset {r ops {opt {}}} { {hash} { randpath {{*}$r hset $k $f $v} \ {{*}$r hdel $k $f} + + if { [{*}$r hexists $k $f] && $usehexpire && rand() < 0.5} { + {*}$r hexpire $k 1000 FIELDS 1 $f + } } } } diff --git a/tests/unit/other.tcl b/tests/unit/other.tcl index 1ba0e62faf0..52f1fa75cc7 100644 --- a/tests/unit/other.tcl +++ b/tests/unit/other.tcl @@ -124,7 +124,8 @@ start_server {tags {"other"}} { if {$::accurate} {set numops 10000} else {set numops 1000} test {Check consistency of different data types after a reload} { r flushdb - createComplexDataset r $numops usetag + # TODO: integrate usehexpire following next commit that will support replication + createComplexDataset r $numops {usetag usehexpire} if {$::ignoredigest} { set _ 1 } else { diff --git a/tests/unit/type/hash-field-expire.tcl b/tests/unit/type/hash-field-expire.tcl index 08d0203ea27..72b7daa5032 100644 --- a/tests/unit/type/hash-field-expire.tcl +++ b/tests/unit/type/hash-field-expire.tcl @@ -64,6 +64,19 @@ proc cmp_hrandfield_result {hash_name expected_result} { } } +proc dumpAllHashes {client} { + set keyAndFields(0,0) 0 + unset keyAndFields + # keep keys sorted for comparison + foreach key [lsort [$client keys *]] { + set fields [$client hgetall $key] + foreach f $fields { + set keyAndFields($key,$f) [$client hpexpiretime $key FIELDS 1 $f] + } + } + return [array get keyAndFields] +} + proc hrandfieldTest {activeExpireConfig} { r debug set-active-expire $activeExpireConfig r del myhash @@ -155,16 +168,16 @@ start_server {tags {"external:skip needs:debug"}} { test "HPEXPIRE(AT) - Test 'LT' flag ($type)" { r del myhash - r hset myhash field1 value1 field2 value2 + r hset myhash field1 value1 field2 value2 field3 value3 assert_equal [r hpexpire myhash 1000 NX FIELDS 1 field1] [list $E_OK] assert_equal [r hpexpire myhash 2000 NX FIELDS 1 field2] [list $E_OK] - assert_equal [r hpexpire myhash 1500 LT FIELDS 2 field1 field2] [list $E_FAIL $E_OK] + assert_equal [r hpexpire myhash 1500 LT FIELDS 3 field1 field2 field3] [list $E_FAIL $E_OK $E_OK] r del myhash - r hset myhash field1 value1 field2 value2 + r hset myhash field1 value1 field2 value2 field3 value3 assert_equal [r hpexpireat myhash [expr {([clock seconds]+1000)*1000}] NX FIELDS 1 field1] [list $E_OK] assert_equal [r hpexpireat myhash [expr {([clock seconds]+2000)*1000}] NX FIELDS 1 field2] [list $E_OK] - assert_equal [r hpexpireat myhash [expr {([clock seconds]+1500)*1000}] LT FIELDS 2 field1 field2] [list $E_FAIL $E_OK] + assert_equal [r hpexpireat myhash [expr {([clock seconds]+1500)*1000}] LT FIELDS 3 field1 field2 field3] [list $E_FAIL $E_OK $E_OK] } test "HPEXPIREAT - field not exists or TTL is in the past ($type)" { @@ -190,46 +203,41 @@ start_server {tags {"external:skip needs:debug"}} { assert_error {*invalid expire time*} {r hpexpire myhash [expr (1<<48) - [clock milliseconds] + 100 ] FIELDS 1 f1} } - test "Lazy - doesn't delete hash that all its fields got expired ($type)" { + test "Lazy Expire - fields are lazy deleted ($type)" { + + # TODO remove the SELECT once dbid will be embedded inside dict/listpack + r select 0 r debug set-active-expire 0 - r flushall + r del myhash - set hash_sizes {1 15 16 17 31 32 33 40} - foreach h $hash_sizes { - for {set i 1} {$i <= $h} {incr i} { - # random expiration time - r hset hrand$h f$i v$i - r hpexpire hrand$h [expr {50 + int(rand() * 50)}] FIELDS 1 f$i - assert_equal 1 [r HEXISTS hrand$h f$i] + r hset myhash f1 v1 f2 v2 f3 v3 + r hpexpire myhash 1 NX FIELDS 3 f1 f2 f3 + after 5 - # same expiration time - r hset same$h f$i v$i - r hpexpire same$h 100 FIELDS 1 f$i - assert_equal 1 [r HEXISTS same$h f$i] + # Verify that still exists even if all fields are expired + assert_equal 1 [r EXISTS myhash] - # same expiration time - r hset mix$h f$i v$i fieldWithoutExpire$i v$i - r hpexpire mix$h 100 FIELDS 1 f$i - assert_equal 1 [r HEXISTS mix$h f$i] - } - } + # Verify that len counts also expired fields + assert_equal 3 [r HLEN myhash] - after 150 + # Trying access to expired field should delete it. Len should be updated + assert_equal 0 [r hexists myhash f1] + assert_equal 2 [r HLEN myhash] + + # Trying access another expired field should delete it. Len should be updated + assert_equal "" [r hget myhash f2] + assert_equal 1 [r HLEN myhash] + + # Trying access last expired field should delete it. hash shouldn't exists afterward. + assert_equal 0 [r hstrlen myhash f3] + assert_equal 0 [r HLEN myhash] + assert_equal 0 [r EXISTS myhash] - # Verify that all fields got expired but keys wasn't lazy deleted - foreach h $hash_sizes { - for {set i 1} {$i <= $h} {incr i} { - assert_equal 0 [r HEXISTS mix$h f$i] - } - assert_equal 1 [r EXISTS hrand$h] - assert_equal 1 [r EXISTS same$h] - assert_equal [expr $h * 2] [r HLEN mix$h] - } # Restore default r debug set-active-expire 1 } - test "Active - deletes hash that all its fields got expired ($type)" { + test "Active Expire - deletes hash that all its fields got expired ($type)" { r flushall set hash_sizes {1 15 16 17 31 32 33 40} @@ -375,7 +383,7 @@ start_server {tags {"external:skip needs:debug"}} { assert_range [r httl myhash FIELDS 1 field1] 4 5 } - test "Lazy expire - delete hash with expired fields ($type)" { + test "Lazy Expire - delete hash with expired fields ($type)" { r del myhash r debug set-active-expire 0 r hset myhash k v @@ -403,7 +411,7 @@ start_server {tags {"external:skip needs:debug"}} { } - test "Lazy expire - HLEN does count expired fields ($type)" { + test "Lazy Expire - HLEN does count expired fields ($type)" { # Enforce only lazy expire r debug set-active-expire 0 @@ -432,7 +440,7 @@ start_server {tags {"external:skip needs:debug"}} { r debug set-active-expire 1 } - test "Lazy expire - HSCAN does not report expired fields ($type)" { + test "Lazy Expire - HSCAN does not report expired fields ($type)" { # Enforce only lazy expire r debug set-active-expire 0 @@ -493,7 +501,7 @@ start_server {tags {"external:skip needs:debug"}} { r debug set-active-expire 1 } - test "Lazy expire - verify various HASH commands handling expired fields ($type)" { + test "Lazy Expire - verify various HASH commands handling expired fields ($type)" { # Enforce only lazy expire r debug set-active-expire 0 r del h1 h2 h3 h4 h5 h18 @@ -676,7 +684,21 @@ start_server {tags {"external:skip needs:debug"}} { wait_for_condition 20 10 { [r exists myhash] == 0 } else { fail "'myhash' should be expired" } } {} {singledb:skip} - test "HPERSIST - Returns empty array if key does not exist" { + test "HMGET - returns empty entries if fields or hash expired ($type)" { + r debug set-active-expire 0 + r del h1 h2 + r hset h1 f1 v1 f2 v2 f3 v3 + r hset h2 f1 v1 f2 v2 f3 v3 + r hpexpire h1 10000000 NX FIELDS 1 f1 + r hpexpire h1 1 NX FIELDS 2 f2 f3 + r hpexpire h2 1 NX FIELDS 3 f1 f2 f3 + after 5 + assert_equal [r hmget h1 f1 f2 f3] {v1 {} {}} + assert_equal [r hmget h2 f1 f2 f3] {{} {} {}} + r debug set-active-expire 1 + } + + test "HPERSIST - Returns empty array if key does not exist ($type)" { r del myhash # Make sure we can distinguish between an empty array and a null response r readraw 1 @@ -743,6 +765,53 @@ start_server {tags {"external:skip needs:debug"}} { assert_equal [lsort [r hgetall myhash]] "1 2 3 a b c" assert_equal [r hexpiretime myhash FIELDS 3 a b c] {-1 -1 -1} } + + test {HINCRBY - discards pending expired field and reset its value} { + r debug set-active-expire 0 + r del h1 h2 + r hset h1 f1 10 f2 2 + r hset h2 f1 10 + assert_equal [r HINCRBY h1 f1 2] 12 + assert_equal [r HINCRBY h2 f1 2] 12 + r HPEXPIRE h1 10 FIELDS 1 f1 + r HPEXPIRE h2 10 FIELDS 1 f1 + after 15 + assert_equal [r HINCRBY h1 f1 1] 1 + assert_equal [r HINCRBY h2 f1 1] 1 + r debug set-active-expire 1 + } + + test {HINCRBY - preserve expiration time of the field} { + r del h1 + r hset h1 f1 10 + r hpexpire h1 20 FIELDS 1 f1 + assert_equal [r HINCRBY h1 f1 2] 12 + assert_range [r HPTTL h1 FIELDS 1 f1] 1 20 + } + + + test {HINCRBYFLOAT - discards pending expired field and reset its value} { + r debug set-active-expire 0 + r del h1 h2 + r hset h1 f1 10 f2 2 + r hset h2 f1 10 + assert_equal [r HINCRBYFLOAT h1 f1 2] 12 + assert_equal [r HINCRBYFLOAT h2 f1 2] 12 + r HPEXPIRE h1 10 FIELDS 1 f1 + r HPEXPIRE h2 10 FIELDS 1 f1 + after 15 + assert_equal [r HINCRBYFLOAT h1 f1 1] 1 + assert_equal [r HINCRBYFLOAT h2 f1 1] 1 + r debug set-active-expire 1 + } + + test {HINCRBYFLOAT - preserve expiration time of the field} { + r del h1 + r hset h1 f1 10 + r hpexpire h1 20 FIELDS 1 f1 + assert_equal [r HINCRBYFLOAT h1 f1 2.5] 12.5 + assert_range [r HPTTL h1 FIELDS 1 f1] 1 20 + } } r config set hash-max-listpack-entries 512 @@ -878,3 +947,211 @@ start_server {tags {"external:skip needs:debug"}} { } } } + +start_server {tags {"external:skip needs:debug"}} { + foreach type {listpack ht} { + if {$type eq "ht"} { + r config set hash-max-listpack-entries 0 + } else { + r config set hash-max-listpack-entries 512 + } + + test "Command rewrite and expired hash fields are propagated to replica ($type)" { + start_server {overrides {appendonly {yes} appendfsync always} tags {external:skip}} { + + set aof [get_last_incr_aof_path r] + r hset h1 f1 v1 f2 v2 + + assert_equal [r hpexpire h1 10 FIELDS 1 f1] [list $E_OK] + + r hpexpire h1 11 FIELDS 1 f2 + r hpexpire h1 12 FIELDS 1 non_exists_field + r hset h2 f1 v1 f2 v2 f3 v3 f4 v4 + r hpexpire h2 12 FIELDS 2 f1 non_exists_field + r hpexpire h2 13 FIELDS 1 f2 + r hpexpireat h2 [expr [clock seconds]*1000+100000] LT FIELDS 1 f3 + r hexpireat h2 [expr [clock seconds]+10] NX FIELDS 1 f4 + + wait_for_condition 50 100 { + [r hexists h2 f2] eq 0 + } else { + fail "Field wasn't deleted" + } + + # Assert that each TTL-related command are persisted with absolute timestamps in AOF + assert_aof_content $aof { + {select *} + {hset h1 f1 v1 f2 v2} + {hpexpireat h1 * FIELDS 1 f1} + {hpexpireat h1 * FIELDS 1 f2} + {hset h2 f1 v1 f2 v2 f3 v3 f4 v4} + {hpexpireat h2 * FIELDS 2 f1 non_exists_field} + {hpexpireat h2 * FIELDS 1 f2} + {hpexpireat h2 * FIELDS 1 f3} + {hpexpireat h2 * FIELDS 1 f4} + {hdel h1 f1} + {hdel h1 f2} + {hdel h2 f1} + {hdel h2 f2} + } + } + } + + test "Lazy Expire - fields are lazy deleted and propagated to replicas ($type)" { + start_server {overrides {appendonly {yes} appendfsync always} tags {external:skip}} { + # TODO remove the SELECT once dbid will be embedded inside dict/listpack + r select 0 + + r debug set-active-expire 0 + set aof [get_last_incr_aof_path r] + + r select 0 + r debug set-active-expire 0 + r del myhash + + r hset myhash f1 v1 f2 v2 f3 v3 + r hpexpire myhash 1 NX FIELDS 3 f1 f2 f3 + after 5 + + # Verify that still exists even if all fields are expired + assert_equal 1 [r EXISTS myhash] + + # Verify that len counts also expired fields + assert_equal 3 [r HLEN myhash] + + # Trying access to expired field should delete it. Len should be updated + assert_equal 0 [r hexists myhash f1] + assert_equal 2 [r HLEN myhash] + + # Trying access another expired field should delete it. Len should be updated + assert_equal "" [r hget myhash f2] + assert_equal 1 [r HLEN myhash] + + # Trying access last expired field should delete it. hash shouldn't exists afterward. + assert_equal 0 [r hstrlen myhash f3] + assert_equal 0 [r HLEN myhash] + assert_equal 0 [r EXISTS myhash] + + wait_for_condition 50 100 { [r exists h1] == 0 } else { fail "hash h1 wasn't deleted" } + + # HDEL are propagated as expected + assert_aof_content $aof { + {select *} + {hset myhash f1 v1 f2 v2 f3 v3} + {hpexpireat myhash * NX FIELDS 3 f1 f2 f3} + {hdel myhash f1} + {hdel myhash f2} + {hdel myhash f3} + } + r debug set-active-expire 1 + } + } + + # Start a new server with empty data and AOF file. + start_server {overrides {appendonly {yes} appendfsync always} tags {external:skip}} { + + # Based on test at expire.tcl: " All time-to-live(TTL) in commands are propagated as absolute ..." + test {All TTLs in commands are propagated as absolute timestamp in milliseconds in AOF} { + + set aof [get_last_incr_aof_path r] + + r hset h1 f1 v1 f2 v2 f3 v3 f4 v4 f5 v5 f6 v6 + r hexpireat h1 [expr [clock seconds]+100] NX FIELDS 1 f1 + r hpexpireat h1 [expr [clock seconds]*1000+100000] NX FIELDS 1 f2 + r hpexpire h1 100000 NX FIELDS 3 f3 f4 f5 + r hexpire h1 100000 FIELDS 1 f6 + r hset h5 f1 v1 + + assert_aof_content $aof { + {select *} + {hset h1 f1 v1 f2 v2 f3 v3 f4 v4 f5 v5 f6 v6} + {hpexpireat h1 * FIELDS 1 f1} + {hpexpireat h1 * FIELDS 1 f2} + {hpexpireat h1 * NX FIELDS 3 f3 f4 f5} + {hpexpireat h1 * FIELDS 1 f6} + {hset h5 f1 v1} + } + + array set keyAndFields1 [dumpAllHashes r] + # Let some time pass and reload data from AOF + after 2000 + r debug loadaof + array set keyAndFields2 [dumpAllHashes r] + + # Assert that absolute TTLs are the same + assert_equal [array get keyAndFields1] [array get keyAndFields2] + + } {} {needs:debug} + } + + # Based on test, with same name, at expire.tcl: + test {All TTL in commands are propagated as absolute timestamp in replication stream} { + # Make sure that both relative and absolute expire commands are propagated + # Consider also comment of the test, with same name, at expire.tcl + + r flushall ; # Clean up keyspace to avoid interference by keys from other tests + set repl [attach_to_replication_stream] + + r hset h1 f1 v1 + r hexpireat h1 [expr [clock seconds]+100] NX FIELDS 1 f1 + r hset h2 f2 v2 + r hpexpireat h2 [expr [clock seconds]*1000+100000] NX FIELDS 1 f2 + r hset h3 f3 v3 f4 v4 + r hexpire h3 100 FIELDS 3 f3 f4 non_exists_field + + assert_replication_stream $repl { + {select *} + {hset h1 f1 v1} + {hpexpireat h1 * NX FIELDS 1 f1} + {hset h2 f2 v2} + {hpexpireat h2 * NX FIELDS 1 f2} + {hset h3 f3 v3 f4 v4} + {hpexpireat h3 * FIELDS 3 f3 f4 non_exists_field} + } + close_replication_stream $repl + } {} {needs:repl} + + # Start another server to test replication of TTLs + start_server {tags {needs:repl external:skip}} { + # Set the outer layer server as primary + set primary [srv -1 client] + set primary_host [srv -1 host] + set primary_port [srv -1 port] + # Set this inner layer server as replica + set replica [srv 0 client] + + # Server should have role slave + $replica replicaof $primary_host $primary_port + wait_for_condition 50 100 { + [s 0 role] eq {slave} + } else { + fail "Replication not started." + } + + # Based on test, with same name, at expire.tcl + test {For all replicated TTL-related commands, absolute expire times are identical on primary and replica} { + # Apply each TTL-related command to a unique key on primary + $primary flushall + $primary hset h1 f v + $primary hexpireat h1 [expr [clock seconds]+10000] FIELDS 1 f + $primary hset h2 f v + $primary hpexpireat h2 [expr [clock milliseconds]+100000] FIELDS 1 f + $primary hset h3 f v + $primary hexpire h3 100 NX FIELDS 1 f + $primary hset h4 f v + $primary hpexpire h4 100000 NX FIELDS 1 f + $primary hset h5 f v + $primary hpexpireat h5 [expr [clock milliseconds]-100000] FIELDS 1 f + $primary hset h9 f v + + # Wait for replica to get the keys and TTLs + assert {[$primary wait 1 0] == 1} + + # Verify absolute TTLs are identical on primary and replica for all keys + # This is because TTLs are always replicated as absolute values + assert_equal [dumpAllHashes $primary] [dumpAllHashes $replica] + } + } + } +} +