Skip to content

Commit

Permalink
initial ttl support
Browse files Browse the repository at this point in the history
Signed-off-by: Kirill Danshin <kirill@danshin.pro>
  • Loading branch information
kirillDanshin committed May 31, 2019
1 parent 46cbf15 commit 3cfc486
Show file tree
Hide file tree
Showing 4 changed files with 517 additions and 1 deletion.
25 changes: 24 additions & 1 deletion fastcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ type Stats struct {

// BigStats contains stats for GetBig/SetBig methods.
BigStats

// TTLStats contains stats for GetWithTTL/SetWithTTL methods.
TTLStats
}

// Reset resets s, so it may be re-used again in Cache.UpdateStats.
Expand Down Expand Up @@ -104,6 +107,10 @@ func (bs *BigStats) reset() {
type Cache struct {
buckets [bucketsCount]bucket

ttlmu sync.RWMutex
ttl map[uint64]int64
ttlgc sync.Once
ttlStats TTLStats
bigStats BigStats
}

Expand All @@ -117,11 +124,14 @@ func New(maxBytes int) *Cache {
if maxBytes <= 0 {
panic(fmt.Errorf("maxBytes must be greater than 0; got %d", maxBytes))
}
var c Cache
var c = Cache{
ttl: make(map[uint64]int64),
}
maxBucketBytes := uint64((maxBytes + bucketsCount - 1) / bucketsCount)
for i := range c.buckets[:] {
c.buckets[i].Init(maxBucketBytes)
}
c.runTTLGC()
return &c
}

Expand All @@ -140,6 +150,10 @@ func New(maxBytes int) *Cache {
// k and v contents may be modified after returning from Set.
func (c *Cache) Set(k, v []byte) {
h := xxhash.Sum64(k)
c.setWithHash(k, v, h)
}

func (c *Cache) setWithHash(k, v []byte, h uint64) {
idx := h % bucketsCount
c.buckets[idx].Set(k, v, h)
}
Expand All @@ -153,6 +167,10 @@ func (c *Cache) Set(k, v []byte) {
// k contents may be modified after returning from Get.
func (c *Cache) Get(dst, k []byte) []byte {
h := xxhash.Sum64(k)
return c.getWithHash(dst, k, h)
}

func (c *Cache) getWithHash(dst, k []byte, h uint64) []byte {
idx := h % bucketsCount
dst, _ = c.buckets[idx].Get(dst, k, h, true)
return dst
Expand Down Expand Up @@ -190,12 +208,17 @@ func (c *Cache) UpdateStats(s *Stats) {
for i := range c.buckets[:] {
c.buckets[i].UpdateStats(s)
}

s.GetBigCalls += atomic.LoadUint64(&c.bigStats.GetBigCalls)
s.SetBigCalls += atomic.LoadUint64(&c.bigStats.SetBigCalls)
s.TooBigKeyErrors += atomic.LoadUint64(&c.bigStats.TooBigKeyErrors)
s.InvalidMetavalueErrors += atomic.LoadUint64(&c.bigStats.InvalidMetavalueErrors)
s.InvalidValueLenErrors += atomic.LoadUint64(&c.bigStats.InvalidValueLenErrors)
s.InvalidValueHashErrors += atomic.LoadUint64(&c.bigStats.InvalidValueHashErrors)

s.GetWithTTLCalls += atomic.LoadUint64(&c.ttlStats.GetWithTTLCalls)
s.SetWithTTLCalls += atomic.LoadUint64(&c.ttlStats.SetWithTTLCalls)
s.MissesWithTTL += atomic.LoadUint64(&c.ttlStats.MissesWithTTL)
}

type bucket struct {
Expand Down
104 changes: 104 additions & 0 deletions fastcache_ttl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package fastcache

import (
"sync/atomic"
"time"

xxhash "github.com/cespare/xxhash/v2"
)

// TTLStats contains stats for {G,S}etWithTTL methods.
type TTLStats struct {
// GetWithTTLCalls is the number of GetWithTTL calls.
GetWithTTLCalls uint64

// SetWithTTLCalls is the number of SetWithTTL calls.
SetWithTTLCalls uint64

// MissesWithTTL is the number of GetWithTTL calls to missing items.
MissesWithTTL uint64
}

func (ts *TTLStats) reset() {
atomic.StoreUint64(&ts.GetWithTTLCalls, 0)
atomic.StoreUint64(&ts.SetWithTTLCalls, 0)
atomic.StoreUint64(&ts.MissesWithTTL, 0)
}

// SetWithTTL stores (k, v) in the cache with TTL.
//
// GetWithTTL must be used for reading the stored entry.
//
// The stored entry may be evicted at any time either due to cache
// overflow or due to unlikely hash collision.
// Pass higher maxBytes value to New if the added items disappear
// frequently.
//
// (k, v) entries with summary size exceeding 64KB aren't stored in the cache.
// SetBig can be used for storing entries exceeding 64KB, but we do not support
// TTLs for big entries currently.
//
// k and v contents may be modified after returning from SetWithTTL.
func (c *Cache) SetWithTTL(k, v []byte, ttl time.Duration) {
atomic.AddUint64(&c.ttlStats.SetWithTTLCalls, 1)

h := xxhash.Sum64(k)
deadBySec := time.Now().Add(ttl).Unix()
c.ttlmu.Lock()
c.ttl[h] = deadBySec
c.ttlmu.Unlock()
c.setWithHash(k, v, h)
}

// GetWithTTL appends value by the key k to dst and returns the result.
//
// GetWithTTL allocates new byte slice for the returned value if dst is nil.
//
// GetWithTTL returns only values stored in c via SetWithTTL.
//
// k contents may be modified after returning from GetWithTTL.
func (c *Cache) GetWithTTL(dst, k []byte) []byte {
atomic.AddUint64(&c.ttlStats.GetWithTTLCalls, 1)
h := xxhash.Sum64(k)
c.ttlmu.RLock()
deadBySec, exists := c.ttl[h]
c.ttlmu.RUnlock()
if !exists || !ttlValid(deadBySec) {
atomic.AddUint64(&c.ttlStats.MissesWithTTL, 1)
return dst
}
return c.getWithHash(dst, k, h)
}

// HasWithTTL returns true if entry for the given key k exists in the cache and not dead yet.
func (c *Cache) HasWithTTL(k []byte) bool {
h := xxhash.Sum64(k)
c.ttlmu.RLock()
deadBySec, exists := c.ttl[h]
c.ttlmu.RUnlock()
return exists && ttlValid(deadBySec)
}

func ttlValid(deadBySec int64) bool {
return time.Until(time.Unix(deadBySec, 0)) > 0
}

func (c *Cache) runTTLGC() {
c.ttlgc.Do(c.ttlGCRoutine)
}

func (c *Cache) ttlGCRoutine() {
go func() {
for {
c.ttlmu.Lock()
for k, deadBySec := range c.ttl {
if !ttlValid(deadBySec) {
delete(c.ttl, k)
}
}
c.ttlmu.Unlock()

time.Sleep(30 * time.Second)
}
}()
}
Loading

0 comments on commit 3cfc486

Please sign in to comment.