Skip to content

Commit

Permalink
statcache: faster LFU (pingcap#45357)
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Jul 19, 2023
1 parent 5b8a14a commit c6d83d7
Show file tree
Hide file tree
Showing 8 changed files with 445 additions and 1 deletion.
1 change: 1 addition & 0 deletions statistics/handle/cache/internal/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_test(
flaky = True,
deps = [
":internal",
"//statistics/handle/cache/internal/lfu",
"//statistics/handle/cache/internal/lru",
"//statistics/handle/cache/internal/mapcache",
"//statistics/handle/cache/internal/testutil",
Expand Down
8 changes: 8 additions & 0 deletions statistics/handle/cache/internal/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

"github.com/pingcap/tidb/statistics/handle/cache/internal"
"github.com/pingcap/tidb/statistics/handle/cache/internal/lfu"
"github.com/pingcap/tidb/statistics/handle/cache/internal/lru"
"github.com/pingcap/tidb/statistics/handle/cache/internal/mapcache"
"github.com/pingcap/tidb/statistics/handle/cache/internal/testutil"
Expand All @@ -43,6 +44,13 @@ var cases = []struct {
return mapcache.NewMapCache()
},
},
{
name: "LFU",
newFunc: func() internal.StatsCacheInner {
result, _ := lfu.NewLFU(defaultSize)
return result
},
},
}

func BenchmarkCachePut(b *testing.B) {
Expand Down
33 changes: 33 additions & 0 deletions statistics/handle/cache/internal/lfu/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "lfu",
srcs = [
"key_set.go",
"key_set_shard.go",
"lfu_cache.go",
],
importpath = "github.com/pingcap/tidb/statistics/handle/cache/internal/lfu",
visibility = ["//statistics/handle/cache:__subpackages__"],
deps = [
"//statistics",
"//statistics/handle/cache/internal",
"//util/intest",
"@com_github_dgraph_io_ristretto//:ristretto",
"@org_golang_x_exp//maps",
],
)

go_test(
name = "lfu_test",
timeout = "short",
srcs = ["lfu_cache_test.go"],
embed = [":lfu"],
flaky = True,
race = "on",
shard_count = 6,
deps = [
"//statistics/handle/cache/internal/testutil",
"@com_github_stretchr_testify//require",
],
)
52 changes: 52 additions & 0 deletions statistics/handle/cache/internal/lfu/key_set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lfu

import (
"sync"

"golang.org/x/exp/maps"
)

type keySet struct {
set map[int64]struct{}
mu sync.RWMutex
}

func (ks *keySet) Add(key int64) {
ks.mu.Lock()
ks.set[key] = struct{}{}
ks.mu.Unlock()
}

func (ks *keySet) Remove(key int64) {
ks.mu.Lock()
delete(ks.set, key)
ks.mu.Unlock()
}

func (ks *keySet) Keys() []int64 {
ks.mu.RLock()
result := maps.Keys(ks.set)
ks.mu.RUnlock()
return result
}

func (ks *keySet) Len() int {
ks.mu.RLock()
result := len(ks.set)
ks.mu.RUnlock()
return result
}
55 changes: 55 additions & 0 deletions statistics/handle/cache/internal/lfu/key_set_shard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lfu

const keySetCnt = 128

type keySetShard struct {
resultKeySet [keySetCnt]keySet
}

func newKeySetShard() *keySetShard {
result := keySetShard{}
for i := 0; i < keySetCnt; i++ {
result.resultKeySet[i] = keySet{
set: make(map[int64]struct{}),
}
}
return &result
}

func (kss *keySetShard) Add(key int64) {
kss.resultKeySet[key%keySetCnt].Add(key)
}

func (kss *keySetShard) Remove(key int64) {
kss.resultKeySet[key%keySetCnt].Remove(key)
}

func (kss *keySetShard) Keys() []int64 {
result := make([]int64, 0, len(kss.resultKeySet))
for idx := range kss.resultKeySet {
result = append(result, kss.resultKeySet[idx].Keys()...)
}
return result
}

func (kss *keySetShard) Len() int {
result := 0
for idx := range kss.resultKeySet {
result += kss.resultKeySet[idx].Len()
}
return result
}
132 changes: 132 additions & 0 deletions statistics/handle/cache/internal/lfu/lfu_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lfu

import (
"sync/atomic"

"github.com/dgraph-io/ristretto"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle/cache/internal"
"github.com/pingcap/tidb/util/intest"
)

// LFU is a LFU based on the ristretto.Cache
type LFU struct {
cache *ristretto.Cache
resultKeySet *keySetShard
cost atomic.Int64
}

// NewLFU creates a new LFU cache.
func NewLFU(totalMemCost int64) (*LFU, error) {
result := &LFU{}
bufferItems := int64(64)
if intest.InTest {
bufferItems = 1
}
cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 1024 * 1024 * 1024, // TODO(hawkingrei): make it configurable for NumCounters and MaxCost
MaxCost: totalMemCost,
BufferItems: bufferItems,
OnEvict: result.onEvict,
OnExit: result.onExit,
IgnoreInternalCost: intest.InTest,
Metrics: intest.InTest,
})
result.cache = cache
result.resultKeySet = newKeySetShard()
return result, err
}

// Get implements statsCacheInner
func (s *LFU) Get(tid int64, _ bool) (*statistics.Table, bool) {
result, ok := s.cache.Get(tid)
if !ok {
return nil, ok
}
return result.(*statistics.Table), ok
}

// Put implements statsCacheInner
func (s *LFU) Put(tblID int64, tbl *statistics.Table, _ bool) {
ok := s.cache.Set(tblID, tbl, tbl.MemoryUsage().TotalTrackingMemUsage())
if ok { // NOTE: `s.cache` and `s.resultKeySet` may be inconsistent since the update operation is not atomic, but it's acceptable for our scenario
s.resultKeySet.Add(tblID)
s.cost.Add(tbl.MemoryUsage().TotalTrackingMemUsage())
}
}

// Del implements statsCacheInner
func (s *LFU) Del(tblID int64) {
s.cache.Del(tblID)
s.resultKeySet.Remove(tblID)
}

// Cost implements statsCacheInner
func (s *LFU) Cost() int64 {
return s.cost.Load()
}

// Values implements statsCacheInner
func (s *LFU) Values() []*statistics.Table {
result := make([]*statistics.Table, 0, 512)
for _, k := range s.resultKeySet.Keys() {
if value, ok := s.cache.Get(k); ok {
result = append(result, value.(*statistics.Table))
}
}
return result
}

func (s *LFU) onEvict(item *ristretto.Item) {
// We do not need to calculate the cost during onEvict, because the onexit function
// is also called when the evict event occurs.
s.resultKeySet.Remove(int64(item.Key))
}

func (s *LFU) onExit(val interface{}) {
s.cost.Add(-1 * val.(*statistics.Table).MemoryUsage().TotalTrackingMemUsage())
}

// Len implements statsCacheInner
func (s *LFU) Len() int {
return s.resultKeySet.Len()
}

// Front implements statsCacheInner
func (*LFU) Front() int64 {
return 0
}

// Copy implements statsCacheInner
func (s *LFU) Copy() internal.StatsCacheInner {
return s
}

// SetCapacity implements statsCacheInner
func (s *LFU) SetCapacity(maxCost int64) {
s.cache.UpdateMaxCost(maxCost)
}

// wait blocks until all buffered writes have been applied. This ensures a call to Set()
// will be visible to future calls to Get(). it is only used for test.
func (s *LFU) wait() {
s.cache.Wait()
}

func (s *LFU) metrics() *ristretto.Metrics {
return s.cache.Metrics
}
Loading

0 comments on commit c6d83d7

Please sign in to comment.