From 26d050a74d7117a22cb4e8a42078a54a52720d7e Mon Sep 17 00:00:00 2001 From: Moti Cohen Date: Tue, 21 May 2024 15:06:48 +0300 Subject: [PATCH 1/7] HFE to support AOF and replicas --- src/aof.c | 67 ++++-- src/db.c | 2 + src/module.c | 2 +- src/networking.c | 19 +- src/object.c | 6 +- src/server.c | 4 + src/server.h | 7 +- src/t_hash.c | 311 +++++++++++++++++++------- tests/support/util.tcl | 6 + tests/unit/other.tcl | 2 +- tests/unit/type/hash-field-expire.tcl | 310 ++++++++++++++++++++++--- 11 files changed, 585 insertions(+), 151 deletions(-) diff --git a/src/aof.c b/src/aof.c index 9632d9c5b33..b0f82cb38d0 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1939,7 +1939,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) { * * The function returns 0 on error, non-zero on success. */ static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) { - if (hi->encoding == OBJ_ENCODING_LISTPACK) { + if ((hi->encoding == OBJ_ENCODING_LISTPACK) || (hi->encoding == OBJ_ENCODING_LISTPACK_EX)) { unsigned char *vstr = NULL; unsigned int vlen = UINT_MAX; long long vll = LLONG_MAX; @@ -1963,37 +1963,64 @@ static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) { /* Emit the commands needed to rebuild a hash object. * The function returns 0 on error, 1 on success. */ int rewriteHashObject(rio *r, robj *key, robj *o) { + int res = 0; /*fail*/ + hashTypeIterator *hi; long long count = 0, items = hashTypeLength(o, 0); + /* Is expected also hash-fields with expiration (HFE) ? */ + int isHFE = hashTypeGetMinExpire(o) != EB_EXPIRE_TIME_INVALID; hi = hashTypeInitIterator(o); - while (hashTypeNext(hi, 0) != C_ERR) { - if (count == 0) { - int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ? - AOF_REWRITE_ITEMS_PER_CMD : items; - if (!rioWriteBulkCount(r,'*',2+cmd_items*2) || - !rioWriteBulkString(r,"HMSET",5) || - !rioWriteBulkObject(r,key)) - { - hashTypeReleaseIterator(hi); - return 0; + if (!isHFE) { + while (hashTypeNext(hi, 0) != C_ERR) { + if (count == 0) { + int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ? + AOF_REWRITE_ITEMS_PER_CMD : items; + if (!rioWriteBulkCount(r, '*', 2 + cmd_items * 2) || + !rioWriteBulkString(r, "HMSET", 5) || + !rioWriteBulkObject(r, key)) + goto reHashEnd; } + + if (!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY) || + !rioWriteHashIteratorCursor(r, hi, OBJ_HASH_VALUE)) + goto reHashEnd; + + if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0; + items--; } + } else { + while (hashTypeNext(hi, 0) != C_ERR) { + if (hi->expire_time == EB_EXPIRE_TIME_INVALID) + { + if (!rioWriteBulkCount(r, '*', 4) || + !rioWriteBulkString(r, "HMSET", 5) || + !rioWriteBulkObject(r, key)) + goto reHashEnd; + } else { + /* HSETF key PXAT msec FVS 1 field value */ + char cmd[] = "*8\r\n$5\r\nHSETF\r\n"; + if ((!rioWrite(r, cmd, sizeof(cmd) - 1)) || + (!rioWriteBulkObject(r, key)) || + (!rioWriteBulkString(r, "PXAT", 4)) || + (!rioWriteBulkLongLong(r, hi->expire_time)) || + (!rioWriteBulkString(r, "FVS", 3)) || + (!rioWriteBulkString(r, "1", 1))) + goto reHashEnd; + } - if (!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY) || - !rioWriteHashIteratorCursor(r, hi, OBJ_HASH_VALUE)) - { - hashTypeReleaseIterator(hi); - return 0; + if (!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY) || + !rioWriteHashIteratorCursor(r, hi, OBJ_HASH_VALUE)) + goto reHashEnd; } - if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0; - items--; } - hashTypeReleaseIterator(hi); + res = 1; /* success */ - return 1; +reHashEnd: + hashTypeReleaseIterator(hi); + return res; } /* Helper for rewriteStreamObject() that generates a bulk string into the diff --git a/src/db.c b/src/db.c index 2b234e90a0f..0e680aea947 100644 --- a/src/db.c +++ b/src/db.c @@ -1764,11 +1764,13 @@ void swapMainDbWithTempDb(redisDb *tempDb) { * remain in the same DB they were. */ activedb->keys = newdb->keys; activedb->expires = newdb->expires; + activedb->hexpires = newdb->hexpires; activedb->avg_ttl = newdb->avg_ttl; activedb->expires_cursor = newdb->expires_cursor; newdb->keys = aux.keys; newdb->expires = aux.expires; + newdb->hexpires = aux.hexpires; newdb->avg_ttl = aux.avg_ttl; newdb->expires_cursor = aux.expires_cursor; diff --git a/src/module.c b/src/module.c index 2f3a81515db..853756789ad 100644 --- a/src/module.c +++ b/src/module.c @@ -5282,7 +5282,7 @@ int RM_HashSet(RedisModuleKey *key, int flags, ...) { /* Handle deletion if value is REDISMODULE_HASH_DELETE. */ if (value == REDISMODULE_HASH_DELETE) { - count += hashTypeDelete(key->value, field->ptr); + count += hashTypeDelete(key->value, field->ptr, 1); if (flags & REDISMODULE_HASH_CFIELDS) decrRefCount(field); continue; } diff --git a/src/networking.c b/src/networking.c index 245e2ebdd8c..885863a18e5 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3757,7 +3757,9 @@ void replaceClientCommandVector(client *c, int argc, robj **argv) { * 1. Make sure there are no "holes" and all the arguments are set. * 2. If the original argument vector was longer than the one we * want to end with, it's up to the caller to set c->argc and - * free the no longer used objects on c->argv. */ + * free the no longer used objects on c->argv. + * 3. To remove argument at i'th index, pass NULL as new value + */ void rewriteClientCommandArgument(client *c, int i, robj *newval) { robj *oldval; retainOriginalCommandVector(c); @@ -3775,9 +3777,18 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { } oldval = c->argv[i]; if (oldval) c->argv_len_sum -= getStringObjectLen(oldval); - if (newval) c->argv_len_sum += getStringObjectLen(newval); - c->argv[i] = newval; - incrRefCount(newval); + + if (newval) { + c->argv[i] = newval; + incrRefCount(newval); + c->argv_len_sum += getStringObjectLen(newval); + } else { + /* move the remaining arguments one step left */ + for (int j = i+1; j < c->argc; j++) { + c->argv[j-1] = c->argv[j]; + } + c->argv[--c->argc] = NULL; + } if (oldval) decrRefCount(oldval); /* If this is the command name make sure to fix c->cmd. */ diff --git a/src/object.c b/src/object.c index 296c8baaeac..a70705b4585 100644 --- a/src/object.c +++ b/src/object.c @@ -490,8 +490,10 @@ void dismissHashObject(robj *o, size_t size_hint) { /* Dismiss hash table memory. */ dismissMemory(d->ht_table[0], DICTHT_SIZE(d->ht_size_exp[0])*sizeof(dictEntry*)); dismissMemory(d->ht_table[1], DICTHT_SIZE(d->ht_size_exp[1])*sizeof(dictEntry*)); - } else if (o->encoding == OBJ_ENCODING_LISTPACK) { - dismissMemory(o->ptr, lpBytes((unsigned char*)o->ptr)); + } else if (o->encoding == OBJ_ENCODING_LISTPACK || + o->encoding == OBJ_ENCODING_LISTPACK_EX) { + unsigned char *lp = hashTypeListpackGetLp(o); + dismissMemory(lp, lpBytes(lp)); } else { serverPanic("Unknown hash encoding type"); } diff --git a/src/server.c b/src/server.c index 382beddedfc..9f8ed2fa614 100644 --- a/src/server.c +++ b/src/server.c @@ -1955,6 +1955,10 @@ void createSharedObjects(void) { shared.persist = createStringObject("PERSIST",7); shared.set = createStringObject("SET",3); shared.eval = createStringObject("EVAL",4); + shared.hpexpireat = createStringObject("HPEXPIREAT",10); + shared.hsetfpxat = createStringObject("PXAT",4); + shared.hgetfpxat = createStringObject("PXAT",4); + shared.hdel = createStringObject("HDEL",4); /* Shared command argument */ shared.left = createStringObject("left",4); diff --git a/src/server.h b/src/server.h index 70a778c2532..674a9bfda8b 100644 --- a/src/server.h +++ b/src/server.h @@ -1317,7 +1317,8 @@ struct sharedObjectsStruct { *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink, *rpop, *lpop, *lpush, *rpoplpush, *lmove, *blmove, *zpopmin, *zpopmax, *emptyscan, *multi, *exec, *left, *right, *hset, *srem, *xgroup, *xclaim, - *script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire, + *script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire, + *hdel, *hpexpireat, *hsetfpxat, *hgetfpxat, *time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread, *lastid, *ping, *setid, *keepttl, *load, *createconsumer, *getack, *special_asterick, *special_equals, *default_username, *redacted, @@ -3174,7 +3175,7 @@ typedef struct listpackEx { void hashTypeConvert(robj *o, int enc, ebuckets *hexpires); void hashTypeTryConversion(redisDb *db, robj *subject, robj **argv, int start, int end); int hashTypeExists(robj *o, sds key); -int hashTypeDelete(robj *o, sds key); +int hashTypeDelete(robj *o, void *key, int isSdsField /*0=hfield, 1=sds*/); unsigned long hashTypeLength(const robj *o, int subtractExpiredFields); hashTypeIterator *hashTypeInitIterator(robj *subject); void hashTypeReleaseIterator(hashTypeIterator *hi); @@ -3197,11 +3198,11 @@ uint64_t hashTypeRemoveFromExpires(ebuckets *hexpires, robj *o); void hashTypeAddToExpires(redisDb *db, sds key, robj *hashObj, uint64_t expireTime); void hashTypeFree(robj *o); int hashTypeIsExpired(const robj *o, uint64_t expireAt); +uint64_t hashTypeGetMinExpire(robj *o); unsigned char *hashTypeListpackGetLp(robj *o); uint64_t hashTypeGetMinExpire(robj *o); void hashTypeUpdateKeyRef(robj *o, sds newkey); ebuckets *hashTypeGetDictMetaHFE(dict *d); -void listpackExExpire(robj *o, ExpireInfo *info); int hashTypeSetExRdb(redisDb *db, robj *o, sds field, sds value, uint64_t expire_at); uint64_t hashTypeGetMinExpire(robj *keyObj); uint64_t hashTypeGetNextTimeToExpire(robj *o); diff --git a/src/t_hash.c b/src/t_hash.c index 1826c778792..04e23605391 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -14,6 +14,12 @@ * update the expiration time of the hash object in global HFE DS. */ #define HASH_NEW_EXPIRE_DIFF_THRESHOLD max(4000, 1<encoding == OBJ_ENCODING_LISTPACK_EX); uint64_t min = EB_EXPIRE_TIME_INVALID; - unsigned char *ptr, *field, *s; + unsigned char *ptr, *pTuple, *s; listpackEx *lpt = o->ptr; ptr = lpFirst(lpt->lp); while (ptr != NULL && (info->itemsExpired < info->maxToExpire)) { long long val; - field = ptr; + pTuple = ptr; + unsigned int flen; + long long fnum; + char *fref = (char *) lpGetValue(ptr, &flen, &fnum); + ptr = lpNext(lpt->lp, ptr); serverAssert(ptr); ptr = lpNext(lpt->lp, ptr); @@ -403,12 +420,19 @@ void listpackExExpire(robj *o, ExpireInfo *info) { serverAssert(!s); /* Fields are ordered by expiry time. If we reached to a non-expired - * field or a non-volatile field, we know rest is not yet expired. */ + * or a non-volatile field, we know rest is not yet expired. */ if (val == HASH_LP_NO_TTL || (uint64_t) val > info->now) break; - lpt->lp = lpDeleteRangeWithEntry(lpt->lp, &field, 3); - ptr = field; + if (!fref) { + flen = ll2string(buf, sizeof(buf), fnum); + fref = buf; + } + + propagateHashFieldDeletion(db, ((listpackEx *) o->ptr)->key, fref, flen); + + lpt->lp = lpDeleteRangeWithEntry(lpt->lp, &pTuple, 3); + ptr = pTuple; info->itemsExpired++; } @@ -604,7 +628,8 @@ SetExRes hashTypeSetExpiryListpack(HashTypeSetEx *ex, sds field, } if (prevExpire == EB_EXPIRE_TIME_INVALID) { - if (ex->expireSetCond & (HFE_XX | HFE_LT | HFE_GT)) + /* For fields without expiry, LT condition is considered valid */ + if (ex->expireSetCond & (HFE_XX | HFE_GT)) return HSETEX_NO_CONDITION_MET; } else { if (((ex->expireSetCond == HFE_GT) && (prevExpire >= expireAt)) || @@ -619,7 +644,7 @@ SetExRes hashTypeSetExpiryListpack(HashTypeSetEx *ex, sds field, /* if expiration time is in the past */ if (unlikely(checkAlreadyExpired(expireAt))) { - hashTypeDelete(ex->hashObj, field); + hashTypeDelete(ex->hashObj, field, 1); ex->fieldDeleted++; return HSETEX_DELETED; } @@ -697,7 +722,7 @@ void hashTypeTryConversion(redisDb *db, robj *o, robj **argv, int start, int end /* Get the value from a listpack encoded hash, identified by field. * Returns -1 when the field cannot be found. */ -int hashTypeGetFromListpack(robj *o, sds field, +GetFieldRes hashTypeGetFromListpack(robj *o, sds field, unsigned char **vstr, unsigned int *vlen, long long *vll) @@ -734,7 +759,7 @@ int hashTypeGetFromListpack(robj *o, sds field, serverAssert(h == NULL); if (hashTypeIsExpired(o, expire)) - return -1; + return GET_FIELD_EXPIRED; } } } else { @@ -743,28 +768,29 @@ int hashTypeGetFromListpack(robj *o, sds field, if (vptr != NULL) { *vstr = lpGetValue(vptr, vlen, vll); - return 0; + return GET_FIELD_OK; } - return -1; + return GET_FIELD_NOT_FOUND; } /* Get the value from a hash table encoded hash, identified by field. * Returns NULL when the field cannot be found, otherwise the SDS value * is returned. */ -sds hashTypeGetFromHashTable(robj *o, sds field) { +GetFieldRes hashTypeGetFromHashTable(robj *o, sds field, sds *value) { dictEntry *de; serverAssert(o->encoding == OBJ_ENCODING_HT); de = dictFind(o->ptr, field); - if (de == NULL) return NULL; + if (de == NULL) return GET_FIELD_NOT_FOUND; /* Check if the field is expired */ - if (hfieldIsExpired(dictGetKey(de))) return NULL; + if (hfieldIsExpired(dictGetKey(de))) return GET_FIELD_EXPIRED; - return dictGetVal(de); + *value = (sds) dictGetVal(de); + return GET_FIELD_OK; } /* Higher level function of hashTypeGet*() that returns the hash value @@ -777,22 +803,69 @@ sds hashTypeGetFromHashTable(robj *o, sds field) { * can always check the function return by checking the return value * for C_OK and checking if vll (or vstr) is NULL. */ int hashTypeGetValue(robj *o, sds field, unsigned char **vstr, unsigned int *vlen, long long *vll) { + sds key; + GetFieldRes res; if (o->encoding == OBJ_ENCODING_LISTPACK || - o->encoding == OBJ_ENCODING_LISTPACK_EX) - { + o->encoding == OBJ_ENCODING_LISTPACK_EX) { *vstr = NULL; - if (hashTypeGetFromListpack(o, field, vstr, vlen, vll) == 0) + res = hashTypeGetFromListpack(o, field, vstr, vlen, vll); + + if (res == GET_FIELD_OK) return C_OK; + + if (res == GET_FIELD_NOT_FOUND) + return C_ERR; + + /* If reached here then res == GET_FIELD_EXPIRED. Extract key. */ + serverAssert(o->encoding == OBJ_ENCODING_LISTPACK_EX); + listpackEx *lpt = o->ptr; + key = lpt->key; + } else if (o->encoding == OBJ_ENCODING_HT) { - sds value; - if ((value = hashTypeGetFromHashTable(o, field)) != NULL) { + sds value = NULL; + res = hashTypeGetFromHashTable(o, field, &value); + if (res == GET_FIELD_OK) { *vstr = (unsigned char*) value; *vlen = sdslen(value); return C_OK; } + + if (res == GET_FIELD_NOT_FOUND) + return C_ERR; + + /* If reached here then res == GET_FIELD_EXPIRED. Extract key. */ + serverAssert(isDictWithMetaHFE((dict*)o->ptr)); + key = ((dictExpireMetadata *) dictMetadata((dict*)o->ptr))->key; + } else { serverPanic("Unknown hash encoding"); } + + /* If in replica then return C_OK (discard hash-field expiry logic) */ + if ((server.masterhost) && (server.current_client && (server.current_client->flags & CLIENT_MASTER))) + return C_OK; + + /* TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO + * Once dbid will be embedded inside HASH, extract it from there and remove + * this hack that is aligned with testing. + * + * Going to be my next commit. + */ + redisDb *db = server.db+0; + + /* delete the field and propagate the deletion */ + serverAssert(hashTypeDelete(o, field, 1) == 1); + propagateHashFieldDeletion(db, key, field, sdslen(field)); + + if (hashTypeLength(o, 0) == 0) { + + /* create temporary robj with the key */ + robj *keyObj = createStringObject(key, sdslen(key)); + + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", keyObj, db->id); + dbDelete(db,keyObj); + decrRefCount(keyObj); + } return C_ERR; } @@ -900,7 +973,8 @@ SetExRes hashTypeSetExpiry(HashTypeSetEx *ex, sds field, uint64_t expireAt, dict /* If field doesn't have expiry metadata attached */ if (!hfieldIsExpireAttached(hfOld)) { - if (ex->expireSetCond & (HFE_XX | HFE_LT | HFE_GT)) { + /* For fields without expiry, LT condition is considered valid */ + if (ex->expireSetCond & (HFE_XX | HFE_GT)) { hfieldFree(hfNew); return HSETEX_NO_CONDITION_MET; } @@ -934,7 +1008,7 @@ SetExRes hashTypeSetExpiry(HashTypeSetEx *ex, sds field, uint64_t expireAt, dict /* field has invalid expiry. No need to ebRemove() */ /* Check XX|LT|GT */ - if (ex->expireSetCond & (HFE_XX | HFE_LT | HFE_GT)) + if (ex->expireSetCond & (HFE_XX | HFE_GT)) return HSETEX_NO_CONDITION_MET; } @@ -947,7 +1021,7 @@ SetExRes hashTypeSetExpiry(HashTypeSetEx *ex, sds field, uint64_t expireAt, dict /* if expiration time is in the past */ if (unlikely(checkAlreadyExpired(expireAt))) { - hashTypeDelete(ex->hashObj, field); + hashTypeDelete(ex->hashObj, field, 1); ex->fieldDeleted++; return HSETEX_DELETED; } @@ -1092,13 +1166,11 @@ void initDictExpireMetadata(sds key, robj *o) { * done by function hashTypeSetExDone(). */ int hashTypeSetExInit(robj *key, robj *o, client *c, redisDb *db, const char *cmd, FieldSetCond fieldSetCond, - FieldGet fieldGet, ExpireSetCond expireSetCond, - HashTypeSetEx *ex) + ExpireSetCond expireSetCond, HashTypeSetEx *ex) { dict *ht = o->ptr; ex->fieldSetCond = fieldSetCond; - ex->fieldGet = fieldGet; /* TODO */ ex->expireSetCond = expireSetCond; ex->minExpire = EB_EXPIRE_TIME_INVALID; ex->c = c; @@ -1110,6 +1182,7 @@ int hashTypeSetExInit(robj *key, robj *o, client *c, redisDb *db, const char *cm ex->fieldUpdated = 0; ex->minExpireFields = EB_EXPIRE_TIME_INVALID; + /* Take care that HASH support expiration */ if (ex->hashObj->encoding == OBJ_ENCODING_LISTPACK) { hashTypeConvert(ex->hashObj, OBJ_ENCODING_LISTPACK_EX, &c->db->hexpires); @@ -1254,8 +1327,9 @@ static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeS /* Delete an element from a hash. * Return 1 on deleted and 0 on not found. */ -int hashTypeDelete(robj *o, sds field) { +int hashTypeDelete(robj *o, void *field, int isSdsField /*0=hfield, 1=sds*/) { int deleted = 0; + int fieldLen = (isSdsField) ? sdslen((sds)field) : hfieldlen((hfield)field); if (o->encoding == OBJ_ENCODING_LISTPACK) { unsigned char *zl, *fptr; @@ -1263,7 +1337,7 @@ int hashTypeDelete(robj *o, sds field) { zl = o->ptr; fptr = lpFirst(zl); if (fptr != NULL) { - fptr = lpFind(zl, fptr, (unsigned char*)field, sdslen(field), 1); + fptr = lpFind(zl, fptr, (unsigned char*)field, fieldLen, 1); if (fptr != NULL) { /* Delete both of the key and the value. */ zl = lpDeleteRangeWithEntry(zl,&fptr,2); @@ -1277,7 +1351,7 @@ int hashTypeDelete(robj *o, sds field) { fptr = lpFirst(lpt->lp); if (fptr != NULL) { - fptr = lpFind(lpt->lp, fptr, (unsigned char*)field, sdslen(field), 2); + fptr = lpFind(lpt->lp, fptr, (unsigned char*)field, fieldLen, 2); if (fptr != NULL) { /* Delete field, value and ttl */ lpt->lp = lpDeleteRangeWithEntry(lpt->lp, &fptr, 3); @@ -1286,9 +1360,11 @@ int hashTypeDelete(robj *o, sds field) { } } else if (o->encoding == OBJ_ENCODING_HT) { /* dictDelete() will call dictHfieldDestructor() */ + dictUseStoredKeyApi((dict*)o->ptr, isSdsField ? 0 : 1); if (dictDelete((dict*)o->ptr, field) == C_OK) { deleted = 1; } + dictUseStoredKeyApi((dict*)o->ptr, 0); } else { serverPanic("Unknown hash encoding"); @@ -1358,6 +1434,7 @@ void hashTypeReleaseIterator(hashTypeIterator *hi) { /* Move to the next entry in the hash. Return C_OK when the next entry * could be found and C_ERR when the iterator reaches the end. */ int hashTypeNext(hashTypeIterator *hi, int skipExpiredFields) { + hi->expire_time = EB_EXPIRE_TIME_INVALID; if (hi->encoding == OBJ_ENCODING_LISTPACK) { unsigned char *zl; unsigned char *fptr, *vptr; @@ -1427,8 +1504,11 @@ int hashTypeNext(hashTypeIterator *hi, int skipExpiredFields) { hi->tptr = tptr; hi->expire_time = (expire_time != HASH_LP_NO_TTL) ? (uint64_t) expire_time : EB_EXPIRE_TIME_INVALID; } else if (hi->encoding == OBJ_ENCODING_HT) { + while ((hi->de = dictNext(hi->di)) != NULL) { - if (skipExpiredFields && hfieldIsExpired(dictGetKey(hi->de))) + hi->expire_time = hfieldGetExpireTime(dictGetKey(hi->de)); + /* this condition still valid if expire_time equals EB_EXPIRE_TIME_INVALID */ + if (skipExpiredFields && ((mstime_t)hi->expire_time < commandTimeSnapshot())) continue; return C_OK; } @@ -1481,11 +1561,8 @@ void hashTypeCurrentFromHashTable(hashTypeIterator *hi, int what, char **str, si *len = sdslen(val); } - if (expireTime) { - if (!key) key = dictGetKey(hi->de); - *expireTime = hfieldGetExpireTime( key ); - } - + if (expireTime) + *expireTime = hi->expire_time; } /* Higher level function of hashTypeCurrent*() that returns the hash value @@ -1510,7 +1587,6 @@ void hashTypeCurrentObject(hashTypeIterator *hi, { *vstr = NULL; hashTypeCurrentFromListpack(hi, what, vstr, vlen, vll, expireTime); - /* TODO-HFE: Handle expireTime */ } else if (hi->encoding == OBJ_ENCODING_HT) { char *ele; size_t eleLen; @@ -1598,7 +1674,6 @@ void hashTypeConvertListpack(robj *o, int enc) { dict = dictCreate(&mstrHashDictType); /* Presize the dict to avoid rehashing */ - /* TODO: activeExpire list pack. Should be small */ dictExpand(dict,hashTypeLength(o, 0)); while (hashTypeNext(hi, 0) != C_ERR) { @@ -1849,11 +1924,10 @@ static ExpireAction hashTypeActiveExpire(eItem _hashObj, void *ctx) { if (hashObj->encoding == OBJ_ENCODING_LISTPACK_EX) { info = (ExpireInfo){ .maxToExpire = activeExpireCtx->fieldsToExpireQuota, - .ctx = hashObj, .now = commandTimeSnapshot(), .itemsExpired = 0}; - listpackExExpire(hashObj, &info); + listpackExExpire(activeExpireCtx->db, hashObj, &info); server.stat_expired_hash_fields += info.itemsExpired; keystr = ((listpackEx*)hashObj->ptr)->key; } else { @@ -1862,10 +1936,15 @@ static ExpireAction hashTypeActiveExpire(eItem _hashObj, void *ctx) { dict *d = hashObj->ptr; dictExpireMetadata *dictExpireMeta = (dictExpireMetadata *) dictMetadata(d); + OnFieldExpireCtx onFieldExpireCtx = { + .hashObj = hashObj, + .db = activeExpireCtx->db + }; + info = (ExpireInfo){ .maxToExpire = activeExpireCtx->fieldsToExpireQuota, .onExpireItem = onFieldExpire, - .ctx = hashObj, + .ctx = &onFieldExpireCtx, .now = commandTimeSnapshot() }; @@ -1881,7 +1960,6 @@ static ExpireAction hashTypeActiveExpire(eItem _hashObj, void *ctx) { if (hashTypeLength(hashObj, 0) == 0) { robj *key = createStringObject(keystr, sdslen(keystr)); dbDelete(activeExpireCtx->db, key); - //notifyKeyspaceEvent(NOTIFY_HASH,"xxxxxxxxx",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key, activeExpireCtx->db->id); server.dirty++; signalModifiedKey(NULL, &server.db[0], key); @@ -2242,7 +2320,7 @@ void hdelCommand(client *c) { checkType(c,o,OBJ_HASH)) return; for (j = 2; j < c->argc; j++) { - if (hashTypeDelete(o,c->argv[j]->ptr)) { + if (hashTypeDelete(o,c->argv[j]->ptr, 1)) { deleted++; if (hashTypeLength(o, 0) == 0) { dbDelete(c->db,c->argv[1]); @@ -2733,14 +2811,36 @@ int hfieldIsExpired(hfield field) { /*----------------------------------------------------------------------------- * Hash Field Expiration (HFE) *----------------------------------------------------------------------------*/ -/* Called during active expiration of hash-fields */ +static void propagateHashFieldDeletion(redisDb *db, sds key, char *field, size_t fieldLen) { + robj *argv[] = { + shared.hdel, + createStringObject((char*) key, sdslen(key)), + createStringObject(field, fieldLen) + }; + + enterExecutionUnit(1, 0); + int prev_replication_allowed = server.replication_allowed; + server.replication_allowed = 1; + alsoPropagate(db->id,argv, 3, PROPAGATE_AOF|PROPAGATE_REPL); + server.replication_allowed = prev_replication_allowed; + exitExecutionUnit(); + + /* Propagate the HDEL command */ + postExecutionUnitOperations(); + + decrRefCount(argv[1]); + decrRefCount(argv[2]); +} + +/* Called during active expiration of hash-fields. Propagate to replica & Delete. */ static ExpireAction onFieldExpire(eItem item, void *ctx) { + OnFieldExpireCtx *expCtx = ctx; hfield hf = item; - robj *hashobj = (robj *) ctx; - dictUseStoredKeyApi((dict *)hashobj->ptr, 1); - hashTypeDelete(hashobj, hf); + dict *d = expCtx->hashObj->ptr; + dictExpireMetadata *dictExpireMeta = (dictExpireMetadata *) dictMetadata(d); + propagateHashFieldDeletion(expCtx->db, dictExpireMeta->key, hf, hfieldlen(hf)); + serverAssert(hashTypeDelete(expCtx->hashObj, hf, 0) == 1); server.stat_expired_hash_fields++; - dictUseStoredKeyApi((dict *)hashobj->ptr, 0); return ACT_REMOVE_EXP_ITEM; } @@ -2954,17 +3054,31 @@ static void hexpireGenericCommand(client *c, const char *cmd, long long basetime HashTypeSetEx exCtx; hashTypeSetExInit(keyArg, hashObj, c, c->db, cmd, FIELD_DONT_CREATE2, - FIELD_GET_NONE, expireSetCond, &exCtx); addReplyArrayLen(c, numFields); + for (int i = 0 ; i < numFields ; i++) { sds field = c->argv[numFieldsAt+i+1]->ptr; SetExRes res = hashTypeSetEx(c->db, hashObj, field, NULL, expire, &exCtx); addReplyLongLong(c,res); } hashTypeSetExDone(&exCtx); + + /**** do some more rewrite for the replica sake ****/ + + /* Propagate as HPEXPIREAT millisecond-timestamp. Rewrite only if not already */ + if (c->cmd->proc != hpexpireatCommand) { + rewriteClientCommandArgument(c,0,shared.hpexpireat); + } + + /* rewrite expiration time to unix time in msec */ + if (basetime != 0 || unit == UNIT_SECONDS) { + robj *expireObj = createStringObjectFromLongLong(expire); + rewriteClientCommandArgument(c, 2, expireObj); + decrRefCount(expireObj); + } } /* HPEXPIRE key milliseconds [ NX | XX | GT | LT] numfields */ @@ -3263,14 +3377,15 @@ static void updateGlobalHfeDs(redisDb *db, robj *o,uint64_t minExpire, uint64_t } /* Parse hgetf command arguments. */ -static int hgetfParseArgs(client *c, int *flags, uint64_t *expireAt, - int *firstFieldPos, int *fieldCount) +static int hgetfParseAndRewriteArgs(client *c, int *flags, uint64_t *expireAt, + int *firstFieldPos, int *fieldCount) { *flags = 0; *firstFieldPos = -1; *fieldCount = -1; for (int i = 2; i < c->argc; i++) { + int flag = 0; if (!strcasecmp(c->argv[i]->ptr, "fields")) { long val; @@ -3299,19 +3414,19 @@ static int hgetfParseArgs(client *c, int *flags, uint64_t *expireAt, } else if (!strcasecmp(c->argv[i]->ptr, "NX")) { if (*flags & (HFE_CMD_XX | HFE_CMD_GT | HFE_CMD_LT)) goto err_condition; - *flags |= HFE_CMD_NX; + flag = HFE_CMD_NX; } else if (!strcasecmp(c->argv[i]->ptr, "XX")) { if (*flags & (HFE_CMD_NX | HFE_CMD_GT | HFE_CMD_LT)) goto err_condition; - *flags |= HFE_CMD_XX; + flag = HFE_CMD_XX; } else if (!strcasecmp(c->argv[i]->ptr, "GT")) { if (*flags & (HFE_CMD_NX | HFE_CMD_XX | HFE_CMD_LT)) goto err_condition; - *flags |= HFE_CMD_GT; + flag = HFE_CMD_GT; } else if (!strcasecmp(c->argv[i]->ptr, "LT")) { if (*flags & (HFE_CMD_NX | HFE_CMD_XX | HFE_CMD_GT)) goto err_condition; - *flags |= HFE_CMD_LT; + flag = HFE_CMD_LT; } else if (!strcasecmp(c->argv[i]->ptr, "EX")) { if (*flags & (HFE_CMD_EXAT | HFE_CMD_PX | HFE_CMD_PXAT | HFE_CMD_PERSIST)) goto err_expiration; @@ -3319,7 +3434,7 @@ static int hgetfParseArgs(client *c, int *flags, uint64_t *expireAt, if (i >= c->argc - 1) goto err_missing_expire; - *flags |= HFE_CMD_EX; + flag = HFE_CMD_EX; i++; if (validateExpire(c, UNIT_SECONDS, c->argv[i], commandTimeSnapshot(), expireAt) != C_OK) @@ -3332,7 +3447,7 @@ static int hgetfParseArgs(client *c, int *flags, uint64_t *expireAt, if (i >= c->argc - 1) goto err_missing_expire; - *flags |= HFE_CMD_PX; + flag = HFE_CMD_PX; i++; if (validateExpire(c, UNIT_MILLISECONDS, c->argv[i], commandTimeSnapshot(), expireAt) != C_OK) @@ -3344,7 +3459,7 @@ static int hgetfParseArgs(client *c, int *flags, uint64_t *expireAt, if (i >= c->argc - 1) goto err_missing_expire; - *flags |= HFE_CMD_EXAT; + flag = HFE_CMD_EXAT; i++; if (validateExpire(c, UNIT_SECONDS, c->argv[i], 0, expireAt) != C_OK) return C_ERR; @@ -3355,18 +3470,28 @@ static int hgetfParseArgs(client *c, int *flags, uint64_t *expireAt, if (i >= c->argc - 1) goto err_missing_expire; - *flags |= HFE_CMD_PXAT; + flag = HFE_CMD_PXAT; i++; if (validateExpire(c, UNIT_MILLISECONDS, c->argv[i], 0, expireAt) != C_OK) return C_ERR; } else if (!strcasecmp(c->argv[i]->ptr, "PERSIST")) { if (*flags & (HFE_CMD_EX | HFE_CMD_EXAT | HFE_CMD_PX | HFE_CMD_PXAT)) goto err_expiration; - *flags |= HFE_CMD_PERSIST; + flag = HFE_CMD_PERSIST; } else { addReplyErrorFormat(c, "unknown argument: %s", (char*) c->argv[i]->ptr); return C_ERR; } + + *flags |= flag; + + /* Rewrite the command for replica’s sake to unix time in msec */ + if (flag & (HFE_CMD_PX|HFE_CMD_EX|HFE_CMD_EXAT)) { + rewriteClientCommandArgument(c, i - 1, shared.hgetfpxat); /* modify to PXAT */ + robj *expireObj = createStringObjectFromLongLong(*expireAt); + rewriteClientCommandArgument(c, i, expireObj); + decrRefCount(expireObj); + } } /* FIELDS argument is mandatory. */ @@ -3480,7 +3605,7 @@ static int hgetfReplyValueAndSetExpiry(client *c, robj *o, sds field, int flag, /* if expiration time is in the past */ if (checkAlreadyExpired(expireAt)) { - hashTypeDelete(o, field); + hashTypeDelete(o, field, 1); return 1; } @@ -3530,7 +3655,8 @@ void hgetfCommand(client *c) { int numFields = 0; uint64_t expireAt = EB_EXPIRE_TIME_INVALID; - if (hgetfParseArgs(c, &flags, &expireAt, &firstFieldPos, &numFields) != C_OK) + if (hgetfParseAndRewriteArgs(c, &flags, &expireAt, &firstFieldPos, + &numFields) != C_OK) return; /* Read the hash object */ @@ -3704,7 +3830,7 @@ static int hsetfSetFieldAndReply(client *c, robj *o, sds field, sds value, * discarded. */ listpackExPersist(o, field, fptr, vptr); } else if (checkAlreadyExpired(expireAt)) { - hashTypeDelete(o, field); + hashTypeDelete(o, field, 1); } else { if (*minPrevExp > expireAt) *minPrevExp = expireAt; @@ -3772,7 +3898,7 @@ static int hsetfSetFieldAndReply(client *c, robj *o, sds field, sds value, hfieldPersist(o, hf); } else if (checkAlreadyExpired(expireAt)) { /* if expiration time is in the past */ - hashTypeDelete(o, field); + hashTypeDelete(o, field, 1); } else { if (*minPrevExp > expireAt) *minPrevExp = expireAt; @@ -3790,9 +3916,9 @@ static int hsetfSetFieldAndReply(client *c, robj *o, sds field, sds value, } } -/* Parse hsetf command arguments. */ -static int hsetfParseArgs(client *c, int *flags, uint64_t *expireAt, - int *firstFieldPos, int *fieldCount) +/* Parse hsetf command arguments. Also rewrite few of the arguments for replica. */ +static int hsetfParseAndRewriteArgs(client *c, int *flags, uint64_t *expireAt, + int *firstFieldPos, int *fieldCount) { long val; @@ -3800,7 +3926,9 @@ static int hsetfParseArgs(client *c, int *flags, uint64_t *expireAt, *firstFieldPos = -1; *fieldCount = -1; - for (int i = 2; i < c->argc; i++) { + for (int i = 2 ; i < c->argc ; ++i) { + int flag = 0; + if (!strcasecmp(c->argv[i]->ptr, "fvs")) { if (*firstFieldPos != -1) { addReplyErrorFormat(c, "multiple FVS argument"); @@ -3827,19 +3955,19 @@ static int hsetfParseArgs(client *c, int *flags, uint64_t *expireAt, } else if (!strcasecmp(c->argv[i]->ptr, "NX")) { if (*flags & (HFE_CMD_XX | HFE_CMD_GT | HFE_CMD_LT)) goto err_condition; - *flags |= HFE_CMD_NX; + flag = HFE_CMD_NX; } else if (!strcasecmp(c->argv[i]->ptr, "XX")) { if (*flags & (HFE_CMD_NX | HFE_CMD_GT | HFE_CMD_LT)) goto err_condition; - *flags |= HFE_CMD_XX; + flag = HFE_CMD_XX; } else if (!strcasecmp(c->argv[i]->ptr, "GT")) { if (*flags & (HFE_CMD_NX | HFE_CMD_XX | HFE_CMD_LT)) goto err_condition; - *flags |= HFE_CMD_GT; + flag = HFE_CMD_GT; } else if (!strcasecmp(c->argv[i]->ptr, "LT")) { if (*flags & (HFE_CMD_NX | HFE_CMD_XX | HFE_CMD_GT)) goto err_condition; - *flags |= HFE_CMD_LT; + flag = HFE_CMD_LT; } else if (!strcasecmp(c->argv[i]->ptr, "EX")) { if (*flags & (HFE_CMD_EXAT | HFE_CMD_PX | HFE_CMD_PXAT | HFE_CMD_KEEPTTL)) goto err_expiration; @@ -3847,7 +3975,7 @@ static int hsetfParseArgs(client *c, int *flags, uint64_t *expireAt, if (i >= c->argc - 1) goto err_missing_expire; - *flags |= HFE_CMD_EX; + flag = HFE_CMD_EX; i++; if (validateExpire(c, UNIT_SECONDS, c->argv[i], commandTimeSnapshot(), expireAt) != C_OK) @@ -3860,7 +3988,7 @@ static int hsetfParseArgs(client *c, int *flags, uint64_t *expireAt, if (i >= c->argc - 1) goto err_missing_expire; - *flags |= HFE_CMD_PX; + flag = HFE_CMD_PX; i++; if (validateExpire(c, UNIT_MILLISECONDS, c->argv[i], commandTimeSnapshot(), expireAt) != C_OK) @@ -3872,7 +4000,7 @@ static int hsetfParseArgs(client *c, int *flags, uint64_t *expireAt, if (i >= c->argc - 1) goto err_missing_expire; - *flags |= HFE_CMD_EXAT; + flag = HFE_CMD_EXAT; i++; if (validateExpire(c, UNIT_SECONDS, c->argv[i], 0, expireAt) != C_OK) return C_ERR; @@ -3883,36 +4011,46 @@ static int hsetfParseArgs(client *c, int *flags, uint64_t *expireAt, if (i >= c->argc - 1) goto err_missing_expire; - *flags |= HFE_CMD_PXAT; + flag = HFE_CMD_PXAT; i++; if (validateExpire(c, UNIT_MILLISECONDS, c->argv[i], 0, expireAt) != C_OK) return C_ERR; } else if (!strcasecmp(c->argv[i]->ptr, "KEEPTTL")) { if (*flags & (HFE_CMD_EX | HFE_CMD_EXAT | HFE_CMD_PX | HFE_CMD_PXAT)) goto err_expiration; - *flags |= HFE_CMD_KEEPTTL; + flag = HFE_CMD_KEEPTTL; } else if (!strcasecmp(c->argv[i]->ptr, "DC")) { - *flags |= HFE_CMD_DC; + flag = HFE_CMD_DC; } else if (!strcasecmp(c->argv[i]->ptr, "DCF")) { if (*flags & HFE_CMD_DOF) goto err_field_condition; - *flags |= HFE_CMD_DCF; + flag = HFE_CMD_DCF; } else if (!strcasecmp(c->argv[i]->ptr, "DOF")) { if (*flags & HFE_CMD_DCF) goto err_field_condition; - *flags |= HFE_CMD_DOF; + flag = HFE_CMD_DOF; } else if (!strcasecmp(c->argv[i]->ptr, "GETNEW")) { if (*flags & HFE_CMD_GETOLD) goto err_return_condition; - *flags |= HFE_CMD_GETNEW; + flag = HFE_CMD_GETNEW; } else if (!strcasecmp(c->argv[i]->ptr, "GETOLD")) { if (*flags & HFE_CMD_GETNEW) goto err_return_condition; - *flags |= HFE_CMD_GETOLD; + flag = HFE_CMD_GETOLD; } else { addReplyErrorFormat(c, "unknown argument: %s", (char*) c->argv[i]->ptr); return C_ERR; } + + *flags |= flag; + + /* Rewrite the command for replica’s sake to unix time in msec */ + if (flag & (HFE_CMD_PX|HFE_CMD_EX|HFE_CMD_EXAT)) { + rewriteClientCommandArgument(c, i - 1, shared.hsetfpxat); /* modify to PXAT */ + robj *expireObj = createStringObjectFromLongLong(*expireAt); + rewriteClientCommandArgument(c, i, expireObj); + decrRefCount(expireObj); + } } /* FVS argument is mandatory. */ @@ -3966,7 +4104,8 @@ void hsetfCommand(client *c) { int numFields = 0; uint64_t expireAt = EB_EXPIRE_TIME_INVALID; - if (hsetfParseArgs(c, &flags, &expireAt, &firstFieldPos, &numFields) != C_OK) + if (hsetfParseAndRewriteArgs(c, &flags, &expireAt, &firstFieldPos, + &numFields) != C_OK) return; hashObj = lookupKeyWrite(c->db, c->argv[1]); diff --git a/tests/support/util.tcl b/tests/support/util.tcl index a40c630df43..b0690b7e552 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -293,6 +293,8 @@ proc findKeyWithType {r type} { proc createComplexDataset {r ops {opt {}}} { set useexpire [expr {[lsearch -exact $opt useexpire] != -1}] + set usehexpire [expr {[lsearch -exact $opt usehexpire] != -1}] + if {[lsearch -exact $opt usetag] != -1} { set tag "{t}" } else { @@ -386,6 +388,10 @@ proc createComplexDataset {r ops {opt {}}} { {hash} { randpath {{*}$r hset $k $f $v} \ {{*}$r hdel $k $f} + + if { [{*}$r hexists $k $f] && $usehexpire && rand() < 0.5} { + {*}$r hexpire $k 1000 FIELDS 1 $f + } } } } diff --git a/tests/unit/other.tcl b/tests/unit/other.tcl index 1ba0e62faf0..f5f1058342d 100644 --- a/tests/unit/other.tcl +++ b/tests/unit/other.tcl @@ -124,7 +124,7 @@ start_server {tags {"other"}} { if {$::accurate} {set numops 10000} else {set numops 1000} test {Check consistency of different data types after a reload} { r flushdb - createComplexDataset r $numops usetag + createComplexDataset r $numops {usetag usehexpire} if {$::ignoredigest} { set _ 1 } else { diff --git a/tests/unit/type/hash-field-expire.tcl b/tests/unit/type/hash-field-expire.tcl index 949f729fa2c..dcee66103cc 100644 --- a/tests/unit/type/hash-field-expire.tcl +++ b/tests/unit/type/hash-field-expire.tcl @@ -56,6 +56,19 @@ proc cmp_hrandfield_result {hash_name expected_result} { } } +proc getKeyFieldsExpiry {redis} { + set keyAndFields1(0,0) 0 + unset keyAndFields1 + # keep keys sorted for comparison + foreach key [lsort [$redis keys *]] { + set fields [$redis hgetall $key] + foreach f $fields { + set keyAndFields1($key,$f) [$redis hpexpiretime $key FIELDS 1 $f] + } + } + return [array get keyAndFields1] +} + proc hrandfieldTest {activeExpireConfig} { r debug set-active-expire $activeExpireConfig r del myhash @@ -136,16 +149,16 @@ start_server {tags {"external:skip needs:debug"}} { test "HPEXPIRE(AT) - Test 'LT' flag ($type)" { r del myhash - r hset myhash field1 value1 field2 value2 + r hset myhash field1 value1 field2 value2 field3 value3 assert_equal [r hpexpire myhash 1000 NX FIELDS 1 field1] [list $E_OK] assert_equal [r hpexpire myhash 2000 NX FIELDS 1 field2] [list $E_OK] - assert_equal [r hpexpire myhash 1500 LT FIELDS 2 field1 field2] [list $E_FAIL $E_OK] + assert_equal [r hpexpire myhash 1500 LT FIELDS 3 field1 field2 field3] [list $E_FAIL $E_OK $E_OK] r del myhash - r hset myhash field1 value1 field2 value2 + r hset myhash field1 value1 field2 value2 field3 value3 assert_equal [r hpexpireat myhash [expr {([clock seconds]+1000)*1000}] NX FIELDS 1 field1] [list $E_OK] assert_equal [r hpexpireat myhash [expr {([clock seconds]+2000)*1000}] NX FIELDS 1 field2] [list $E_OK] - assert_equal [r hpexpireat myhash [expr {([clock seconds]+1500)*1000}] LT FIELDS 2 field1 field2] [list $E_FAIL $E_OK] + assert_equal [r hpexpireat myhash [expr {([clock seconds]+1500)*1000}] LT FIELDS 3 field1 field2 field3] [list $E_FAIL $E_OK $E_OK] } test "HPEXPIREAT - field not exists or TTL is in the past ($type)" { @@ -171,46 +184,41 @@ start_server {tags {"external:skip needs:debug"}} { assert_error {*invalid expire time*} {r hpexpire myhash [expr (1<<48) - [clock milliseconds] + 100 ] FIELDS 1 f1} } - test "Lazy - doesn't delete hash that all its fields got expired ($type)" { + test "Lazy Expire - fields are lazy deleted ($type)" { + + # TODO remove the SELECT once dbid will be embedded inside dict/listpack + r select 0 r debug set-active-expire 0 - r flushall + r del myhash - set hash_sizes {1 15 16 17 31 32 33 40} - foreach h $hash_sizes { - for {set i 1} {$i <= $h} {incr i} { - # random expiration time - r hset hrand$h f$i v$i - r hpexpire hrand$h [expr {50 + int(rand() * 50)}] FIELDS 1 f$i - assert_equal 1 [r HEXISTS hrand$h f$i] + r hset myhash f1 v1 f2 v2 f3 v3 + r hpexpire myhash 1 NX FIELDS 3 f1 f2 f3 + after 5 - # same expiration time - r hset same$h f$i v$i - r hpexpire same$h 100 FIELDS 1 f$i - assert_equal 1 [r HEXISTS same$h f$i] + # Verify that still exists even if all fields are expired + assert_equal 1 [r EXISTS myhash] - # same expiration time - r hset mix$h f$i v$i fieldWithoutExpire$i v$i - r hpexpire mix$h 100 FIELDS 1 f$i - assert_equal 1 [r HEXISTS mix$h f$i] - } - } + # Verify that len counts also expired fields + assert_equal 3 [r HLEN myhash] - after 150 + # Trying access to expired field should delete it. Len should be updated + assert_equal 0 [r hexists myhash f1] + assert_equal 2 [r HLEN myhash] + + # Trying access another expired field should delete it. Len should be updated + assert_equal "" [r hget myhash f2] + assert_equal 1 [r HLEN myhash] + + # Trying access last expired field should delete it. hash shouldn't exists afterward. + assert_equal 0 [r hstrlen myhash f3] + assert_equal 0 [r HLEN myhash] + assert_equal 0 [r EXISTS myhash] - # Verify that all fields got expired but keys wasn't lazy deleted - foreach h $hash_sizes { - for {set i 1} {$i <= $h} {incr i} { - assert_equal 0 [r HEXISTS mix$h f$i] - } - assert_equal 1 [r EXISTS hrand$h] - assert_equal 1 [r EXISTS same$h] - assert_equal [expr $h * 2] [r HLEN mix$h] - } # Restore default r debug set-active-expire 1 } - test "Active - deletes hash that all its fields got expired ($type)" { + test "Active Expire - deletes hash that all its fields got expired ($type)" { r flushall set hash_sizes {1 15 16 17 31 32 33 40} @@ -1236,3 +1244,237 @@ start_server {tags {"external:skip needs:debug"}} { r config set hash-max-listpack-value 64 } } + +start_server {tags {"external:skip needs:debug"}} { + foreach type {listpack ht} { + if {$type eq "ht"} { + r config set hash-max-listpack-entries 0 + } else { + r config set hash-max-listpack-entries 512 + } + + test "Command rewrite and expired hash fields are propagated to replica ($type)" { + start_server {overrides {appendonly {yes} appendfsync always} tags {external:skip}} { + + set aof [get_last_incr_aof_path r] + r hset h1 f1 v1 f2 v2 + + assert_equal [r hpexpire h1 10 FIELDS 1 f1] [list $E_OK] + + r hpexpire h1 11 FIELDS 1 f2 + r hpexpire h1 12 FIELDS 1 non_exists_field + r hset h2 f1 v1 f2 v2 f3 v3 f4 v4 + r hpexpire h2 12 FIELDS 2 f1 non_exists_field + r hpexpire h2 13 FIELDS 1 f2 + r hpexpireat h2 [expr [clock seconds]*1000+100000] LT FIELDS 1 f3 + r hexpireat h2 [expr [clock seconds]+10] NX FIELDS 1 f4 + r hsetf h3 PX 14 NX FVS 1 f1 v1 + r hsetf h4 PX 10000 FVS 1 f1 v1 + r hgetf h4 EX 10000 FIELDS 1 f1 + r hgetf h4 PX 15 FIELDS 1 f1 + + wait_for_condition 50 100 { + [r exists h1] eq 0 + } else { + fail "hash h1 wasn't deleted" + } + + # Assert that each TTL-related command are persisted with absolute timestamps in AOF + assert_aof_content $aof { + {select *} + {hset h1 f1 v1 f2 v2} + {hpexpireat h1 * FIELDS 1 f1} + {hpexpireat h1 * FIELDS 1 f2} + {hset h2 f1 v1 f2 v2 f3 v3 f4 v4} + {hpexpireat h2 * FIELDS 2 f1 non_exists_field} + {hpexpireat h2 * FIELDS 1 f2} + {hpexpireat h2 * FIELDS 1 f3} + {hpexpireat h2 * FIELDS 1 f4} + {hsetf h3 PXAT * FVS 1 f1 v1} + {hsetf h4 PXAT * FVS 1 f1 v1} + {hgetf h4 PXAT * FIELDS 1 f1} + {hgetf h4 PXAT * FIELDS 1 f1} + {hdel h1 f1} + {hdel h1 f2} + {hdel h2 f1} + {hdel h2 f2} + {hdel h3 f1} + {hdel h4 f1} + } + } + } + + test "Lazy Expire - fields are lazy deleted and propagated to replicas ($type)" { + start_server {overrides {appendonly {yes} appendfsync always} tags {external:skip}} { + # TODO remove the SELECT once dbid will be embedded inside dict/listpack + r select 0 + + r debug set-active-expire 0 + set aof [get_last_incr_aof_path r] + + r select 0 + r debug set-active-expire 0 + r del myhash + + r hset myhash f1 v1 f2 v2 f3 v3 + r hpexpire myhash 1 NX FIELDS 3 f1 f2 f3 + after 5 + + # Verify that still exists even if all fields are expired + assert_equal 1 [r EXISTS myhash] + + # Verify that len counts also expired fields + assert_equal 3 [r HLEN myhash] + + # Trying access to expired field should delete it. Len should be updated + assert_equal 0 [r hexists myhash f1] + assert_equal 2 [r HLEN myhash] + + # Trying access another expired field should delete it. Len should be updated + assert_equal "" [r hget myhash f2] + assert_equal 1 [r HLEN myhash] + + # Trying access last expired field should delete it. hash shouldn't exists afterward. + assert_equal 0 [r hstrlen myhash f3] + assert_equal 0 [r HLEN myhash] + assert_equal 0 [r EXISTS myhash] + + wait_for_condition 50 100 { [r exists h1] == 0 } else { fail "hash h1 wasn't deleted" } + + # HDEL are propagated as expected + assert_aof_content $aof { + {select *} + {hset myhash f1 v1 f2 v2 f3 v3} + {hpexpireat myhash * NX FIELDS 3 f1 f2 f3} + {hdel myhash f1} + {hdel myhash f2} + {hdel myhash f3} + } + r debug set-active-expire 1 + } + } + + # Start a new server with empty data and AOF file. + start_server {overrides {appendonly {yes} appendfsync always} tags {external:skip}} { + + # Based on test at expire.tcl: " All time-to-live(TTL) in commands are propagated as absolute ..." + test {All TTLs in commands are propagated as absolute timestamp in milliseconds in AOF} { + + set aof [get_last_incr_aof_path r] + + r hset h1 f1 v1 f2 v2 f3 v3 f4 v4 f5 v5 f6 v6 + r hexpireat h1 [expr [clock seconds]+100] NX FIELDS 1 f1 + r hpexpireat h1 [expr [clock seconds]*1000+100000] NX FIELDS 1 f2 + r hpexpire h1 100000 NX FIELDS 3 f3 f4 f5 + r hexpire h1 100000 FIELDS 1 f6 + r hsetf h2 GETNEW EX 100 LT FVS 1 f1 v1 + r hsetf h3 GETOLD DOF PX 100000 NX FVS 2 f1 v1 f2 v2 + r hsetf h3 KEEPTTL FVS 2 f1 v3 f2 v4 + r hsetf h4 EX 10000 FVS 1 f1 v1 + r hsetf h4 EX 100 LT FVS 1 f1 v1 + r hset h5 f1 v1 + r hgetf h5 EX 100 LT FIELDS 1 f1 + + assert_aof_content $aof { + {select *} + {hset h1 f1 v1 f2 v2 f3 v3 f4 v4 f5 v5 f6 v6} + {hpexpireat h1 * FIELDS 1 f1} + {hpexpireat h1 * FIELDS 1 f2} + {hpexpireat h1 * NX FIELDS 3 f3 f4 f5} + {hpexpireat h1 * FIELDS 1 f6} + {hsetf h2 GETNEW PXAT * LT FVS 1 f1 v1} + {hsetf h3 GETOLD DOF PXAT * NX FVS 2 f1 v1 f2 v2} + {hsetf h3 KEEPTTL FVS 2 f1 v3 f2 v4} + {hsetf h4 PXAT * FVS 1 f1 v1} + {hsetf h4 PXAT * LT FVS 1 f1 v1} + {hset h5 f1 v1} + {hgetf h5 PXAT * LT FIELDS 1 f1} + } + + array set keyAndFields1 [getKeyFieldsExpiry r] + # Let some time pass and reload data from AOF + after 2000 + r debug loadaof + array set keyAndFields2 [getKeyFieldsExpiry r] + + # Assert that absolute TTLs are the same + assert_equal [array get keyAndFields1] [array get keyAndFields2] + + } {} {needs:debug} + } + + # Based on test, with same name, at expire.tcl: + test {All TTL in commands are propagated as absolute timestamp in replication stream} { + # Make sure that both relative and absolute expire commands are propagated + # Consider also comment of the test, with same name, at expire.tcl + + r flushall ; # Clean up keyspace to avoid interference by keys from other tests + set repl [attach_to_replication_stream] + + r hset h1 f1 v1 + r hexpireat h1 [expr [clock seconds]+100] NX FIELDS 1 f1 + r hset h2 f2 v2 + r hpexpireat h2 [expr [clock seconds]*1000+100000] NX FIELDS 1 f2 + r hset h3 f3 v3 f4 v4 + r hexpire h3 100 FIELDS 3 f3 f4 non_exists_field + + assert_replication_stream $repl { + {select *} + {hset h1 f1 v1} + {hpexpireat h1 * NX FIELDS 1 f1} + {hset h2 f2 v2} + {hpexpireat h2 * NX FIELDS 1 f2} + {hset h3 f3 v3 f4 v4} + {hpexpireat h3 * FIELDS 3 f3 f4 non_exists_field} + } + close_replication_stream $repl + } {} {needs:repl} + + # Start another server to test replication of TTLs + start_server {tags {needs:repl external:skip}} { + # Set the outer layer server as primary + set primary [srv -1 client] + set primary_host [srv -1 host] + set primary_port [srv -1 port] + # Set this inner layer server as replica + set replica [srv 0 client] + + # Server should have role slave + $replica replicaof $primary_host $primary_port + wait_for_condition 50 100 { + [s 0 role] eq {slave} + } else { + fail "Replication not started." + } + + # Based on test, with same name, at expire.tcl + test {For all replicated TTL-related commands, absolute expire times are identical on primary and replica} { + # Apply each TTL-related command to a unique key on primary + $primary flushall + $primary hset h1 f v + $primary hexpireat h1 [expr [clock seconds]+10000] FIELDS 1 f + $primary hset h2 f v + $primary hpexpireat h2 [expr [clock milliseconds]+100000] FIELDS 1 f + $primary hset h3 f v + $primary hexpire h3 100 NX FIELDS 1 f + $primary hset h4 f v + $primary hpexpire h4 100000 NX FIELDS 1 f + $primary hset h5 f v + $primary hpexpireat h5 [expr [clock milliseconds]-100000] FIELDS 1 f + $primary hsetf h6 EX 100 LT FVS 1 f1 v1 + $primary hsetf h7 PXAT [expr [clock seconds]*1000+100000] LT FVS 1 f1 v1 + $primary hsetf h8 EXAT [expr [clock seconds]+100] FVS 1 f1 v1 + $primary hset h9 f v + $primary hgetf h9 EX 100 LT FIELDS 1 f + + # Wait for replica to get the keys and TTLs + assert {[$primary wait 1 0] == 1} + + # Verify absolute TTLs are identical on primary and replica for all keys + # This is because TTLs are always replicated as absolute values + assert_equal [getKeyFieldsExpiry $primary] [getKeyFieldsExpiry $replica] + } + } + } +} + From 9b5bd8f2eb54391b196f4c88928cad278580d896 Mon Sep 17 00:00:00 2001 From: Moti Cohen Date: Tue, 21 May 2024 16:25:22 +0300 Subject: [PATCH 2/7] fix flaky test --- tests/unit/type/hash-field-expire.tcl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/type/hash-field-expire.tcl b/tests/unit/type/hash-field-expire.tcl index dcee66103cc..0fd1b2d945e 100644 --- a/tests/unit/type/hash-field-expire.tcl +++ b/tests/unit/type/hash-field-expire.tcl @@ -1274,7 +1274,7 @@ start_server {tags {"external:skip needs:debug"}} { r hgetf h4 PX 15 FIELDS 1 f1 wait_for_condition 50 100 { - [r exists h1] eq 0 + [r exists h4] eq 0 } else { fail "hash h1 wasn't deleted" } From 156e21464239b2f2540150f796440d6f006ab59e Mon Sep 17 00:00:00 2001 From: Moti Cohen Date: Thu, 23 May 2024 14:52:59 +0300 Subject: [PATCH 3/7] Place del hooks for hashObj (instead of keep dbid in it) --- src/db.c | 5 ++ src/module.c | 6 +- src/server.h | 6 +- src/sort.c | 2 +- src/t_hash.c | 95 ++++++++++++++------------- tests/support/util.tcl | 1 + tests/unit/other.tcl | 1 + tests/unit/type/hash-field-expire.tcl | 20 +++--- 8 files changed, 73 insertions(+), 63 deletions(-) diff --git a/src/db.c b/src/db.c index 79aec556200..328e37a3158 100644 --- a/src/db.c +++ b/src/db.c @@ -276,6 +276,11 @@ static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite, dictEnt old = dictGetVal(de); } kvstoreDictSetVal(db->keys, slot, de, val); + + /* if hash with HFEs, take care to remove from global HFE DS */ + if (old->type == OBJ_HASH) + hashTypeRemoveFromExpires(&db->hexpires, old); + if (server.lazyfree_lazy_server_del) { freeObjAsync(key,old,db->id); } else { diff --git a/src/module.c b/src/module.c index 853756789ad..c35d67eb05d 100644 --- a/src/module.c +++ b/src/module.c @@ -5271,7 +5271,7 @@ int RM_HashSet(RedisModuleKey *key, int flags, ...) { /* Handle XX and NX */ if (flags & (REDISMODULE_HASH_XX|REDISMODULE_HASH_NX)) { - int exists = hashTypeExists(key->value, field->ptr); + int exists = hashTypeExists(key->db, key->value, field->ptr); if (((flags & REDISMODULE_HASH_XX) && !exists) || ((flags & REDISMODULE_HASH_NX) && exists)) { @@ -5375,13 +5375,13 @@ int RM_HashGet(RedisModuleKey *key, int flags, ...) { if (flags & REDISMODULE_HASH_EXISTS) { existsptr = va_arg(ap,int*); if (key->value) - *existsptr = hashTypeExists(key->value,field->ptr); + *existsptr = hashTypeExists(key->db, key->value,field->ptr); else *existsptr = 0; } else { valueptr = va_arg(ap,RedisModuleString**); if (key->value) { - *valueptr = hashTypeGetValueObject(key->value,field->ptr); + *valueptr = hashTypeGetValueObject(key->db, key->value,field->ptr); if (*valueptr) { robj *decoded = getDecodedObject(*valueptr); decrRefCount(*valueptr); diff --git a/src/server.h b/src/server.h index 674a9bfda8b..c63814be14d 100644 --- a/src/server.h +++ b/src/server.h @@ -3174,8 +3174,8 @@ typedef struct listpackEx { void hashTypeConvert(robj *o, int enc, ebuckets *hexpires); void hashTypeTryConversion(redisDb *db, robj *subject, robj **argv, int start, int end); -int hashTypeExists(robj *o, sds key); -int hashTypeDelete(robj *o, void *key, int isSdsField /*0=hfield, 1=sds*/); +int hashTypeExists(redisDb *db, robj *o, sds key); +int hashTypeDelete(robj *o, void *key, int isSdsField); unsigned long hashTypeLength(const robj *o, int subtractExpiredFields); hashTypeIterator *hashTypeInitIterator(robj *subject); void hashTypeReleaseIterator(hashTypeIterator *hi); @@ -3191,7 +3191,7 @@ void hashTypeCurrentObject(hashTypeIterator *hi, int what, unsigned char **vstr, unsigned int *vlen, long long *vll, uint64_t *expireTime); sds hashTypeCurrentObjectNewSds(hashTypeIterator *hi, int what); hfield hashTypeCurrentObjectNewHfield(hashTypeIterator *hi); -robj *hashTypeGetValueObject(robj *o, sds field); +robj *hashTypeGetValueObject(redisDb *db, robj *o, sds field); int hashTypeSet(redisDb *db, robj *o, sds field, sds value, int flags); robj *hashTypeDup(robj *o, sds newkey, uint64_t *minHashExpire); uint64_t hashTypeRemoveFromExpires(ebuckets *hexpires, robj *o); diff --git a/src/sort.c b/src/sort.c index 426ff0c1d05..4b226ab5bc1 100644 --- a/src/sort.c +++ b/src/sort.c @@ -94,7 +94,7 @@ robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { /* Retrieve value from hash by the field name. The returned object * is a new object with refcount already incremented. */ - o = hashTypeGetValueObject(o, fieldobj->ptr); + o = hashTypeGetValueObject(db, o, fieldobj->ptr); } else { if (o->type != OBJ_STRING) goto noobj; diff --git a/src/t_hash.c b/src/t_hash.c index b3ce0e24683..6d1294a884b 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -392,20 +392,21 @@ static uint64_t listpackExGetMinExpire(robj *o) { /* Walk over fields and delete the expired ones. */ void listpackExExpire(redisDb *db, robj *o, ExpireInfo *info) { - char buf[LONG_STR_SIZE]; serverAssert(o->encoding == OBJ_ENCODING_LISTPACK_EX); uint64_t min = EB_EXPIRE_TIME_INVALID; unsigned char *ptr, *pTuple; listpackEx *lpt = o->ptr; ptr = lpFirst(lpt->lp); + while (ptr != NULL && (info->itemsExpired < info->maxToExpire)) { long long val; + int64_t flen; + unsigned char intbuf[LP_INTBUF_SIZE], *fref; - pTuple = ptr; - unsigned int flen; - long long fnum; - char *fref = (char *) lpGetValue(ptr, &flen, &fnum); + pTuple = ptr; /* keep aside ref to begining of the tuple*/ + + fref = lpGet(ptr, &flen, intbuf); ptr = lpNext(lpt->lp, ptr); serverAssert(ptr); @@ -417,12 +418,7 @@ void listpackExExpire(redisDb *db, robj *o, ExpireInfo *info) { if (val == HASH_LP_NO_TTL || (uint64_t) val > info->now) break; - if (!fref) { - flen = ll2string(buf, sizeof(buf), fnum); - fref = buf; - } - - propagateHashFieldDeletion(db, ((listpackEx *) o->ptr)->key, fref, flen); + propagateHashFieldDeletion(db, ((listpackEx *) o->ptr)->key, (char *)((fref) ? fref : intbuf), flen); lpt->lp = lpDeleteRangeWithEntry(lpt->lp, &pTuple, 3); ptr = pTuple; @@ -783,8 +779,11 @@ GetFieldRes hashTypeGetFromHashTable(robj *o, sds field, sds *value) { * * If *vll is populated *vstr is set to NULL, so the caller * can always check the function return by checking the return value - * for C_OK and checking if vll (or vstr) is NULL. */ -int hashTypeGetValue(robj *o, sds field, unsigned char **vstr, unsigned int *vlen, long long *vll) { + * for C_OK and checking if vll (or vstr) is NULL. + * + * If field is expired (GET_FIELD_EXPIRED), then it will be lazy deleted. + */ +int hashTypeGetValue(redisDb *db, robj *o, sds field, unsigned char **vstr, unsigned int *vlen, long long *vll) { sds key; GetFieldRes res; if (o->encoding == OBJ_ENCODING_LISTPACK || @@ -827,14 +826,6 @@ int hashTypeGetValue(robj *o, sds field, unsigned char **vstr, unsigned int *vle if ((server.masterhost) && (server.current_client && (server.current_client->flags & CLIENT_MASTER))) return C_OK; - /* TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO - * Once dbid will be embedded inside HASH, extract it from there and remove - * this hack that is aligned with testing. - * - * Going to be my next commit. - */ - redisDb *db = server.db+0; - /* delete the field and propagate the deletion */ serverAssert(hashTypeDelete(o, field, 1) == 1); propagateHashFieldDeletion(db, key, field, sdslen(field)); @@ -855,12 +846,12 @@ int hashTypeGetValue(robj *o, sds field, unsigned char **vstr, unsigned int *vle * interaction with the hash type outside t_hash.c. * The function returns NULL if the field is not found in the hash. Otherwise * a newly allocated string object with the value is returned. */ -robj *hashTypeGetValueObject(robj *o, sds field) { +robj *hashTypeGetValueObject(redisDb *db, robj *o, sds field) { unsigned char *vstr; unsigned int vlen; long long vll; - if (hashTypeGetValue(o,field,&vstr,&vlen,&vll) == C_ERR) return NULL; + if (hashTypeGetValue(db, o,field,&vstr,&vlen,&vll) == C_ERR) return NULL; if (vstr) return createStringObject((char*)vstr,vlen); else return createStringObjectFromLongLong(vll); } @@ -868,26 +859,29 @@ robj *hashTypeGetValueObject(robj *o, sds field) { /* Higher level function using hashTypeGet*() to return the length of the * object associated with the requested field, or 0 if the field does not * exist. */ -size_t hashTypeGetValueLength(robj *o, sds field) { +size_t hashTypeGetValueLength(redisDb *db, robj *o, sds field) { size_t len = 0; unsigned char *vstr = NULL; unsigned int vlen = UINT_MAX; long long vll = LLONG_MAX; - if (hashTypeGetValue(o, field, &vstr, &vlen, &vll) == C_OK) + if (hashTypeGetValue(db, o, field, &vstr, &vlen, &vll) == C_OK) len = vstr ? vlen : sdigits10(vll); return len; } -/* Test if the specified field exists in the given hash. Returns 1 if the field - * exists, and 0 when it doesn't. */ -int hashTypeExists(robj *o, sds field) { +/* Test if the specified field exists in the given hash. If the field is + * expired (HFE), then it will be lazy deleted + * + * Returns 1 if the field exists, and 0 when it doesn't. + */ +int hashTypeExists(redisDb *db, robj *o, sds field) { unsigned char *vstr = NULL; unsigned int vlen = UINT_MAX; long long vll = LLONG_MAX; - return hashTypeGetValue(o, field, &vstr, &vlen, &vll) == C_OK; + return hashTypeGetValue(db, o, field, &vstr, &vlen, &vll) == C_OK; } /* Add a new field, overwrite the old with the new value if it already exists. @@ -1305,8 +1299,10 @@ static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeS } /* Delete an element from a hash. - * Return 1 on deleted and 0 on not found. */ -int hashTypeDelete(robj *o, void *field, int isSdsField /*0=hfield, 1=sds*/) { + * + * Return 1 on deleted and 0 on not found. + * isSdsField - 1 if the field is sds, 0 if it is hfield */ +int hashTypeDelete(robj *o, void *field, int isSdsField) { int deleted = 0; int fieldLen = (isSdsField) ? sdslen((sds)field) : hfieldlen((hfield)field); @@ -1914,15 +1910,15 @@ static ExpireAction hashTypeActiveExpire(eItem _hashObj, void *ctx) { dictExpireMetadata *dictExpireMeta = (dictExpireMetadata *) dictMetadata(d); OnFieldExpireCtx onFieldExpireCtx = { - .hashObj = hashObj, - .db = activeExpireCtx->db + .hashObj = hashObj, + .db = activeExpireCtx->db }; info = (ExpireInfo){ - .maxToExpire = activeExpireCtx->fieldsToExpireQuota, - .onExpireItem = onFieldExpire, - .ctx = &onFieldExpireCtx, - .now = commandTimeSnapshot() + .maxToExpire = activeExpireCtx->fieldsToExpireQuota, + .onExpireItem = onFieldExpire, + .ctx = &onFieldExpireCtx, + .now = commandTimeSnapshot() }; ebExpire(&dictExpireMeta->hfe, &hashFieldExpireBucketsType, &info); @@ -2076,12 +2072,19 @@ uint64_t hashTypeDbActiveExpire(redisDb *db, uint32_t maxFieldsToExpire) { void hashTypeFree(robj *o) { switch (o->encoding) { case OBJ_ENCODING_HT: + /* Verify hash is not registered in global HFE ds */ + if (isDictWithMetaHFE((dict*)o->ptr)) { + dictExpireMetadata *m = (dictExpireMetadata *)dictMetadata((dict*)o->ptr); + serverAssert(m->expireMeta.trash == 1); + } dictRelease((dict*) o->ptr); break; case OBJ_ENCODING_LISTPACK: lpFree(o->ptr); break; case OBJ_ENCODING_LISTPACK_EX: + /* Verify hash is not registered in global HFE ds */ + serverAssert(((listpackEx *) o->ptr)->meta.trash == 1); listpackExFree(o->ptr); break; default: @@ -2116,7 +2119,7 @@ void hsetnxCommand(client *c) { robj *o; if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; - if (hashTypeExists(o, c->argv[2]->ptr)) { + if (hashTypeExists(c->db, o, c->argv[2]->ptr)) { addReply(c, shared.czero); } else { hashTypeTryConversion(c->db, o,c->argv,2,3); @@ -2166,7 +2169,7 @@ void hincrbyCommand(client *c) { if (getLongLongFromObjectOrReply(c,c->argv[3],&incr,NULL) != C_OK) return; if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; - if (hashTypeGetValue(o,c->argv[2]->ptr,&vstr,&vlen,&value) == C_OK) { + if (hashTypeGetValue(c->db, o,c->argv[2]->ptr,&vstr,&vlen,&value) == C_OK) { if (vstr) { if (string2ll((char*)vstr,vlen,&value) == 0) { addReplyError(c,"hash value is not an integer"); @@ -2206,7 +2209,7 @@ void hincrbyfloatCommand(client *c) { return; } if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; - if (hashTypeGetValue(o,c->argv[2]->ptr,&vstr,&vlen,&ll) == C_OK) { + if (hashTypeGetValue(c->db, o,c->argv[2]->ptr,&vstr,&vlen,&ll) == C_OK) { if (vstr) { if (string2ld((char*)vstr,vlen,&value) == 0) { addReplyError(c,"hash value is not a float"); @@ -2254,7 +2257,7 @@ static void addHashFieldToReply(client *c, robj *o, sds field) { unsigned int vlen = UINT_MAX; long long vll = LLONG_MAX; - if (hashTypeGetValue(o, field, &vstr, &vlen, &vll) == C_OK) { + if (hashTypeGetValue(c->db, o, field, &vstr, &vlen, &vll) == C_OK) { if (vstr) { addReplyBulkCBuffer(c, vstr, vlen); } else { @@ -2331,7 +2334,7 @@ void hstrlenCommand(client *c) { if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,OBJ_HASH)) return; - addReplyLongLong(c,hashTypeGetValueLength(o,c->argv[2]->ptr)); + addReplyLongLong(c,hashTypeGetValueLength(c->db, o,c->argv[2]->ptr)); } static void addHashIteratorCursorToReply(client *c, hashTypeIterator *hi, int what) { @@ -2417,7 +2420,7 @@ void hexistsCommand(client *c) { if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,OBJ_HASH)) return; - addReply(c, hashTypeExists(o,c->argv[2]->ptr) ? shared.cone : shared.czero); + addReply(c, hashTypeExists(c->db, o,c->argv[2]->ptr) ? shared.cone : shared.czero); } void hscanCommand(client *c) { @@ -2790,9 +2793,9 @@ int hfieldIsExpired(hfield field) { *----------------------------------------------------------------------------*/ static void propagateHashFieldDeletion(redisDb *db, sds key, char *field, size_t fieldLen) { robj *argv[] = { - shared.hdel, - createStringObject((char*) key, sdslen(key)), - createStringObject(field, fieldLen) + shared.hdel, + createStringObject((char*) key, sdslen(key)), + createStringObject(field, fieldLen) }; enterExecutionUnit(1, 0); diff --git a/tests/support/util.tcl b/tests/support/util.tcl index b0690b7e552..fcdac8c947c 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -293,6 +293,7 @@ proc findKeyWithType {r type} { proc createComplexDataset {r ops {opt {}}} { set useexpire [expr {[lsearch -exact $opt useexpire] != -1}] + # TODO: Remove usehexpire on next commit, when RDB will support replication set usehexpire [expr {[lsearch -exact $opt usehexpire] != -1}] if {[lsearch -exact $opt usetag] != -1} { diff --git a/tests/unit/other.tcl b/tests/unit/other.tcl index f5f1058342d..52f1fa75cc7 100644 --- a/tests/unit/other.tcl +++ b/tests/unit/other.tcl @@ -124,6 +124,7 @@ start_server {tags {"other"}} { if {$::accurate} {set numops 10000} else {set numops 1000} test {Check consistency of different data types after a reload} { r flushdb + # TODO: integrate usehexpire following next commit that will support replication createComplexDataset r $numops {usetag usehexpire} if {$::ignoredigest} { set _ 1 diff --git a/tests/unit/type/hash-field-expire.tcl b/tests/unit/type/hash-field-expire.tcl index 7c47b734201..59627254efd 100644 --- a/tests/unit/type/hash-field-expire.tcl +++ b/tests/unit/type/hash-field-expire.tcl @@ -56,17 +56,17 @@ proc cmp_hrandfield_result {hash_name expected_result} { } } -proc getKeyFieldsExpiry {redis} { - set keyAndFields1(0,0) 0 - unset keyAndFields1 +proc dumpAllHashes {client} { + set keyAndFields(0,0) 0 + unset keyAndFields # keep keys sorted for comparison - foreach key [lsort [$redis keys *]] { - set fields [$redis hgetall $key] + foreach key [lsort [$client keys *]] { + set fields [$client hgetall $key] foreach f $fields { - set keyAndFields1($key,$f) [$redis hpexpiretime $key FIELDS 1 $f] + set keyAndFields($key,$f) [$client hpexpiretime $key FIELDS 1 $f] } } - return [array get keyAndFields1] + return [array get keyAndFields] } proc hrandfieldTest {activeExpireConfig} { @@ -1416,11 +1416,11 @@ start_server {tags {"external:skip needs:debug"}} { {hgetf h5 PXAT * LT FIELDS 1 f1} } - array set keyAndFields1 [getKeyFieldsExpiry r] + array set keyAndFields1 [dumpAllHashes r] # Let some time pass and reload data from AOF after 2000 r debug loadaof - array set keyAndFields2 [getKeyFieldsExpiry r] + array set keyAndFields2 [dumpAllHashes r] # Assert that absolute TTLs are the same assert_equal [array get keyAndFields1] [array get keyAndFields2] @@ -1497,7 +1497,7 @@ start_server {tags {"external:skip needs:debug"}} { # Verify absolute TTLs are identical on primary and replica for all keys # This is because TTLs are always replicated as absolute values - assert_equal [getKeyFieldsExpiry $primary] [getKeyFieldsExpiry $replica] + assert_equal [dumpAllHashes $primary] [dumpAllHashes $replica] } } } From 3e3aed56bba19c24c4d33f4c2dd78ca6d5650c34 Mon Sep 17 00:00:00 2001 From: Moti Cohen Date: Thu, 23 May 2024 20:38:13 +0300 Subject: [PATCH 4/7] add expiry to digest --- src/debug.c | 5 +++++ src/t_hash.c | 1 - 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/debug.c b/src/debug.c index 84e96aa1420..1ae88717da5 100644 --- a/src/debug.c +++ b/src/debug.c @@ -204,13 +204,18 @@ void xorObjectDigest(redisDb *db, robj *keyobj, unsigned char *digest, robj *o) unsigned char eledigest[20]; sds sdsele; + /* field */ memset(eledigest,0,20); sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_KEY); mixDigest(eledigest,sdsele,sdslen(sdsele)); sdsfree(sdsele); + /* val */ sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_VALUE); mixDigest(eledigest,sdsele,sdslen(sdsele)); sdsfree(sdsele); + /* field expire */ + if (hi->expire_time != EB_EXPIRE_TIME_INVALID) + xorDigest(eledigest,"!!hexpire!!",11); xorDigest(digest,eledigest,20); } hashTypeReleaseIterator(hi); diff --git a/src/t_hash.c b/src/t_hash.c index 6d1294a884b..8a8585e05bc 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -29,7 +29,6 @@ static ExpireAction hashTypeActiveExpire(eItem hashObj, void *ctx); static void hfieldPersist(robj *hashObj, hfield field); static void updateGlobalHfeDs(redisDb *db, robj *o, uint64_t minExpire, uint64_t minExpireFields); static void propagateHashFieldDeletion(redisDb *db, sds key, char *field, size_t fieldLen); -static void listpackExExpire(redisDb *db, robj *o, ExpireInfo *info); /* hash dictType funcs */ static int dictHfieldKeyCompare(dict *d, const void *key1, const void *key2); From 088430db7e37dbba81b79c8ef6abafe3da3ec043 Mon Sep 17 00:00:00 2001 From: Moti Cohen Date: Sun, 26 May 2024 11:02:27 +0300 Subject: [PATCH 5/7] RDB load expired fields sync to replicas --- src/debug.c | 2 +- src/ebuckets.c | 2 +- src/ebuckets.h | 2 +- src/module.c | 4 +- src/rdb.c | 78 +++++++++++---------- src/rdb.h | 5 +- src/t_hash.c | 8 ++- tests/integration/psync2-master-restart.tcl | 11 +++ tests/unit/type/hash-field-expire.tcl | 4 +- 9 files changed, 67 insertions(+), 49 deletions(-) diff --git a/src/debug.c b/src/debug.c index 1ae88717da5..4b5f730610f 100644 --- a/src/debug.c +++ b/src/debug.c @@ -213,7 +213,7 @@ void xorObjectDigest(redisDb *db, robj *keyobj, unsigned char *digest, robj *o) sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_VALUE); mixDigest(eledigest,sdsele,sdslen(sdsele)); sdsfree(sdsele); - /* field expire */ + /* hash-field expiration (HFE) */ if (hi->expire_time != EB_EXPIRE_TIME_INVALID) xorDigest(eledigest,"!!hexpire!!",11); xorDigest(digest,eledigest,20); diff --git a/src/ebuckets.c b/src/ebuckets.c index 7235862850c..279ee98dcad 100644 --- a/src/ebuckets.c +++ b/src/ebuckets.c @@ -190,7 +190,7 @@ static inline uint64_t raxKey2BucketKey(unsigned char *raxKey) { * Before: [segHdr] -> {item1,..,item16} -> [..] * After: [segHdr] -> {newItem} -> [nextSegHdr] -> {item1,..,item16} -> [..] * - * Take care to persist `segHdr` to be the same instance after the change. + * Taken care to persist `segHdr` to be the same instance after the change. * This is important because the rax tree is pointing to it. */ static int ebSegAddExtended(EbucketsType *type, FirstSegHdr *firstSegHdr, eItem newItem) { /* Allocate nextSegHdr and let it take the items of first segment header */ diff --git a/src/ebuckets.h b/src/ebuckets.h index f3033733061..66954131a02 100644 --- a/src/ebuckets.h +++ b/src/ebuckets.h @@ -139,7 +139,7 @@ * The idea of it is to trim the rax tree depth, avoid having too many branches, * and reduce frequent modifications of the tree to the minimum. */ -#define EB_BUCKET_KEY_PRECISION 0 /* 1024msec */ +#define EB_BUCKET_KEY_PRECISION 0 /* TBD: modify to 10 */ /* From expiration time to bucket-key */ #define EB_BUCKET_KEY(exptime) ((exptime) >> EB_BUCKET_KEY_PRECISION) diff --git a/src/module.c b/src/module.c index c35d67eb05d..a59852e2e0c 100644 --- a/src/module.c +++ b/src/module.c @@ -5375,13 +5375,13 @@ int RM_HashGet(RedisModuleKey *key, int flags, ...) { if (flags & REDISMODULE_HASH_EXISTS) { existsptr = va_arg(ap,int*); if (key->value) - *existsptr = hashTypeExists(key->db, key->value,field->ptr); + *existsptr = hashTypeExists(key->db,key->value,field->ptr); else *existsptr = 0; } else { valueptr = va_arg(ap,RedisModuleString**); if (key->value) { - *valueptr = hashTypeGetValueObject(key->db, key->value,field->ptr); + *valueptr = hashTypeGetValueObject(key->db,key->value,field->ptr); if (*valueptr) { robj *decoded = getDecodedObject(*valueptr); decrRefCount(*valueptr); diff --git a/src/rdb.c b/src/rdb.c index 6053cf40ac8..b5e9d724033 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2236,7 +2236,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, size_t fieldLen; sds value, field; - uint64_t expire, minExpire = EB_EXPIRE_TIME_INVALID; + uint64_t expireAt, minExpire = EB_EXPIRE_TIME_INVALID; mstime_t now = mstime(); dict *dupSearchDict = NULL; ebuckets *hexpires = (db != NULL ? &db->hexpires : NULL); @@ -2266,14 +2266,14 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, len--; /* read the TTL */ - if (rdbLoadLenByRef(rdb, NULL, &expire) == -1) { + if (rdbLoadLenByRef(rdb, NULL, &expireAt) == -1) { serverLog(LL_WARNING, "failed reading hash TTL"); decrRefCount(o); if (dupSearchDict != NULL) dictRelease(dupSearchDict); return NULL; } - if (expire > EB_EXPIRE_TIME_MAX) { - rdbReportCorruptRDB("invalid expire time: %llu", (unsigned long long)expire); + if (expireAt > EB_EXPIRE_TIME_MAX) { + rdbReportCorruptRDB("invalid expireAt time: %llu", (unsigned long long)expireAt); decrRefCount(o); return NULL; } @@ -2306,11 +2306,25 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, * Expired hash fields on the master are silently discarded. * Note that if all fields in a hash has expired, the hash would not be * created in memory (because it is created on the first valid field), and - * thus the key would be discarded as an "empty key" */ - if (expire != 0 && iAmMaster() && ((mstime_t)expire < now) && /* note: expire was saved to RDB as unix-time in milliseconds */ + * thus the key would be discarded as an "expired hash" */ + if (expireAt != 0 && iAmMaster() && ((mstime_t)expireAt < now) && !(rdbflags & RDBFLAGS_AOF_PREAMBLE)) { - /* TODO: consider replication (like in rdbLoadAddKeyToDb) */ + if (rdbflags & RDBFLAGS_FEED_REPL && db) { + /* Caller should have created replication backlog, + * and now this path only works when rebooting, + * so we don't have replicas yet. */ + serverAssert(server.repl_backlog != NULL && listLength(server.slaves) == 0); + robj keyobj, fieldObj; + initStaticStringObject(keyobj,key); + initStaticStringObject(fieldObj, field); + robj *argv[3]; + argv[0] = shared.hdel; + argv[1] = &keyobj; + argv[2] = &fieldObj; + replicationFeedSlaves(server.slaves,db->id,argv,3); + } + server.rdb_last_load_hash_fields_expired++; sdsfree(field); sdsfree(value); @@ -2318,7 +2332,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, } /* keep the nearest expiration to connect listpack object to db expiry */ - if ((expire != 0) && (expire < minExpire)) minExpire = expire; + if ((expireAt != 0) && (expireAt < minExpire)) minExpire = expireAt; /* store the values read - either to listpack or dict */ if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { @@ -2340,7 +2354,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, /* check if the values can be saved to listpack (or should convert to dict encoding) */ if (sdslen(field) > server.hash_max_listpack_value || sdslen(value) > server.hash_max_listpack_value || - !lpSafeToAdd(((listpackEx*)o->ptr)->lp, sdslen(field) + sdslen(value) + lpEntrySizeInteger(expire))) + !lpSafeToAdd(((listpackEx*)o->ptr)->lp, sdslen(field) + sdslen(value) + lpEntrySizeInteger(expireAt))) { /* convert to hash */ hashTypeConvert(o, OBJ_ENCODING_HT, hexpires); @@ -2358,7 +2372,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, /* don't add the values to the new hash: the next if will catch and the values will be added there */ } else { - listpackExAddNew(o, field, value, expire); + listpackExAddNew(o, field, value, expireAt); sdsfree(field); sdsfree(value); } @@ -2367,10 +2381,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, if (o->encoding == OBJ_ENCODING_HT) { /* WA for check-rdb mode, when there's no DB so can't attach expired items to ebuckets, * or when no expiry was not set for this field */ - if ((db == NULL) || (expire == 0)) { + if ((db == NULL) || (expireAt == 0)) { hashTypeSet(db, o, field, value, 0); } else { - if (hashTypeSetExRdb(db, o, field, value, expire) != C_OK) { + if (hashTypeSetExRdb(db, o, field, value, expireAt) != C_OK) { serverLog(LL_WARNING, "failed adding hash field %s to key %s", field, key); decrRefCount(o); if (dupSearchDict != NULL) dictRelease(dupSearchDict); @@ -2389,7 +2403,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, /* check for empty key (if all fields were expired) */ if (hashTypeLength(o, 0) == 0) { decrRefCount(o); - goto emptykey; + goto expiredHash; } if ((db != NULL) && (minExpire != EB_EXPIRE_TIME_INVALID)) hashTypeAddToExpires(db, key, o, minExpire); @@ -2719,31 +2733,15 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, /* for TTL listpack, find the minimum expiry */ uint64_t minExpire = hashTypeGetNextTimeToExpire(o); - /* check if need to convert to dict encoding */ - if ((db != NULL) && - (hashTypeLength(o, 0) > server.hash_max_listpack_entries)) /* TODO: each field length is not verified against server.hash_max_listpack_value */ - { - hashTypeConvert(o, OBJ_ENCODING_HT, &db->hexpires); - /* - * hashTypeAddToExpires is presumably called from within - * the convert function (from listpackEx to dict), BUT, - * this call depends on the lpt->meta field to be updated, - * which is not the case here as hashTypeAddToExpires was - * not yet called for the listpack (which is what updating - * its meta). - * Instead, this "manual" call is added here. - * Another approach would be to have the conversion function - * find the minExpire by itself when iterating on the listpack - * instead of relying on the meta and use this value for the - * final ebAdd call. - */ + /* Convert listpac to hash table without register in global HFE DS + * since the listpack is not connected yet to the DB */ + if (hashTypeLength(o, 0) > server.hash_max_listpack_entries) + hashTypeConvert(o, OBJ_ENCODING_HT, NULL /*db->hexpires*/); + + /* Only now register in global HFE DS, if db provided */ + if (db != NULL) hashTypeAddToExpires(db, key, o, minExpire); - } else if (rdbtype == RDB_TYPE_HASH_LISTPACK_EX) { - /* connect the listpack to the DB-global expiry data structure */ - if ((minExpire != EB_EXPIRE_TIME_INVALID) && (db != NULL)) { /* DB can be NULL when checking rdb */ - hashTypeAddToExpires(db, key, o, minExpire); - } - } + break; default: /* totally unreachable */ @@ -3134,6 +3132,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, emptykey: if (error) *error = RDB_LOAD_ERR_EMPTY_KEY; return NULL; +expiredHash: + if (error) *error = RDB_LOAD_ERR_EXPIRED_HASH; + return NULL; } /* Mark that we are loading in the global state and setup the fields @@ -3563,6 +3564,9 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin if(empty_keys_skipped++ < 10) serverLog(LL_NOTICE, "rdbLoadObject skipping empty key: %s", key); sdsfree(key); + } else if (error == RDB_LOAD_ERR_EXPIRED_HASH) { + /* Valid flow. Continue. */ + sdsfree(key); } else { sdsfree(key); goto eoferr; diff --git a/src/rdb.h b/src/rdb.h index 2f4c49954aa..73a6a5b67c1 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -119,8 +119,9 @@ /* When rdbLoadObject() returns NULL, the err flag is * set to hold the type of error that occurred */ -#define RDB_LOAD_ERR_EMPTY_KEY 1 /* Error of empty key */ -#define RDB_LOAD_ERR_OTHER 2 /* Any other errors */ +#define RDB_LOAD_ERR_EMPTY_KEY 1 /* Error of empty key */ +#define RDB_LOAD_ERR_EXPIRED_HASH 2 /* Expired hash since all its fields are expired */ +#define RDB_LOAD_ERR_OTHER 3 /* Any other errors */ ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len); int rdbSaveType(rio *rdb, unsigned char type); diff --git a/src/t_hash.c b/src/t_hash.c index 2952c949d7a..0e43d9567cb 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -797,7 +797,7 @@ robj *hashTypeGetValueObject(redisDb *db, robj *o, sds field) { unsigned int vlen; long long vll; - if (hashTypeGetValue(db, o,field,&vstr,&vlen,&vll) == C_ERR) return NULL; + if (hashTypeGetValue(db,o,field,&vstr,&vlen,&vll) == C_ERR) return NULL; if (vstr) return createStringObject((char*)vstr,vlen); else return createStringObjectFromLongLong(vll); } @@ -1676,7 +1676,7 @@ void hashTypeConvertListpackEx(robj *o, int enc, ebuckets *hexpires) { } } -/* NOTE: hexpires can be NULL (Won't attempt to register in global HFE DS) */ +/* NOTE: hexpires can be NULL (Won't register in global HFE DS) */ void hashTypeConvert(robj *o, int enc, ebuckets *hexpires) { if (o->encoding == OBJ_ENCODING_LISTPACK) { hashTypeConvertListpack(o, enc); @@ -4025,7 +4025,9 @@ void hsetfCommand(client *c) { return; hashObj = lookupKeyWrite(c->db, c->argv[1]); - if (!hashObj) { + if (hashObj) { + if (checkType(c, hashObj, OBJ_HASH)) return; + } else { /* Don't create the object if command has DC or DCF arguments */ if (flags & HFE_CMD_DC || flags & HFE_CMD_DCF) { addReplyOrErrorObject(c, shared.null[c->resp]); diff --git a/tests/integration/psync2-master-restart.tcl b/tests/integration/psync2-master-restart.tcl index 6f7a31d2187..a74352d618f 100644 --- a/tests/integration/psync2-master-restart.tcl +++ b/tests/integration/psync2-master-restart.tcl @@ -179,6 +179,15 @@ start_server {} { $master set $j somevalue px 10 } + ##### hash-field-expiration + # Hashes of type OBJ_ENCODING_LISTPACK_EX won't be discarded during + # RDB load, even if they are expired. + $master hsetf myhash1 PX 10 FVS 3 f1 v1 f2 v2 f3 v3 + # Hashes of type RDB_TYPE_HASH_METADATA will be discarded during RDB load. + $master config set hash-max-listpack-entries 0 + $master hsetf myhash2 PX 10 FVS 2 f1 v1 f2 v2 + $master config set hash-max-listpack-entries 1 + after 20 wait_for_condition 500 100 { @@ -209,6 +218,8 @@ start_server {} { assert {[status $master sync_partial_ok] == 0} assert {[status $master sync_full] == 1} assert {[status $master rdb_last_load_keys_expired] == 2048} + + assert {[status $master rdb_last_load_hash_fields_expired] == 2} assert {[status $replica sync_full] == 1} set digest [$master debug digest] diff --git a/tests/unit/type/hash-field-expire.tcl b/tests/unit/type/hash-field-expire.tcl index dcbe7c986ef..d2e91c41607 100644 --- a/tests/unit/type/hash-field-expire.tcl +++ b/tests/unit/type/hash-field-expire.tcl @@ -1286,12 +1286,12 @@ start_server {tags {"external:skip needs:debug"}} { r config resetstat r del myhash r hset myhash f1 v1 f2 v2 f3 v3 f4 v4 f5 v5 - r hpexpire myhash 100 1 f1 f2 f3 + r hpexpire myhash 100 FIELDS 3 f1 f2 f3 assert_match [get_hashes_with_expiry_fields r] 1 r hset myhash2 f1 v1 f2 v2 f3 v3 f4 v4 f5 v5 assert_match [get_hashes_with_expiry_fields r] 1 - r hpexpire myhash2 100 1 f1 f2 f3 + r hpexpire myhash2 100 FIELDS 3 f1 f2 f3 assert_match [get_hashes_with_expiry_fields r] 2 wait_for_condition 50 50 { From 502964d6768660817866dc1e53fb21638fb4e7f1 Mon Sep 17 00:00:00 2001 From: Moti Cohen Date: Mon, 27 May 2024 15:10:21 +0300 Subject: [PATCH 6/7] Extend lazy-expire + preserve expiry after hincrby --- src/aof.c | 1 - src/ebuckets.c | 23 ++- src/module.c | 6 +- src/rdb.c | 5 +- src/server.h | 2 +- src/sort.c | 7 +- src/t_hash.c | 255 ++++++++++++++++---------- tests/unit/type/hash-field-expire.tcl | 55 +++++- 8 files changed, 234 insertions(+), 120 deletions(-) diff --git a/src/aof.c b/src/aof.c index b4d97ccd853..100f0ccefdc 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1968,7 +1968,6 @@ int rewriteHashObject(rio *r, robj *key, robj *o) { hashTypeIterator *hi; long long count = 0, items = hashTypeLength(o, 0); - /* Is expected also hash-fields with expiration (HFE) ? */ int isHFE = hashTypeGetMinExpire(o) != EB_EXPIRE_TIME_INVALID; hi = hashTypeInitIterator(o); diff --git a/src/ebuckets.c b/src/ebuckets.c index 279ee98dcad..387aef88c93 100644 --- a/src/ebuckets.c +++ b/src/ebuckets.c @@ -23,6 +23,12 @@ * #define EB_VALIDATE_DEBUG 1 */ +#if (REDIS_TEST || EB_VALIDATE_DEBUG) && !defined(EB_TEST_BENCHMARK) +#define EB_VALIDATE_STRUCTURE(eb, type) ebValidate(eb, type) +#else +#define EB_VALIDATE_STRUCTURE(eb, type) // Do nothing +#endif + /*** BENCHMARK * * To benchmark ebuckets creation and active-expire with 10 million items, apply @@ -1390,9 +1396,8 @@ int ebRemove(ebuckets *eb, EbucketsType *type, eItem item) { if (res) type->getExpireMeta(item)->trash = 1; -#if (REDIS_TEST || EB_VALIDATE_DEBUG) && !defined(EB_TEST_BENCHMARK) - ebValidate(*eb, type); -#endif + EB_VALIDATE_STRUCTURE(*eb, type); + return res; } @@ -1435,9 +1440,9 @@ int ebAdd(ebuckets *eb, EbucketsType *type, eItem item, uint64_t expireTime) { /* Add item to rax */ res = ebAddToRax(eb, type, item, EB_BUCKET_KEY(expireTime)); } -#if (REDIS_TEST || EB_VALIDATE_DEBUG) && !defined(EB_TEST_BENCHMARK) - ebValidate(*eb, type); -#endif + + EB_VALIDATE_STRUCTURE(*eb, type); + return res; } @@ -1521,9 +1526,9 @@ void ebExpire(ebuckets *eb, EbucketsType *type, ExpireInfo *info) { ebAdd(eb, type, updateList, ebGetMetaExpTime(mItem)); updateList = next; } -#if (REDIS_TEST || EB_VALIDATE_DEBUG) && !defined(EB_TEST_BENCHMARK) - ebValidate(*eb, type); -#endif + + EB_VALIDATE_STRUCTURE(*eb, type); + return; } diff --git a/src/module.c b/src/module.c index a59852e2e0c..1cad6f72c43 100644 --- a/src/module.c +++ b/src/module.c @@ -5379,9 +5379,13 @@ int RM_HashGet(RedisModuleKey *key, int flags, ...) { else *existsptr = 0; } else { + int isHashDeleted; valueptr = va_arg(ap,RedisModuleString**); if (key->value) { - *valueptr = hashTypeGetValueObject(key->db,key->value,field->ptr); + *valueptr = hashTypeGetValueObject(key->db,key->value,field->ptr, &isHashDeleted); + + /* Currently hash-field-expiration is not exposed to modules */ + serverAssert(isHashDeleted == 0); if (*valueptr) { robj *decoded = getDecodedObject(*valueptr); decrRefCount(*valueptr); diff --git a/src/rdb.c b/src/rdb.c index b5e9d724033..69b5a543adf 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2310,6 +2310,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, if (expireAt != 0 && iAmMaster() && ((mstime_t)expireAt < now) && !(rdbflags & RDBFLAGS_AOF_PREAMBLE)) { + /* TODO: aggregate HDELs and send them to replicas. */ if (rdbflags & RDBFLAGS_FEED_REPL && db) { /* Caller should have created replication backlog, * and now this path only works when rebooting, @@ -2733,8 +2734,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, /* for TTL listpack, find the minimum expiry */ uint64_t minExpire = hashTypeGetNextTimeToExpire(o); - /* Convert listpac to hash table without register in global HFE DS - * since the listpack is not connected yet to the DB */ + /* Convert listpack to hash table without register in global HFE DS, + * if has HFEs, since the listpack is not connected yet to the DB */ if (hashTypeLength(o, 0) > server.hash_max_listpack_entries) hashTypeConvert(o, OBJ_ENCODING_HT, NULL /*db->hexpires*/); diff --git a/src/server.h b/src/server.h index bedddb6d328..ed0e6e5d774 100644 --- a/src/server.h +++ b/src/server.h @@ -3191,7 +3191,7 @@ void hashTypeCurrentObject(hashTypeIterator *hi, int what, unsigned char **vstr, unsigned int *vlen, long long *vll, uint64_t *expireTime); sds hashTypeCurrentObjectNewSds(hashTypeIterator *hi, int what); hfield hashTypeCurrentObjectNewHfield(hashTypeIterator *hi); -robj *hashTypeGetValueObject(redisDb *db, robj *o, sds field); +robj *hashTypeGetValueObject(redisDb *db, robj *o, sds field, int *isHashDeleted); int hashTypeSet(redisDb *db, robj *o, sds field, sds value, int flags); robj *hashTypeDup(robj *o, sds newkey, uint64_t *minHashExpire); uint64_t hashTypeRemoveFromExpires(ebuckets *hexpires, robj *o); diff --git a/src/sort.c b/src/sort.c index 4b226ab5bc1..d45c380ac39 100644 --- a/src/sort.c +++ b/src/sort.c @@ -94,7 +94,12 @@ robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { /* Retrieve value from hash by the field name. The returned object * is a new object with refcount already incremented. */ - o = hashTypeGetValueObject(db, o, fieldobj->ptr); + int isHashDeleted; + o = hashTypeGetValueObject(db, o, fieldobj->ptr, &isHashDeleted); + + if (isHashDeleted) + goto noobj; + } else { if (o->type != OBJ_STRING) goto noobj; diff --git a/src/t_hash.c b/src/t_hash.c index fb80eff0b6f..513ac8a71c9 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -14,10 +14,16 @@ * update the expiration time of the hash object in global HFE DS. */ #define HASH_NEW_EXPIRE_DIFF_THRESHOLD max(4000, 1<hexpires); } -/* Get the value from a listpack encoded hash, identified by field. - * Returns -1 when the field cannot be found. */ +/* Get the value from a listpack encoded hash, identified by field. */ GetFieldRes hashTypeGetFromListpack(robj *o, sds field, unsigned char **vstr, unsigned int *vlen, - long long *vll) + long long *vll, + uint64_t *expiredAt) { + *expiredAt = EB_EXPIRE_TIME_INVALID; unsigned char *zl, *fptr = NULL, *vptr = NULL; if (o->encoding == OBJ_ENCODING_LISTPACK) { @@ -680,9 +687,8 @@ GetFieldRes hashTypeGetFromListpack(robj *o, sds field, h = lpNext(lpt->lp, vptr); serverAssert(h && lpGetIntegerValue(h, &expire)); - - if (hashTypeIsExpired(o, expire)) - return GET_FIELD_EXPIRED; + if (expire != HASH_LP_NO_TTL) + *expiredAt = expire; } } } else { @@ -691,142 +697,151 @@ GetFieldRes hashTypeGetFromListpack(robj *o, sds field, if (vptr != NULL) { *vstr = lpGetValue(vptr, vlen, vll); - return GET_FIELD_OK; + return GETF_OK; } - return GET_FIELD_NOT_FOUND; + return GETF_NOT_FOUND; } /* Get the value from a hash table encoded hash, identified by field. * Returns NULL when the field cannot be found, otherwise the SDS value * is returned. */ -GetFieldRes hashTypeGetFromHashTable(robj *o, sds field, sds *value) { +GetFieldRes hashTypeGetFromHashTable(robj *o, sds field, sds *value, uint64_t *expiredAt) { dictEntry *de; + *expiredAt = EB_EXPIRE_TIME_INVALID; + serverAssert(o->encoding == OBJ_ENCODING_HT); de = dictFind(o->ptr, field); - if (de == NULL) return GET_FIELD_NOT_FOUND; - - /* Check if the field is expired */ - if (hfieldIsExpired(dictGetKey(de))) return GET_FIELD_EXPIRED; + if (de == NULL) + return GETF_NOT_FOUND; + *expiredAt = hfieldGetExpireTime(dictGetKey(de)); *value = (sds) dictGetVal(de); - return GET_FIELD_OK; + return GETF_OK; } /* Higher level function of hashTypeGet*() that returns the hash value - * associated with the specified field. If the field is found C_OK - * is returned, otherwise C_ERR. The returned object is returned by - * reference in either *vstr and *vlen if it's returned in string form, - * or stored in *vll if it's returned as a number. + * associated with the specified field. * - * If *vll is populated *vstr is set to NULL, so the caller - * can always check the function return by checking the return value - * for C_OK and checking if vll (or vstr) is NULL. + * Returned: + * - GetFieldRes: OK: Return Field's valid value + * NOT_FOUND: Field was not found. + * EXPIRED: Field is expired and Lazy deleted + * EXPIRED_HASH: Returned only if the field is the last one in the + * hash and the hash is deleted. + * - vstr, vlen : if string, ref in either *vstr and *vlen if it's + * returned in string form, + * - vll : or stored in *vll if it's returned as a number. + * If *vll is populated *vstr is set to NULL, so the caller can + * always check the function return by checking the return value + * for GETF_OK and checking if vll (or vstr) is NULL. * - * If field is expired (GET_FIELD_EXPIRED), then it will be lazy deleted. */ -int hashTypeGetValue(redisDb *db, robj *o, sds field, unsigned char **vstr, unsigned int *vlen, long long *vll) { +GetFieldRes hashTypeGetValue(redisDb *db, robj *o, sds field, unsigned char **vstr, + unsigned int *vlen, long long *vll) { + uint64_t expiredAt; sds key; GetFieldRes res; if (o->encoding == OBJ_ENCODING_LISTPACK || o->encoding == OBJ_ENCODING_LISTPACK_EX) { *vstr = NULL; - res = hashTypeGetFromListpack(o, field, vstr, vlen, vll); + res = hashTypeGetFromListpack(o, field, vstr, vlen, vll, &expiredAt); - if (res == GET_FIELD_OK) - return C_OK; - - if (res == GET_FIELD_NOT_FOUND) - return C_ERR; - - /* If reached here then res == GET_FIELD_EXPIRED. Extract key. */ - serverAssert(o->encoding == OBJ_ENCODING_LISTPACK_EX); - listpackEx *lpt = o->ptr; - key = lpt->key; + if (res == GETF_NOT_FOUND) + return GETF_NOT_FOUND; + key = ((listpackEx *) o->ptr)->key; } else if (o->encoding == OBJ_ENCODING_HT) { sds value = NULL; - res = hashTypeGetFromHashTable(o, field, &value); - if (res == GET_FIELD_OK) { - *vstr = (unsigned char*) value; - *vlen = sdslen(value); - return C_OK; - } + res = hashTypeGetFromHashTable(o, field, &value, &expiredAt); - if (res == GET_FIELD_NOT_FOUND) - return C_ERR; + if (res == GETF_NOT_FOUND) + return GETF_NOT_FOUND; - /* If reached here then res == GET_FIELD_EXPIRED. Extract key. */ - serverAssert(isDictWithMetaHFE((dict*)o->ptr)); + + *vstr = (unsigned char*) value; + *vlen = sdslen(value); key = ((dictExpireMetadata *) dictMetadata((dict*)o->ptr))->key; } else { serverPanic("Unknown hash encoding"); } - /* If in replica then return C_OK (discard hash-field expiry logic) */ + /* Don't expire anything while loading. It will be done later. */ + if (server.loading) + return GETF_OK; + + if (server.lazy_expire_disabled) + return GETF_OK; + if ((server.masterhost) && (server.current_client && (server.current_client->flags & CLIENT_MASTER))) - return C_OK; + return GETF_OK; + + if (expiredAt >= (uint64_t) commandTimeSnapshot()) + return GETF_OK; /* delete the field and propagate the deletion */ serverAssert(hashTypeDelete(o, field, 1) == 1); propagateHashFieldDeletion(db, key, field, sdslen(field)); + /* If the field is the last one in the hash, then the hash will be deleted */ if (hashTypeLength(o, 0) == 0) { - - /* create temporary robj with the key */ robj *keyObj = createStringObject(key, sdslen(key)); - notifyKeyspaceEvent(NOTIFY_GENERIC, "del", keyObj, db->id); dbDelete(db,keyObj); decrRefCount(keyObj); + return GETF_EXPIRED_HASH; } - return C_ERR; + + return GETF_EXPIRED; } /* Like hashTypeGetValue() but returns a Redis object, which is useful for * interaction with the hash type outside t_hash.c. * The function returns NULL if the field is not found in the hash. Otherwise - * a newly allocated string object with the value is returned. */ -robj *hashTypeGetValueObject(redisDb *db, robj *o, sds field) { + * a newly allocated string object with the value is returned. + * + * isHashDeleted - If attempted to access expired field and it's the last field + * in the hash, then the hash will as well be deleted. In this case, + * isHashDeleted will be set to 1. + */ +robj *hashTypeGetValueObject(redisDb *db, robj *o, sds field, int *isHashDeleted) { unsigned char *vstr; unsigned int vlen; long long vll; - if (hashTypeGetValue(db,o,field,&vstr,&vlen,&vll) == C_ERR) return NULL; - if (vstr) return createStringObject((char*)vstr,vlen); - else return createStringObjectFromLongLong(vll); -} + *isHashDeleted = 0; /*default*/ + GetFieldRes res = hashTypeGetValue(db,o,field,&vstr,&vlen,&vll); -/* Higher level function using hashTypeGet*() to return the length of the - * object associated with the requested field, or 0 if the field does not - * exist. */ -size_t hashTypeGetValueLength(redisDb *db, robj *o, sds field) { - size_t len = 0; - unsigned char *vstr = NULL; - unsigned int vlen = UINT_MAX; - long long vll = LLONG_MAX; + if (res == GETF_OK) { + if (vstr) return createStringObject((char*)vstr,vlen); + else return createStringObjectFromLongLong(vll); + } - if (hashTypeGetValue(db, o, field, &vstr, &vlen, &vll) == C_OK) - len = vstr ? vlen : sdigits10(vll); + if (res == GETF_EXPIRED_HASH) + *isHashDeleted = 1; - return len; + /* GETF_EXPIRED_HASH, GETF_EXPIRED, GETF_NOT_FOUND */ + return NULL; } /* Test if the specified field exists in the given hash. If the field is * expired (HFE), then it will be lazy deleted * * Returns 1 if the field exists, and 0 when it doesn't. + * + * Note: This function will delete the field if it is expired (HFE). If it is the + * last field in the hash, then the hash will be deleted as well. */ int hashTypeExists(redisDb *db, robj *o, sds field) { unsigned char *vstr = NULL; unsigned int vlen = UINT_MAX; long long vll = LLONG_MAX; - return hashTypeGetValue(db, o, field, &vstr, &vlen, &vll) == C_OK; + return (hashTypeGetValue(db, o, field, &vstr, &vlen, &vll) == GETF_OK) ? 1 : 0; } /* Add a new field, overwrite the old with the new value if it already exists. @@ -836,8 +851,9 @@ int hashTypeExists(redisDb *db, robj *o, sds field) { * caller retains ownership of the strings passed. However this behavior * can be effected by passing appropriate flags (possibly bitwise OR-ed): * - * HASH_SET_TAKE_FIELD -- The SDS field ownership passes to the function. - * HASH_SET_TAKE_VALUE -- The SDS value ownership passes to the function. + * HASH_SET_TAKE_FIELD -- The SDS field ownership passes to the function. + * HASH_SET_TAKE_VALUE -- The SDS value ownership passes to the function. + * HASH_SET_KEEP_FIELD -- keep original field along with TTL if already exists * * When the flags are used the caller does not need to release the passed * SDS string(s). It's up to the function to use the string to create a new @@ -847,8 +863,9 @@ int hashTypeExists(redisDb *db, robj *o, sds field) { * semantics of copying the values if needed. * */ -#define HASH_SET_TAKE_FIELD (1<<0) -#define HASH_SET_TAKE_VALUE (1<<1) +#define HASH_SET_TAKE_FIELD (1<<0) +#define HASH_SET_TAKE_VALUE (1<<1) +#define HASH_SET_KEEP_FIELD (1<<2) #define HASH_SET_COPY 0 int hashTypeSet(redisDb *db, robj *o, sds field, sds value, int flags) { HashTypeSet set = {value, flags}; @@ -1014,13 +1031,18 @@ SetExRes hashTypeSetEx(redisDb *db, robj *o, sds field, HashTypeSet *setKeyVal, /* If field already exists, then update "field". "Value" will be set afterward */ if (de == NULL) { - /* If attached TTL to the old field, then remove it from hash's private ebuckets */ - hfield oldField = dictGetKey(existing); - hfieldPersist(o, oldField); - hfieldFree(oldField); - sdsfree(dictGetVal(existing)); - dictSetKey(ht, existing, newField); + if (flags & HASH_SET_KEEP_FIELD) { + /* Not keep old field along with TTL */ + hfieldFree(newField); + } else { + /* If attached TTL to the old field, then remove it from hash's private ebuckets */ + hfield oldField = dictGetKey(existing); + hfieldPersist(o, oldField); + hfieldFree(oldField); + sdsfree(dictGetVal(existing)); + dictSetKey(ht, existing, newField); + } res = HSET_UPDATE; de = existing; } @@ -1184,13 +1206,16 @@ void hashTypeSetExDone(HashTypeSetEx *ex) { /* Check if the field is too long for listpack, and convert before adding the item. * This is needed for HINCRBY* case since in other commands this is handled early by * hashTypeTryConversion, so this check will be a NOP. */ -static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeSet *s, - uint64_t expireAt, HashTypeSetEx *ex) +static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeSet *setParams, + uint64_t expireAt, HashTypeSetEx *exParams) { int res = HSETEX_OK; unsigned char *fptr = NULL, *vptr = NULL, *tptr = NULL; if (o->encoding == OBJ_ENCODING_LISTPACK) { + /* If reached here, then only for setting field with value */ + serverAssert(setParams != NULL); + unsigned char *zl = o->ptr; fptr = lpFirst(zl); if (fptr != NULL) { @@ -1202,14 +1227,14 @@ static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeS res = HSET_UPDATE; /* Replace value */ - zl = lpReplace(zl, &vptr, (unsigned char *) s->value, sdslen(s->value)); + zl = lpReplace(zl, &vptr, (unsigned char *) setParams->value, sdslen(setParams->value)); } } if (res != HSET_UPDATE) { /* Push new field/value pair onto the tail of the listpack */ zl = lpAppend(zl, (unsigned char*)field, sdslen(field)); - zl = lpAppend(zl, (unsigned char*)s->value, sdslen(s->value)); + zl = lpAppend(zl, (unsigned char*)setParams->value, sdslen(setParams->value)); } o->ptr = zl; goto out; @@ -1225,11 +1250,11 @@ static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeS vptr = lpNext(lpt->lp, fptr); serverAssert(vptr != NULL); - if (s) { + if (setParams) { /* Replace value */ lpt->lp = lpReplace(lpt->lp, &vptr, - (unsigned char *) s->value, - sdslen(s->value)); + (unsigned char *) setParams->value, + sdslen(setParams->value)); fptr = lpPrev(lpt->lp, vptr); serverAssert(fptr != NULL); @@ -1238,8 +1263,11 @@ static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeS tptr = lpNext(lpt->lp, vptr); serverAssert(tptr && lpGetIntegerValue(tptr, &expireTime)); - if (ex) { - res = hashTypeSetExpiryListpack(ex, field, fptr, vptr, tptr, + /* Keep, update or clear TTL */ + if (setParams && setParams->flags & HASH_SET_KEEP_FIELD) { + /* keep old field along with TTL */ + } else if (exParams) { + res = hashTypeSetExpiryListpack(exParams, field, fptr, vptr, tptr, expireAt); if (res != HSETEX_OK) goto out; @@ -1251,9 +1279,9 @@ static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeS } if (!fptr) { - if (s) { - listpackExAddNew(o, field, s->value, - ex ? expireAt : HASH_LP_NO_TTL); + if (setParams) { + listpackExAddNew(o, field, setParams->value, + exParams ? expireAt : HASH_LP_NO_TTL); } else { res = HSETEX_NO_FIELD; } @@ -2138,14 +2166,21 @@ void hincrbyCommand(client *c) { if (getLongLongFromObjectOrReply(c,c->argv[3],&incr,NULL) != C_OK) return; if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; - if (hashTypeGetValue(c->db, o,c->argv[2]->ptr,&vstr,&vlen,&value) == C_OK) { + + GetFieldRes res = hashTypeGetValue(c->db, o,c->argv[2]->ptr,&vstr,&vlen,&value); + if (res == GETF_OK) { if (vstr) { if (string2ll((char*)vstr,vlen,&value) == 0) { addReplyError(c,"hash value is not an integer"); return; } } /* Else hashTypeGetValue() already stored it into &value */ + } else if ((res == GETF_NOT_FOUND) || (res == GETF_EXPIRED)) { + value = 0; } else { + /* Field expired and in turn hash deleted by hashTypeGetValue(). Create new one! */ + o = createHashObject(); + dbAdd(c->db,c->argv[1],o); value = 0; } @@ -2157,7 +2192,7 @@ void hincrbyCommand(client *c) { } value += incr; new = sdsfromlonglong(value); - hashTypeSet(c->db, o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE); + hashTypeSet(c->db, o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE | HASH_SET_KEEP_FIELD); addReplyLongLong(c,value); signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_HASH,"hincrby",c->argv[1],c->db->id); @@ -2178,7 +2213,8 @@ void hincrbyfloatCommand(client *c) { return; } if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; - if (hashTypeGetValue(c->db, o,c->argv[2]->ptr,&vstr,&vlen,&ll) == C_OK) { + GetFieldRes res = hashTypeGetValue(c->db, o,c->argv[2]->ptr,&vstr,&vlen,&ll); + if (res == GETF_OK) { if (vstr) { if (string2ld((char*)vstr,vlen,&value) == 0) { addReplyError(c,"hash value is not a float"); @@ -2187,7 +2223,12 @@ void hincrbyfloatCommand(client *c) { } else { value = (long double)ll; } + } else if ((res == GETF_NOT_FOUND) || (res == GETF_EXPIRED)) { + value = 0; } else { + /* Field expired and in turn hash deleted by hashTypeGetValue(). Create new one! */ + o = createHashObject(); + dbAdd(c->db,c->argv[1],o); value = 0; } @@ -2200,7 +2241,7 @@ void hincrbyfloatCommand(client *c) { char buf[MAX_LONG_DOUBLE_CHARS]; int len = ld2string(buf,sizeof(buf),value,LD_STR_HUMAN); new = sdsnewlen(buf,len); - hashTypeSet(c->db, o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE); + hashTypeSet(c->db, o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE | HASH_SET_KEEP_FIELD); addReplyBulkCBuffer(c,buf,len); signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_HASH,"hincrbyfloat",c->argv[1],c->db->id); @@ -2226,7 +2267,7 @@ static void addHashFieldToReply(client *c, robj *o, sds field) { unsigned int vlen = UINT_MAX; long long vll = LLONG_MAX; - if (hashTypeGetValue(c->db, o, field, &vstr, &vlen, &vll) == C_OK) { + if (hashTypeGetValue(c->db, o, field, &vstr, &vlen, &vll) == GETF_OK) { if (vstr) { addReplyBulkCBuffer(c, vstr, vlen); } else { @@ -2300,10 +2341,22 @@ void hlenCommand(client *c) { void hstrlenCommand(client *c) { robj *o; + unsigned char *vstr = NULL; + unsigned int vlen = UINT_MAX; + long long vll = LLONG_MAX; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,OBJ_HASH)) return; - addReplyLongLong(c,hashTypeGetValueLength(c->db, o,c->argv[2]->ptr)); + + GetFieldRes res = hashTypeGetValue(c->db, o, c->argv[2]->ptr, &vstr, &vlen, &vll); + + if (res == GETF_NOT_FOUND || res == GETF_EXPIRED || res == GETF_EXPIRED_HASH) { + addReply(c, shared.czero); + return; + } + + size_t len = vstr ? vlen : sdigits10(vll); + addReplyLongLong(c,len); } static void addHashIteratorCursorToReply(client *c, hashTypeIterator *hi, int what) { diff --git a/tests/unit/type/hash-field-expire.tcl b/tests/unit/type/hash-field-expire.tcl index 9039397d986..677fe9b235e 100644 --- a/tests/unit/type/hash-field-expire.tcl +++ b/tests/unit/type/hash-field-expire.tcl @@ -383,7 +383,7 @@ start_server {tags {"external:skip needs:debug"}} { assert_range [r httl myhash FIELDS 1 field1] 4 5 } - test "Lazy expire - delete hash with expired fields ($type)" { + test "Lazy Expire - delete hash with expired fields ($type)" { r del myhash r debug set-active-expire 0 r hset myhash k v @@ -411,7 +411,7 @@ start_server {tags {"external:skip needs:debug"}} { } - test "Lazy expire - HLEN does count expired fields ($type)" { + test "Lazy Expire - HLEN does count expired fields ($type)" { # Enforce only lazy expire r debug set-active-expire 0 @@ -440,7 +440,7 @@ start_server {tags {"external:skip needs:debug"}} { r debug set-active-expire 1 } - test "Lazy expire - HSCAN does not report expired fields ($type)" { + test "Lazy Expire - HSCAN does not report expired fields ($type)" { # Enforce only lazy expire r debug set-active-expire 0 @@ -501,7 +501,7 @@ start_server {tags {"external:skip needs:debug"}} { r debug set-active-expire 1 } - test "Lazy expire - verify various HASH commands handling expired fields ($type)" { + test "Lazy Expire - verify various HASH commands handling expired fields ($type)" { # Enforce only lazy expire r debug set-active-expire 0 r del h1 h2 h3 h4 h5 h18 @@ -751,6 +751,53 @@ start_server {tags {"external:skip needs:debug"}} { assert_equal [lsort [r hgetall myhash]] "1 2 3 a b c" assert_equal [r hexpiretime myhash FIELDS 3 a b c] {-1 -1 -1} } + + test {HINCRBY - discards pending expired field and reset its value} { + r debug set-active-expire 0 + r del h1 h2 + r hset h1 f1 10 f2 2 + r hset h2 f1 10 + assert_equal [r HINCRBY h1 f1 2] 12 + assert_equal [r HINCRBY h2 f1 2] 12 + r HPEXPIRE h1 10 FIELDS 1 f1 + r HPEXPIRE h2 10 FIELDS 1 f1 + after 15 + assert_equal [r HINCRBY h1 f1 1] 1 + assert_equal [r HINCRBY h2 f1 1] 1 + r debug set-active-expire 1 + } + + test {HINCRBY - preserve expiration time of the field} { + r del h1 + r hset h1 f1 10 + r hpexpire h1 20 FIELDS 1 f1 + assert_equal [r HINCRBY h1 f1 2] 12 + assert_range [r HPTTL h1 FIELDS 1 f1] 1 20 + } + + + test {HINCRBYFLOAT - discards pending expired field and reset its value} { + r debug set-active-expire 0 + r del h1 h2 + r hset h1 f1 10 f2 2 + r hset h2 f1 10 + assert_equal [r HINCRBYFLOAT h1 f1 2] 12 + assert_equal [r HINCRBYFLOAT h2 f1 2] 12 + r HPEXPIRE h1 10 FIELDS 1 f1 + r HPEXPIRE h2 10 FIELDS 1 f1 + after 15 + assert_equal [r HINCRBYFLOAT h1 f1 1] 1 + assert_equal [r HINCRBYFLOAT h2 f1 1] 1 + r debug set-active-expire 1 + } + + test {HINCRBYFLOAT - preserve expiration time of the field} { + r del h1 + r hset h1 f1 10 + r hpexpire h1 20 FIELDS 1 f1 + assert_equal [r HINCRBYFLOAT h1 f1 2.5] 12.5 + assert_range [r HPTTL h1 FIELDS 1 f1] 1 20 + } } r config set hash-max-listpack-entries 512 From a5c4bac2badad720532c0ffdd503d26e547bc4f8 Mon Sep 17 00:00:00 2001 From: Moti Cohen Date: Mon, 27 May 2024 19:28:00 +0300 Subject: [PATCH 7/7] PR fixes --- src/aof.c | 3 +- src/module.c | 15 +++- src/rdb.c | 2 +- src/server.h | 2 +- src/t_hash.c | 103 ++++++++++++++++---------- tests/unit/type/hash-field-expire.tcl | 16 +++- 6 files changed, 93 insertions(+), 48 deletions(-) diff --git a/src/aof.c b/src/aof.c index 100f0ccefdc..17e72febb0f 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1999,8 +1999,7 @@ int rewriteHashObject(rio *r, robj *key, robj *o) { (!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_VALUE)) ) goto reHashEnd; - if (hi->expire_time != EB_EXPIRE_TIME_INVALID) - { + if (hi->expire_time != EB_EXPIRE_TIME_INVALID) { char cmd[] = "*6\r\n$10\r\nHPEXPIREAT\r\n"; if ( (!rioWrite(r, cmd, sizeof(cmd) - 1)) || (!rioWriteBulkObject(r, key)) || diff --git a/src/module.c b/src/module.c index 1cad6f72c43..d4eefdd99a6 100644 --- a/src/module.c +++ b/src/module.c @@ -5271,7 +5271,10 @@ int RM_HashSet(RedisModuleKey *key, int flags, ...) { /* Handle XX and NX */ if (flags & (REDISMODULE_HASH_XX|REDISMODULE_HASH_NX)) { - int exists = hashTypeExists(key->db, key->value, field->ptr); + int isHashDeleted; + int exists = hashTypeExists(key->db, key->value, field->ptr, &isHashDeleted); + /* hash-field-expiration is not exposed to modules */ + serverAssert(isHashDeleted == 0); if (((flags & REDISMODULE_HASH_XX) && !exists) || ((flags & REDISMODULE_HASH_NX) && exists)) { @@ -5374,10 +5377,14 @@ int RM_HashGet(RedisModuleKey *key, int flags, ...) { /* Query the hash for existence or value object. */ if (flags & REDISMODULE_HASH_EXISTS) { existsptr = va_arg(ap,int*); - if (key->value) - *existsptr = hashTypeExists(key->db,key->value,field->ptr); - else + if (key->value) { + int isHashDeleted; + *existsptr = hashTypeExists(key->db, key->value, field->ptr, &isHashDeleted); + /* hash-field-expiration is not exposed to modules */ + serverAssert(isHashDeleted == 0); + } else { *existsptr = 0; + } } else { int isHashDeleted; valueptr = va_arg(ap,RedisModuleString**); diff --git a/src/rdb.c b/src/rdb.c index 69b5a543adf..7ec20dfd35b 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2734,7 +2734,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, /* for TTL listpack, find the minimum expiry */ uint64_t minExpire = hashTypeGetNextTimeToExpire(o); - /* Convert listpack to hash table without register in global HFE DS, + /* Convert listpack to hash table without registering in global HFE DS, * if has HFEs, since the listpack is not connected yet to the DB */ if (hashTypeLength(o, 0) > server.hash_max_listpack_entries) hashTypeConvert(o, OBJ_ENCODING_HT, NULL /*db->hexpires*/); diff --git a/src/server.h b/src/server.h index ed0e6e5d774..7c607f293d2 100644 --- a/src/server.h +++ b/src/server.h @@ -3174,7 +3174,7 @@ typedef struct listpackEx { void hashTypeConvert(robj *o, int enc, ebuckets *hexpires); void hashTypeTryConversion(redisDb *db, robj *subject, robj **argv, int start, int end); -int hashTypeExists(redisDb *db, robj *o, sds key); +int hashTypeExists(redisDb *db, robj *o, sds key, int *isHashDeleted); int hashTypeDelete(robj *o, void *key, int isSdsField); unsigned long hashTypeLength(const robj *o, int subtractExpiredFields); hashTypeIterator *hashTypeInitIterator(robj *subject); diff --git a/src/t_hash.c b/src/t_hash.c index 513ac8a71c9..4642cb196e5 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -753,7 +753,6 @@ GetFieldRes hashTypeGetValue(redisDb *db, robj *o, sds field, unsigned char **vs if (res == GETF_NOT_FOUND) return GETF_NOT_FOUND; - key = ((listpackEx *) o->ptr)->key; } else if (o->encoding == OBJ_ENCODING_HT) { sds value = NULL; res = hashTypeGetFromHashTable(o, field, &value, &expiredAt); @@ -761,11 +760,8 @@ GetFieldRes hashTypeGetValue(redisDb *db, robj *o, sds field, unsigned char **vs if (res == GETF_NOT_FOUND) return GETF_NOT_FOUND; - *vstr = (unsigned char*) value; *vlen = sdslen(value); - key = ((dictExpireMetadata *) dictMetadata((dict*)o->ptr))->key; - } else { serverPanic("Unknown hash encoding"); } @@ -783,6 +779,12 @@ GetFieldRes hashTypeGetValue(redisDb *db, robj *o, sds field, unsigned char **vs if (expiredAt >= (uint64_t) commandTimeSnapshot()) return GETF_OK; + /* Got expired. Extract attached key from LISTPACK_EX/HT */ + if (o->encoding == OBJ_ENCODING_LISTPACK_EX) + key = ((listpackEx *) o->ptr)->key; + else + key = ((dictExpireMetadata *) dictMetadata((dict*)o->ptr))->key; + /* delete the field and propagate the deletion */ serverAssert(hashTypeDelete(o, field, 1) == 1); propagateHashFieldDeletion(db, key, field, sdslen(field)); @@ -833,15 +835,18 @@ robj *hashTypeGetValueObject(redisDb *db, robj *o, sds field, int *isHashDeleted * * Returns 1 if the field exists, and 0 when it doesn't. * - * Note: This function will delete the field if it is expired (HFE). If it is the - * last field in the hash, then the hash will be deleted as well. + * isHashDeleted - If attempted to access expired field and it is the last field + * in the hash, then the hash will as well be deleted. In this case, + * isHashDeleted will be set to 1. */ -int hashTypeExists(redisDb *db, robj *o, sds field) { +int hashTypeExists(redisDb *db, robj *o, sds field, int *isHashDeleted) { unsigned char *vstr = NULL; unsigned int vlen = UINT_MAX; long long vll = LLONG_MAX; - return (hashTypeGetValue(db, o, field, &vstr, &vlen, &vll) == GETF_OK) ? 1 : 0; + GetFieldRes res = hashTypeGetValue(db, o, field, &vstr, &vlen, &vll); + *isHashDeleted = (res == GETF_EXPIRED_HASH) ? 1 : 0; + return (res == GETF_OK) ? 1 : 0; } /* Add a new field, overwrite the old with the new value if it already exists. @@ -993,22 +998,22 @@ SetExRes hashTypeSetEx(redisDb *db, robj *o, sds field, HashTypeSet *setKeyVal, /* Check if the field is too long for listpack, and convert before adding the item. * This is needed for HINCRBY* case since in other commands this is handled early by * hashTypeTryConversion, so this check will be a NOP. */ - if (isSetKeyValue && (o->encoding == OBJ_ENCODING_LISTPACK || - o->encoding == OBJ_ENCODING_LISTPACK_EX)) + if (o->encoding == OBJ_ENCODING_LISTPACK || + o->encoding == OBJ_ENCODING_LISTPACK_EX) { - if (sdslen(field) > server.hash_max_listpack_value || - sdslen(setKeyVal->value) > server.hash_max_listpack_value) + if ( (isSetKeyValue) && + (sdslen(field) > server.hash_max_listpack_value || + sdslen(setKeyVal->value) > server.hash_max_listpack_value) ) + { hashTypeConvert(o, OBJ_ENCODING_HT, &db->hexpires); + } else { + res = hashTypeSetExListpack(db, o, field, setKeyVal, expireAt, exInfo); + goto SetExDone; /*done*/ + } } - if (o->encoding == OBJ_ENCODING_LISTPACK || - o->encoding == OBJ_ENCODING_LISTPACK_EX) - { - res = hashTypeSetExListpack(db, o, field, setKeyVal, expireAt, exInfo); - goto SetExDone; - } else if (o->encoding != OBJ_ENCODING_HT) { + if (o->encoding != OBJ_ENCODING_HT) serverPanic("Unknown hash encoding"); - } /*** now deal with HT ***/ hfield newField; @@ -1031,7 +1036,6 @@ SetExRes hashTypeSetEx(redisDb *db, robj *o, sds field, HashTypeSet *setKeyVal, /* If field already exists, then update "field". "Value" will be set afterward */ if (de == NULL) { - if (flags & HASH_SET_KEEP_FIELD) { /* Not keep old field along with TTL */ hfieldFree(newField); @@ -1040,9 +1044,9 @@ SetExRes hashTypeSetEx(redisDb *db, robj *o, sds field, HashTypeSet *setKeyVal, hfield oldField = dictGetKey(existing); hfieldPersist(o, oldField); hfieldFree(oldField); - sdsfree(dictGetVal(existing)); dictSetKey(ht, existing, newField); } + sdsfree(dictGetVal(existing)); res = HSET_UPDATE; de = existing; } @@ -1213,8 +1217,8 @@ static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeS unsigned char *fptr = NULL, *vptr = NULL, *tptr = NULL; if (o->encoding == OBJ_ENCODING_LISTPACK) { - /* If reached here, then only for setting field with value */ - serverAssert(setParams != NULL); + /* If reached here, then no need to set expiration. Otherwise, as precond + * listpack is converted to listpackex by hashTypeSetExInit() */ unsigned char *zl = o->ptr; fptr = lpFirst(zl); @@ -2113,19 +2117,27 @@ ebuckets *hashTypeGetDictMetaHFE(dict *d) { *----------------------------------------------------------------------------*/ void hsetnxCommand(client *c) { + int isHashDeleted; robj *o; if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; - if (hashTypeExists(c->db, o, c->argv[2]->ptr)) { + if (hashTypeExists(c->db, o, c->argv[2]->ptr, &isHashDeleted)) { addReply(c, shared.czero); - } else { - hashTypeTryConversion(c->db, o,c->argv,2,3); - hashTypeSet(c->db, o,c->argv[2]->ptr,c->argv[3]->ptr,HASH_SET_COPY); - addReply(c, shared.cone); - signalModifiedKey(c,c->db,c->argv[1]); - notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id); - server.dirty++; + return; + } + + /* Field expired and in turn hash deleted. Create new one! */ + if (isHashDeleted) { + o = createHashObject(); + dbAdd(c->db,c->argv[1],o); } + + hashTypeTryConversion(c->db, o,c->argv,2,3); + hashTypeSet(c->db, o,c->argv[2]->ptr,c->argv[3]->ptr,HASH_SET_COPY); + addReply(c, shared.cone); + signalModifiedKey(c,c->db,c->argv[1]); + notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id); + server.dirty++; } void hsetCommand(client *c) { @@ -2178,7 +2190,7 @@ void hincrbyCommand(client *c) { } else if ((res == GETF_NOT_FOUND) || (res == GETF_EXPIRED)) { value = 0; } else { - /* Field expired and in turn hash deleted by hashTypeGetValue(). Create new one! */ + /* Field expired and in turn hash deleted. Create new one! */ o = createHashObject(); dbAdd(c->db,c->argv[1],o); value = 0; @@ -2226,7 +2238,7 @@ void hincrbyfloatCommand(client *c) { } else if ((res == GETF_NOT_FOUND) || (res == GETF_EXPIRED)) { value = 0; } else { - /* Field expired and in turn hash deleted by hashTypeGetValue(). Create new one! */ + /* Field expired and in turn hash deleted. Create new one! */ o = createHashObject(); dbAdd(c->db,c->argv[1],o); value = 0; @@ -2257,17 +2269,18 @@ void hincrbyfloatCommand(client *c) { decrRefCount(newobj); } -static void addHashFieldToReply(client *c, robj *o, sds field) { +static GetFieldRes addHashFieldToReply(client *c, robj *o, sds field) { if (o == NULL) { addReplyNull(c); - return; + return GETF_NOT_FOUND; } unsigned char *vstr = NULL; unsigned int vlen = UINT_MAX; long long vll = LLONG_MAX; - if (hashTypeGetValue(c->db, o, field, &vstr, &vlen, &vll) == GETF_OK) { + GetFieldRes res = hashTypeGetValue(c->db, o, field, &vstr, &vlen, &vll); + if (res == GETF_OK) { if (vstr) { addReplyBulkCBuffer(c, vstr, vlen); } else { @@ -2276,6 +2289,7 @@ static void addHashFieldToReply(client *c, robj *o, sds field) { } else { addReplyNull(c); } + return res; } void hgetCommand(client *c) { @@ -2288,6 +2302,7 @@ void hgetCommand(client *c) { } void hmgetCommand(client *c) { + GetFieldRes res = GETF_OK; robj *o; int i; @@ -2297,8 +2312,17 @@ void hmgetCommand(client *c) { if (checkType(c,o,OBJ_HASH)) return; addReplyArrayLen(c, c->argc-2); - for (i = 2; i < c->argc; i++) { - addHashFieldToReply(c, o, c->argv[i]->ptr); + for (i = 2; i < c->argc ; i++) { + + res = addHashFieldToReply(c, o, c->argv[i]->ptr); + + /* If hash got lazy expired since all fields are expired (o is invalid), + * then fill the rest with trivial nulls and return */ + if (res == GETF_EXPIRED_HASH) { + for ( ++i ; i < c->argc; i++) + addReply(c, shared.null[c->resp]); + return; + } } } @@ -2439,10 +2463,11 @@ void hgetallCommand(client *c) { void hexistsCommand(client *c) { robj *o; + int isHashDeleted; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,OBJ_HASH)) return; - addReply(c, hashTypeExists(c->db, o,c->argv[2]->ptr) ? shared.cone : shared.czero); + addReply(c, hashTypeExists(c->db, o,c->argv[2]->ptr, &isHashDeleted) ? shared.cone : shared.czero); } void hscanCommand(client *c) { diff --git a/tests/unit/type/hash-field-expire.tcl b/tests/unit/type/hash-field-expire.tcl index 677fe9b235e..72b7daa5032 100644 --- a/tests/unit/type/hash-field-expire.tcl +++ b/tests/unit/type/hash-field-expire.tcl @@ -684,7 +684,21 @@ start_server {tags {"external:skip needs:debug"}} { wait_for_condition 20 10 { [r exists myhash] == 0 } else { fail "'myhash' should be expired" } } {} {singledb:skip} - test "HPERSIST - Returns empty array if key does not exist" { + test "HMGET - returns empty entries if fields or hash expired ($type)" { + r debug set-active-expire 0 + r del h1 h2 + r hset h1 f1 v1 f2 v2 f3 v3 + r hset h2 f1 v1 f2 v2 f3 v3 + r hpexpire h1 10000000 NX FIELDS 1 f1 + r hpexpire h1 1 NX FIELDS 2 f2 f3 + r hpexpire h2 1 NX FIELDS 3 f1 f2 f3 + after 5 + assert_equal [r hmget h1 f1 f2 f3] {v1 {} {}} + assert_equal [r hmget h2 f1 f2 f3] {{} {} {}} + r debug set-active-expire 1 + } + + test "HPERSIST - Returns empty array if key does not exist ($type)" { r del myhash # Make sure we can distinguish between an empty array and a null response r readraw 1