diff --git a/ddl/table_split_test.go b/ddl/table_split_test.go index 132255a986dc6..afb33d5b4044f 100644 --- a/ddl/table_split_test.go +++ b/ddl/table_split_test.go @@ -79,7 +79,7 @@ func checkRegionStartWithTableID(c *C, id int64, store kvStore) { c.Assert(err, IsNil) // Region cache may be out of date, so we need to drop this expired region and load it again. - cache.DropRegion(loc.Region) + cache.InvalidateCachedRegion(loc.Region) if bytes.Equal(loc.StartKey, []byte(regionStartKey)) { return } diff --git a/store/tikv/coprocessor_test.go b/store/tikv/coprocessor_test.go index 12a8ed8fd352d..6daec69cf5c0c 100644 --- a/store/tikv/coprocessor_test.go +++ b/store/tikv/coprocessor_test.go @@ -34,6 +34,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) { _, regionIDs, _ := mocktikv.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t")) pdCli := &codecPDClient{mocktikv.NewPDClient(cluster)} cache := NewRegionCache(pdCli) + defer cache.Close() bo := NewBackoffer(context.Background(), 3000) @@ -96,6 +97,7 @@ func (s *testCoprocessorSuite) TestSplitRegionRanges(c *C) { mocktikv.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t")) pdCli := &codecPDClient{mocktikv.NewPDClient(cluster)} cache := NewRegionCache(pdCli) + defer cache.Close() bo := NewBackoffer(context.Background(), 3000) @@ -148,6 +150,7 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) { storeID, regionIDs, peerIDs := mocktikv.BootstrapWithMultiRegions(cluster, []byte("m")) pdCli := &codecPDClient{mocktikv.NewPDClient(cluster)} cache := NewRegionCache(pdCli) + defer cache.Close() bo := NewBackoffer(context.Background(), 3000) tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), false, false) @@ -161,7 +164,7 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) { regionIDs = append(regionIDs, cluster.AllocID()) peerIDs = append(peerIDs, cluster.AllocID()) cluster.Split(regionIDs[1], regionIDs[2], []byte("q"), []uint64{peerIDs[2]}, storeID) - cache.DropRegion(tasks[1].region) + cache.InvalidateCachedRegion(tasks[1].region) tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), true, false) c.Assert(err, IsNil) diff --git a/store/tikv/kv.go b/store/tikv/kv.go index 5d278f7e686bc..245af37af1c5b 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -296,6 +296,7 @@ func (s *tikvStore) Close() error { if s.txnLatches != nil { s.txnLatches.Close() } + s.regionCache.Close() return nil } diff --git a/store/tikv/rawkv.go b/store/tikv/rawkv.go index 692cf77807d10..aea8c9abf52dd 100644 --- a/store/tikv/rawkv.go +++ b/store/tikv/rawkv.go @@ -82,7 +82,15 @@ func NewRawKVClient(pdAddrs []string, security config.Security) (*RawKVClient, e // Close closes the client. func (c *RawKVClient) Close() error { - c.pdClient.Close() + if c.pdClient != nil { + c.pdClient.Close() + } + if c.regionCache != nil { + c.regionCache.Close() + } + if c.rpcClient == nil { + return nil + } return c.rpcClient.Close() } diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index a48207c7b88fb..537113696b36d 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -20,10 +20,11 @@ import ( "sync" "sync/atomic" "time" + "unsafe" "github.com/google/btree" + "github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils" "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/client" "github.com/pingcap/tidb/metrics" @@ -32,8 +33,10 @@ import ( ) const ( - btreeDegree = 32 - rcDefaultRegionCacheTTL = time.Minute * 10 + btreeDegree = 32 + rcDefaultRegionCacheTTLSec = 600 + invalidatedLastAccessTime = -1 + reloadRegionThreshold = 5 ) var ( @@ -46,16 +49,101 @@ var ( tikvRegionCacheCounterWithGetStoreError = metrics.TiKVRegionCacheCounter.WithLabelValues("get_store", "err") ) -// CachedRegion encapsulates {Region, TTL} -type CachedRegion struct { - region *Region - lastAccess int64 +const ( + updated int32 = iota // region is updated and no need to reload. + needSync // need sync new region info. +) + +// Region presents kv region +type Region struct { + meta *metapb.Region // raw region meta from PD immutable after init + store unsafe.Pointer // point to region store info, see RegionStore + syncFlag int32 // region need be sync in next turn + lastAccess int64 // last region access time, see checkRegionCacheTTL +} + +// RegionStore represents region stores info +// it will be store as unsafe.Pointer and be load at once +type RegionStore struct { + workStoreIdx int32 // point to current work peer in meta.Peers and work store in stores(same idx) + stores []*Store // stores in this region + attemptAfterLoad uint8 // indicate switch peer attempts after load region info +} + +// clone clones region store struct. +func (r *RegionStore) clone() *RegionStore { + return &RegionStore{ + workStoreIdx: r.workStoreIdx, + stores: r.stores, + attemptAfterLoad: r.attemptAfterLoad, + } +} + +// init initializes region after constructed. +func (r *Region) init(c *RegionCache) { + // region store pull used store from global store map + // to avoid acquire storeMu in later access. + rs := &RegionStore{ + workStoreIdx: 0, + stores: make([]*Store, 0, len(r.meta.Peers)), + } + for _, p := range r.meta.Peers { + c.storeMu.RLock() + store, exists := c.storeMu.stores[p.StoreId] + c.storeMu.RUnlock() + if !exists { + store = c.getStoreByStoreID(p.StoreId) + } + rs.stores = append(rs.stores, store) + } + atomic.StorePointer(&r.store, unsafe.Pointer(rs)) + + // mark region has been init accessed. + r.lastAccess = time.Now().Unix() } -func (c *CachedRegion) isValid() bool { - lastAccess := atomic.LoadInt64(&c.lastAccess) - lastAccessTime := time.Unix(lastAccess, 0) - return time.Since(lastAccessTime) < rcDefaultRegionCacheTTL +func (r *Region) getStore() (store *RegionStore) { + store = (*RegionStore)(atomic.LoadPointer(&r.store)) + return +} + +func (r *Region) compareAndSwapStore(oldStore, newStore *RegionStore) bool { + return atomic.CompareAndSwapPointer(&r.store, unsafe.Pointer(oldStore), unsafe.Pointer(newStore)) +} + +func (r *Region) checkRegionCacheTTL(ts int64) bool { + for { + lastAccess := atomic.LoadInt64(&r.lastAccess) + if ts-lastAccess > rcDefaultRegionCacheTTLSec { + return false + } + if atomic.CompareAndSwapInt64(&r.lastAccess, lastAccess, ts) { + return true + } + } +} + +// invalidate invalidates a region, next time it will got null result. +func (r *Region) invalidate() { + atomic.StoreInt64(&r.lastAccess, invalidatedLastAccessTime) +} + +// scheduleReload schedules reload region request in next LocateKey. +func (r *Region) scheduleReload() { + oldValue := atomic.LoadInt32(&r.syncFlag) + if oldValue != updated { + return + } + atomic.CompareAndSwapInt32(&r.syncFlag, oldValue, needSync) +} + +// needReload checks whether region need reload. +func (r *Region) needReload() bool { + oldValue := atomic.LoadInt32(&r.syncFlag) + if oldValue == updated { + return false + } + return atomic.CompareAndSwapInt32(&r.syncFlag, oldValue, updated) } // RegionCache caches Regions loaded from PD. @@ -63,14 +151,16 @@ type RegionCache struct { pdClient pd.Client mu struct { - sync.RWMutex - regions map[RegionVerID]*CachedRegion - sorted *btree.BTree + sync.RWMutex // mutex protect cached region + regions map[RegionVerID]*Region // cached regions be organized as regionVerID to region ref mapping + sorted *btree.BTree // cache regions be organized as sorted key to region ref mapping } storeMu struct { sync.RWMutex stores map[uint64]*Store } + notifyCheckCh chan struct{} + closeCh chan struct{} } // NewRegionCache creates a RegionCache. @@ -78,24 +168,73 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { c := &RegionCache{ pdClient: pdClient, } - c.mu.regions = make(map[RegionVerID]*CachedRegion) + c.mu.regions = make(map[RegionVerID]*Region) c.mu.sorted = btree.New(btreeDegree) c.storeMu.stores = make(map[uint64]*Store) + c.notifyCheckCh = make(chan struct{}, 1) + c.closeCh = make(chan struct{}) + go c.asyncCheckAndResolveLoop() return c } +// Close releases region cache's resource. +func (c *RegionCache) Close() { + close(c.closeCh) +} + +// asyncCheckAndResolveLoop with +func (c *RegionCache) asyncCheckAndResolveLoop() { + var needCheckStores []*Store + for { + select { + case <-c.closeCh: + return + case <-c.notifyCheckCh: + needCheckStores = needCheckStores[:0] + c.checkAndResolve(needCheckStores) + } + } +} + +// checkAndResolve checks and resolve addr of failed stores. +// this method isn't thread-safe and only be used by one goroutine. +func (c *RegionCache) checkAndResolve(needCheckStores []*Store) { + defer func() { + r := recover() + if r != nil { + logutil.Logger(context.Background()).Error("panic in the checkAndResolve goroutine", + zap.Reflect("r", r), + zap.Stack("stack trace")) + } + }() + + c.storeMu.RLock() + for _, store := range c.storeMu.stores { + state := store.getState() + if state.resolveState == needCheck { + needCheckStores = append(needCheckStores, store) + } + } + c.storeMu.RUnlock() + + for _, store := range needCheckStores { + store.reResolve(c) + } +} + // RPCContext contains data that is needed to send RPC to a region. type RPCContext struct { Region RegionVerID Meta *metapb.Region Peer *metapb.Peer + Store *Store Addr string } // GetStoreID returns StoreID. func (c *RPCContext) GetStoreID() uint64 { - if c.Peer != nil { - return c.Peer.StoreId + if c.Store != nil { + return c.Store.storeID } return 0 } @@ -108,32 +247,32 @@ func (c *RPCContext) String() string { // GetRPCContext returns RPCContext for a region. If it returns nil, the region // must be out of date and already dropped from cache. func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext, error) { - c.mu.RLock() - region := c.getCachedRegion(id) - if region == nil { - c.mu.RUnlock() + ts := time.Now().Unix() + + cachedRegion := c.getCachedRegionWithRLock(id) + if cachedRegion == nil { return nil, nil } - // Note: it is safe to use region.meta and region.peer without clone after - // unlock, because region cache will never update the content of region's meta - // or peer. On the contrary, if we want to use `region` after unlock, then we - // need to clone it to avoid data race. - meta, peer := region.meta, region.peer - c.mu.RUnlock() - addr, err := c.GetStoreAddr(bo, peer.GetStoreId()) + if !cachedRegion.checkRegionCacheTTL(ts) { + return nil, nil + } + + store, peer, addr, err := c.routeStoreInRegion(bo, cachedRegion, ts) if err != nil { - return nil, errors.Trace(err) + return nil, err } - if addr == "" { + if store == nil || len(addr) == 0 { // Store not found, region must be out of date. - c.DropRegion(id) + cachedRegion.invalidate() return nil, nil } + return &RPCContext{ Region: id, - Meta: meta, + Meta: cachedRegion.meta, Peer: peer, + Store: store, Addr: addr, }, nil } @@ -153,28 +292,10 @@ func (l *KeyLocation) Contains(key []byte) bool { // LocateKey searches for the region and range that the key is located. func (c *RegionCache) LocateKey(bo *Backoffer, key []byte) (*KeyLocation, error) { - c.mu.RLock() - r := c.searchCachedRegion(key, false) - if r != nil { - loc := &KeyLocation{ - Region: r.VerID(), - StartKey: r.StartKey(), - EndKey: r.EndKey(), - } - c.mu.RUnlock() - return loc, nil - } - c.mu.RUnlock() - - r, err := c.loadRegion(bo, key, false) + r, err := c.findRegionByKey(bo, key, false) if err != nil { - return nil, errors.Trace(err) + return nil, err } - - c.mu.Lock() - defer c.mu.Unlock() - c.insertRegionToCache(r) - return &KeyLocation{ Region: r.VerID(), StartKey: r.StartKey(), @@ -182,31 +303,24 @@ func (c *RegionCache) LocateKey(bo *Backoffer, key []byte) (*KeyLocation, error) }, nil } -// LocateEndKey searches for the region and range that the key is located. -// Unlike LocateKey, start key of a region is exclusive and end key is inclusive. -func (c *RegionCache) LocateEndKey(bo *Backoffer, key []byte) (*KeyLocation, error) { - c.mu.RLock() - r := c.searchCachedRegion(key, true) - if r != nil { - loc := &KeyLocation{ - Region: r.VerID(), - StartKey: r.StartKey(), - EndKey: r.EndKey(), - } - c.mu.RUnlock() - return loc, nil - } - c.mu.RUnlock() - - r, err := c.loadRegion(bo, key, true) +func (c *RegionCache) loadAndInsertRegion(bo *Backoffer, key []byte) (*Region, error) { + r, err := c.loadRegion(bo, key, false) if err != nil { - return nil, errors.Trace(err) + return nil, err } - c.mu.Lock() - defer c.mu.Unlock() c.insertRegionToCache(r) + c.mu.Unlock() + return r, nil +} +// LocateEndKey searches for the region and range that the key is located. +// Unlike LocateKey, start key of a region is exclusive and end key is inclusive. +func (c *RegionCache) LocateEndKey(bo *Backoffer, key []byte) (*KeyLocation, error) { + r, err := c.findRegionByKey(bo, key, true) + if err != nil { + return nil, err + } return &KeyLocation{ Region: r.VerID(), StartKey: r.StartKey(), @@ -214,6 +328,36 @@ func (c *RegionCache) LocateEndKey(bo *Backoffer, key []byte) (*KeyLocation, err }, nil } +func (c *RegionCache) findRegionByKey(bo *Backoffer, key []byte, isEndKey bool) (r *Region, err error) { + r = c.searchCachedRegion(key, isEndKey) + if r == nil { + // load region when it is not exists or expired. + lr, err := c.loadRegion(bo, key, isEndKey) + if err != nil { + // no region data, return error if failure. + return nil, err + } + r = lr + c.mu.Lock() + c.insertRegionToCache(r) + c.mu.Unlock() + } else if r.needReload() { + // load region when it be marked as need reload. + lr, err := c.loadRegion(bo, key, isEndKey) + if err != nil { + // ignore error and use old region info. + logutil.Logger(bo.ctx).Error("load region failure", + zap.ByteString("key", key), zap.Error(err)) + } else { + r = lr + c.mu.Lock() + c.insertRegionToCache(r) + c.mu.Unlock() + } + } + return r, nil +} + // LocateRegionByID searches for the region with ID. func (c *RegionCache) LocateRegionByID(bo *Backoffer, regionID uint64) (*KeyLocation, error) { c.mu.RLock() @@ -235,8 +379,8 @@ func (c *RegionCache) LocateRegionByID(bo *Backoffer, regionID uint64) (*KeyLoca } c.mu.Lock() - defer c.mu.Unlock() c.insertRegionToCache(r) + c.mu.Unlock() return &KeyLocation{ Region: r.VerID(), StartKey: r.StartKey(), @@ -284,61 +428,40 @@ func (c *RegionCache) ListRegionIDsInKeyRange(bo *Backoffer, startKey, endKey [] return regionIDs, nil } -// DropRegion removes a cached Region. -func (c *RegionCache) DropRegion(id RegionVerID) { - c.mu.Lock() - defer c.mu.Unlock() - c.dropRegionFromCache(id) +// InvalidateCachedRegion removes a cached Region. +func (c *RegionCache) InvalidateCachedRegion(id RegionVerID) { + cachedRegion := c.getCachedRegionWithRLock(id) + if cachedRegion == nil { + return + } + tikvRegionCacheCounterWithDropRegionFromCacheOK.Inc() + cachedRegion.invalidate() } // UpdateLeader update some region cache with newer leader info. func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64) { - c.mu.Lock() - defer c.mu.Unlock() - - r := c.getCachedRegion(regionID) + r := c.getCachedRegionWithRLock(regionID) if r == nil { logutil.Logger(context.Background()).Debug("regionCache: cannot find region when updating leader", zap.Uint64("regionID", regionID.GetID()), zap.Uint64("leaderStoreID", leaderStoreID)) return } - - if !r.SwitchPeer(leaderStoreID) { + if !c.switchWorkStore(r, leaderStoreID) { logutil.Logger(context.Background()).Debug("regionCache: cannot find peer when updating leader", zap.Uint64("regionID", regionID.GetID()), zap.Uint64("leaderStoreID", leaderStoreID)) - c.dropRegionFromCache(r.VerID()) + r.invalidate() } } // insertRegionToCache tries to insert the Region to cache. -func (c *RegionCache) insertRegionToCache(r *Region) { - old := c.mu.sorted.ReplaceOrInsert(newBtreeItem(r)) +func (c *RegionCache) insertRegionToCache(cachedRegion *Region) { + old := c.mu.sorted.ReplaceOrInsert(newBtreeItem(cachedRegion)) if old != nil { - delete(c.mu.regions, old.(*btreeItem).region.VerID()) - } - c.mu.regions[r.VerID()] = &CachedRegion{ - region: r, - lastAccess: time.Now().Unix(), - } -} - -// getCachedRegion loads a region from cache. It also checks if the region has -// not been accessed for a long time (maybe out of date). In this case, it -// returns nil so the region will be loaded from PD again. -// Note that it should be called with c.mu.RLock(), and the returned Region -// should not be used after c.mu is RUnlock(). -func (c *RegionCache) getCachedRegion(id RegionVerID) *Region { - cachedRegion, ok := c.mu.regions[id] - if !ok { - return nil - } - if cachedRegion.isValid() { - atomic.StoreInt64(&cachedRegion.lastAccess, time.Now().Unix()) - return cachedRegion.region + delete(c.mu.regions, old.(*btreeItem).cachedRegion.VerID()) } - return nil + c.mu.regions[cachedRegion.VerID()] = cachedRegion } // searchCachedRegion finds a region from cache by key. Like `getCachedRegion`, @@ -346,17 +469,27 @@ func (c *RegionCache) getCachedRegion(id RegionVerID) *Region { // used after c.mu is RUnlock(). // If the given key is the end key of the region that you want, you may set the second argument to true. This is useful when processing in reverse order. func (c *RegionCache) searchCachedRegion(key []byte, isEndKey bool) *Region { + ts := time.Now().Unix() var r *Region + c.mu.RLock() c.mu.sorted.DescendLessOrEqual(newBtreeSearchItem(key), func(item btree.Item) bool { - r = item.(*btreeItem).region + r = item.(*btreeItem).cachedRegion if isEndKey && bytes.Equal(r.StartKey(), key) { r = nil // clear result return true // iterate next item } + if !r.checkRegionCacheTTL(ts) { + r = nil + return true + } return false }) + c.mu.RUnlock() if r != nil && (!isEndKey && r.Contains(key) || isEndKey && r.ContainsByEnd(key)) { - return c.getCachedRegion(r.VerID()) + if !c.hasAvailableStore(r, ts) { + return nil + } + return r } return nil } @@ -367,22 +500,12 @@ func (c *RegionCache) searchCachedRegion(key []byte, isEndKey bool) *Region { func (c *RegionCache) getRegionByIDFromCache(regionID uint64) *Region { for v, r := range c.mu.regions { if v.id == regionID { - return r.region + return r } } return nil } -func (c *RegionCache) dropRegionFromCache(verID RegionVerID) { - r, ok := c.mu.regions[verID] - if !ok { - return - } - tikvRegionCacheCounterWithDropRegionFromCacheOK.Inc() - c.mu.sorted.Delete(newBtreeItem(r.region)) - delete(c.mu.regions, verID) -} - // loadRegion loads region from pd client, and picks the first peer as leader. // If the given key is the end key of the region that you want, you may set the second argument to true. This is useful when processing in reverse order. func (c *RegionCache) loadRegion(bo *Backoffer, key []byte, isEndKey bool) (*Region, error) { @@ -423,12 +546,10 @@ func (c *RegionCache) loadRegion(bo *Backoffer, key []byte, isEndKey bool) (*Reg searchPrev = true continue } - region := &Region{ - meta: meta, - peer: meta.Peers[0], - } + region := &Region{meta: meta} + region.init(c) if leader != nil { - region.SwitchPeer(leader.GetStoreId()) + c.switchWorkStore(region, leader.StoreId) } return region, nil } @@ -461,120 +582,160 @@ func (c *RegionCache) loadRegionByID(bo *Backoffer, regionID uint64) (*Region, e if len(meta.Peers) == 0 { return nil, errors.New("receive Region with no peer") } - region := &Region{ - meta: meta, - peer: meta.Peers[0], - } + region := &Region{meta: meta} + region.init(c) if leader != nil { - region.SwitchPeer(leader.GetStoreId()) + c.switchWorkStore(region, leader.GetStoreId()) } return region, nil } } -// GetStoreAddr returns a tikv server's address by its storeID. It checks cache -// first, sends request to pd server when necessary. -func (c *RegionCache) GetStoreAddr(bo *Backoffer, id uint64) (string, error) { - c.storeMu.RLock() - if store, ok := c.storeMu.stores[id]; ok { - c.storeMu.RUnlock() - return store.Addr, nil - } - c.storeMu.RUnlock() - return c.ReloadStoreAddr(bo, id) +func (c *RegionCache) getCachedRegionWithRLock(regionID RegionVerID) (r *Region) { + c.mu.RLock() + r = c.mu.regions[regionID] + c.mu.RUnlock() + return } -// ReloadStoreAddr reloads store's address. -func (c *RegionCache) ReloadStoreAddr(bo *Backoffer, id uint64) (string, error) { - addr, err := c.loadStoreAddr(bo, id) - if err != nil || addr == "" { - return "", errors.Trace(err) +// routeStoreInRegion ensures region have workable store and return it. +func (c *RegionCache) routeStoreInRegion(bo *Backoffer, region *Region, ts int64) (workStore *Store, workPeer *metapb.Peer, workAddr string, err error) { +retry: + regionStore := region.getStore() + cachedStore, cachedPeer, cachedIdx := region.WorkStorePeer(regionStore) + + // Most of the time, requests are directly routed to stable leader. + // returns if store is stable leader and no need retry other node. + state := cachedStore.getState() + if cachedStore != nil && state.failedAttempt == 0 && state.lastFailedTime == 0 { + workStore = cachedStore + workAddr, err = c.getStoreAddr(bo, region, workStore, cachedIdx, state) + workPeer = cachedPeer + return } - c.storeMu.Lock() - defer c.storeMu.Unlock() - c.storeMu.stores[id] = &Store{ - ID: id, - Addr: addr, + // try round-robin find & switch to other peers when old leader meet error. + newIdx := -1 + storeNum := len(regionStore.stores) + i := (cachedIdx + 1) % storeNum + start := i + for { + store := regionStore.stores[i] + state = store.getState() + if state.Available(ts) { + newIdx = i + break + } + i = (i + 1) % storeNum + if i == start { + break + } + } + if newIdx < 0 { + return + } + newRegionStore := regionStore.clone() + newRegionStore.workStoreIdx = int32(newIdx) + newRegionStore.attemptAfterLoad++ + attemptOverThreshold := newRegionStore.attemptAfterLoad == reloadRegionThreshold + if attemptOverThreshold { + newRegionStore.attemptAfterLoad = 0 } - return addr, nil + if !region.compareAndSwapStore(regionStore, newRegionStore) { + goto retry + } + + // reload region info after attempts more than reloadRegionThreshold + if attemptOverThreshold { + region.scheduleReload() + } + + workStore = newRegionStore.stores[newIdx] + workAddr, err = c.getStoreAddr(bo, region, workStore, newIdx, state) + workPeer = region.meta.Peers[newIdx] + return } -// ClearStoreByID clears store from cache with storeID. -func (c *RegionCache) ClearStoreByID(id uint64) { - c.storeMu.Lock() - defer c.storeMu.Unlock() - delete(c.storeMu.stores, id) +func (c *RegionCache) getStoreAddr(bo *Backoffer, region *Region, store *Store, storeIdx int, state storeState) (addr string, err error) { + switch state.resolveState { + case resolved, needCheck: + addr = store.addr + return + case unresolved: + addr, err = store.initResolve(bo, c) + return + case deleted: + addr = c.changeToActiveStore(region, store, storeIdx) + return + default: + panic("unsupported resolve state") + } } -func (c *RegionCache) loadStoreAddr(bo *Backoffer, id uint64) (string, error) { +func (c *RegionCache) changeToActiveStore(region *Region, store *Store, storeIdx int) (addr string) { + c.storeMu.RLock() + store = c.storeMu.stores[store.storeID] + c.storeMu.RUnlock() for { - store, err := c.pdClient.GetStore(bo.ctx, id) - if err != nil { - tikvRegionCacheCounterWithGetStoreError.Inc() - } else { - tikvRegionCacheCounterWithGetStoreOK.Inc() - } - if err != nil { - if errors.Cause(err) == context.Canceled { - return "", errors.Trace(err) - } - err = errors.Errorf("loadStore from PD failed, id: %d, err: %v", id, err) - if err = bo.Backoff(BoPDRPC, err); err != nil { - return "", errors.Trace(err) + oldRegionStore := region.getStore() + newRegionStore := oldRegionStore.clone() + newRegionStore.stores = make([]*Store, 0, len(oldRegionStore.stores)) + for i, s := range oldRegionStore.stores { + if i == storeIdx { + newRegionStore.stores = append(newRegionStore.stores, store) + } else { + newRegionStore.stores = append(newRegionStore.stores, s) } - continue } - if store == nil { - return "", nil + if region.compareAndSwapStore(oldRegionStore, newRegionStore) { + break } - return store.GetAddress(), nil } + addr = store.addr + return } -// DropStoreOnSendRequestFail is used for clearing cache when a tikv server does not respond. -func (c *RegionCache) DropStoreOnSendRequestFail(ctx *RPCContext, err error) { - // We need to drop the store only when the request is the first one failed on this store. - // Because too many concurrently requests trying to drop the store will be blocked on the lock. - failedRegionID := ctx.Region - failedStoreID := ctx.Peer.StoreId - c.mu.Lock() - _, ok := c.mu.regions[failedRegionID] - if !ok { - // The failed region is dropped already by another request, we don't need to iterate the regions - // and find regions on the failed store to drop. - c.mu.Unlock() - return - } - for id, r := range c.mu.regions { - if r.region.peer.GetStoreId() == failedStoreID { - c.dropRegionFromCache(id) +// hasAvailableStore checks whether region has available store. +// different to `routeStoreInRegion` just check and never change work store or peer. +func (c *RegionCache) hasAvailableStore(region *Region, ts int64) bool { + regionStore := region.getStore() + for _, store := range regionStore.stores { + state := store.getState() + if state.Available(ts) { + return true } } - c.mu.Unlock() + return false +} - // Store's meta may be out of date. - var failedStoreAddr string +func (c *RegionCache) getStoreByStoreID(storeID uint64) (store *Store) { + var ok bool c.storeMu.Lock() - store, ok := c.storeMu.stores[failedStoreID] + store, ok = c.storeMu.stores[storeID] if ok { - failedStoreAddr = store.Addr - delete(c.storeMu.stores, failedStoreID) + c.storeMu.Unlock() + return } + store = &Store{storeID: storeID} + c.storeMu.stores[storeID] = store c.storeMu.Unlock() - logutil.Logger(context.Background()).Info("drop regions that on the store due to send request fail", - zap.Uint64("store", failedStoreID), - zap.String("store addr", failedStoreAddr), - zap.Error(err)) + return +} + +// OnSendRequestFail is used for clearing cache when a tikv server does not respond. +func (c *RegionCache) OnSendRequestFail(ctx *RPCContext, err error) { + failedStoreID := ctx.Store.storeID + c.storeMu.RLock() + store, exists := c.storeMu.stores[failedStoreID] + c.storeMu.RUnlock() + if !exists { + return + } + store.markAccess(c.notifyCheckCh, false) } // OnRegionEpochNotMatch removes the old region and inserts new regions into the cache. func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, currentRegions []*metapb.Region) error { - c.mu.Lock() - defer c.mu.Unlock() - - c.dropRegionFromCache(ctx.Region) - // Find whether the region epoch in `ctx` is ahead of TiKV's. If so, backoff. for _, meta := range currentRegions { if meta.GetId() == ctx.Region.id && @@ -586,6 +747,9 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, curr } } + c.mu.Lock() + defer c.mu.Unlock() + needInvalidateOld := true // If the region epoch is not ahead of TiKV's, replace region meta in region cache. for _, meta := range currentRegions { if _, ok := c.pdClient.(*codecPDClient); ok { @@ -593,12 +757,20 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, curr return errors.Errorf("newRegion's range key is not encoded: %v, %v", meta, err) } } - region := &Region{ - meta: meta, - peer: meta.Peers[0], - } - region.SwitchPeer(ctx.Peer.GetStoreId()) + region := &Region{meta: meta} + region.init(c) + c.switchWorkStore(region, ctx.Store.storeID) c.insertRegionToCache(region) + if ctx.Region == region.VerID() { + needInvalidateOld = false + } + } + if needInvalidateOld { + cachedRegion, ok := c.mu.regions[ctx.Region] + if ok { + tikvRegionCacheCounterWithDropRegionFromCacheOK.Inc() + cachedRegion.invalidate() + } } return nil } @@ -610,14 +782,14 @@ func (c *RegionCache) PDClient() pd.Client { // btreeItem is BTree's Item that uses []byte to compare. type btreeItem struct { - key []byte - region *Region + key []byte + cachedRegion *Region } -func newBtreeItem(r *Region) *btreeItem { +func newBtreeItem(cr *Region) *btreeItem { return &btreeItem{ - key: r.StartKey(), - region: r, + key: cr.StartKey(), + cachedRegion: cr, } } @@ -631,17 +803,19 @@ func (item *btreeItem) Less(other btree.Item) bool { return bytes.Compare(item.key, other.(*btreeItem).key) < 0 } -// Region stores region's meta and its leader peer. -type Region struct { - meta *metapb.Region - peer *metapb.Peer -} - // GetID returns id. func (r *Region) GetID() uint64 { return r.meta.GetId() } +// WorkStorePeer returns current work store with work peer. +func (r *Region) WorkStorePeer(rs *RegionStore) (store *Store, peer *metapb.Peer, idx int) { + idx = int(rs.workStoreIdx) + store = rs.stores[rs.workStoreIdx] + peer = r.meta.Peers[rs.workStoreIdx] + return +} + // RegionVerID is a unique ID that can identify a Region at a specific version. type RegionVerID struct { id uint64 @@ -673,25 +847,34 @@ func (r *Region) EndKey() []byte { return r.meta.EndKey } -// GetContext constructs kvprotopb.Context from region info. -func (r *Region) GetContext() *kvrpcpb.Context { - return &kvrpcpb.Context{ - RegionId: r.meta.Id, - RegionEpoch: r.meta.RegionEpoch, - Peer: r.peer, +// switchWorkStore switches current store to the one on specific store. It returns +// false if no peer matches the storeID. +func (c *RegionCache) switchWorkStore(r *Region, targetStoreID uint64) (switchToTarget bool) { + if len(r.meta.Peers) == 0 { + return } -} -// SwitchPeer switches current peer to the one on specific store. It returns -// false if no peer matches the storeID. -func (r *Region) SwitchPeer(storeID uint64) bool { - for _, p := range r.meta.Peers { - if p.GetStoreId() == storeID { - r.peer = p - return true + leaderIdx := 0 + for i, p := range r.meta.Peers { + if p.GetStoreId() == targetStoreID { + leaderIdx = i + switchToTarget = true + break } } - return false + +retry: + // switch to new leader. + oldRegionStore := r.getStore() + if oldRegionStore.workStoreIdx == int32(leaderIdx) { + return + } + newRegionStore := oldRegionStore.clone() + newRegionStore.workStoreIdx = int32(leaderIdx) + if !r.compareAndSwapStore(oldRegionStore, newRegionStore) { + goto retry + } + return } // Contains checks whether the key is in the region, for the maximum region endKey is empty. @@ -709,8 +892,227 @@ func (r *Region) ContainsByEnd(key []byte) bool { (bytes.Compare(key, r.meta.GetEndKey()) <= 0 || len(r.meta.GetEndKey()) == 0) } -// Store contains a tikv server's address. +// Store contains a kv process's address. type Store struct { - ID uint64 - Addr string + addr string // loaded store address + storeID uint64 // store's id + state uint64 // unsafe store storeState + resolveMutex sync.Mutex // protect pd from concurrent init requests +} + +// storeState contains store's access info. +type storeState struct { + lastFailedTime uint32 + failedAttempt uint16 + resolveState resolveState + _Align int8 +} + +type resolveState uint8 + +const ( + unresolved resolveState = iota + resolved + needCheck + deleted +) + +// initResolve resolves addr for store that never resolved. +func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err error) { + s.resolveMutex.Lock() + state := s.getState() + defer s.resolveMutex.Unlock() + if state.resolveState != unresolved { + addr = s.addr + return + } + var store *metapb.Store + for { + store, err = c.pdClient.GetStore(bo.ctx, s.storeID) + if err != nil { + tikvRegionCacheCounterWithGetStoreError.Inc() + } else { + tikvRegionCacheCounterWithGetStoreOK.Inc() + } + if err != nil { + // TODO: more refine PD error status handle. + if errors.Cause(err) == context.Canceled { + return + } + err = errors.Errorf("loadStore from PD failed, id: %d, err: %v", s.storeID, err) + if err = bo.Backoff(BoPDRPC, err); err != nil { + return + } + continue + } + if store == nil { + return + } + addr = store.GetAddress() + s.addr = addr + retry: + state = s.getState() + if state.resolveState != unresolved { + addr = s.addr + return + } + newState := state + newState.resolveState = resolved + if !s.compareAndSwapState(state, newState) { + goto retry + } + return + } +} + +// reResolve try to resolve addr for store that need check. +func (s *Store) reResolve(c *RegionCache) { + var addr string + store, err := c.pdClient.GetStore(context.Background(), s.storeID) + if err != nil { + tikvRegionCacheCounterWithGetStoreError.Inc() + } else { + tikvRegionCacheCounterWithGetStoreOK.Inc() + } + if err != nil { + logutil.Logger(context.Background()).Error("loadStore from PD failed", zap.Uint64("id", s.storeID), zap.Error(err)) + // we cannot do backoff in reResolve loop but try check other store and wait tick. + return + } + if store == nil { + return + } + + addr = store.GetAddress() + if s.addr != addr { + var state storeState + state.resolveState = resolved + newStore := &Store{storeID: s.storeID, addr: addr} + newStore.state = *(*uint64)(unsafe.Pointer(&state)) + c.storeMu.Lock() + c.storeMu.stores[newStore.storeID] = newStore + c.storeMu.Unlock() + retryMarkDel: + // all region used those + oldState := s.getState() + if oldState.resolveState == deleted { + return + } + newState := oldState + newState.resolveState = deleted + if !s.compareAndSwapState(oldState, newState) { + goto retryMarkDel + } + return + } +retryMarkResolved: + oldState := s.getState() + if oldState.resolveState != needCheck { + return + } + newState := oldState + newState.resolveState = resolved + if !s.compareAndSwapState(oldState, newState) { + goto retryMarkResolved + } + return +} + +const ( + // maxExponentAttempt before this blackout time is exponent increment. + maxExponentAttempt = 10 + // startBlackoutAfterAttempt after continue fail attempts start blackout store. + startBlackoutAfterAttempt = 20 +) + +func (s *Store) getState() storeState { + var state storeState + if s == nil { + return state + } + x := atomic.LoadUint64(&s.state) + *(*uint64)(unsafe.Pointer(&state)) = x + return state +} + +func (s *Store) compareAndSwapState(oldState, newState storeState) bool { + oldValue, newValue := *(*uint64)(unsafe.Pointer(&oldState)), *(*uint64)(unsafe.Pointer(&newState)) + return atomic.CompareAndSwapUint64(&s.state, oldValue, newValue) +} + +func (s *Store) storeState(newState storeState) { + newValue := *(*uint64)(unsafe.Pointer(&newState)) + atomic.StoreUint64(&s.state, newValue) +} + +// Available returns whether store be available for current. +func (state storeState) Available(ts int64) bool { + if state.failedAttempt == 0 || state.lastFailedTime == 0 { + // return quickly if it's continue success. + return true + } + // first `startBlackoutAfterAttempt` can retry immediately. + if state.failedAttempt < startBlackoutAfterAttempt { + return true + } + // continue fail over than `startBlackoutAfterAttempt` start blackout store logic. + // check blackout time window to determine store's reachable. + if state.failedAttempt > startBlackoutAfterAttempt+maxExponentAttempt { + state.failedAttempt = startBlackoutAfterAttempt + maxExponentAttempt + } + blackoutDeadline := int64(state.lastFailedTime) + 1*int64(backoffutils.ExponentBase2(uint(state.failedAttempt-startBlackoutAfterAttempt+1))) + return blackoutDeadline <= ts +} + +// markAccess marks the processing result. +func (s *Store) markAccess(notifyCheckCh chan struct{}, success bool) { +retry: + var triggerCheck bool + oldState := s.getState() + if (oldState.failedAttempt == 0 && success) || (!success && oldState.failedAttempt >= (startBlackoutAfterAttempt+maxExponentAttempt)) { + // return quickly if continue success, and no more mark when attempt meet max bound. + return + } + state := oldState + if !success { + if state.lastFailedTime == 0 { + state.lastFailedTime = uint32(time.Now().Unix()) + } + state.failedAttempt = state.failedAttempt + 1 + if state.resolveState == resolved { + state.resolveState = needCheck + triggerCheck = true + } + } else { + state.lastFailedTime = 0 + state.failedAttempt = 0 + } + if !s.compareAndSwapState(oldState, state) { + goto retry + } + if triggerCheck { + select { + case notifyCheckCh <- struct{}{}: + default: + } + } +} + +// markNeedCheck marks resolved store to be async resolve to check store addr change. +func (s *Store) markNeedCheck(notifyCheckCh chan struct{}) { +retry: + oldState := s.getState() + if oldState.resolveState != resolved { + return + } + state := oldState + state.resolveState = needCheck + if !s.compareAndSwapState(oldState, state) { + goto retry + } + select { + case notifyCheckCh <- struct{}{}: + default: + } + } diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index 5fd5bf62c68fc..1542d937f65d4 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -17,11 +17,12 @@ import ( "context" "errors" "fmt" - "github.com/pingcap/kvproto/pkg/metapb" "testing" "time" + "github.com/google/btree" . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/store/mockstore/mocktikv" ) @@ -52,16 +53,66 @@ func (s *testRegionCacheSuite) SetUpTest(c *C) { s.bo = NewBackoffer(context.Background(), 5000) } +func (s *testRegionCacheSuite) TearDownTest(c *C) { + s.cache.Close() +} + func (s *testRegionCacheSuite) storeAddr(id uint64) string { return fmt.Sprintf("store%d", id) } func (s *testRegionCacheSuite) checkCache(c *C, len int) { - c.Assert(s.cache.mu.regions, HasLen, len) - c.Assert(s.cache.mu.sorted.Len(), Equals, len) + ts := time.Now().Unix() + c.Assert(workableRegions(s.cache, s.cache.mu.regions, ts), Equals, len) + c.Assert(workableRegionsInBtree(s.cache, s.cache.mu.sorted, ts), Equals, len) for _, r := range s.cache.mu.regions { - c.Assert(r.region, DeepEquals, s.cache.searchCachedRegion(r.region.StartKey(), false)) + if r.checkRegionCacheTTL(ts) { + bo := NewBackoffer(context.Background(), 100) + if store, _, _, _ := s.cache.routeStoreInRegion(bo, r, ts); store != nil { + c.Assert(r, DeepEquals, s.cache.searchCachedRegion(r.StartKey(), false)) + } + } + } +} + +func workableRegions(cache *RegionCache, regions map[RegionVerID]*Region, ts int64) (len int) { + for _, region := range regions { + if !region.checkRegionCacheTTL(ts) { + continue + } + bo := NewBackoffer(context.Background(), 100) + store, _, _, _ := cache.routeStoreInRegion(bo, region, ts) + if store != nil { + len++ + } + } + return +} + +func workableRegionsInBtree(cache *RegionCache, t *btree.BTree, ts int64) (len int) { + t.Descend(func(item btree.Item) bool { + r := item.(*btreeItem).cachedRegion + if !r.checkRegionCacheTTL(ts) { + return true + } + bo := NewBackoffer(context.Background(), 100) + store, _, _, _ := cache.routeStoreInRegion(bo, r, ts) + if store != nil { + len++ + } + return true + }) + return +} + +func reachableStore(stores map[uint64]*Store, ts int64) (cnt int) { + for _, store := range stores { + state := store.getState() + if state.Available(ts) { + cnt++ + } } + return } func (s *testRegionCacheSuite) getRegion(c *C, key []byte) *Region { @@ -71,6 +122,7 @@ func (s *testRegionCacheSuite) getRegion(c *C, key []byte) *Region { c.Assert(r, NotNil) return r } + func (s *testRegionCacheSuite) getRegionWithEndKey(c *C, key []byte) *Region { _, err := s.cache.LocateEndKey(s.bo, key) c.Assert(err, IsNil) @@ -209,7 +261,7 @@ func (s *testRegionCacheSuite) TestSplit(c *C) { s.cluster.Split(s.region1, region2, []byte("m"), newPeers, newPeers[0]) // tikv-server reports `NotInRegion` - s.cache.DropRegion(r.VerID()) + s.cache.InvalidateCachedRegion(r.VerID()) s.checkCache(c, 0) r = s.getRegion(c, []byte("x")) @@ -236,7 +288,7 @@ func (s *testRegionCacheSuite) TestMerge(c *C) { s.cluster.Merge(s.region1, region2) // tikv-server reports `NotInRegion` - s.cache.DropRegion(loc.Region) + s.cache.InvalidateCachedRegion(loc.Region) s.checkCache(c, 0) loc, err = s.cache.LocateKey(s.bo, []byte("x")) @@ -250,7 +302,7 @@ func (s *testRegionCacheSuite) TestReconnect(c *C) { c.Assert(err, IsNil) // connect tikv-server failed, cause drop cache - s.cache.DropRegion(loc.Region) + s.cache.InvalidateCachedRegion(loc.Region) r := s.getRegion(c, []byte("a")) c.Assert(r, NotNil) @@ -259,22 +311,46 @@ func (s *testRegionCacheSuite) TestReconnect(c *C) { s.checkCache(c, 1) } -func (s *testRegionCacheSuite) TestRequestFail(c *C) { +func (s *testRegionCacheSuite) TestSendFailBlackout(c *C) { + ts := time.Now().Unix() region := s.getRegion(c, []byte("a")) - ctx, _ := s.cache.GetRPCContext(s.bo, region.VerID()) - s.cache.DropStoreOnSendRequestFail(ctx, errors.New("test error")) - c.Assert(s.cache.mu.regions, HasLen, 0) - region = s.getRegion(c, []byte("a")) - c.Assert(s.cache.mu.regions, HasLen, 1) - ctx, _ = s.cache.GetRPCContext(s.bo, region.VerID()) - s.cache.DropStoreOnSendRequestFail(ctx, errors.New("test error")) - c.Assert(len(s.cache.mu.regions), Equals, 0) + // init with 1 region 2 stores + c.Assert(workableRegions(s.cache, s.cache.mu.regions, ts), Equals, 1) + c.Assert(reachableStore(s.cache.storeMu.stores, ts), Equals, 2) + + // for each stores has 20 chance to retry, and still have chance to access store for 21 + for i := 0; i < 38; i++ { + ctx, _ := s.cache.GetRPCContext(s.bo, region.VerID()) + if ctx == nil { + fmt.Println() + } + s.cache.OnSendRequestFail(ctx, errors.New("test error")) + + } + ts = time.Now().Unix() + c.Assert(workableRegions(s.cache, s.cache.mu.regions, ts), Equals, 1) + c.Assert(reachableStore(s.cache.storeMu.stores, ts), Equals, 2) + + // 21 fail attempts will start blackout store in 1 second + for i := 0; i < 2; i++ { + // first fail request make 1st store' failAttempt + 1 + ctx, _ := s.cache.GetRPCContext(s.bo, region.VerID()) + s.cache.OnSendRequestFail(ctx, errors.New("test error")) + } + c.Assert(workableRegions(s.cache, s.cache.mu.regions, ts), Equals, 0) + c.Assert(reachableStore(s.cache.storeMu.stores, ts), Equals, 0) + + // after 1 second blackout, 2 store can be access again. + time.Sleep(1 * time.Second) + ts = time.Now().Unix() s.getRegion(c, []byte("a")) - c.Assert(s.cache.mu.regions, HasLen, 1) + c.Assert(workableRegions(s.cache, s.cache.mu.regions, ts), Equals, 1) + c.Assert(reachableStore(s.cache.storeMu.stores, ts), Equals, 2) } -func (s *testRegionCacheSuite) TestRequestFail2(c *C) { +func (s *testRegionCacheSuite) TestSendFailBlackTwoRegion(c *C) { + ts := time.Now().Unix() // key range: ['' - 'm' - 'z'] region2 := s.cluster.AllocID() newPeers := s.cluster.AllocIDs(2) @@ -288,21 +364,38 @@ func (s *testRegionCacheSuite) TestRequestFail2(c *C) { c.Assert(err, IsNil) c.Assert(loc2.Region.id, Equals, region2) - // Request should fail on region1. - ctx, _ := s.cache.GetRPCContext(s.bo, loc1.Region) - c.Assert(s.cache.storeMu.stores, HasLen, 1) + c.Assert(reachableStore(s.cache.storeMu.stores, ts), Equals, 2) s.checkCache(c, 2) - s.cache.DropStoreOnSendRequestFail(ctx, errors.New("test error")) - // Both region2 and store should be dropped from cache. - c.Assert(s.cache.storeMu.stores, HasLen, 0) - c.Assert(s.cache.searchCachedRegion([]byte("x"), true), IsNil) - s.checkCache(c, 0) + + // send request fail in 2 regions backed by same 2 stores. + for i := 0; i < startBlackoutAfterAttempt; i++ { + ctx, _ := s.cache.GetRPCContext(s.bo, loc1.Region) + s.cache.OnSendRequestFail(ctx, errors.New("test error")) + } + for i := 0; i < startBlackoutAfterAttempt; i++ { + ctx, _ := s.cache.GetRPCContext(s.bo, loc2.Region) + s.cache.OnSendRequestFail(ctx, errors.New("test error")) + } + + // both 2 region are invalidate and both 2 store are available. + c.Assert(reachableStore(s.cache.storeMu.stores, ts), Equals, 0) + c.Assert(workableRegions(s.cache, s.cache.mu.regions, ts), Equals, 0) + + // after sleep 1 second, region recover + time.Sleep(1 * time.Second) + ts = time.Now().Unix() + c.Assert(reachableStore(s.cache.storeMu.stores, ts), Equals, 2) + s.getRegion(c, []byte("a")) + s.getRegion(c, []byte("x")) + c.Assert(workableRegions(s.cache, s.cache.mu.regions, ts), Equals, 2) + c.Assert(workableRegions(s.cache, s.cache.mu.regions, ts), Equals, 2) } func (s *testRegionCacheSuite) TestRegionEpochAheadOfTiKV(c *C) { // Create a separated region cache to do this test. pdCli := &codecPDClient{mocktikv.NewPDClient(s.cluster)} cache := NewRegionCache(pdCli) + defer cache.Close() region := createSampleRegion([]byte("k1"), []byte("k2")) region.meta.Id = 1 @@ -322,32 +415,35 @@ func (s *testRegionCacheSuite) TestRegionEpochAheadOfTiKV(c *C) { } func (s *testRegionCacheSuite) TestDropStoreOnSendRequestFail(c *C) { - regionCnt := 999 - cluster := createClusterWithStoresAndRegions(regionCnt) + ts := time.Now().Unix() + regionCnt, storeCount := 8, 3 + cluster := createClusterWithStoresAndRegions(regionCnt, storeCount) cache := NewRegionCache(mocktikv.NewPDClient(cluster)) + defer cache.Close() loadRegionsToCache(cache, regionCnt) - c.Assert(len(cache.mu.regions), Equals, regionCnt) + c.Assert(workableRegions(cache, cache.mu.regions, ts), Equals, regionCnt) bo := NewBackoffer(context.Background(), 1) loc, err := cache.LocateKey(bo, []byte{}) c.Assert(err, IsNil) - // Drop the regions on one store, should drop only 1/3 of the regions. - rpcCtx, err := cache.GetRPCContext(bo, loc.Region) - c.Assert(err, IsNil) - cache.DropStoreOnSendRequestFail(rpcCtx, errors.New("test error")) - c.Assert(len(cache.mu.regions), Equals, regionCnt*2/3) - - loadRegionsToCache(cache, regionCnt) - c.Assert(len(cache.mu.regions), Equals, regionCnt) + // fail on one region make all stores be unavailable. + for j := 0; j < 20; j++ { + for i := 0; i < storeCount; i++ { + rpcCtx, err := cache.GetRPCContext(bo, loc.Region) + c.Assert(err, IsNil) + cache.OnSendRequestFail(rpcCtx, errors.New("test error")) + } + } + c.Assert(workableRegions(cache, cache.mu.regions, ts), Equals, 0) } const regionSplitKeyFormat = "t%08d" -func createClusterWithStoresAndRegions(regionCnt int) *mocktikv.Cluster { +func createClusterWithStoresAndRegions(regionCnt, storeCount int) *mocktikv.Cluster { cluster := mocktikv.NewCluster() - _, _, regionID, _ := mocktikv.BootstrapWithMultiStores(cluster, 3) + _, _, regionID, _ := mocktikv.BootstrapWithMultiStores(cluster, storeCount) for i := 0; i < regionCnt; i++ { rawKey := []byte(fmt.Sprintf(regionSplitKeyFormat, i)) ids := cluster.AllocIDs(4) @@ -377,6 +473,7 @@ func (s *testRegionCacheSuite) TestUpdateStoreAddr(c *C) { regionCache: NewRegionCache(mocktikv.NewPDClient(s.cluster)), rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore), } + defer client.Close() testKey := []byte("test_key") testValue := []byte("test_value") err := client.Put(testKey, testValue) @@ -449,13 +546,14 @@ func (s *testRegionCacheSuite) TestContainsByEnd(c *C) { func BenchmarkOnRequestFail(b *testing.B) { /* - This benchmark simulate many concurrent requests call DropStoreOnSendRequestFail method + This benchmark simulate many concurrent requests call OnSendRequestFail method after failed on a store, validate that on this scene, requests don't get blocked on the RegionCache lock. */ - regionCnt := 999 - cluster := createClusterWithStoresAndRegions(regionCnt) + regionCnt, storeCount := 998, 3 + cluster := createClusterWithStoresAndRegions(regionCnt, storeCount) cache := NewRegionCache(mocktikv.NewPDClient(cluster)) + defer cache.Close() loadRegionsToCache(cache, regionCnt) bo := NewBackoffer(context.Background(), 1) loc, err := cache.LocateKey(bo, []byte{}) @@ -464,14 +562,17 @@ func BenchmarkOnRequestFail(b *testing.B) { } region := cache.getRegionByIDFromCache(loc.Region.id) b.ResetTimer() + regionStore := region.getStore() + store, peer, _ := region.WorkStorePeer(regionStore) b.RunParallel(func(pb *testing.PB) { for pb.Next() { rpcCtx := &RPCContext{ Region: loc.Region, Meta: region.meta, - Peer: region.peer, + Peer: peer, + Store: store, } - cache.DropStoreOnSendRequestFail(rpcCtx, nil) + cache.OnSendRequestFail(rpcCtx, nil) } }) if len(cache.mu.regions) != regionCnt*2/3 { diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index a10efc645fcaf..e69e29f6943db 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -143,9 +143,15 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, ctx *RPCContext, re } return nil, true, nil } + s.onSendSuccess(ctx) return } +func (s *RegionRequestSender) onSendSuccess(ctx *RPCContext) { + store := s.regionCache.getStoreByStoreID(ctx.Store.storeID) + store.markAccess(s.regionCache.notifyCheckCh, true) +} + func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err error) error { // If it failed because the context is cancelled by ourself, don't retry. if errors.Cause(err) == context.Canceled { @@ -163,7 +169,7 @@ func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err err } } - s.regionCache.DropStoreOnSendRequestFail(ctx, err) + s.regionCache.OnSendRequestFail(ctx, err) // Retry on send request failure when it's not canceled. // When a store is not available, the leader of related region should be elected quickly. @@ -220,7 +226,7 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, regi logutil.Logger(context.Background()).Warn("tikv reports `StoreNotMatch` retry later", zap.Stringer("storeNotMatch", storeNotMatch), zap.Stringer("ctx", ctx)) - s.regionCache.ClearStoreByID(ctx.GetStoreID()) + ctx.Store.markNeedCheck(s.regionCache.notifyCheckCh) return true, nil } @@ -254,7 +260,7 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, regi logutil.Logger(context.Background()).Debug("tikv reports region error", zap.Stringer("regionErr", regionErr), zap.Stringer("ctx", ctx)) - s.regionCache.DropRegion(ctx.Region) + s.regionCache.InvalidateCachedRegion(ctx.Region) return false, nil } diff --git a/store/tikv/region_request_test.go b/store/tikv/region_request_test.go index a6d1a6bc2c72a..949af3f9eef7f 100644 --- a/store/tikv/region_request_test.go +++ b/store/tikv/region_request_test.go @@ -54,6 +54,10 @@ func (s *testRegionRequestSuite) SetUpTest(c *C) { s.regionRequestSender = NewRegionRequestSender(s.cache, client) } +func (s *testRegionRequestSuite) TearDownTest(c *C) { + s.cache.Close() +} + func (s *testRegionRequestSuite) TestOnSendFailedWithStoreRestart(c *C) { req := &tikvrpc.Request{ Type: tikvrpc.CmdRawPut, diff --git a/store/tikv/split_test.go b/store/tikv/split_test.go index 06cbd732cdac8..3a40c844b14e4 100644 --- a/store/tikv/split_test.go +++ b/store/tikv/split_test.go @@ -69,7 +69,7 @@ func (s *testSplitSuite) TestSplitBatchGet(c *C) { } s.split(c, loc.Region.id, []byte("b")) - s.store.regionCache.DropRegion(loc.Region) + s.store.regionCache.InvalidateCachedRegion(loc.Region) // mocktikv will panic if it meets a not-in-region key. err = snapshot.batchGetSingleRegion(s.bo, batch, func([]byte, []byte) {})