Skip to content

Commit

Permalink
planner: simplify the structure of value in binding cache (#49952)
Browse files Browse the repository at this point in the history
ref #48875
  • Loading branch information
qw4990 authored Jan 2, 2024
1 parent 3d939d4 commit 3f7b017
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 100 deletions.
98 changes: 15 additions & 83 deletions pkg/bindinfo/bind_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@ func (key bindCacheKey) Hash() []byte {
return hack.Slice(string(key))
}

func calcBindCacheKVMem(key bindCacheKey, value []*BindRecord) int64 {
func calcBindCacheKVMem(key bindCacheKey, value *BindRecord) int64 {
var valMem int64
for _, bindRecord := range value {
valMem += int64(bindRecord.size())
}
valMem += int64(value.size())
return int64(len(key.Hash())) + valMem
}

Expand All @@ -65,35 +63,19 @@ func newBindCache() *bindCache {
// Note: Only other functions of the bindCache file can use this function.
// Don't use this function directly in other files in bindinfo package.
// The return value is not read-only, but it is only can be used in other functions which are also in the bind_cache.go.
func (c *bindCache) get(key bindCacheKey) []*BindRecord {
func (c *bindCache) get(key bindCacheKey) *BindRecord {
value, hit := c.cache.Get(key)
if !hit {
return nil
}
typedValue := value.([]*BindRecord)
typedValue := value.(*BindRecord)
return typedValue
}

// getCopiedVal gets a copied cache item according to cache key.
// The return value can be modified.
// If you want to modify the return value, use the 'getCopiedVal' function rather than 'get' function.
// We use the copy on write way to operate the bindRecord in cache for safety and accuracy of memory usage.
func (c *bindCache) getCopiedVal(key bindCacheKey) []*BindRecord {
bindRecords := c.get(key)
if bindRecords != nil {
copiedRecords := make([]*BindRecord, len(bindRecords))
for i, bindRecord := range bindRecords {
copiedRecords[i] = bindRecord.shallowCopy()
}
return copiedRecords
}
return bindRecords
}

// set inserts an item to the cache. It's not thread-safe.
// Only other functions of the bindCache can use this function.
// The set operation will return error message when the memory usage of binding_cache exceeds its capacity.
func (c *bindCache) set(key bindCacheKey, value []*BindRecord) (ok bool, err error) {
func (c *bindCache) set(key bindCacheKey, value *BindRecord) (ok bool, err error) {
mem := calcBindCacheKVMem(key, value)
if mem > c.memCapacity { // ignore this kv pair if its size is too large
err = errors.New("The memory usage of all available bindings exceeds the cache's mem quota. As a result, all available bindings cannot be held on the cache. Please increase the value of the system variable 'tidb_mem_quota_binding_cache' and execute 'admin reload bindings' to ensure that all bindings exist in the cache and can be used normally")
Expand All @@ -110,7 +92,7 @@ func (c *bindCache) set(key bindCacheKey, value []*BindRecord) (ok bool, err err
if !evicted {
return
}
c.memTracker.Consume(-calcBindCacheKVMem(evictedKey.(bindCacheKey), evictedValue.([]*BindRecord)))
c.memTracker.Consume(-calcBindCacheKVMem(evictedKey.(bindCacheKey), evictedValue.(*BindRecord)))
}
c.memTracker.Consume(mem)
c.cache.Put(key, value)
Expand All @@ -134,33 +116,10 @@ func (c *bindCache) delete(key bindCacheKey) bool {
// GetBinding gets the BindRecord from the cache.
// The return value is not read-only, but it shouldn't be changed in the caller functions.
// The function is thread-safe.
func (c *bindCache) GetBinding(sqlDigest, normalizedSQL, _ string) *BindRecord {
c.lock.Lock()
defer c.lock.Unlock()
bindRecords := c.get(bindCacheKey(sqlDigest))
for _, bindRecord := range bindRecords {
if bindRecord.OriginalSQL == normalizedSQL {
return bindRecord
}
}
return nil
}

// GetBindingBySQLDigest gets the BindRecord from the cache.
// The return value is not read-only, but it shouldn't be changed in the caller functions.
// The function is thread-safe.
func (c *bindCache) GetBindingBySQLDigest(sqlDigest string) (*BindRecord, error) {
func (c *bindCache) GetBinding(sqlDigest string) *BindRecord {
c.lock.Lock()
defer c.lock.Unlock()
bindings := c.get(bindCacheKey(sqlDigest))
if len(bindings) > 1 {
// currently, we only allow one binding for a sql
return nil, errors.New("more than 1 binding matched")
}
if len(bindings) == 0 || len(bindings[0].Bindings) == 0 {
return nil, errors.New("can't find any binding for '" + sqlDigest + "'")
}
return bindings[0], nil
return c.get(bindCacheKey(sqlDigest))
}

// GetAllBindings return all the bindRecords from the bindCache.
Expand All @@ -172,7 +131,7 @@ func (c *bindCache) GetAllBindings() []*BindRecord {
values := c.cache.Values()
bindRecords := make([]*BindRecord, 0, len(values))
for _, vals := range values {
bindRecords = append(bindRecords, vals.([]*BindRecord)...)
bindRecords = append(bindRecords, vals.(*BindRecord))
}
return bindRecords
}
Expand All @@ -183,41 +142,16 @@ func (c *bindCache) SetBinding(sqlDigest string, meta *BindRecord) (err error) {
c.lock.Lock()
defer c.lock.Unlock()
cacheKey := bindCacheKey(sqlDigest)
metas := c.getCopiedVal(cacheKey)
for i := range metas {
if metas[i].OriginalSQL == meta.OriginalSQL {
metas[i] = meta
}
}
_, err = c.set(cacheKey, []*BindRecord{meta})
_, err = c.set(cacheKey, meta)
return
}

// RemoveBinding removes the BindRecord which has same originSQL with specified BindRecord.
// The function is thread-safe.
func (c *bindCache) RemoveBinding(sqlDigest string, meta *BindRecord) {
func (c *bindCache) RemoveBinding(sqlDigest string, _ *BindRecord) {
c.lock.Lock()
defer c.lock.Unlock()
metas := c.getCopiedVal(bindCacheKey(sqlDigest))
if metas == nil {
return
}

for i := len(metas) - 1; i >= 0; i-- {
if metas[i].isSame(meta) {
metas[i] = metas[i].remove(meta)
if len(metas[i].Bindings) == 0 {
metas = append(metas[:i], metas[i+1:]...)
}
if len(metas) == 0 {
c.delete(bindCacheKey(sqlDigest))
return
}
}
}
// This function can guarantee the memory usage for the cache will never grow up.
// So we don't need to handle the return value here.
_, _ = c.set(bindCacheKey(sqlDigest), metas)
c.delete(bindCacheKey(sqlDigest))
}

// SetMemCapacity sets the memory capacity for the cache.
Expand Down Expand Up @@ -258,11 +192,9 @@ func (c *bindCache) Copy() (newCache *bindCache, err error) {
for _, key := range keys {
cacheKey := key.(bindCacheKey)
v := c.get(cacheKey)
bindRecords := make([]*BindRecord, len(v))
copy(bindRecords, v)
// The memory usage of cache has been handled at the beginning of this function.
// So we don't need to handle the return value here.
_, _ = newCache.set(cacheKey, bindRecords)
if _, err := newCache.set(cacheKey, v); err != nil {
return nil, err
}
}
return newCache, err
}
8 changes: 3 additions & 5 deletions pkg/bindinfo/bind_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@ func TestBindCache(t *testing.T) {
variable.MemQuotaBindingCache.Store(200)
bindCache := newBindCache()

value := make([][]*BindRecord, 3)
value := make([]*BindRecord, 3)
key := make([]bindCacheKey, 3)
var bigKey string
for i := 0; i < 3; i++ {
cacheKey := strings.Repeat(strconv.Itoa(i), 50)
key[i] = bindCacheKey(hack.Slice(cacheKey))
record := &BindRecord{OriginalSQL: cacheKey, Db: ""}
value[i] = []*BindRecord{record}
value[i] = &BindRecord{OriginalSQL: cacheKey, Db: ""}
bigKey += cacheKey

require.Equal(t, int64(100), calcBindCacheKVMem(key[i], value[i]))
Expand Down Expand Up @@ -68,8 +67,7 @@ func TestBindCache(t *testing.T) {
require.NotNil(t, result)

bigBindCacheKey := bindCacheKey(hack.Slice(bigKey))
bigRecord := &BindRecord{OriginalSQL: bigKey, Db: ""}
bigBindCacheValue := []*BindRecord{bigRecord}
bigBindCacheValue := &BindRecord{OriginalSQL: bigKey, Db: ""}
require.Equal(t, int64(300), calcBindCacheKVMem(bigBindCacheKey, bigBindCacheValue))
ok, err = bindCache.set(bigBindCacheKey, bigBindCacheValue)
require.False(t, ok)
Expand Down
25 changes: 17 additions & 8 deletions pkg/bindinfo/global_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (h *globalBindingHandle) Update(fullLoad bool) (err error) {
continue
}

oldRecord := newCache.GetBinding(sqlDigest, meta.OriginalSQL, meta.Db)
oldRecord := newCache.GetBinding(sqlDigest)
newRecord := merge(oldRecord, meta).removeDeletedBindings()
if len(newRecord.Bindings) > 0 {
err = newCache.SetBinding(sqlDigest, newRecord)
Expand All @@ -253,7 +253,7 @@ func (h *globalBindingHandle) Update(fullLoad bool) (err error) {
} else {
newCache.RemoveBinding(sqlDigest, newRecord)
}
updateMetrics(metrics.ScopeGlobal, oldRecord, newCache.GetBinding(sqlDigest, meta.OriginalSQL, meta.Db), true)
updateMetrics(metrics.ScopeGlobal, oldRecord, newCache.GetBinding(sqlDigest), true)
}
if memExceededErr != nil {
// When the memory capacity of bing_cache is not enough,
Expand Down Expand Up @@ -364,10 +364,16 @@ func (h *globalBindingHandle) DropGlobalBinding(originalSQL, db string, binding

// DropGlobalBindingByDigest drop BindRecord to the storage and BindRecord int the cache.
func (h *globalBindingHandle) DropGlobalBindingByDigest(sqlDigest string) (deletedRows uint64, err error) {
if sqlDigest == "" {
return 0, errors.New("sql digest is empty")
}
oldRecord, err := h.GetGlobalBindingBySQLDigest(sqlDigest)
if err != nil {
return 0, err
}
if oldRecord == nil {
return 0, errors.Errorf("can't find any binding for '%s'", sqlDigest)
}
return h.DropGlobalBinding(oldRecord.OriginalSQL, strings.ToLower(oldRecord.Db), nil)
}

Expand Down Expand Up @@ -446,6 +452,9 @@ func (h *globalBindingHandle) SetGlobalBindingStatusByDigest(newStatus, sqlDiges
if err != nil {
return false, err
}
if oldRecord == nil {
return false, errors.Errorf("can't find any binding for '%s'", sqlDigest)
}
return h.SetGlobalBindingStatus(oldRecord.OriginalSQL, nil, newStatus)
}

Expand Down Expand Up @@ -554,13 +563,13 @@ func (h *globalBindingHandle) Size() int {
}

// GetGlobalBinding returns the BindRecord of the (normalizedSQL,db) if BindRecord exist.
func (h *globalBindingHandle) GetGlobalBinding(sqlDigest, normalizedSQL, db string) *BindRecord {
return h.getCache().GetBinding(sqlDigest, normalizedSQL, db)
func (h *globalBindingHandle) GetGlobalBinding(sqlDigest, _, _ string) *BindRecord {
return h.getCache().GetBinding(sqlDigest)
}

// GetGlobalBindingBySQLDigest returns the BindRecord of the sql digest.
func (h *globalBindingHandle) GetGlobalBindingBySQLDigest(sqlDigest string) (*BindRecord, error) {
return h.getCache().GetBindingBySQLDigest(sqlDigest)
return h.getCache().GetBinding(sqlDigest), nil
}

// GetAllGlobalBindings returns all bind records in cache.
Expand Down Expand Up @@ -626,7 +635,7 @@ func (h *globalBindingHandle) setGlobalCacheBinding(sqlDigest string, meta *Bind
if err0 != nil {
logutil.BgLogger().Warn("BindHandle.setGlobalCacheBindRecord", zap.String("category", "sql-bind"), zap.Error(err0))
}
oldRecord := newCache.GetBinding(sqlDigest, meta.OriginalSQL, meta.Db)
oldRecord := newCache.GetBinding(sqlDigest)
err1 := newCache.SetBinding(sqlDigest, meta)
if err1 != nil && err0 == nil {
logutil.BgLogger().Warn("BindHandle.setGlobalCacheBindRecord", zap.String("category", "sql-bind"), zap.Error(err1))
Expand All @@ -641,10 +650,10 @@ func (h *globalBindingHandle) removeGlobalCacheBinding(sqlDigest string, meta *B
if err != nil {
logutil.BgLogger().Warn("", zap.String("category", "sql-bind"), zap.Error(err))
}
oldRecord := newCache.GetBinding(sqlDigest, meta.OriginalSQL, meta.Db)
oldRecord := newCache.GetBinding(sqlDigest)
newCache.RemoveBinding(sqlDigest, meta)
h.setCache(newCache) // TODO: update it in place
updateMetrics(metrics.ScopeGlobal, oldRecord, newCache.GetBinding(sqlDigest, meta.OriginalSQL, meta.Db), false)
updateMetrics(metrics.ScopeGlobal, oldRecord, newCache.GetBinding(sqlDigest), false)
}

func copyBindRecordUpdateMap(oldMap map[string]*bindRecordUpdate) map[string]*bindRecordUpdate {
Expand Down
15 changes: 11 additions & 4 deletions pkg/bindinfo/session_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/mysql"
Expand Down Expand Up @@ -72,7 +73,7 @@ func NewSessionBindingHandle() SessionBindingHandle {
// appendSessionBinding adds the BindRecord to the cache, all the stale bindMetas are
// removed from the cache after this operation.
func (h *sessionBindingHandle) appendSessionBinding(sqlDigest string, meta *BindRecord) {
oldRecord := h.ch.GetBinding(sqlDigest, meta.OriginalSQL, meta.Db)
oldRecord := h.ch.GetBinding(sqlDigest)
err := h.ch.SetBinding(sqlDigest, meta)
if err != nil {
logutil.BgLogger().Warn("SessionHandle.appendBindRecord", zap.String("category", "sql-bind"), zap.Error(err))
Expand Down Expand Up @@ -125,21 +126,27 @@ func (h *sessionBindingHandle) DropSessionBinding(originalSQL, db string, bindin

// DropSessionBindingByDigest drop BindRecord in the cache.
func (h *sessionBindingHandle) DropSessionBindingByDigest(sqlDigest string) error {
if sqlDigest == "" {
return errors.New("sql digest is empty")
}
oldRecord, err := h.GetSessionBindingBySQLDigest(sqlDigest)
if err != nil {
return err
}
if oldRecord == nil {
return errors.Errorf("can't find any binding for '%s'", sqlDigest)
}
return h.DropSessionBinding(oldRecord.OriginalSQL, strings.ToLower(oldRecord.Db), nil)
}

// GetSessionBinding return the BindMeta of the (normdOrigSQL,db) if BindMeta exist.
func (h *sessionBindingHandle) GetSessionBinding(sqlDigest, normdOrigSQL, db string) *BindRecord {
return h.ch.GetBinding(sqlDigest, normdOrigSQL, db)
func (h *sessionBindingHandle) GetSessionBinding(sqlDigest, _, _ string) *BindRecord {
return h.ch.GetBinding(sqlDigest)
}

// GetSessionBindingBySQLDigest return all BindMeta corresponding to sqlDigest.
func (h *sessionBindingHandle) GetSessionBindingBySQLDigest(sqlDigest string) (*BindRecord, error) {
return h.ch.GetBindingBySQLDigest(sqlDigest)
return h.ch.GetBinding(sqlDigest), nil
}

// GetAllSessionBindings return all session bind info.
Expand Down

0 comments on commit 3f7b017

Please sign in to comment.