From 7384d66b7e664155286a37c1044141292b781e27 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Tue, 18 Dec 2018 12:57:19 +0100 Subject: [PATCH] Bump etcd Picks up https://github.com/etcd-io/etcd/pull/10199. --- go.etcd.io/etcd/raft/OWNERS | 19 +++++++++++ go.etcd.io/etcd/raft/log.go | 14 ++++---- go.etcd.io/etcd/raft/node.go | 14 ++++++-- go.etcd.io/etcd/raft/raft.go | 58 +++++++++++++++++++++++++++------ go.etcd.io/etcd/raft/rawnode.go | 33 +++++++++++++++++++ go.etcd.io/etcd/raft/status.go | 32 ++++++++++-------- go.etcd.io/etcd/raft/util.go | 11 ++++--- 7 files changed, 146 insertions(+), 35 deletions(-) create mode 100644 go.etcd.io/etcd/raft/OWNERS diff --git a/go.etcd.io/etcd/raft/OWNERS b/go.etcd.io/etcd/raft/OWNERS new file mode 100644 index 0000000000..ab781066e2 --- /dev/null +++ b/go.etcd.io/etcd/raft/OWNERS @@ -0,0 +1,19 @@ +approvers: +- heyitsanthony +- philips +- fanminshi +- gyuho +- mitake +- jpbetz +- xiang90 +- bdarnell +reviewers: +- heyitsanthony +- philips +- fanminshi +- gyuho +- mitake +- jpbetz +- xiang90 +- bdarnell +- tschottdorf diff --git a/go.etcd.io/etcd/raft/log.go b/go.etcd.io/etcd/raft/log.go index 50f28f87b3..03f83e61c4 100644 --- a/go.etcd.io/etcd/raft/log.go +++ b/go.etcd.io/etcd/raft/log.go @@ -39,7 +39,9 @@ type raftLog struct { logger Logger - maxMsgSize uint64 + // maxNextEntsSize is the maximum number aggregate byte size of the messages + // returned from calls to nextEnts. + maxNextEntsSize uint64 } // newLog returns log using the given storage and default options. It @@ -51,14 +53,14 @@ func newLog(storage Storage, logger Logger) *raftLog { // newLogWithSize returns a log using the given storage and max // message size. -func newLogWithSize(storage Storage, logger Logger, maxMsgSize uint64) *raftLog { +func newLogWithSize(storage Storage, logger Logger, maxNextEntsSize uint64) *raftLog { if storage == nil { log.Panic("storage must not be nil") } log := &raftLog{ - storage: storage, - logger: logger, - maxMsgSize: maxMsgSize, + storage: storage, + logger: logger, + maxNextEntsSize: maxNextEntsSize, } firstIndex, err := storage.FirstIndex() if err != nil { @@ -149,7 +151,7 @@ func (l *raftLog) unstableEntries() []pb.Entry { func (l *raftLog) nextEnts() (ents []pb.Entry) { off := max(l.applied+1, l.firstIndex()) if l.committed+1 > off { - ents, err := l.slice(off, l.committed+1, l.maxMsgSize) + ents, err := l.slice(off, l.committed+1, l.maxNextEntsSize) if err != nil { l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err) } diff --git a/go.etcd.io/etcd/raft/node.go b/go.etcd.io/etcd/raft/node.go index f67628fd36..749db98758 100644 --- a/go.etcd.io/etcd/raft/node.go +++ b/go.etcd.io/etcd/raft/node.go @@ -129,7 +129,8 @@ type Node interface { Tick() // Campaign causes the Node to transition to candidate state and start campaigning to become leader. Campaign(ctx context.Context) error - // Propose proposes that data be appended to the log. + // Propose proposes that data be appended to the log. Note that proposals can be lost without + // notice, therefore it is user's job to ensure proposal retries. Propose(ctx context.Context, data []byte) error // ProposeConfChange proposes config change. // At most one ConfChange can be in the process of going through consensus. @@ -174,7 +175,16 @@ type Node interface { Status() Status // ReportUnreachable reports the given node is not reachable for the last send. ReportUnreachable(id uint64) - // ReportSnapshot reports the status of the sent snapshot. + // ReportSnapshot reports the status of the sent snapshot. The id is the raft ID of the follower + // who is meant to receive the snapshot, and the status is SnapshotFinish or SnapshotFailure. + // Calling ReportSnapshot with SnapshotFinish is a no-op. But, any failure in applying a + // snapshot (for e.g., while streaming it from leader to follower), should be reported to the + // leader with SnapshotFailure. When leader sends a snapshot to a follower, it pauses any raft + // log probes until the follower can apply the snapshot and advance its state. If the follower + // can't do that, for e.g., due to a crash, it could end up in a limbo, never getting any + // updates from the leader. Therefore, it is crucial that the application ensures that any + // failure in snapshot sending is caught and reported back to the leader; so it can resume raft + // log probing in the follower. ReportSnapshot(id uint64, status SnapshotStatus) // Stop performs any necessary termination of the Node. Stop() diff --git a/go.etcd.io/etcd/raft/raft.go b/go.etcd.io/etcd/raft/raft.go index bf0a8983c4..211a3b0e2e 100644 --- a/go.etcd.io/etcd/raft/raft.go +++ b/go.etcd.io/etcd/raft/raft.go @@ -154,6 +154,9 @@ type Config struct { // throughput during normal replication. Note: math.MaxUint64 for unlimited, // 0 for at most one entry per message. MaxSizePerMsg uint64 + // MaxCommittedSizePerReady limits the size of the committed entries which + // can be applied. + MaxCommittedSizePerReady uint64 // MaxUncommittedEntriesSize limits the aggregate byte size of the // uncommitted entries that may be appended to a leader's log. Once this // limit is exceeded, proposals will begin to return ErrProposalDropped @@ -224,6 +227,12 @@ func (c *Config) validate() error { c.MaxUncommittedEntriesSize = noLimit } + // default MaxCommittedSizePerReady to MaxSizePerMsg because they were + // previously the same parameter. + if c.MaxCommittedSizePerReady == 0 { + c.MaxCommittedSizePerReady = c.MaxSizePerMsg + } + if c.MaxInflightMsgs <= 0 { return errors.New("max inflight messages must be greater than 0") } @@ -316,7 +325,7 @@ func newRaft(c *Config) *raft { if err := c.validate(); err != nil { panic(err.Error()) } - raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxSizePerMsg) + raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady) hs, cs, err := c.Storage.InitialState() if err != nil { panic(err) // TODO(bdarnell) @@ -635,17 +644,27 @@ func (r *raft) reset(term uint64) { r.readOnly = newReadOnly(r.readOnly.option) } -func (r *raft) appendEntry(es ...pb.Entry) { +func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { li := r.raftLog.lastIndex() for i := range es { es[i].Term = r.Term es[i].Index = li + 1 + uint64(i) } + // Track the size of this uncommitted proposal. + if !r.increaseUncommittedSize(es) { + r.logger.Debugf( + "%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", + r.id, + ) + // Drop the proposal. + return false + } // use latest "last" index after truncate/append li = r.raftLog.append(es...) r.getProgress(r.id).maybeUpdate(li) // Regardless of maybeCommit's return, our caller will call bcastAppend. r.maybeCommit() + return true } // tickElection is run by followers and candidates after r.electionTimeout. @@ -731,6 +750,11 @@ func (r *raft) becomeLeader() { r.tick = r.tickHeartbeat r.lead = r.id r.state = StateLeader + // Followers enter replicate mode when they've been successfully probed + // (perhaps after having received a snapshot as a result). The leader is + // trivially in this state. Note that r.reset() has initialized this + // progress with the last index already. + r.prs[r.id].becomeReplicate() // Conservatively set the pendingConfIndex to the last index in the // log. There may or may not be a pending config change, but it's @@ -739,7 +763,16 @@ func (r *raft) becomeLeader() { // could be expensive. r.pendingConfIndex = r.raftLog.lastIndex() - r.appendEntry(pb.Entry{Data: nil}) + emptyEnt := pb.Entry{Data: nil} + if !r.appendEntry(emptyEnt) { + // This won't happen because we just called reset() above. + r.logger.Panic("empty entry was dropped") + } + // As a special case, don't count the initial empty entry towards the + // uncommitted log quota. This is because we want to preserve the + // behavior of allowing one entry larger than quota if the current + // usage is zero. + r.reduceUncommittedSize([]pb.Entry{emptyEnt}) r.logger.Infof("%x became leader at term %d", r.id, r.Term) } @@ -970,10 +1003,6 @@ func stepLeader(r *raft, m pb.Message) error { r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee) return ErrProposalDropped } - if !r.increaseUncommittedSize(m.Entries) { - r.logger.Debugf("%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", r.id) - return ErrProposalDropped - } for i, e := range m.Entries { if e.Type == pb.EntryConfChange { @@ -986,7 +1015,10 @@ func stepLeader(r *raft, m pb.Message) error { } } } - r.appendEntry(m.Entries...) + + if !r.appendEntry(m.Entries...) { + return ErrProposalDropped + } r.bcastAppend() return nil case pb.MsgReadIndex: @@ -1046,7 +1078,13 @@ func stepLeader(r *raft, m pb.Message) error { pr.becomeReplicate() case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort(): r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr) + // Transition back to replicating state via probing state + // (which takes the snapshot into account). If we didn't + // move to replicating state, that would only happen with + // the next round of appends (but there may not be a next + // round for a while, exposing an inconsistent RaftStatus). pr.becomeProbe() + pr.becomeReplicate() case pr.State == ProgressStateReplicate: pr.ins.freeTo(m.Index) } @@ -1490,7 +1528,7 @@ func (r *raft) abortLeaderTransfer() { func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool { var s uint64 for _, e := range ents { - s += uint64(e.Size()) + s += uint64(PayloadSize(e)) } if r.uncommittedSize > 0 && r.uncommittedSize+s > r.maxUncommittedSize { @@ -1513,7 +1551,7 @@ func (r *raft) reduceUncommittedSize(ents []pb.Entry) { var s uint64 for _, e := range ents { - s += uint64(e.Size()) + s += uint64(PayloadSize(e)) } if s > r.uncommittedSize { // uncommittedSize may underestimate the size of the uncommitted Raft diff --git a/go.etcd.io/etcd/raft/rawnode.go b/go.etcd.io/etcd/raft/rawnode.go index 4a4ec2e946..d7a272d143 100644 --- a/go.etcd.io/etcd/raft/rawnode.go +++ b/go.etcd.io/etcd/raft/rawnode.go @@ -236,6 +236,39 @@ func (rn *RawNode) Status() *Status { return &status } +// StatusWithoutProgress returns a Status without populating the Progress field +// (and returns the Status as a value to avoid forcing it onto the heap). This +// is more performant if the Progress is not required. See WithProgress for an +// allocation-free way to introspect the Progress. +func (rn *RawNode) StatusWithoutProgress() Status { + return getStatusWithoutProgress(rn.raft) +} + +// ProgressType indicates the type of replica a Progress corresponds to. +type ProgressType byte + +const ( + // ProgressTypePeer accompanies a Progress for a regular peer replica. + ProgressTypePeer ProgressType = iota + // ProgressTypeLearner accompanies a Progress for a learner replica. + ProgressTypeLearner +) + +// WithProgress is a helper to introspect the Progress for this node and its +// peers. +func (rn *RawNode) WithProgress(visitor func(id uint64, typ ProgressType, pr Progress)) { + for id, pr := range rn.raft.prs { + pr := *pr + pr.ins = nil + visitor(id, ProgressTypePeer, pr) + } + for id, pr := range rn.raft.learnerPrs { + pr := *pr + pr.ins = nil + visitor(id, ProgressTypeLearner, pr) + } +} + // ReportUnreachable reports the given node is not reachable for the last send. func (rn *RawNode) ReportUnreachable(id uint64) { _ = rn.raft.Step(pb.Message{Type: pb.MsgUnreachable, From: id}) diff --git a/go.etcd.io/etcd/raft/status.go b/go.etcd.io/etcd/raft/status.go index 6d4aa7ba5f..9feca7c03b 100644 --- a/go.etcd.io/etcd/raft/status.go +++ b/go.etcd.io/etcd/raft/status.go @@ -32,29 +32,35 @@ type Status struct { LeadTransferee uint64 } -// getStatus gets a copy of the current raft status. -func getStatus(r *raft) Status { +func getProgressCopy(r *raft) map[uint64]Progress { + prs := make(map[uint64]Progress) + for id, p := range r.prs { + prs[id] = *p + } + + for id, p := range r.learnerPrs { + prs[id] = *p + } + return prs +} + +func getStatusWithoutProgress(r *raft) Status { s := Status{ ID: r.id, LeadTransferee: r.leadTransferee, } - s.HardState = r.hardState() s.SoftState = *r.softState() - s.Applied = r.raftLog.applied + return s +} +// getStatus gets a copy of the current raft status. +func getStatus(r *raft) Status { + s := getStatusWithoutProgress(r) if s.RaftState == StateLeader { - s.Progress = make(map[uint64]Progress) - for id, p := range r.prs { - s.Progress[id] = *p - } - - for id, p := range r.learnerPrs { - s.Progress[id] = *p - } + s.Progress = getProgressCopy(r) } - return s } diff --git a/go.etcd.io/etcd/raft/util.go b/go.etcd.io/etcd/raft/util.go index 1a7a1e9ac3..c145d26dd7 100644 --- a/go.etcd.io/etcd/raft/util.go +++ b/go.etcd.io/etcd/raft/util.go @@ -77,10 +77,7 @@ func DescribeMessage(m pb.Message, f EntryFormatter) string { var buf bytes.Buffer fmt.Fprintf(&buf, "%x->%x %v Term:%d Log:%d/%d", m.From, m.To, m.Type, m.Term, m.LogTerm, m.Index) if m.Reject { - fmt.Fprintf(&buf, " Rejected") - if m.RejectHint != 0 { - fmt.Fprintf(&buf, "(Hint:%d)", m.RejectHint) - } + fmt.Fprintf(&buf, " Rejected (Hint: %d)", m.RejectHint) } if m.Commit != 0 { fmt.Fprintf(&buf, " Commit:%d", m.Commit) @@ -101,6 +98,12 @@ func DescribeMessage(m pb.Message, f EntryFormatter) string { return buf.String() } +// PayloadSize is the size of the payload of this Entry. Notably, it does not +// depend on its Index or Term. +func PayloadSize(e pb.Entry) int { + return len(e.Data) +} + // DescribeEntry returns a concise human-readable description of an // Entry for debugging. func DescribeEntry(e pb.Entry, f EntryFormatter) string {