Skip to content

Commit

Permalink
syncer: sync region leaders between PD leader and follower (tikv#2591)
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato authored Aug 19, 2020
1 parent d27a803 commit 7e31082
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 20 deletions.
8 changes: 4 additions & 4 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
var saveKV, saveCache, isNew, statsChange bool
var saveKV, saveCache, isNew, needSync bool
if origin == nil {
log.Debug("insert new region",
zap.Uint64("region-id", region.GetID()),
Expand Down Expand Up @@ -563,7 +563,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
zap.Uint64("to", region.GetLeader().GetStoreId()),
)
}
saveCache = true
saveCache, needSync = true, true
}
if len(region.GetDownPeers()) > 0 || len(region.GetPendingPeers()) > 0 {
saveCache = true
Expand All @@ -584,7 +584,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
region.GetBytesRead() != origin.GetBytesRead() ||
region.GetKeysWritten() != origin.GetKeysWritten() ||
region.GetKeysRead() != origin.GetKeysRead() {
saveCache, statsChange = true, true
saveCache, needSync = true, true
}

if region.GetReplicationStatus().GetState() != replication_modepb.RegionReplicationState_UNKNOWN &&
Expand Down Expand Up @@ -675,7 +675,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
}
regionEventCounter.WithLabelValues("update_kv").Inc()
}
if saveKV || statsChange {
if saveKV || needSync {
select {
case c.changedRegions <- region:
default:
Expand Down
14 changes: 11 additions & 3 deletions server/region_syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/pkg/errors"
Expand Down Expand Up @@ -186,18 +187,25 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
}
stats := resp.GetRegionStats()
regions := resp.GetRegions()
regionLeaders := resp.GetRegionLeaders()
hasStats := len(stats) == len(regions)
for i, r := range regions {
var region *core.RegionInfo
var (
region *core.RegionInfo
regionLeader *metapb.Peer
)
if len(regionLeaders) > i && regionLeaders[i].Id != 0 {
regionLeader = regionLeaders[i]
}
if hasStats {
region = core.NewRegionInfo(r, nil,
region = core.NewRegionInfo(r, regionLeader,
core.SetWrittenBytes(stats[i].BytesWritten),
core.SetWrittenKeys(stats[i].KeysWritten),
core.SetReadBytes(stats[i].BytesRead),
core.SetReadKeys(stats[i].KeysRead),
)
} else {
region = core.NewRegionInfo(r, nil)
region = core.NewRegionInfo(r, regionLeader)
}

s.server.GetBasicCluster().CheckAndPutRegion(region)
Expand Down
42 changes: 30 additions & 12 deletions server/region_syncer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func NewRegionSyncer(s Server) *RegionSyncer {
func (s *RegionSyncer) RunServer(regionNotifier <-chan *core.RegionInfo, quit chan struct{}) {
var requests []*metapb.Region
var stats []*pdpb.RegionStat
var leaders []*metapb.Peer
ticker := time.NewTicker(syncerKeepAliveInterval)
for {
select {
Expand All @@ -108,20 +109,23 @@ func (s *RegionSyncer) RunServer(regionNotifier <-chan *core.RegionInfo, quit ch
case first := <-regionNotifier:
requests = append(requests, first.GetMeta())
stats := append(stats, first.GetStat())
leaders := append(leaders, first.GetLeader())
startIndex := s.history.GetNextIndex()
s.history.Record(first)
pending := len(regionNotifier)
for i := 0; i < pending && i < maxSyncRegionBatchSize; i++ {
region := <-regionNotifier
requests = append(requests, region.GetMeta())
stats = append(stats, region.GetStat())
leaders = append(leaders, region.GetLeader())
s.history.Record(region)
}
regions := &pdpb.SyncRegionResponse{
Header: &pdpb.ResponseHeader{ClusterId: s.server.ClusterID()},
Regions: requests,
StartIndex: startIndex,
RegionStats: stats,
Header: &pdpb.ResponseHeader{ClusterId: s.server.ClusterID()},
Regions: requests,
StartIndex: startIndex,
RegionStats: stats,
RegionLeaders: leaders,
}
s.broadcast(regions)
case <-ticker.C:
Expand Down Expand Up @@ -179,17 +183,24 @@ func (s *RegionSyncer) syncHistoryRegion(request *pdpb.SyncRegionRequest, stream
start := time.Now()
metas := make([]*metapb.Region, 0, maxSyncRegionBatchSize)
stats := make([]*pdpb.RegionStat, 0, maxSyncRegionBatchSize)
leaders := make([]*metapb.Peer, 0, maxSyncRegionBatchSize)
for syncedIndex, r := range regions {
metas = append(metas, r.GetMeta())
stats = append(stats, r.GetStat())
leader := &metapb.Peer{}
if r.GetLeader() != nil {
leader = r.GetLeader()
}
leaders = append(leaders, leader)
if len(metas) < maxSyncRegionBatchSize && syncedIndex < len(regions)-1 {
continue
}
resp := &pdpb.SyncRegionResponse{
Header: &pdpb.ResponseHeader{ClusterId: s.server.ClusterID()},
Regions: metas,
StartIndex: uint64(lastIndex),
RegionStats: stats,
Header: &pdpb.ResponseHeader{ClusterId: s.server.ClusterID()},
Regions: metas,
StartIndex: uint64(lastIndex),
RegionStats: stats,
RegionLeaders: leaders,
}
s.limit.Wait(int64(resp.Size()))
lastIndex += len(metas)
Expand All @@ -213,15 +224,22 @@ func (s *RegionSyncer) syncHistoryRegion(request *pdpb.SyncRegionRequest, stream
zap.Int("records-length", len(records)))
regions := make([]*metapb.Region, len(records))
stats := make([]*pdpb.RegionStat, len(records))
leaders := make([]*metapb.Peer, len(records))
for i, r := range records {
regions[i] = r.GetMeta()
stats[i] = r.GetStat()
leader := &metapb.Peer{}
if r.GetLeader() != nil {
leader = r.GetLeader()
}
leaders[i] = leader
}
resp := &pdpb.SyncRegionResponse{
Header: &pdpb.ResponseHeader{ClusterId: s.server.ClusterID()},
Regions: regions,
StartIndex: startIndex,
RegionStats: stats,
Header: &pdpb.ResponseHeader{ClusterId: s.server.ClusterID()},
Regions: regions,
StartIndex: startIndex,
RegionStats: stats,
RegionLeaders: leaders,
}
return stream.Send(resp)
}
Expand Down
14 changes: 13 additions & 1 deletion tests/server/region_syncer/region_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) {
},
StartKey: []byte{byte(i)},
EndKey: []byte{byte(i + 1)},
Peers: []*metapb.Peer{{Id: allocator.alloc(), StoreId: uint64(0)}},
Peers: []*metapb.Peer{
{Id: allocator.alloc(), StoreId: uint64(0)},
{Id: allocator.alloc(), StoreId: uint64(0)},
},
}
regions = append(regions, core.NewRegionInfo(r, r.Peers[0]))
}
Expand Down Expand Up @@ -129,6 +132,13 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) {
c.Assert(err, IsNil)
}

// change the leader of region
for i := 0; i < len(regions); i++ {
regions[i] = regions[i].Clone(core.WithLeader(regions[i].GetPeers()[1]))
err = rc.HandleRegionHeartbeat(regions[i])
c.Assert(err, IsNil)
}

// ensure flush to region storage, we use a duration larger than the
// region storage flush rate limit (3s).
time.Sleep(4 * time.Second)
Expand All @@ -142,6 +152,7 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) {
r := followerServer.GetServer().GetBasicCluster().GetRegion(region.GetID())
c.Assert(r.GetMeta(), DeepEquals, region.GetMeta())
c.Assert(r.GetStat(), DeepEquals, region.GetStat())
c.Assert(r.GetLeader(), DeepEquals, region.GetLeader())
}

err = leaderServer.Stop()
Expand All @@ -155,6 +166,7 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) {
r := leaderServer.GetRegionInfoByID(region.GetID())
c.Assert(r.GetMeta(), DeepEquals, region.GetMeta())
c.Assert(r.GetStat(), DeepEquals, region.GetStat())
c.Assert(r.GetLeader(), DeepEquals, region.GetLeader())
}
}

Expand Down

0 comments on commit 7e31082

Please sign in to comment.