From 623d8ef1614bcd92758eaf4ae716a657b0278453 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Thu, 20 Aug 2020 09:44:38 -0700 Subject: [PATCH] Add OnExit handler which can be used for manual memory management (#183) Add a new OnExit handler, which is called every time an accepted value by Ristretto is let go. This is useful for manual memory management. Move Calloc from Badger over to Ristretto in z package, so Ristretto, Badger and Dgraph can all use it. --- cache.go | 45 +++++++++++++++++++++----- cache_test.go | 82 +++++++++++++++++++++++++++++++++++++++++++++++ store.go | 29 +++++++++++------ store_test.go | 14 +++++--- z/calloc.go | 71 ++++++++++++++++++++++++++++++++++++++++ z/calloc_32bit.go | 12 +++++++ z/calloc_64bit.go | 12 +++++++ z/calloc_nocgo.go | 19 +++++++++++ z/calloc_test.go | 74 ++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 335 insertions(+), 23 deletions(-) create mode 100644 z/calloc.go create mode 100644 z/calloc_32bit.go create mode 100644 z/calloc_64bit.go create mode 100644 z/calloc_nocgo.go create mode 100644 z/calloc_test.go diff --git a/cache.go b/cache.go index c9f3cac3..c6d26067 100644 --- a/cache.go +++ b/cache.go @@ -55,6 +55,8 @@ type Cache struct { onEvict itemCallback // onReject is called when an item is rejected via admission policy. onReject itemCallback + // onExit is called whenever a value goes out of scope from the cache. + onExit (func(interface{})) // KeyToHash function is used to customize the key hashing algorithm. // Each key will be hashed using the provided function. If keyToHash value // is not set, the default keyToHash function is used. @@ -105,6 +107,10 @@ type Config struct { OnEvict func(item *Item) // OnReject is called for every rejection done via the policy. OnReject func(item *Item) + // OnExit is called whenever a value is removed from cache. This can be + // used to do manual memory deallocation. Would also be called on eviction + // and rejection of the value. + OnExit func(val interface{}) // KeyToHash function is used to customize the key hashing algorithm. // Each key will be hashed using the provided function. If keyToHash value // is not set, the default keyToHash function is used. @@ -149,13 +155,28 @@ func NewCache(config *Config) (*Cache, error) { policy: policy, getBuf: newRingBuffer(policy, config.BufferItems), setBuf: make(chan *Item, setBufSize), - onEvict: config.OnEvict, - onReject: config.OnReject, keyToHash: config.KeyToHash, stop: make(chan struct{}), cost: config.Cost, cleanupTicker: time.NewTicker(time.Duration(bucketDurationSecs) * time.Second / 2), } + cache.onExit = func(val interface{}) { + if config.OnExit != nil && val != nil { + config.OnExit(val) + } + } + cache.onEvict = func(item *Item) { + if config.OnEvict != nil { + config.OnEvict(item) + } + cache.onExit(item.Value) + } + cache.onReject = func(item *Item) { + if config.OnReject != nil { + config.OnReject(item) + } + cache.onExit(item.Value) + } if cache.keyToHash == nil { cache.keyToHash = z.KeyToHash } @@ -232,7 +253,8 @@ func (c *Cache) SetWithTTL(key, value interface{}, cost int64, ttl time.Duration } // cost is eventually updated. The expiration must also be immediately updated // to prevent items from being prematurely removed from the map. - if c.store.Update(i) { + if prev, ok := c.store.Update(i); ok { + c.onExit(prev) i.flag = itemUpdate } // Attempt to send item to policy. @@ -258,7 +280,8 @@ func (c *Cache) Del(key interface{}) { } keyHash, conflictHash := c.keyToHash(key) // Delete immediately. - c.store.Del(keyHash, conflictHash) + _, prev := c.store.Del(keyHash, conflictHash) + c.onExit(prev) // If we've set an item, it would be applied slightly later. // So we must push the same item to `setBuf` with the deletion flag. // This ensures that if a set is followed by a delete, it will be @@ -297,7 +320,12 @@ func (c *Cache) Clear() { loop: for { select { - case <-c.setBuf: + case i := <-c.setBuf: + if i.flag != itemUpdate { + // In itemUpdate, the value is already set in the store. So, no need to call + // onEvict here. + c.onEvict(i) + } default: break loop } @@ -305,7 +333,7 @@ loop: // Clear value hashmap and policy data. c.policy.Clear() - c.store.Clear() + c.store.Clear(c.onEvict) // Only reset metrics if they're enabled. if c.Metrics != nil { c.Metrics.Clear() @@ -357,7 +385,7 @@ func (c *Cache) processItems() { c.store.Set(i) c.Metrics.add(keyAdd, i.Key, 1) trackAdmission(i.Key) - } else if c.onReject != nil { + } else { c.onReject(i) } for _, victim := range victims { @@ -370,7 +398,8 @@ func (c *Cache) processItems() { case itemDelete: c.policy.Del(i.Key) // Deals with metrics updates. - c.store.Del(i.Key, i.Conflict) + _, val := c.store.Del(i.Key, i.Conflict) + c.onExit(val) } case <-c.cleanupTicker.C: c.store.Cleanup(c.policy, onEvict) diff --git a/cache_test.go b/cache_test.go index 3ed8a3d2..70b1d33f 100644 --- a/cache_test.go +++ b/cache_test.go @@ -3,9 +3,11 @@ package ristretto import ( "fmt" "math/rand" + "runtime" "strconv" "strings" "sync" + "sync/atomic" "testing" "time" @@ -682,3 +684,83 @@ func TestDropUpdates(t *testing.T) { test() } } + +func TestRistrettoCalloc(t *testing.T) { + maxCacheSize := 1 << 20 + config := &Config{ + // Use 5% of cache memory for storing counters. + NumCounters: int64(float64(maxCacheSize) * 0.05 * 2), + MaxCost: int64(float64(maxCacheSize) * 0.95), + BufferItems: 64, + Metrics: true, + OnExit: func(val interface{}) { + z.Free(val.([]byte)) + }, + } + r, err := NewCache(config) + require.NoError(t, err) + defer r.Close() + + var wg sync.WaitGroup + for i := 0; i < runtime.NumCPU(); i++ { + wg.Add(1) + go func() { + defer wg.Done() + rd := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := 0; i < 10000; i++ { + k := rd.Intn(10000) + v := z.Calloc(256) + rd.Read(v) + if !r.Set(k, v, 256) { + z.Free(v) + } + if rd.Intn(10) == 0 { + r.Del(k) + } + } + }() + } + wg.Wait() + r.Clear() + require.Zero(t, atomic.LoadInt64(&z.NumAllocBytes)) +} + +func TestRistrettoCallocTTL(t *testing.T) { + maxCacheSize := 1 << 20 + config := &Config{ + // Use 5% of cache memory for storing counters. + NumCounters: int64(float64(maxCacheSize) * 0.05 * 2), + MaxCost: int64(float64(maxCacheSize) * 0.95), + BufferItems: 64, + Metrics: true, + OnExit: func(val interface{}) { + z.Free(val.([]byte)) + }, + } + r, err := NewCache(config) + require.NoError(t, err) + defer r.Close() + + var wg sync.WaitGroup + for i := 0; i < runtime.NumCPU(); i++ { + wg.Add(1) + go func() { + defer wg.Done() + rd := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := 0; i < 10000; i++ { + k := rd.Intn(10000) + v := z.Calloc(256) + rd.Read(v) + if !r.SetWithTTL(k, v, 256, time.Second) { + z.Free(v) + } + if rd.Intn(10) == 0 { + r.Del(k) + } + } + }() + } + wg.Wait() + time.Sleep(5 * time.Second) + require.Zero(t, atomic.LoadInt64(&z.NumAllocBytes)) +} diff --git a/store.go b/store.go index 1dba0ca1..e42a98b7 100644 --- a/store.go +++ b/store.go @@ -48,11 +48,11 @@ type store interface { Del(uint64, uint64) (uint64, interface{}) // Update attempts to update the key with a new value and returns true if // successful. - Update(*Item) bool + Update(*Item) (interface{}, bool) // Cleanup removes items that have an expired TTL. Cleanup(policy policy, onEvict itemCallback) // Clear clears all contents of the store. - Clear() + Clear(onEvict itemCallback) } // newStore returns the default store implementation. @@ -99,7 +99,7 @@ func (sm *shardedMap) Del(key, conflict uint64) (uint64, interface{}) { return sm.shards[key%numShards].Del(key, conflict) } -func (sm *shardedMap) Update(newItem *Item) bool { +func (sm *shardedMap) Update(newItem *Item) (interface{}, bool) { return sm.shards[newItem.Key%numShards].Update(newItem) } @@ -107,9 +107,9 @@ func (sm *shardedMap) Cleanup(policy policy, onEvict itemCallback) { sm.expiryMap.cleanup(sm, policy, onEvict) } -func (sm *shardedMap) Clear() { +func (sm *shardedMap) Clear(onEvict itemCallback) { for i := uint64(0); i < numShards; i++ { - sm.shards[i].Clear() + sm.shards[i].Clear(onEvict) } } @@ -202,16 +202,16 @@ func (m *lockedMap) Del(key, conflict uint64) (uint64, interface{}) { return item.conflict, item.value } -func (m *lockedMap) Update(newItem *Item) bool { +func (m *lockedMap) Update(newItem *Item) (interface{}, bool) { m.Lock() item, ok := m.data[newItem.Key] if !ok { m.Unlock() - return false + return nil, false } if newItem.Conflict != 0 && (newItem.Conflict != item.conflict) { m.Unlock() - return false + return nil, false } m.em.update(newItem.Key, newItem.Conflict, item.expiration, newItem.Expiration) @@ -223,11 +223,20 @@ func (m *lockedMap) Update(newItem *Item) bool { } m.Unlock() - return true + return item.value, true } -func (m *lockedMap) Clear() { +func (m *lockedMap) Clear(onEvict itemCallback) { m.Lock() + i := &Item{} + if onEvict != nil { + for _, si := range m.data { + i.Key = si.key + i.Conflict = si.conflict + i.Value = si.value + onEvict(i) + } + } m.data = make(map[uint64]storeItem) m.Unlock() } diff --git a/store_test.go b/store_test.go index 9aa20114..ee490982 100644 --- a/store_test.go +++ b/store_test.go @@ -66,7 +66,7 @@ func TestStoreClear(t *testing.T) { } s.Set(&it) } - s.Clear() + s.Clear(nil) for i := uint64(0); i < 1000; i++ { key, conflict := z.KeyToHash(i) val, ok := s.Get(key, conflict) @@ -85,7 +85,8 @@ func TestStoreUpdate(t *testing.T) { } s.Set(&i) i.Value = 2 - require.True(t, s.Update(&i)) + _, ok := s.Update(&i) + require.True(t, ok) val, ok := s.Get(key, conflict) require.True(t, ok) @@ -96,7 +97,8 @@ func TestStoreUpdate(t *testing.T) { require.Equal(t, 2, val.(int)) i.Value = 3 - require.True(t, s.Update(&i)) + _, ok = s.Update(&i) + require.True(t, ok) val, ok = s.Get(key, conflict) require.True(t, ok) @@ -108,7 +110,8 @@ func TestStoreUpdate(t *testing.T) { Conflict: conflict, Value: 2, } - require.False(t, s.Update(&i)) + _, ok = s.Update(&i) + require.False(t, ok) val, ok = s.Get(key, conflict) require.False(t, ok) require.Nil(t, val) @@ -137,7 +140,8 @@ func TestStoreCollision(t *testing.T) { require.True(t, ok) require.NotEqual(t, 2, val.(int)) - require.False(t, s.Update(&i)) + _, ok = s.Update(&i) + require.False(t, ok) val, ok = s.Get(1, 0) require.True(t, ok) require.NotEqual(t, 2, val.(int)) diff --git a/z/calloc.go b/z/calloc.go new file mode 100644 index 00000000..339badf6 --- /dev/null +++ b/z/calloc.go @@ -0,0 +1,71 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package z + +// #include +import "C" +import ( + "sync/atomic" + "unsafe" +) + +// The go:linkname directives provides backdoor access to private functions in +// the runtime. Below we're accessing the throw function. + +//go:linkname throw runtime.throw +func throw(s string) + +// TODO(peter): Rather than relying an C malloc/free, we could fork the Go +// runtime page allocator and allocate large chunks of memory using mmap or +// similar. + +var NumAllocBytes int64 + +func NumAllocsMB() int64 { + return atomic.LoadInt64(&NumAllocBytes) / (1 << 20) +} + +// New allocates a slice of size n. The returned slice is from manually managed +// memory and MUST be released by calling Free. Failure to do so will result in +// a memory leak. +func Calloc(n int) []byte { + if n == 0 { + return make([]byte, 0) + } + // We need to be conscious of the Cgo pointer passing rules: + // + // https://golang.org/cmd/cgo/#hdr-Passing_pointers + // + // ... + // Note: the current implementation has a bug. While Go code is permitted + // to write nil or a C pointer (but not a Go pointer) to C memory, the + // current implementation may sometimes cause a runtime error if the + // contents of the C memory appear to be a Go pointer. Therefore, avoid + // passing uninitialized C memory to Go code if the Go code is going to + // store pointer values in it. Zero out the memory in C before passing it + // to Go. + ptr := C.calloc(C.size_t(n), 1) + if ptr == nil { + // NB: throw is like panic, except it guarantees the process will be + // terminated. The call below is exactly what the Go runtime invokes when + // it cannot allocate memory. + throw("out of memory") + } + atomic.AddInt64(&NumAllocBytes, int64(n)) + // Interpret the C pointer as a pointer to a Go array, then slice. + return (*[MaxArrayLen]byte)(unsafe.Pointer(ptr))[:n:n] +} + +// Free frees the specified slice. +func Free(b []byte) { + if sz := cap(b); sz != 0 { + if len(b) == 0 { + b = b[:cap(b)] + } + ptr := unsafe.Pointer(&b[0]) + C.free(ptr) + atomic.AddInt64(&NumAllocBytes, -int64(sz)) + } +} diff --git a/z/calloc_32bit.go b/z/calloc_32bit.go new file mode 100644 index 00000000..db36d985 --- /dev/null +++ b/z/calloc_32bit.go @@ -0,0 +1,12 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// +build 386 amd64p32 arm armbe mips mipsle mips64p32 mips64p32le ppc sparc + +package z + +const ( + // MaxArrayLen is a safe maximum length for slices on this architecture. + MaxArrayLen = 1<<31 - 1 +) diff --git a/z/calloc_64bit.go b/z/calloc_64bit.go new file mode 100644 index 00000000..7e2c5da7 --- /dev/null +++ b/z/calloc_64bit.go @@ -0,0 +1,12 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// +build amd64 arm64 arm64be ppc64 ppc64le mips64 mips64le s390x sparc64 + +package z + +const ( + // MaxArrayLen is a safe maximum length for slices on this architecture. + MaxArrayLen = 1<<50 - 1 +) diff --git a/z/calloc_nocgo.go b/z/calloc_nocgo.go new file mode 100644 index 00000000..01c1123f --- /dev/null +++ b/z/calloc_nocgo.go @@ -0,0 +1,19 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// +build !cgo + +package z + +// Provides versions of New and Free when cgo is not available (e.g. cross +// compilation). + +// New allocates a slice of size n. +func Calloc(n int) []byte { + return make([]byte, n) +} + +// Free frees the specified slice. +func Free(b []byte) { +} diff --git a/z/calloc_test.go b/z/calloc_test.go new file mode 100644 index 00000000..7a65ee84 --- /dev/null +++ b/z/calloc_test.go @@ -0,0 +1,74 @@ +/* + * Copyright 2020 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package z + +import ( + "sync" + "testing" + "time" + + "math/rand" +) + +// $ go test -failfast -run xxx -bench . -benchmem -count 10 > out.txt +// $ benchstat out.txt +// name time/op +// Allocation/Pool-8 200µs ± 5% +// Allocation/Calloc-8 100µs ±11% +// +// name alloc/op +// Allocation/Pool-8 477B ±29% +// Allocation/Calloc-8 4.00B ± 0% +// +// name allocs/op +// Allocation/Pool-8 1.00 ± 0% +// Allocation/Calloc-8 0.00 +func BenchmarkAllocation(b *testing.B) { + b.Run("Pool", func(b *testing.B) { + pool := sync.Pool{ + New: func() interface{} { + return make([]byte, 4<<10) + }, + } + b.RunParallel(func(pb *testing.PB) { + source := rand.NewSource(time.Now().UnixNano()) + r := rand.New(source) + for pb.Next() { + x := pool.Get().([]byte) + sz := r.Intn(100) << 10 + if len(x) < sz { + x = make([]byte, sz) + } + r.Read(x) + pool.Put(x) + } + }) + }) + + b.Run("Calloc", func(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + source := rand.NewSource(time.Now().UnixNano()) + r := rand.New(source) + for pb.Next() { + sz := r.Intn(100) << 10 + x := Calloc(sz) + r.Read(x) + Free(x) + } + }) + }) +}