Skip to content

Commit

Permalink
statistics: use TopN (#2009)
Browse files Browse the repository at this point in the history
  • Loading branch information
Luffbee authored and sre-bot committed Dec 13, 2019
1 parent 234784c commit f7f643c
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 35 deletions.
11 changes: 11 additions & 0 deletions server/statistics/hot_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ type HotPeerStat struct {
isNew bool
}

// ID returns region ID. Implementing TopNItem.
func (stat *HotPeerStat) ID() uint64 {
return stat.RegionID
}

// Less compares two HotPeerStat.Implementing TopNItem.
func (stat *HotPeerStat) Less(than TopNItem) bool {
rhs := than.(*HotPeerStat)
return stat.BytesRate < rhs.BytesRate
}

// IsNeedDelete to delete the item in cache.
func (stat *HotPeerStat) IsNeedDelete() bool {
return stat.needDelete
Expand Down
57 changes: 22 additions & 35 deletions server/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@
package statistics

import (
"math"
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/pkg/cache"
"github.com/pingcap/pd/server/core"
)

const (
cacheMaxLen = 1000
hotPeerMaxCount = 400
topNN = 60
topNTTL = 3 * RegionHeartBeatReportInterval * time.Second
hotThresholdRatio = 0.8

rollingWindowsSize = 5

Expand All @@ -38,15 +39,15 @@ const (
// hotPeerCache saves the hot peer's statistics.
type hotPeerCache struct {
kind FlowKind
peersOfStore map[uint64]cache.Cache // storeID -> hot peers
peersOfStore map[uint64]*TopN // storeID -> hot peers
storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs
}

// NewHotStoresStats creates a HotStoresStats
func NewHotStoresStats(kind FlowKind) *hotPeerCache {
return &hotPeerCache{
kind: kind,
peersOfStore: make(map[uint64]cache.Cache),
peersOfStore: make(map[uint64]*TopN),
storesOfRegion: make(map[uint64]map[uint64]struct{}),
}
}
Expand All @@ -55,11 +56,11 @@ func NewHotStoresStats(kind FlowKind) *hotPeerCache {
func (f *hotPeerCache) RegionStats() map[uint64][]*HotPeerStat {
res := make(map[uint64][]*HotPeerStat)
for storeID, peers := range f.peersOfStore {
values := peers.Elems()
values := peers.GetAll()
stat := make([]*HotPeerStat, len(values))
res[storeID] = stat
for i := range values {
stat[i] = values[i].Value.(*HotPeerStat)
stat[i] = values[i].(*HotPeerStat)
}
}
return res
Expand All @@ -78,10 +79,10 @@ func (f *hotPeerCache) Update(item *HotPeerStat) {
} else {
peers, ok := f.peersOfStore[item.StoreID]
if !ok {
peers = cache.NewCache(cacheMaxLen, cache.TwoQueueCache)
peers = NewTopN(topNN, topNTTL)
f.peersOfStore[item.StoreID] = peers
}
peers.Put(item.RegionID, item)
peers.Put(item)

stores, ok := f.storesOfRegion[item.RegionID]
if !ok {
Expand Down Expand Up @@ -177,7 +178,7 @@ func (f *hotPeerCache) getTotalKeys(region *core.RegionInfo) uint64 {

func (f *hotPeerCache) getOldHotPeerStat(regionID, storeID uint64) *HotPeerStat {
if hotPeers, ok := f.peersOfStore[storeID]; ok {
if v, ok := hotPeers.Peek(regionID); ok {
if v := hotPeers.Get(regionID); v != nil {
return v.(*HotPeerStat)
}
}
Expand All @@ -195,13 +196,20 @@ func (f *hotPeerCache) isRegionExpired(region *core.RegionInfo, storeID uint64)
}

func (f *hotPeerCache) calcHotThreshold(stats *StoresStats, storeID uint64) float64 {
var minHotThreshold float64
switch f.kind {
case WriteFlow:
return calculateWriteHotThresholdWithStore(stats, storeID)
minHotThreshold = hotWriteRegionMinBytesRate
case ReadFlow:
return calculateReadHotThresholdWithStore(stats, storeID)
minHotThreshold = hotReadRegionMinBytesRate
}
return 0

tn, ok := f.peersOfStore[storeID]
if !ok || tn.Len() < topNN {
return minHotThreshold
}
tnMin := tn.GetTopNMin().(*HotPeerStat).BytesRate
return math.Max(tnMin*hotThresholdRatio, minHotThreshold)
}

// gets the storeIDs, including old region and new region
Expand Down Expand Up @@ -244,7 +252,7 @@ func (f *hotPeerCache) isRegionHotWithPeer(region *core.RegionInfo, peer *metapb
}
storeID := peer.GetStoreId()
if peers, ok := f.peersOfStore[storeID]; ok {
if stat, ok := peers.Peek(region.GetID()); ok {
if stat := peers.Get(region.GetID()); stat != nil {
return stat.(*HotPeerStat).HotDegree >= hotDegree
}
}
Expand Down Expand Up @@ -280,24 +288,3 @@ func updateHotPeerStat(newItem, oldItem *HotPeerStat, bytesRate float64, hotThre

return newItem
}

// Utils
func calculateWriteHotThresholdWithStore(stats *StoresStats, storeID uint64) float64 {
writeBytes, _ := stats.GetStoreBytesRate(storeID)
hotRegionThreshold := writeBytes / hotPeerMaxCount

if hotRegionThreshold < hotWriteRegionMinBytesRate {
hotRegionThreshold = hotWriteRegionMinBytesRate
}
return hotRegionThreshold
}

func calculateReadHotThresholdWithStore(stats *StoresStats, storeID uint64) float64 {
_, readBytes := stats.GetStoreBytesRate(storeID)
hotRegionThreshold := readBytes / hotPeerMaxCount

if hotRegionThreshold < hotReadRegionMinBytesRate {
hotRegionThreshold = hotReadRegionMinBytesRate
}
return hotRegionThreshold
}

0 comments on commit f7f643c

Please sign in to comment.