Skip to content

Commit

Permalink
dm: fix leader did not retire after delete the key (#11604)
Browse files Browse the repository at this point in the history
close #11602
  • Loading branch information
GMHDBJD authored Sep 24, 2024
1 parent 25676cf commit d3aef11
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 4 deletions.
16 changes: 12 additions & 4 deletions dm/pkg/election/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,12 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio
e.l.Debug("begin to campaign", zap.Stringer("current member", e.info))

err2 := elec.Campaign(ctx2, e.infoStr)
failpoint.Inject("mockCapaignSucceedButReturnErr", func() {
if err2 == nil {
err2 = errors.New("mock campaign succeed but return error")
time.Sleep(time.Second)
}
})
if err2 != nil {
// because inner commit may return undetermined error, we try to delete the election key manually
deleted, err3 := e.ClearSessionIfNeeded(ctx, e.ID())
Expand All @@ -282,6 +288,7 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio
var (
leaderKey string
leaderInfo *CampaignerInfo
revision int64
)
eleObserveCh := elec.Observe(ctx2)

Expand All @@ -300,6 +307,7 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio
e.l.Info("get response from election observe", zap.String("key", string(resp.Kvs[0].Key)), zap.String("value", string(resp.Kvs[0].Value)))
leaderKey = string(resp.Kvs[0].Key)
leaderInfo, err = getCampaignerInfo(resp.Kvs[0].Value)
revision = resp.Header.Revision
if err != nil {
// this should never happened
e.l.Error("fail to get leader information", zap.String("value", string(resp.Kvs[0].Value)), zap.Error(err))
Expand Down Expand Up @@ -330,7 +338,7 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio

e.l.Info("become leader", zap.Stringer("current member", e.info))
e.notifyLeader(ctx, leaderInfo) // become the leader now
e.watchLeader(ctx, session, leaderKey, elec)
e.watchLeader(ctx, session, leaderKey, elec, revision)
e.l.Info("retire from leader", zap.Stringer("current member", e.info))
e.notifyLeader(ctx, nil) // need to re-campaign
oldLeaderID = ""
Expand Down Expand Up @@ -359,8 +367,8 @@ func (e *Election) notifyLeader(ctx context.Context, leaderInfo *CampaignerInfo)
}
}

func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session, key string, elec *concurrency.Election) {
e.l.Debug("watch leader key", zap.String("key", key))
func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session, key string, elec *concurrency.Election, revision int64) {
e.l.Debug("watch leader key", zap.String("key", key), zap.Int64("revision", revision), zap.Stringer("current member", e.info))

e.campaignMu.Lock()
e.resignCh = make(chan struct{})
Expand All @@ -374,7 +382,7 @@ func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session

wCtx, cancel := context.WithCancel(ctx)
defer cancel()
wch := e.cli.Watch(wCtx, key)
wch := e.cli.Watch(wCtx, key, clientv3.WithRev(revision+1))

for {
if e.evictLeader.Load() {
Expand Down
79 changes: 79 additions & 0 deletions dm/pkg/election/election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,3 +412,82 @@ func (t *testElectionSuite) TestElectionDeleteKey(c *C) {
c.Assert(err, IsNil)
wg.Wait()
}

func (t *testElectionSuite) TestElectionSucceedButReturnError(c *C) {
var (
timeout = 5 * time.Second
sessionTTL = 60
key = "unit-test/election-succeed-but-return-error"
ID1 = "member1"
ID2 = "member2"
addr1 = "127.0.0.1:1"
addr2 = "127.0.0.1:2"
)
cli, err := etcdutil.CreateClient([]string{t.endPoint}, nil)
c.Assert(err, IsNil)
defer cli.Close()

ctx1, cancel1 := context.WithCancel(context.Background())
defer cancel1()

e1, err := NewElection(ctx1, cli, sessionTTL, key, ID1, addr1, t.notifyBlockTime)
c.Assert(err, IsNil)
defer e1.Close()

// e1 should become the leader
select {
case leader := <-e1.LeaderNotify():
c.Assert(leader.ID, Equals, ID1)
case <-time.After(timeout):
c.Fatal("leader campaign timeout")
}
c.Assert(e1.IsLeader(), IsTrue)
_, leaderID, leaderAddr, err := e1.LeaderInfo(ctx1)
c.Assert(err, IsNil)
c.Assert(leaderID, Equals, e1.ID())
c.Assert(leaderAddr, Equals, addr1)

// start e2
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
e2, err := NewElection(ctx2, cli, sessionTTL, key, ID2, addr2, t.notifyBlockTime)
c.Assert(err, IsNil)
defer e2.Close()
select {
case leader := <-e2.leaderCh:
c.Assert(leader.ID, Equals, ID1)
case <-time.After(timeout):
c.Fatal("leader campaign timeout")
}
// but the leader should still be e1
_, leaderID, leaderAddr, err = e2.LeaderInfo(ctx2)
c.Assert(err, IsNil)
c.Assert(leaderID, Equals, e1.ID())
c.Assert(leaderAddr, Equals, addr1)
c.Assert(e2.IsLeader(), IsFalse)

c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/pkg/election/mockCapaignSucceedButReturnErr", `return()`), IsNil)
//nolint:errcheck
defer failpoint.Disable("github.com/pingcap/tiflow/dm/pkg/election/mockCapaignSucceedButReturnErr")

e1.Close() // stop the campaign for e1
c.Assert(e1.IsLeader(), IsFalse)

// e2 should become the leader
select {
case leader := <-e2.LeaderNotify():
c.Assert(leader.ID, Equals, ID2)
case <-time.After(timeout):
c.Fatal("leader campaign timeout")
}

// the leader retired after deleted the key
select {
case err2 := <-e2.ErrorNotify():
c.Fatalf("delete the leader key should not get an error, %v", err2)
case leader := <-e2.LeaderNotify():
c.Assert(leader, IsNil)
case <-time.After(timeout):
c.Fatal("leader retire timeout")
}
}

0 comments on commit d3aef11

Please sign in to comment.