Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: avoid hanging proposal after leader goes down #46045

Merged
merged 4 commits into from
Mar 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions pkg/cmd/roachtest/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ import (

func registerGossip(r *testRegistry) {
runGossipChaos := func(ctx context.Context, t *test, c *cluster) {
args := startArgs("--args=--vmodule=*=1")
c.Put(ctx, cockroach, "./cockroach", c.All())
c.Start(ctx, t, c.All())
c.Start(ctx, t, c.All(), args)
waitForFullReplication(t, c.Conn(ctx, 1))

gossipNetwork := func(node int) string {
const query = `
Expand Down Expand Up @@ -65,6 +67,7 @@ SELECT string_agg(source_id::TEXT || ':' || target_id::TEXT, ',')
if i == deadNode {
continue
}
c.l.Printf("%d: checking gossip\n", i)
s := gossipNetwork(i)
if !initialized {
deadNodeStr := fmt.Sprint(deadNode)
Expand All @@ -88,7 +91,7 @@ SELECT string_agg(source_id::TEXT || ':' || target_id::TEXT, ',')
return false
}
}
fmt.Printf("gossip ok: %s (%0.0fs)\n", expected, timeutil.Since(start).Seconds())
c.l.Printf("gossip ok: %s (%0.0fs)\n", expected, timeutil.Since(start).Seconds())
return true
}

Expand All @@ -109,7 +112,7 @@ SELECT string_agg(source_id::TEXT || ':' || target_id::TEXT, ',')
deadNode = nodes.randNode()[0]
c.Stop(ctx, c.Node(deadNode))
waitForGossip()
c.Start(ctx, t, c.Node(deadNode))
c.Start(ctx, t, c.Node(deadNode), args)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (t *test) GetStatus() string {
defer t.mu.Unlock()
status, ok := t.mu.status[t.runnerID]
if ok {
return fmt.Sprintf("%s (set %s ago)", status.msg, timeutil.Now().Sub(status.time))
return fmt.Sprintf("%s (set %s ago)", status.msg, timeutil.Now().Sub(status.time).Round(time.Second))
}
return "N/A"
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ type Replica struct {
// evaluation and is consumed by the Raft processing thread. Once
// consumed, commands are proposed through Raft and moved to the
// proposals map.
//
// Access to proposalBuf must occur *without* holding the mutex.
// Instead, the buffer internally holds a reference to mu and will use
// it appropriately.
proposalBuf propBuf
// proposals stores the Raft in-flight commands which originated at
// this Replica, i.e. all commands for which propose has been called,
Expand Down Expand Up @@ -838,6 +842,13 @@ func (r *Replica) raftStatusRLocked() *raft.Status {
return nil
}

func (r *Replica) raftBasicStatusRLocked() raft.BasicStatus {
if rg := r.mu.internalRaftGroup; rg != nil {
return rg.BasicStatus()
}
return raft.BasicStatus{}
}

// State returns a copy of the internal state of the Replica, along with some
// auxiliary information.
func (r *Replica) State() storagepb.RangeInfo {
Expand Down
15 changes: 9 additions & 6 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ func (b *propBuf) flushRLocked() error {

func (b *propBuf) flushLocked() error {
return b.p.withGroupLocked(func(raftGroup *raft.RawNode) error {
return b.FlushLockedWithRaftGroup(raftGroup)
_, err := b.FlushLockedWithRaftGroup(raftGroup)
return err
})
}

Expand All @@ -356,7 +357,9 @@ func (b *propBuf) flushLocked() error {
//
// If raftGroup is non-nil (the common case) then the commands will also be
// proposed to the RawNode. This initiates Raft replication of the commands.
func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error {
//
// Returns the number of proposals handed to the RawNode.
func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) (int, error) {
// Before returning, make sure to forward the lease index base to at least
// the proposer's currently applied lease index. This ensures that if the
// lease applied index advances outside of this proposer's control (i.e.
Expand All @@ -374,7 +377,7 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error {
defer b.arr.adjustSize(used)
if used == 0 {
// The buffer is empty. Nothing to do.
return nil
return 0, nil
} else if used > b.arr.len() {
// The buffer is full and at least one writer has tried to allocate
// on top of the full buffer, so notify them to try again.
Expand Down Expand Up @@ -481,9 +484,9 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error {
}
}
if firstErr != nil {
return firstErr
return 0, firstErr
}
return proposeBatch(raftGroup, b.p.replicaID(), ents)
return used, proposeBatch(raftGroup, b.p.replicaID(), ents)
}

func (b *propBuf) forwardLeaseIndexBase(v uint64) {
Expand Down Expand Up @@ -521,7 +524,7 @@ func proposeBatch(raftGroup *raft.RawNode, replID roachpb.ReplicaID, ents []raft
// The representative example of this is a caller that wants to flush the buffer
// into the proposals map before canceling all proposals.
func (b *propBuf) FlushLockedWithoutProposing() {
if err := b.FlushLockedWithRaftGroup(nil /* raftGroup */); err != nil {
if _, err := b.FlushLockedWithRaftGroup(nil /* raftGroup */); err != nil {
log.Fatalf(context.Background(), "unexpected error: %+v", err)
}
}
Expand Down
41 changes: 39 additions & 2 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,9 @@ func (r *Replica) propose(ctx context.Context, p *ProposalData) (index int64, pE
// Insert into the proposal buffer, which passes the command to Raft to be
// proposed. The proposal buffer assigns the command a maximum lease index
// when it sequences it.
//
// NB: we must not hold r.mu while using the proposal buffer, see comment
// on the field.
maxLeaseIndex, err := r.mu.proposalBuf.Insert(p, data)
if err != nil {
return 0, roachpb.NewError(err)
Expand Down Expand Up @@ -408,13 +411,27 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
leaderID := r.mu.leaderID
lastLeaderID := leaderID
err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) {
if err := r.mu.proposalBuf.FlushLockedWithRaftGroup(raftGroup); err != nil {
numFlushed, err := r.mu.proposalBuf.FlushLockedWithRaftGroup(raftGroup)
if err != nil {
return false, err
}
if hasReady = raftGroup.HasReady(); hasReady {
rd = raftGroup.Ready()
}
return hasReady /* unquiesceAndWakeLeader */, nil
// We unquiesce if we have a Ready (= there's work to do). We also have
// to unquiesce if we just flushed some proposals but there isn't a
// Ready, which can happen if the proposals got dropped (raft does this
// if it doesn't know who the leader is). And, for extra defense in depth,
// we also unquiesce if there are outstanding proposals.
//
// NB: if we had the invariant that the group can only be in quiesced
// state if it knows the leader (state.Lead) AND we knew that raft would
// never give us an empty ready here (i.e. the only reason to drop a
// proposal is not knowing the leader) then numFlushed would not be
// necessary. The latter is likely true but we don't want to rely on
// it. The former is maybe true, but there's no easy way to enforce it.
unquiesceAndWakeLeader := hasReady || numFlushed > 0 || len(r.mu.proposals) > 0
return unquiesceAndWakeLeader, nil
})
r.mu.Unlock()
if err == errRemoved {
Expand Down Expand Up @@ -1333,6 +1350,26 @@ func (r *Replica) withRaftGroupLocked(
unquiesce, err := func(rangeID roachpb.RangeID, raftGroup *raft.RawNode) (bool, error) {
return f(raftGroup)
}(r.RangeID, r.mu.internalRaftGroup)
if r.mu.internalRaftGroup.BasicStatus().Lead == 0 {
// If we don't know the leader, unquiesce unconditionally. As a
// follower, we can't wake up the leader if we don't know who that is,
// so we should find out now before someone needs us to unquiesce.
//
// This situation should occur rarely or never (ever since we got
// stricter about validating incoming Quiesce requests) but it's good
// defense-in-depth.
//
// Note that unquiesceAndWakeLeaderLocked won't manage to wake up the
// leader since it's unknown to this replica, and at the time of writing
// the heuristics for campaigning are defensive (won't campaign if there
// is a live leaseholder). But if we are trying to unquiesce because
// this follower was asked to propose something, then this means that a
// request is going to have to wait until the leader next contacts us,
// or, in the worst case, an election timeout. This is not ideal - if a
// node holds a live lease, we should direct the client to it
// immediately.
unquiesce = true
}
if unquiesce {
r.unquiesceAndWakeLeaderLocked()
}
Expand Down
8 changes: 0 additions & 8 deletions pkg/kv/kvserver/replica_raft_quiesce.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,6 @@ import (
"go.etcd.io/etcd/raft/raftpb"
)

// mark the replica as quiesced. Returns true if the Replica is successfully
// quiesced and false otherwise.
func (r *Replica) quiesce() bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.quiesceLocked()
}

func (r *Replica) quiesceLocked() bool {
ctx := r.AnnotateCtx(context.TODO())
if r.hasPendingProposalsRLocked() {
Expand Down
22 changes: 17 additions & 5 deletions pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,23 @@ func (s *Store) processRaftRequestWithReplica(
if req.Message.Type != raftpb.MsgHeartbeat {
log.Fatalf(ctx, "unexpected quiesce: %+v", req)
}
status := r.RaftStatus()
if status != nil && status.Term == req.Message.Term && status.Commit == req.Message.Commit {
if r.quiesce() {
return nil
}
// If another replica tells us to quiesce, we verify that according to
// it, we are fully caught up, and that we believe it to be the leader.
// If we didn't do this, this replica could only unquiesce by means of
// an election, which means that the request prompting the unquiesce
// would end up with latency on the order of an election timeout.
//
// There are additional checks in quiesceLocked() that prevent us from
// quiescing if there's outstanding work.
r.mu.Lock()
status := r.raftBasicStatusRLocked()
ok := status.Term == req.Message.Term &&
status.Commit == req.Message.Commit &&
status.Lead == req.Message.From &&
r.quiesceLocked()
r.mu.Unlock()
if ok {
return nil
}
if log.V(4) {
log.Infof(ctx, "not quiescing: local raft status is %+v, incoming quiesce message is %+v", status, req.Message)
Expand Down