Skip to content

Commit

Permalink
Add iterator capability to ebuckets DS
Browse files Browse the repository at this point in the history
  • Loading branch information
moticless committed Sep 5, 2024
1 parent 569584d commit 802c3d6
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 2 deletions.
131 changes: 129 additions & 2 deletions src/ebuckets.c
Original file line number Diff line number Diff line change
Expand Up @@ -1876,6 +1876,90 @@ uint64_t ebGetExpireTime(EbucketsType *type, eItem item) {
return ebGetMetaExpTime(meta);
}

/* Init ebuckets iterator
*
* This is a non-safe iterator. Any modification to ebuckets will invalidate the
* iterator. Calling this function takes care to reference the first item
* in ebuckets with minimal expiration time. If not items to iterate, then
* iter->currItem will be NULL */
void ebStartIter(EbucketsIterator *iter, ebuckets eb, EbucketsType *type) {
iter->eb = eb;
iter->type = type;

if (ebIsEmpty(eb)) {
iter->currItem = NULL;
} else if (ebIsList(eb)) {
iter->currItem = ebGetListPtr(type, eb);
iter->itemsCurrBucket = type->getExpireMeta(iter->currItem)->numItems;
iter->isRax = 0;
} else {
rax *rax = ebGetRaxPtr(eb);
raxStart(&iter->raxIter, rax);
raxSeek(&iter->raxIter, "^", NULL, 0);
raxNext(&iter->raxIter);
FirstSegHdr *firstSegHdr = iter->raxIter.data;
iter->itemsCurrBucket = firstSegHdr->totalItems;
iter->currItem = firstSegHdr->head;
iter->isRax = 1;
}
}

/* Advance iterator to the next item.
*
* Return: 0 if reached EOF. Update `iter->currItem` to NULL
* 1 Otherwise. Update `iter->currItem`
*/
int ebNext(EbucketsIterator *iter) {
if (iter->currItem == NULL)
return 0;

eItem item = iter->currItem;
ExpireMeta *meta = iter->type->getExpireMeta(item);
if (iter->isRax) {
if (meta->lastItemBucket) {
if (raxNext(&iter->raxIter)) {
FirstSegHdr *firstSegHdr = iter->raxIter.data;
iter->currItem = firstSegHdr->head;
iter->itemsCurrBucket = firstSegHdr->totalItems;
} else {
iter->currItem = NULL;
}
} else if (meta->lastInSegment) {
NextSegHdr *nextSegHdr = meta->next;
iter->currItem = nextSegHdr->head;
} else {
iter->currItem = meta->next;
}
} else {
iter->currItem = meta->next;
}
return (iter->currItem == NULL) ? 0 : 1;
}

/* Advance iterator to the next ebucket.
*
* Return: 0 if reached EOF. Set `iter->itemsCurrBucket`, `iter->currItem` to NULL
* 1 Otherwise. Update `iter->currItem` and `iter->itemsCurrBucket`
*/
int ebNextBucket(EbucketsIterator *iter) {
if (iter->currItem == NULL)
return 0;
iter->currItem = NULL;
if ((iter->isRax) && (raxNext(&iter->raxIter))) {
FirstSegHdr *currSegHdr = iter->raxIter.data;
iter->currItem = currSegHdr->head;
iter->itemsCurrBucket = currSegHdr->totalItems;
} else {
iter->itemsCurrBucket = 0;
}
return 1;
}

void ebStopIter(EbucketsIterator *iter) {
if (iter->isRax)
raxStop(&iter->raxIter);
}

/*** Unit tests ***/

#ifdef REDIS_TEST
Expand Down Expand Up @@ -2117,6 +2201,50 @@ int ebucketsTest(int argc, char **argv, int flags) {
}
#endif

TEST("basic iterator test") {
MyItem *items[100];
for (uint32_t numItems = 0 ; numItems < sizeof(items)/sizeof(items[0]) ; ++numItems) {
ebuckets eb = NULL;
EbucketsIterator iter;

/* Create and add items to ebuckets */
for (uint32_t i = 0; i < numItems; i++) {
items[i] = zmalloc(sizeof(MyItem));
ebAdd(&eb, &myEbucketsType, items[i], i);
}

/* iterate items */
ebStartIter(&iter, eb, &myEbucketsType);
for (uint32_t i = 0; i < numItems; i++) {
assert(iter.currItem == items[i]);
int res = ebNext(&iter);
if (i+1<numItems) {
assert(res == 1);
assert(iter.currItem != NULL);
} else {
assert(res == 0);
assert(iter.currItem == NULL);
}
}
ebStopIter(&iter);

/* iterate buckets */
ebStartIter(&iter, eb, &myEbucketsType);
uint32_t countItems = 0;

uint32_t countBuckets = 0;
while (1) {
countItems += iter.itemsCurrBucket;
if (!ebNextBucket(&iter)) break;
countBuckets++;
}
ebStopIter(&iter);
assert(countItems == numItems);
if (numItems>=8) assert(numItems/8 >= countBuckets);
ebDestroy(&eb, &myEbucketsType, NULL);
}
}

TEST("list - Create a single item, get TTL, and remove") {
MyItem *singleItem = zmalloc(sizeof(MyItem));
ebuckets eb = NULL;
Expand Down Expand Up @@ -2146,9 +2274,8 @@ int ebucketsTest(int argc, char **argv, int flags) {
assert(ebRemove(&eb, &myEbucketsType, items[i]));
}

for (int i = 0 ; i < EB_LIST_MAX_ITEMS ; i++) {
for (int i = 0 ; i < EB_LIST_MAX_ITEMS ; i++)
zfree(items[i]);
}

ebDestroy(&eb, &myEbucketsType, NULL);
}
Expand Down
22 changes: 22 additions & 0 deletions src/ebuckets.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,20 @@ typedef struct ExpireInfo {
EB_EXPIRE_TIME_INVALID if none left. */
} ExpireInfo;

/* Iterator to traverse ebuckets items */
typedef struct EbucketsIterator {
/* private data of iterator */
ebuckets eb;
EbucketsType *type;
raxIterator raxIter;
int isRax;

/* public read only */
eItem currItem; /* Current item ref. Use ebGetMetaExpTime()
on `currItem` to get expiration time.*/
uint64_t itemsCurrBucket; /* Number of items in current bucket. */
} EbucketsIterator;

/* ebuckets API */

static inline ebuckets ebCreate(void) { return NULL; } /* Empty ebuckets */
Expand All @@ -281,6 +295,14 @@ int ebAdd(ebuckets *eb, EbucketsType *type, eItem item, uint64_t expireTime);

uint64_t ebGetExpireTime(EbucketsType *type, eItem item);

void ebStartIter(EbucketsIterator *iter, ebuckets eb, EbucketsType *type);

void ebStopIter(EbucketsIterator *iter);

int ebNext(EbucketsIterator *iter);

int ebNextBucket(EbucketsIterator *iter);

typedef eItem (ebDefragFunction)(const eItem item);
eItem ebDefragItem(ebuckets *eb, EbucketsType *type, eItem item, ebDefragFunction *fn);

Expand Down

0 comments on commit 802c3d6

Please sign in to comment.