Skip to content

Commit

Permalink
Add OnExit handler which can be used for manual memory management (#183)
Browse files Browse the repository at this point in the history
Add a new OnExit handler, which is called every time an accepted value by Ristretto is let go. This is useful for manual memory management.

Move Calloc from Badger over to Ristretto in z package, so Ristretto, Badger and Dgraph can all use it.
  • Loading branch information
manishrjain authored Aug 20, 2020
1 parent 1940d54 commit 623d8ef
Show file tree
Hide file tree
Showing 9 changed files with 335 additions and 23 deletions.
45 changes: 37 additions & 8 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type Cache struct {
onEvict itemCallback
// onReject is called when an item is rejected via admission policy.
onReject itemCallback
// onExit is called whenever a value goes out of scope from the cache.
onExit (func(interface{}))
// KeyToHash function is used to customize the key hashing algorithm.
// Each key will be hashed using the provided function. If keyToHash value
// is not set, the default keyToHash function is used.
Expand Down Expand Up @@ -105,6 +107,10 @@ type Config struct {
OnEvict func(item *Item)
// OnReject is called for every rejection done via the policy.
OnReject func(item *Item)
// OnExit is called whenever a value is removed from cache. This can be
// used to do manual memory deallocation. Would also be called on eviction
// and rejection of the value.
OnExit func(val interface{})
// KeyToHash function is used to customize the key hashing algorithm.
// Each key will be hashed using the provided function. If keyToHash value
// is not set, the default keyToHash function is used.
Expand Down Expand Up @@ -149,13 +155,28 @@ func NewCache(config *Config) (*Cache, error) {
policy: policy,
getBuf: newRingBuffer(policy, config.BufferItems),
setBuf: make(chan *Item, setBufSize),
onEvict: config.OnEvict,
onReject: config.OnReject,
keyToHash: config.KeyToHash,
stop: make(chan struct{}),
cost: config.Cost,
cleanupTicker: time.NewTicker(time.Duration(bucketDurationSecs) * time.Second / 2),
}
cache.onExit = func(val interface{}) {
if config.OnExit != nil && val != nil {
config.OnExit(val)
}
}
cache.onEvict = func(item *Item) {
if config.OnEvict != nil {
config.OnEvict(item)
}
cache.onExit(item.Value)
}
cache.onReject = func(item *Item) {
if config.OnReject != nil {
config.OnReject(item)
}
cache.onExit(item.Value)
}
if cache.keyToHash == nil {
cache.keyToHash = z.KeyToHash
}
Expand Down Expand Up @@ -232,7 +253,8 @@ func (c *Cache) SetWithTTL(key, value interface{}, cost int64, ttl time.Duration
}
// cost is eventually updated. The expiration must also be immediately updated
// to prevent items from being prematurely removed from the map.
if c.store.Update(i) {
if prev, ok := c.store.Update(i); ok {
c.onExit(prev)
i.flag = itemUpdate
}
// Attempt to send item to policy.
Expand All @@ -258,7 +280,8 @@ func (c *Cache) Del(key interface{}) {
}
keyHash, conflictHash := c.keyToHash(key)
// Delete immediately.
c.store.Del(keyHash, conflictHash)
_, prev := c.store.Del(keyHash, conflictHash)
c.onExit(prev)
// If we've set an item, it would be applied slightly later.
// So we must push the same item to `setBuf` with the deletion flag.
// This ensures that if a set is followed by a delete, it will be
Expand Down Expand Up @@ -297,15 +320,20 @@ func (c *Cache) Clear() {
loop:
for {
select {
case <-c.setBuf:
case i := <-c.setBuf:
if i.flag != itemUpdate {
// In itemUpdate, the value is already set in the store. So, no need to call
// onEvict here.
c.onEvict(i)
}
default:
break loop
}
}

// Clear value hashmap and policy data.
c.policy.Clear()
c.store.Clear()
c.store.Clear(c.onEvict)
// Only reset metrics if they're enabled.
if c.Metrics != nil {
c.Metrics.Clear()
Expand Down Expand Up @@ -357,7 +385,7 @@ func (c *Cache) processItems() {
c.store.Set(i)
c.Metrics.add(keyAdd, i.Key, 1)
trackAdmission(i.Key)
} else if c.onReject != nil {
} else {
c.onReject(i)
}
for _, victim := range victims {
Expand All @@ -370,7 +398,8 @@ func (c *Cache) processItems() {

case itemDelete:
c.policy.Del(i.Key) // Deals with metrics updates.
c.store.Del(i.Key, i.Conflict)
_, val := c.store.Del(i.Key, i.Conflict)
c.onExit(val)
}
case <-c.cleanupTicker.C:
c.store.Cleanup(c.policy, onEvict)
Expand Down
82 changes: 82 additions & 0 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package ristretto
import (
"fmt"
"math/rand"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -682,3 +684,83 @@ func TestDropUpdates(t *testing.T) {
test()
}
}

func TestRistrettoCalloc(t *testing.T) {
maxCacheSize := 1 << 20
config := &Config{
// Use 5% of cache memory for storing counters.
NumCounters: int64(float64(maxCacheSize) * 0.05 * 2),
MaxCost: int64(float64(maxCacheSize) * 0.95),
BufferItems: 64,
Metrics: true,
OnExit: func(val interface{}) {
z.Free(val.([]byte))
},
}
r, err := NewCache(config)
require.NoError(t, err)
defer r.Close()

var wg sync.WaitGroup
for i := 0; i < runtime.NumCPU(); i++ {
wg.Add(1)
go func() {
defer wg.Done()
rd := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < 10000; i++ {
k := rd.Intn(10000)
v := z.Calloc(256)
rd.Read(v)
if !r.Set(k, v, 256) {
z.Free(v)
}
if rd.Intn(10) == 0 {
r.Del(k)
}
}
}()
}
wg.Wait()
r.Clear()
require.Zero(t, atomic.LoadInt64(&z.NumAllocBytes))
}

func TestRistrettoCallocTTL(t *testing.T) {
maxCacheSize := 1 << 20
config := &Config{
// Use 5% of cache memory for storing counters.
NumCounters: int64(float64(maxCacheSize) * 0.05 * 2),
MaxCost: int64(float64(maxCacheSize) * 0.95),
BufferItems: 64,
Metrics: true,
OnExit: func(val interface{}) {
z.Free(val.([]byte))
},
}
r, err := NewCache(config)
require.NoError(t, err)
defer r.Close()

var wg sync.WaitGroup
for i := 0; i < runtime.NumCPU(); i++ {
wg.Add(1)
go func() {
defer wg.Done()
rd := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < 10000; i++ {
k := rd.Intn(10000)
v := z.Calloc(256)
rd.Read(v)
if !r.SetWithTTL(k, v, 256, time.Second) {
z.Free(v)
}
if rd.Intn(10) == 0 {
r.Del(k)
}
}
}()
}
wg.Wait()
time.Sleep(5 * time.Second)
require.Zero(t, atomic.LoadInt64(&z.NumAllocBytes))
}
29 changes: 19 additions & 10 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ type store interface {
Del(uint64, uint64) (uint64, interface{})
// Update attempts to update the key with a new value and returns true if
// successful.
Update(*Item) bool
Update(*Item) (interface{}, bool)
// Cleanup removes items that have an expired TTL.
Cleanup(policy policy, onEvict itemCallback)
// Clear clears all contents of the store.
Clear()
Clear(onEvict itemCallback)
}

// newStore returns the default store implementation.
Expand Down Expand Up @@ -99,17 +99,17 @@ func (sm *shardedMap) Del(key, conflict uint64) (uint64, interface{}) {
return sm.shards[key%numShards].Del(key, conflict)
}

func (sm *shardedMap) Update(newItem *Item) bool {
func (sm *shardedMap) Update(newItem *Item) (interface{}, bool) {
return sm.shards[newItem.Key%numShards].Update(newItem)
}

func (sm *shardedMap) Cleanup(policy policy, onEvict itemCallback) {
sm.expiryMap.cleanup(sm, policy, onEvict)
}

func (sm *shardedMap) Clear() {
func (sm *shardedMap) Clear(onEvict itemCallback) {
for i := uint64(0); i < numShards; i++ {
sm.shards[i].Clear()
sm.shards[i].Clear(onEvict)
}
}

Expand Down Expand Up @@ -202,16 +202,16 @@ func (m *lockedMap) Del(key, conflict uint64) (uint64, interface{}) {
return item.conflict, item.value
}

func (m *lockedMap) Update(newItem *Item) bool {
func (m *lockedMap) Update(newItem *Item) (interface{}, bool) {
m.Lock()
item, ok := m.data[newItem.Key]
if !ok {
m.Unlock()
return false
return nil, false
}
if newItem.Conflict != 0 && (newItem.Conflict != item.conflict) {
m.Unlock()
return false
return nil, false
}

m.em.update(newItem.Key, newItem.Conflict, item.expiration, newItem.Expiration)
Expand All @@ -223,11 +223,20 @@ func (m *lockedMap) Update(newItem *Item) bool {
}

m.Unlock()
return true
return item.value, true
}

func (m *lockedMap) Clear() {
func (m *lockedMap) Clear(onEvict itemCallback) {
m.Lock()
i := &Item{}
if onEvict != nil {
for _, si := range m.data {
i.Key = si.key
i.Conflict = si.conflict
i.Value = si.value
onEvict(i)
}
}
m.data = make(map[uint64]storeItem)
m.Unlock()
}
14 changes: 9 additions & 5 deletions store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestStoreClear(t *testing.T) {
}
s.Set(&it)
}
s.Clear()
s.Clear(nil)
for i := uint64(0); i < 1000; i++ {
key, conflict := z.KeyToHash(i)
val, ok := s.Get(key, conflict)
Expand All @@ -85,7 +85,8 @@ func TestStoreUpdate(t *testing.T) {
}
s.Set(&i)
i.Value = 2
require.True(t, s.Update(&i))
_, ok := s.Update(&i)
require.True(t, ok)

val, ok := s.Get(key, conflict)
require.True(t, ok)
Expand All @@ -96,7 +97,8 @@ func TestStoreUpdate(t *testing.T) {
require.Equal(t, 2, val.(int))

i.Value = 3
require.True(t, s.Update(&i))
_, ok = s.Update(&i)
require.True(t, ok)

val, ok = s.Get(key, conflict)
require.True(t, ok)
Expand All @@ -108,7 +110,8 @@ func TestStoreUpdate(t *testing.T) {
Conflict: conflict,
Value: 2,
}
require.False(t, s.Update(&i))
_, ok = s.Update(&i)
require.False(t, ok)
val, ok = s.Get(key, conflict)
require.False(t, ok)
require.Nil(t, val)
Expand Down Expand Up @@ -137,7 +140,8 @@ func TestStoreCollision(t *testing.T) {
require.True(t, ok)
require.NotEqual(t, 2, val.(int))

require.False(t, s.Update(&i))
_, ok = s.Update(&i)
require.False(t, ok)
val, ok = s.Get(1, 0)
require.True(t, ok)
require.NotEqual(t, 2, val.(int))
Expand Down
Loading

0 comments on commit 623d8ef

Please sign in to comment.