diff --git a/fastcache.go b/fastcache.go index c0ef724..cf6198f 100644 --- a/fastcache.go +++ b/fastcache.go @@ -51,6 +51,9 @@ type Stats struct { // BigStats contains stats for GetBig/SetBig methods. BigStats + + // TTLStats contains stats for GetWithTTL/SetWithTTL methods. + TTLStats } // Reset resets s, so it may be re-used again in Cache.UpdateStats. @@ -104,6 +107,10 @@ func (bs *BigStats) reset() { type Cache struct { buckets [bucketsCount]bucket + ttlmu sync.RWMutex + ttl map[uint64]int64 + ttlgc sync.Once + ttlStats TTLStats bigStats BigStats } @@ -117,11 +124,14 @@ func New(maxBytes int) *Cache { if maxBytes <= 0 { panic(fmt.Errorf("maxBytes must be greater than 0; got %d", maxBytes)) } - var c Cache + var c = Cache{ + ttl: make(map[uint64]int64), + } maxBucketBytes := uint64((maxBytes + bucketsCount - 1) / bucketsCount) for i := range c.buckets[:] { c.buckets[i].Init(maxBucketBytes) } + c.runTTLGC() return &c } @@ -140,6 +150,10 @@ func New(maxBytes int) *Cache { // k and v contents may be modified after returning from Set. func (c *Cache) Set(k, v []byte) { h := xxhash.Sum64(k) + c.setWithHash(k, v, h) +} + +func (c *Cache) setWithHash(k, v []byte, h uint64) { idx := h % bucketsCount c.buckets[idx].Set(k, v, h) } @@ -153,6 +167,10 @@ func (c *Cache) Set(k, v []byte) { // k contents may be modified after returning from Get. func (c *Cache) Get(dst, k []byte) []byte { h := xxhash.Sum64(k) + return c.getWithHash(dst, k, h) +} + +func (c *Cache) getWithHash(dst, k []byte, h uint64) []byte { idx := h % bucketsCount dst, _ = c.buckets[idx].Get(dst, k, h, true) return dst @@ -190,12 +208,17 @@ func (c *Cache) UpdateStats(s *Stats) { for i := range c.buckets[:] { c.buckets[i].UpdateStats(s) } + s.GetBigCalls += atomic.LoadUint64(&c.bigStats.GetBigCalls) s.SetBigCalls += atomic.LoadUint64(&c.bigStats.SetBigCalls) s.TooBigKeyErrors += atomic.LoadUint64(&c.bigStats.TooBigKeyErrors) s.InvalidMetavalueErrors += atomic.LoadUint64(&c.bigStats.InvalidMetavalueErrors) s.InvalidValueLenErrors += atomic.LoadUint64(&c.bigStats.InvalidValueLenErrors) s.InvalidValueHashErrors += atomic.LoadUint64(&c.bigStats.InvalidValueHashErrors) + + s.GetWithTTLCalls += atomic.LoadUint64(&c.ttlStats.GetWithTTLCalls) + s.SetWithTTLCalls += atomic.LoadUint64(&c.ttlStats.SetWithTTLCalls) + s.MissesWithTTL += atomic.LoadUint64(&c.ttlStats.MissesWithTTL) } type bucket struct { diff --git a/fastcache_ttl.go b/fastcache_ttl.go new file mode 100644 index 0000000..23aa31c --- /dev/null +++ b/fastcache_ttl.go @@ -0,0 +1,104 @@ +package fastcache + +import ( + "sync/atomic" + "time" + + xxhash "github.com/cespare/xxhash/v2" +) + +// TTLStats contains stats for {G,S}etWithTTL methods. +type TTLStats struct { + // GetWithTTLCalls is the number of GetWithTTL calls. + GetWithTTLCalls uint64 + + // SetWithTTLCalls is the number of SetWithTTL calls. + SetWithTTLCalls uint64 + + // MissesWithTTL is the number of GetWithTTL calls to missing items. + MissesWithTTL uint64 +} + +func (ts *TTLStats) reset() { + atomic.StoreUint64(&ts.GetWithTTLCalls, 0) + atomic.StoreUint64(&ts.SetWithTTLCalls, 0) + atomic.StoreUint64(&ts.MissesWithTTL, 0) +} + +// SetWithTTL stores (k, v) in the cache with TTL. +// +// GetWithTTL must be used for reading the stored entry. +// +// The stored entry may be evicted at any time either due to cache +// overflow or due to unlikely hash collision. +// Pass higher maxBytes value to New if the added items disappear +// frequently. +// +// (k, v) entries with summary size exceeding 64KB aren't stored in the cache. +// SetBig can be used for storing entries exceeding 64KB, but we do not support +// TTLs for big entries currently. +// +// k and v contents may be modified after returning from SetWithTTL. +func (c *Cache) SetWithTTL(k, v []byte, ttl time.Duration) { + atomic.AddUint64(&c.ttlStats.SetWithTTLCalls, 1) + + h := xxhash.Sum64(k) + deadBySec := time.Now().Add(ttl).Unix() + c.ttlmu.Lock() + c.ttl[h] = deadBySec + c.ttlmu.Unlock() + c.setWithHash(k, v, h) +} + +// GetWithTTL appends value by the key k to dst and returns the result. +// +// GetWithTTL allocates new byte slice for the returned value if dst is nil. +// +// GetWithTTL returns only values stored in c via SetWithTTL. +// +// k contents may be modified after returning from GetWithTTL. +func (c *Cache) GetWithTTL(dst, k []byte) []byte { + atomic.AddUint64(&c.ttlStats.GetWithTTLCalls, 1) + h := xxhash.Sum64(k) + c.ttlmu.RLock() + deadBySec, exists := c.ttl[h] + c.ttlmu.RUnlock() + if !exists || !ttlValid(deadBySec) { + atomic.AddUint64(&c.ttlStats.MissesWithTTL, 1) + return dst + } + return c.getWithHash(dst, k, h) +} + +// HasWithTTL returns true if entry for the given key k exists in the cache and not dead yet. +func (c *Cache) HasWithTTL(k []byte) bool { + h := xxhash.Sum64(k) + c.ttlmu.RLock() + deadBySec, exists := c.ttl[h] + c.ttlmu.RUnlock() + return exists && ttlValid(deadBySec) +} + +func ttlValid(deadBySec int64) bool { + return time.Until(time.Unix(deadBySec, 0)) > 0 +} + +func (c *Cache) runTTLGC() { + c.ttlgc.Do(c.ttlGCRoutine) +} + +func (c *Cache) ttlGCRoutine() { + go func() { + for { + c.ttlmu.Lock() + for k, deadBySec := range c.ttl { + if !ttlValid(deadBySec) { + delete(c.ttl, k) + } + } + c.ttlmu.Unlock() + + time.Sleep(30 * time.Second) + } + }() +} diff --git a/fastcache_ttl_test.go b/fastcache_ttl_test.go new file mode 100644 index 0000000..1fc3a3c --- /dev/null +++ b/fastcache_ttl_test.go @@ -0,0 +1,292 @@ +package fastcache + +import ( + "fmt" + "runtime" + "sync" + "testing" + "time" +) + +func TestTTLCacheSmall(t *testing.T) { + c := New(1) + defer c.Reset() + + v := c.GetWithTTL(nil, []byte("aaa")) + if len(v) != 0 { + t.Fatalf("unexpected non-empty value obtained from small cache: %q", v) + } + + c.SetWithTTL([]byte("key"), []byte("value"), 3*time.Second) + v = c.GetWithTTL(nil, []byte("key")) + if string(v) != "value" { + t.Fatalf("unexpected value obtained; got %q; want %q", v, "value") + } + + v = c.GetWithTTL(nil, nil) + if len(v) != 0 { + t.Fatalf("unexpected non-empty value obtained from small cache: %q", v) + } + v = c.GetWithTTL(nil, []byte("aaa")) + if len(v) != 0 { + t.Fatalf("unexpected non-empty value obtained from small cache: %q", v) + } + + c.SetWithTTL([]byte("aaa"), []byte("bbb"), 3*time.Second) + v = c.GetWithTTL(nil, []byte("aaa")) + if string(v) != "bbb" { + t.Fatalf("unexpected value obtained; got %q; want %q", v, "bbb") + } + + c.Reset() + v = c.GetWithTTL(nil, []byte("aaa")) + if len(v) != 0 { + t.Fatalf("unexpected non-empty value obtained from empty cache: %q", v) + } + + // Test empty value + k := []byte("empty") + c.SetWithTTL(k, nil, 3*time.Second) + v = c.GetWithTTL(nil, k) + if len(v) != 0 { + t.Fatalf("unexpected non-empty value obtained from empty entry: %q", v) + } + if !c.Has(k) { + t.Fatalf("cannot find empty entry for key %q", k) + } + if c.Has([]byte("foobar")) { + t.Fatalf("non-existing entry found in the cache") + } +} + +func TestTTLCacheWrap(t *testing.T) { + c := New(bucketsCount * chunkSize * 1.5) + defer c.Reset() + + calls := uint64(5e6) + + for i := uint64(0); i < calls; i++ { + k := []byte(fmt.Sprintf("key %d", i)) + v := []byte(fmt.Sprintf("value %d", i)) + c.SetWithTTL(k, v, 3*time.Second) + vv := c.GetWithTTL(nil, k) + if string(vv) != string(v) { + t.Fatalf("unexpected value for key %q; got %q; want %q", k, vv, v) + } + } + for i := uint64(0); i < calls/10; i++ { + x := i * 10 + k := []byte(fmt.Sprintf("key %d", x)) + v := []byte(fmt.Sprintf("value %d", x)) + vv := c.GetWithTTL(nil, k) + if len(vv) > 0 && string(v) != string(vv) { + t.Fatalf("unexpected value for key %q; got %q; want %q", k, vv, v) + } + } + + var s Stats + c.UpdateStats(&s) + getCalls := calls + calls/10 + if s.GetWithTTLCalls != getCalls { + t.Fatalf("unexpected number of getCalls; got %d; want %d", s.GetWithTTLCalls, getCalls) + } + if s.SetWithTTLCalls != calls { + t.Fatalf("unexpected number of setCalls; got %d; want %d", s.SetWithTTLCalls, calls) + } + if s.MissesWithTTL == 0 || s.MissesWithTTL >= calls/10 { + t.Fatalf("unexpected number of misses; got %d; it should be between 0 and %d", s.MissesWithTTL, calls/10) + } + if s.Collisions != 0 { + t.Fatalf("unexpected number of collisions; got %d; want 0", s.Collisions) + } + if s.EntriesCount < calls/5 { + t.Fatalf("unexpected number of items; got %d; cannot be smaller than %d", s.EntriesCount, calls/5) + } + if s.BytesSize < 1024 { + t.Fatalf("unexpected number of bytesSize; got %d; cannot be smaller than %d", s.BytesSize, 1024) + } +} + +func TestTTLCacheDel(t *testing.T) { + c := New(1024) + defer c.Reset() + for i := 0; i < 100; i++ { + k := []byte(fmt.Sprintf("key %d", i)) + v := []byte(fmt.Sprintf("value %d", i)) + c.SetWithTTL(k, v, 3*time.Second) + vv := c.GetWithTTL(nil, k) + if string(vv) != string(v) { + t.Fatalf("unexpected value for key %q; got %q; want %q", k, vv, v) + } + c.Del(k) + vv = c.GetWithTTL(nil, k) + if len(vv) > 0 { + t.Fatalf("unexpected non-empty value got for key %q: %q", k, vv) + } + } +} + +func TestTTLCacheBigKeyValue(t *testing.T) { + c := New(1024) + defer c.Reset() + + // Both key and value exceed 64Kb + k := make([]byte, 90*1024) + v := make([]byte, 100*1024) + c.SetWithTTL(k, v, 3*time.Second) + vv := c.GetWithTTL(nil, k) + if len(vv) > 0 { + t.Fatalf("unexpected non-empty value got for key %q: %q", k, vv) + } + + // len(key) + len(value) > 64Kb + k = make([]byte, 40*1024) + v = make([]byte, 40*1024) + c.SetWithTTL(k, v, 3*time.Second) + vv = c.GetWithTTL(nil, k) + if len(vv) > 0 { + t.Fatalf("unexpected non-empty value got for key %q: %q", k, vv) + } +} + +func TestTTLCacheSetGetSerial(t *testing.T) { + itemsCount := 10000 + c := New(30 * itemsCount) + defer c.Reset() + if err := testTTLCacheGetSet(c, itemsCount); err != nil { + t.Fatalf("unexpected error: %s", err) + } +} + +func TestTTLCacheGetSetConcurrent(t *testing.T) { + itemsCount := 10000 + const gorotines = 10 + c := New(30 * itemsCount * gorotines) + defer c.Reset() + + ch := make(chan error, gorotines) + for i := 0; i < gorotines; i++ { + go func() { + ch <- testTTLCacheGetSet(c, itemsCount) + }() + } + for i := 0; i < gorotines; i++ { + select { + case err := <-ch: + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + case <-time.After(5 * time.Second): + t.Fatalf("timeout") + } + } +} + +func testTTLCacheGetSet(c *Cache, itemsCount int) error { + for i := 0; i < itemsCount; i++ { + k := []byte(fmt.Sprintf("key %d", i)) + v := []byte(fmt.Sprintf("value %d", i)) + c.SetWithTTL(k, v, 3*time.Second) + vv := c.GetWithTTL(nil, k) + if string(vv) != string(v) { + return fmt.Errorf("unexpected value for key %q after insertion; got %q; want %q", k, vv, v) + } + } + misses := 0 + for i := 0; i < itemsCount; i++ { + k := []byte(fmt.Sprintf("key %d", i)) + vExpected := fmt.Sprintf("value %d", i) + v := c.GetWithTTL(nil, k) + if string(v) != string(vExpected) { + if len(v) > 0 { + return fmt.Errorf("unexpected value for key %q after all insertions; got %q; want %q", k, v, vExpected) + } + misses++ + } + } + if misses >= itemsCount/100 { + return fmt.Errorf("too many cache misses; got %d; want less than %d", misses, itemsCount/100) + } + + time.Sleep(3 * time.Second) + misses = 0 + for i := 0; i < itemsCount; i++ { + k := []byte(fmt.Sprintf("key %d", i)) + vExpected := fmt.Sprintf("value %d", i) + v := c.GetWithTTL(nil, k) + if string(v) != string(vExpected) { + if len(v) > 0 { + return fmt.Errorf("unexpected value for key %q after all insertions; got %q; want %q", k, v, vExpected) + } + misses++ + } + } + if misses != itemsCount { + return fmt.Errorf("at least one dead item returned: expected %d missses after sleep, got %d", itemsCount, misses) + } + return nil +} + +func TestTTLCacheResetUpdateStatsSetConcurrent(t *testing.T) { + c := New(12334) + + stopCh := make(chan struct{}) + + // run workers for cache reset + var resettersWG sync.WaitGroup + for i := 0; i < 10; i++ { + resettersWG.Add(1) + go func() { + defer resettersWG.Done() + for { + select { + case <-stopCh: + return + default: + c.Reset() + runtime.Gosched() + } + } + }() + } + + // run workers for update cache stats + var statsWG sync.WaitGroup + for i := 0; i < 10; i++ { + statsWG.Add(1) + go func() { + defer statsWG.Done() + var s Stats + for { + select { + case <-stopCh: + return + default: + c.UpdateStats(&s) + runtime.Gosched() + } + } + }() + } + + // run workers for setting data to cache + var settersWG sync.WaitGroup + for i := 0; i < 10; i++ { + settersWG.Add(1) + go func() { + defer settersWG.Done() + for j := 0; j < 100; j++ { + key := []byte(fmt.Sprintf("key_%d", j)) + value := []byte(fmt.Sprintf("value_%d", j)) + c.SetWithTTL(key, value, 3*time.Second) + runtime.Gosched() + } + }() + } + + // wait for setters + settersWG.Wait() + close(stopCh) + statsWG.Wait() + resettersWG.Wait() +} diff --git a/fastcache_ttl_timing_test.go b/fastcache_ttl_timing_test.go new file mode 100644 index 0000000..aa75ab0 --- /dev/null +++ b/fastcache_ttl_timing_test.go @@ -0,0 +1,97 @@ +package fastcache + +import ( + "fmt" + "time" + "testing" +) + +func BenchmarkCacheSetWithTTL(b *testing.B) { + const items = 1 << 16 + const ttl = 3 * time.Second + c := New(12 * items) + defer c.Reset() + b.ReportAllocs() + b.SetBytes(items) + b.RunParallel(func(pb *testing.PB) { + k := []byte("\x00\x00\x00\x00") + v := []byte("xyza") + for pb.Next() { + for i := 0; i < items; i++ { + k[0]++ + if k[0] == 0 { + k[1]++ + } + c.SetWithTTL(k, v, ttl) + } + } + }) +} + +func BenchmarkCacheGetWithTTL(b *testing.B) { + const items = 1 << 16 + const ttl = 3 * time.Second + + c := New(12 * items) + defer c.Reset() + k := []byte("\x00\x00\x00\x00") + v := []byte("xyza") + for i := 0; i < items; i++ { + k[0]++ + if k[0] == 0 { + k[1]++ + } + c.SetWithTTL(k, v, ttl) + } + + b.ReportAllocs() + b.SetBytes(items) + b.RunParallel(func(pb *testing.PB) { + var buf []byte + k := []byte("\x00\x00\x00\x00") + for pb.Next() { + for i := 0; i < items; i++ { + k[0]++ + if k[0] == 0 { + k[1]++ + } + buf = c.GetWithTTL(buf[:0], k) + if string(buf) != string(v) { + panic(fmt.Errorf("BUG: invalid value obtained; got %q; want %q", buf, v)) + } + } + } + }) +} + +func BenchmarkCacheHasWithTTL(b *testing.B) { + const items = 1 << 16 + const ttl = 3 * time.Second + c := New(12 * items) + defer c.Reset() + k := []byte("\x00\x00\x00\x00") + for i := 0; i < items; i++ { + k[0]++ + if k[0] == 0 { + k[1]++ + } + c.SetWithTTL(k, nil, ttl) + } + + b.ReportAllocs() + b.SetBytes(items) + b.RunParallel(func(pb *testing.PB) { + k := []byte("\x00\x00\x00\x00") + for pb.Next() { + for i := 0; i < items; i++ { + k[0]++ + if k[0] == 0 { + k[1]++ + } + if !c.HasWithTTL(k) { + panic(fmt.Errorf("BUG: missing value for key %q", k)) + } + } + } + }) +} \ No newline at end of file